Skip to content

Commit

Permalink
Offload multicast operations to the EventLoop if not called from with…
Browse files Browse the repository at this point in the history
…in an EventLoop thread

Motivation:

LinuxSocket is not thread-safe and so we need to ensure we only access it from without the EventLoop thread.

Modifications:

Change EpollDatagramChannel to always dispatch to the EventLoop if not called from the EventLoop thread

Result:

No more threading issues
  • Loading branch information
normanmaurer committed Jul 8, 2022
1 parent c949fa6 commit f520061
Showing 1 changed file with 37 additions and 4 deletions.
Expand Up @@ -30,7 +30,6 @@
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.unix.Errors;
import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.channel.unix.Socket;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.UncheckedBooleanSupplier;
Expand Down Expand Up @@ -177,13 +176,30 @@ public ChannelFuture joinGroup(
ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
ObjectUtil.checkNotNull(networkInterface, "networkInterface");

if (eventLoop().inEventLoop()) {
joinGroup0(multicastAddress, networkInterface, source, promise);
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
joinGroup0(multicastAddress, networkInterface, source, promise);
}
});
}
return promise;
}

private void joinGroup0(
final InetAddress multicastAddress, final NetworkInterface networkInterface,
final InetAddress source, final ChannelPromise promise) {
assert eventLoop().inEventLoop();

try {
socket.joinGroup(multicastAddress, networkInterface, source);
promise.setSuccess();
} catch (IOException e) {
promise.setFailure(e);
}
return promise;
}

@Override
Expand Down Expand Up @@ -228,13 +244,30 @@ public ChannelFuture leaveGroup(
ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
ObjectUtil.checkNotNull(networkInterface, "networkInterface");

if (eventLoop().inEventLoop()) {
leaveGroup0(multicastAddress, networkInterface, source, promise);
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
leaveGroup0(multicastAddress, networkInterface, source, promise);
}
});
}
return promise;
}

private void leaveGroup0(
final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
final ChannelPromise promise) {
assert eventLoop().inEventLoop();

try {
socket.leaveGroup(multicastAddress, networkInterface, source);
promise.setSuccess();
} catch (IOException e) {
promise.setFailure(e);
}
return promise;
}

@Override
Expand All @@ -252,7 +285,7 @@ public ChannelFuture block(
ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
ObjectUtil.checkNotNull(networkInterface, "networkInterface");

promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
promise.setFailure(new UnsupportedOperationException("Multicast block not supported"));
return promise;
}

Expand Down

0 comments on commit f520061

Please sign in to comment.