Skip to content

Commit

Permalink
First cleanup pass for UDPSocket.
Browse files Browse the repository at this point in the history
  • Loading branch information
headius committed Mar 12, 2012
1 parent ccde9e3 commit bef4afe
Showing 1 changed file with 127 additions and 65 deletions.
192 changes: 127 additions & 65 deletions src/org/jruby/ext/socket/RubyUDPSocket.java
Expand Up @@ -37,9 +37,12 @@
import java.net.UnknownHostException;
import java.net.DatagramPacket;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.DatagramChannel;

import java.nio.channels.SelectionKey;

import jnr.netdb.Service;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
Expand Down Expand Up @@ -72,6 +75,7 @@ static void createUDPSocket(Ruby runtime) {

runtime.getObject().setConstant("UDPsocket", rb_cUDPSocket);
}

private static ObjectAllocator UDPSOCKET_ALLOCATOR = new ObjectAllocator() {

public IRubyObject allocate(Ruby runtime, RubyClass klass) {
Expand All @@ -90,13 +94,17 @@ public IRubyObject initialize(ThreadContext context) {
try {
DatagramChannel channel = DatagramChannel.open();
initSocket(runtime, new ChannelDescriptor(channel, newModeFlags(runtime, ModeFlags.RDWR)));

} catch (ConnectException e) {
throw runtime.newErrnoECONNREFUSEDError();

} catch (UnknownHostException e) {
throw sockerr(runtime, "initialize: name or service not known");

} catch (IOException e) {
throw sockerr(runtime, "initialize: name or service not known");
}

return this;
}

Expand All @@ -105,80 +113,89 @@ public IRubyObject initialize(ThreadContext context, IRubyObject protocol) {
// we basically ignore protocol. let someone report it...
return initialize(context);
}

@Deprecated
public IRubyObject bind(IRubyObject host, IRubyObject port) {
return bind(getRuntime().getCurrentContext(), host, port);
}

@JRubyMethod
public IRubyObject bind(ThreadContext context, IRubyObject host, IRubyObject port) {
Ruby runtime = context.runtime;
InetSocketAddress addr = null;

try {
Channel channel = getChannel();

if (host.isNil()
|| ((host instanceof RubyString)
&& ((RubyString) host).isEmpty())) {

// host is nil or the empty string, bind to INADDR_ANY
addr = new InetSocketAddress(RubyNumeric.fix2int(port));

} else if (host instanceof RubyFixnum) {

// passing in something like INADDR_ANY
int intAddr = RubyNumeric.fix2int(host);
RubyModule socketMod = context.getRuntime().getModule("Socket");
RubyModule socketMod = runtime.getModule("Socket");
if (intAddr == RubyNumeric.fix2int(socketMod.getConstant("INADDR_ANY"))) {
addr = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), RubyNumeric.fix2int(port));
}

} else {
// passing in something like INADDR_ANY
addr = new InetSocketAddress(InetAddress.getByName(host.convertToString().toString()), RubyNumeric.fix2int(port));
}

if (this.multicastStateManager == null) {
((DatagramChannel) this.getChannel()).socket().bind(addr);
if (multicastStateManager == null) {
((DatagramChannel) channel).socket().bind(addr);
} else {
this.multicastStateManager.rebindToPort(RubyNumeric.fix2int(port));
multicastStateManager.rebindToPort(RubyNumeric.fix2int(port));
}

return RubyFixnum.zero(context.getRuntime());
return RubyFixnum.zero(runtime);

} catch (UnknownHostException e) {
throw sockerr(context.getRuntime(), "bind: name or service not known");
throw sockerr(runtime, "bind: name or service not known");

} catch (SocketException e) {
throw sockerr(context.getRuntime(), "bind: name or service not known");
throw sockerr(runtime, "bind: name or service not known");

} catch (IOException e) {
throw sockerr(context.getRuntime(), "bind: name or service not known");
throw sockerr(runtime, "bind: name or service not known");

} catch (Error e) {

// Workaround for a bug in Sun's JDK 1.5.x, see
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6303753
if (e.getCause() instanceof SocketException) {
throw sockerr(context.getRuntime(), "bind: name or service not known");
throw sockerr(runtime, "bind: name or service not known");
} else {
throw e;
}

}
}
@Deprecated
public IRubyObject connect(IRubyObject host, IRubyObject port) {
return connect(getRuntime().getCurrentContext(), host, port);
}

@JRubyMethod
public IRubyObject connect(ThreadContext context, IRubyObject host, IRubyObject port) {
Ruby runtime = context.runtime;

try {
InetSocketAddress addr;
addr = new InetSocketAddress(InetAddress.getByName(host.convertToString().toString()), RubyNumeric.fix2int(port));
InetSocketAddress addr = new InetSocketAddress(InetAddress.getByName(host.convertToString().toString()), RubyNumeric.fix2int(port));

((DatagramChannel) this.getChannel()).connect(addr);
return RubyFixnum.zero(context.getRuntime());

return RubyFixnum.zero(runtime);

} catch (UnknownHostException e) {
throw sockerr(context.getRuntime(), "connect: name or service not known");
throw sockerr(runtime, "connect: name or service not known");

} catch (IOException e) {
throw sockerr(context.getRuntime(), "connect: name or service not known");
throw sockerr(runtime, "connect: name or service not known");
}
}

@Deprecated
public IRubyObject recvfrom(IRubyObject[] args) {
return recvfrom(getRuntime().getCurrentContext(), args);
}

@JRubyMethod(required = 1, rest = true)
public IRubyObject recvfrom(ThreadContext context, IRubyObject[] args) {
Ruby runtime = context.runtime;

try {
InetSocketAddress sender = null;
int length = RubyNumeric.fix2int(args[0]);
Expand All @@ -187,74 +204,87 @@ public IRubyObject recvfrom(ThreadContext context, IRubyObject[] args) {
DatagramPacket recv = new DatagramPacket(buf2, buf2.length);

if (this.multicastStateManager == null) {
((DatagramChannel) this.getChannel()).configureBlocking(false);
DatagramChannel channel = (DatagramChannel)getChannel();

channel.configureBlocking(false);
context.getThread().select(this, SelectionKey.OP_READ);
sender = (InetSocketAddress) ((DatagramChannel) this.getChannel()).receive(buf);
sender = (InetSocketAddress)channel.receive(buf);

} else {
MulticastSocket ms = this.multicastStateManager.getMulticastSocket();

ms.receive(recv);
sender = (InetSocketAddress) recv.getSocketAddress();
}

// see JRUBY-4678
if (sender == null) {
throw context.getRuntime().newErrnoECONNRESETError();
throw runtime.newErrnoECONNRESETError();
}

IRubyObject addressArray = context.getRuntime().newArray(new IRubyObject[]{
context.getRuntime().newString("AF_INET"),
context.getRuntime().newFixnum(sender.getPort()),
context.getRuntime().newString(sender.getHostName()),
context.getRuntime().newString(sender.getAddress().getHostAddress())
});
IRubyObject addressArray = runtime.newArray(
runtime.newString("AF_INET"),
runtime.newFixnum(sender.getPort()),
runtime.newString(sender.getHostName()),
runtime.newString(sender.getAddress().getHostAddress())
);

IRubyObject result = null;

if (this.multicastStateManager == null) {
result = context.getRuntime().newString(new ByteList(buf.array(), 0, buf.position()));
result = runtime.newString(new ByteList(buf.array(), 0, buf.position()));
} else {
result = context.getRuntime().newString(new ByteList(recv.getData(), 0, recv.getLength()));
result = runtime.newString(new ByteList(recv.getData(), 0, recv.getLength()));
}

return context.getRuntime().newArray(new IRubyObject[]{result, addressArray});
return runtime.newArray(result, addressArray);

} catch (UnknownHostException e) {
throw sockerr(context.getRuntime(), "recvfrom: name or service not known");
throw sockerr(runtime, "recvfrom: name or service not known");

} catch (PortUnreachableException e) {
throw context.getRuntime().newErrnoECONNREFUSEDError();
throw runtime.newErrnoECONNREFUSEDError();

} catch (IOException e) {
throw sockerr(context.getRuntime(), "recvfrom: name or service not known");
throw sockerr(runtime, "recvfrom: name or service not known");
}
}

@Override
@JRubyMethod(rest = true)
public IRubyObject recv(ThreadContext context, IRubyObject[] args) {
Ruby runtime = context.runtime;

try {
DatagramChannel channel = (DatagramChannel)getChannel();

int length = RubyNumeric.fix2int(args[0]);
ByteBuffer buf = ByteBuffer.allocate(length);
((DatagramChannel) this.getChannel()).configureBlocking(false);

channel.configureBlocking(false);
context.getThread().select(this, SelectionKey.OP_READ);
InetSocketAddress sender = (InetSocketAddress) ((DatagramChannel) this.getChannel()).receive(buf);
InetSocketAddress sender = (InetSocketAddress)channel.receive(buf);

// see JRUBY-4678
if (sender == null) {
throw context.getRuntime().newErrnoECONNRESETError();
throw runtime.newErrnoECONNRESETError();
}

return context.getRuntime().newString(new ByteList(buf.array(), 0, buf.position()));
return runtime.newString(new ByteList(buf.array(), 0, buf.position()));

} catch (IOException e) {
throw sockerr(context.getRuntime(), "recv: name or service not known");
}
}
throw sockerr(runtime, "recv: name or service not known");

@Deprecated
public IRubyObject send(IRubyObject[] args) {
return send(getRuntime().getCurrentContext(), args);
}
}

@JRubyMethod(required = 1, rest = true)
public IRubyObject send(ThreadContext context, IRubyObject[] args) {
Ruby runtime = context.runtime;

try {
int written;

if (args.length >= 3) { // host and port given
RubyString nameStr = args[2].convertToString();
RubyString data = args[0].convertToString();
Expand All @@ -265,59 +295,91 @@ public IRubyObject send(ThreadContext context, IRubyObject[] args) {

int port;
if (args[3] instanceof RubyString) {
jnr.netdb.Service service = jnr.netdb.Service.getServiceByName(args[3].asJavaString(), "udp");

Service service = Service.getServiceByName(args[3].asJavaString(), "udp");

if (service != null) {
port = service.getPort();
} else {
port = (int)args[3].convertToInteger("to_i").getLongValue();
}

} else {
port = (int)args[3].convertToInteger().getLongValue();
}

InetAddress address = SocketUtils.getRubyInetAddress(nameStr.getByteList());
InetSocketAddress addr =
new InetSocketAddress(address, port);
InetSocketAddress addr = new InetSocketAddress(address, port);

if (this.multicastStateManager == null) {
written = ((DatagramChannel) this.getChannel()).send(buf, addr);
}
else {

} else {
sendDP = new DatagramPacket(buf2, buf2.length, address, port);
MulticastSocket ms = this.multicastStateManager.getMulticastSocket();

ms.send(sendDP);
written = sendDP.getLength();
}

} else {
RubyString data = args[0].convertToString();
ByteBuffer buf = ByteBuffer.wrap(data.getBytes());

written = ((DatagramChannel) this.getChannel()).write(buf);
}
return context.getRuntime().newFixnum(written);

return runtime.newFixnum(written);

} catch (UnknownHostException e) {
throw sockerr(context.getRuntime(), "send: name or service not known");
throw sockerr(runtime, "send: name or service not known");

} catch (IOException e) {
throw sockerr(context.getRuntime(), "send: name or service not known");
throw sockerr(runtime, "send: name or service not known");
}
}
@Deprecated
public static IRubyObject open(IRubyObject recv, IRubyObject[] args, Block block) {
return open(recv.getRuntime().getCurrentContext(), recv, args, block);
}

@JRubyMethod(rest = true, meta = true)
public static IRubyObject open(ThreadContext context, IRubyObject recv, IRubyObject[] args, Block block) {
RubyUDPSocket sock = (RubyUDPSocket) recv.callMethod(context, "new", args);

if (!block.isGiven()) {
return sock;
}

try {
return block.yield(context, sock);

} finally {
if (sock.openFile.isOpen()) {
sock.close();
}
}
}

@Deprecated
public IRubyObject bind(IRubyObject host, IRubyObject port) {
return bind(getRuntime().getCurrentContext(), host, port);
}

@Deprecated
public IRubyObject connect(IRubyObject host, IRubyObject port) {
return connect(getRuntime().getCurrentContext(), host, port);
}

@Deprecated
public IRubyObject recvfrom(IRubyObject[] args) {
return recvfrom(getRuntime().getCurrentContext(), args);
}

@Deprecated
public IRubyObject send(IRubyObject[] args) {
return send(getRuntime().getCurrentContext(), args);
}

@Deprecated
public static IRubyObject open(IRubyObject recv, IRubyObject[] args, Block block) {
return open(recv.getRuntime().getCurrentContext(), recv, args, block);
}
}// RubyUDPSocket

0 comments on commit bef4afe

Please sign in to comment.