From f520061b071315cbbe06370059e3dba11f811354 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 8 Jul 2022 16:34:44 +0200 Subject: [PATCH] Offload multicast operations to the EventLoop if not called from within an EventLoop thread Motivation: LinuxSocket is not thread-safe and so we need to ensure we only access it from without the EventLoop thread. Modifications: Change EpollDatagramChannel to always dispatch to the EventLoop if not called from the EventLoop thread Result: No more threading issues --- .../channel/epoll/EpollDatagramChannel.java | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/transport-classes-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-classes-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 0ca449b6654..90da64a4996 100644 --- a/transport-classes-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-classes-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -30,7 +30,6 @@ import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.unix.Errors; import io.netty.channel.unix.Errors.NativeIoException; -import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannelUtil; import io.netty.util.ReferenceCountUtil; import io.netty.util.UncheckedBooleanSupplier; @@ -177,13 +176,30 @@ public ChannelFuture joinGroup( ObjectUtil.checkNotNull(multicastAddress, "multicastAddress"); ObjectUtil.checkNotNull(networkInterface, "networkInterface"); + if (eventLoop().inEventLoop()) { + joinGroup0(multicastAddress, networkInterface, source, promise); + } else { + eventLoop().execute(new Runnable() { + @Override + public void run() { + joinGroup0(multicastAddress, networkInterface, source, promise); + } + }); + } + return promise; + } + + private void joinGroup0( + final InetAddress multicastAddress, final NetworkInterface networkInterface, + final InetAddress source, final ChannelPromise promise) { + assert eventLoop().inEventLoop(); + try { socket.joinGroup(multicastAddress, networkInterface, source); promise.setSuccess(); } catch (IOException e) { promise.setFailure(e); } - return promise; } @Override @@ -228,13 +244,30 @@ public ChannelFuture leaveGroup( ObjectUtil.checkNotNull(multicastAddress, "multicastAddress"); ObjectUtil.checkNotNull(networkInterface, "networkInterface"); + if (eventLoop().inEventLoop()) { + leaveGroup0(multicastAddress, networkInterface, source, promise); + } else { + eventLoop().execute(new Runnable() { + @Override + public void run() { + leaveGroup0(multicastAddress, networkInterface, source, promise); + } + }); + } + return promise; + } + + private void leaveGroup0( + final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source, + final ChannelPromise promise) { + assert eventLoop().inEventLoop(); + try { socket.leaveGroup(multicastAddress, networkInterface, source); promise.setSuccess(); } catch (IOException e) { promise.setFailure(e); } - return promise; } @Override @@ -252,7 +285,7 @@ public ChannelFuture block( ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock"); ObjectUtil.checkNotNull(networkInterface, "networkInterface"); - promise.setFailure(new UnsupportedOperationException("Multicast not supported")); + promise.setFailure(new UnsupportedOperationException("Multicast block not supported")); return promise; }