From aa847f076b853e6ab1a6026dd0e98e51b240e3d6 Mon Sep 17 00:00:00 2001 From: Michael Christiansen Date: Thu, 21 Sep 2023 16:13:10 -0700 Subject: [PATCH] Added a synchronous write loop for connections. --- .gitignore | 6 +++ .../client/api/ConnectionFactory.cs | 9 +++- .../client/impl/IProtocolExtensions.cs | 5 +- .../client/impl/SocketFrameHandler.cs | 48 ++++++++++++++++--- .../Unit/APIApproval.Approve.verified.txt | 1 + projects/Unit/TestConnectionFactory.cs | 14 ++++++ 6 files changed, 73 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 3e8850c4b0..8d9fe0e6bc 100644 --- a/.gitignore +++ b/.gitignore @@ -124,3 +124,9 @@ projects/Unit*/TestResult.xml # Vim .sw? .*.sw? + + +################# +## JetBrains Rider +################# +.idea/ diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index ae8f7c1dad..0a6c7a6c15 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -274,6 +274,13 @@ public TimeSpan ContinuationTimeout /// public bool TopologyRecoveryEnabled { get; set; } = true; + /// + /// Force writes to the socket to run on a dedicated thread instead of the thread pool. This may prevent + /// timeouts if a large number of blocking requests are going out simultaneously. Will become obsolete + /// once requests become asynchronous. Defaults to false. + /// + public bool EnableSynchronousWriteLoop { get; set; } = false; + /// /// Filter to include/exclude entities from topology recovery. /// Default filter includes all entities in topology recovery. @@ -642,7 +649,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c internal IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) { IFrameHandler fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, _memoryPool, SocketFactory, - RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout); + RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout, EnableSynchronousWriteLoop); return ConfigureFrameHandler(fh); } diff --git a/projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs b/projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs index e8c2acd125..8edce34190 100644 --- a/projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs +++ b/projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs @@ -45,9 +45,10 @@ static class IProtocolExtensions Func socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, - TimeSpan writeTimeout) + TimeSpan writeTimeout, + bool enableSynchronousWriteLoop) { - return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout) + return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout, enableSynchronousWriteLoop) { MemoryPool = pool }; diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs index 141e02aee0..9553bfec43 100644 --- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs @@ -78,12 +78,14 @@ class SocketFrameHandler : IFrameHandler private readonly byte[] _frameHeaderBuffer; private bool _closed; private ArrayPool _pool = ArrayPool.Shared; + private readonly bool _enableSynchronousWriteLoop; public SocketFrameHandler(AmqpTcpEndpoint endpoint, Func socketFactory, - TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout) + TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout, bool enableSynchronousWriteLoop) { _endpoint = endpoint; + _enableSynchronousWriteLoop = enableSynchronousWriteLoop; _frameHeaderBuffer = new byte[6]; var channel = Channel.CreateUnbounded>( new UnboundedChannelOptions @@ -134,7 +136,15 @@ class SocketFrameHandler : IFrameHandler _writer = new BufferedStream(netstream, _socket.Client.SendBufferSize); WriteTimeout = writeTimeout; - _writerTask = Task.Run(WriteLoop, CancellationToken.None); + if (_enableSynchronousWriteLoop) + { + TaskCreationOptions tco = TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach; + _writerTask = Task.Factory.StartNew(SynchronousWriteLoop, CancellationToken.None, tco, TaskScheduler.Default); + } + else + { + _writerTask = Task.Run(WriteLoop, CancellationToken.None); + } } public AmqpTcpEndpoint Endpoint @@ -270,17 +280,41 @@ private async Task WriteLoop() while (await _channelReader.WaitToReadAsync().ConfigureAwait(false)) { _socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite); - while (_channelReader.TryRead(out var memory)) + while (_channelReader.TryRead(out ReadOnlyMemory memory)) { - MemoryMarshal.TryGetArray(memory, out ArraySegment segment); - await _writer.WriteAsync(segment.Array, segment.Offset, segment.Count).ConfigureAwait(false); - MemoryPool.Return(segment.Array); + if (MemoryMarshal.TryGetArray(memory, out ArraySegment segment)) + { + if (segment.Array != null) + { + await _writer.WriteAsync(segment.Array, segment.Offset, segment.Count).ConfigureAwait(false); + MemoryPool.Return(segment.Array); + } + } } - await _writer.FlushAsync().ConfigureAwait(false); } } + private void SynchronousWriteLoop() + { + while (_channelReader.WaitToReadAsync().AsTask().Result) + { + _socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite); + while (_channelReader.TryRead(out ReadOnlyMemory memory)) + { + if (MemoryMarshal.TryGetArray(memory, out ArraySegment segment)) + { + if (segment.Array != null) + { + _writer.Write(segment.Array, segment.Offset, segment.Count); + MemoryPool.Return(segment.Array); + } + } + } + _writer.Flush(); + } + } + private static bool ShouldTryIPv6(AmqpTcpEndpoint endpoint) { return Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork; diff --git a/projects/Unit/APIApproval.Approve.verified.txt b/projects/Unit/APIApproval.Approve.verified.txt index d7bcdaab0c..78bb462a28 100644 --- a/projects/Unit/APIApproval.Approve.verified.txt +++ b/projects/Unit/APIApproval.Approve.verified.txt @@ -101,6 +101,7 @@ namespace RabbitMQ.Client public RabbitMQ.Client.ICredentialsProvider CredentialsProvider { get; set; } public RabbitMQ.Client.ICredentialsRefresher CredentialsRefresher { get; set; } public bool DispatchConsumersAsync { get; set; } + public bool EnableSynchronousWriteLoop { get; set; } public RabbitMQ.Client.AmqpTcpEndpoint Endpoint { get; set; } public System.Func, RabbitMQ.Client.IEndpointResolver> EndpointResolverFactory { get; set; } public System.TimeSpan HandshakeContinuationTimeout { get; set; } diff --git a/projects/Unit/TestConnectionFactory.cs b/projects/Unit/TestConnectionFactory.cs index c21b19dedc..f75ea9a880 100644 --- a/projects/Unit/TestConnectionFactory.cs +++ b/projects/Unit/TestConnectionFactory.cs @@ -196,6 +196,20 @@ public void TestCreateConnectionAmqpTcpEndpointListAndClientProvidedName() } } + [Test] + public void TestCreateConnectionWithSynchronousWriteLoop() + { + var cf = new ConnectionFactory + { + AutomaticRecoveryEnabled = true, + HostName = "localhost", + EnableSynchronousWriteLoop = true + }; + using (IConnection conn = cf.CreateConnection()){ + Assert.AreEqual(5672, conn.Endpoint.Port); + } + } + [Test] public void TestCreateConnectionUsesDefaultPort() {