Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a synchronous write loop for connections. #1392

Open
wants to merge 1 commit into
base: 6.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,9 @@ projects/Unit*/TestResult.xml
# Vim
.sw?
.*.sw?


#################
## JetBrains Rider
#################
.idea/
9 changes: 8 additions & 1 deletion projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ public TimeSpan ContinuationTimeout
/// </summary>
public bool TopologyRecoveryEnabled { get; set; } = true;

/// <summary>
/// Force writes to the socket to run on a dedicated thread instead of the thread pool. This may prevent
/// timeouts if a large number of blocking requests are going out simultaneously. Will become obsolete
/// once requests become asynchronous. Defaults to false.
/// </summary>
public bool EnableSynchronousWriteLoop { get; set; } = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the default value is false, maybe we could remove the init value

Suggested change
public bool EnableSynchronousWriteLoop { get; set; } = false;
public bool EnableSynchronousWriteLoop { get; set; }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making the default more visible is good idea. I'd not remove it.

Copy link
Contributor

@WeihanLi WeihanLi Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I think it's a kind of code style. I prefer to remove it while I would not argue for this.

Have a test on this, the default value would be removed when built in release mode, but would not when in debug mode


/// <summary>
/// Filter to include/exclude entities from topology recovery.
/// Default filter includes all entities in topology recovery.
Expand Down Expand Up @@ -642,7 +649,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
internal IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint)
{
IFrameHandler fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, _memoryPool, SocketFactory,
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout, EnableSynchronousWriteLoop);
return ConfigureFrameHandler(fh);
}

Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ static class IProtocolExtensions
Func<AddressFamily, ITcpClient> socketFactory,
TimeSpan connectionTimeout,
TimeSpan readTimeout,
TimeSpan writeTimeout)
TimeSpan writeTimeout,
bool enableSynchronousWriteLoop)
{
return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout)
return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout, enableSynchronousWriteLoop)
{
MemoryPool = pool
};
Expand Down
48 changes: 41 additions & 7 deletions projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ class SocketFrameHandler : IFrameHandler
private readonly byte[] _frameHeaderBuffer;
private bool _closed;
private ArrayPool<byte> _pool = ArrayPool<byte>.Shared;
private readonly bool _enableSynchronousWriteLoop;

public SocketFrameHandler(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout, bool enableSynchronousWriteLoop)
{
_endpoint = endpoint;
_enableSynchronousWriteLoop = enableSynchronousWriteLoop;
_frameHeaderBuffer = new byte[6];
var channel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>(
new UnboundedChannelOptions
Expand Down Expand Up @@ -134,7 +136,15 @@ class SocketFrameHandler : IFrameHandler
_writer = new BufferedStream(netstream, _socket.Client.SendBufferSize);

WriteTimeout = writeTimeout;
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
if (_enableSynchronousWriteLoop)
{
TaskCreationOptions tco = TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach;
_writerTask = Task.Factory.StartNew(SynchronousWriteLoop, CancellationToken.None, tco, TaskScheduler.Default);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@WeihanLi WeihanLi Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When just using the Task.Run, the thread would be borrowed from the thread pool, the long-running task on a thread-pool thread would affect the thread-pool thread scheduling.
Here's the LongRunning flag is specified, it would use a separate thread, so I think it's ok.

https://github.com/dotnet/runtime/blob/93381bf9c745e5925fb68ac4da2b38526b36222a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/ThreadPoolTaskScheduler.cs#L42

}
else
{
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
}
}

public AmqpTcpEndpoint Endpoint
Expand Down Expand Up @@ -270,17 +280,41 @@ private async Task WriteLoop()
while (await _channelReader.WaitToReadAsync().ConfigureAwait(false))
{
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
while (_channelReader.TryRead(out var memory))
while (_channelReader.TryRead(out ReadOnlyMemory<byte> memory))
{
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment);
await _writer.WriteAsync(segment.Array, segment.Offset, segment.Count).ConfigureAwait(false);
MemoryPool.Return(segment.Array);
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
{
if (segment.Array != null)
{
await _writer.WriteAsync(segment.Array, segment.Offset, segment.Count).ConfigureAwait(false);
MemoryPool.Return(segment.Array);
}
}
}

await _writer.FlushAsync().ConfigureAwait(false);
}
}

private void SynchronousWriteLoop()
{
while (_channelReader.WaitToReadAsync().AsTask().Result)
{
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
while (_channelReader.TryRead(out ReadOnlyMemory<byte> memory))
{
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
{
if (segment.Array != null)
{
_writer.Write(segment.Array, segment.Offset, segment.Count);
MemoryPool.Return(segment.Array);
}
}
}
_writer.Flush();
}
}

private static bool ShouldTryIPv6(AmqpTcpEndpoint endpoint)
{
return Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork;
Expand Down
1 change: 1 addition & 0 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ namespace RabbitMQ.Client
public RabbitMQ.Client.ICredentialsProvider CredentialsProvider { get; set; }
public RabbitMQ.Client.ICredentialsRefresher CredentialsRefresher { get; set; }
public bool DispatchConsumersAsync { get; set; }
public bool EnableSynchronousWriteLoop { get; set; }
public RabbitMQ.Client.AmqpTcpEndpoint Endpoint { get; set; }
public System.Func<System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint>, RabbitMQ.Client.IEndpointResolver> EndpointResolverFactory { get; set; }
public System.TimeSpan HandshakeContinuationTimeout { get; set; }
Expand Down
14 changes: 14 additions & 0 deletions projects/Unit/TestConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,20 @@ public void TestCreateConnectionAmqpTcpEndpointListAndClientProvidedName()
}
}

[Test]
public void TestCreateConnectionWithSynchronousWriteLoop()
{
var cf = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
HostName = "localhost",
EnableSynchronousWriteLoop = true
};
using (IConnection conn = cf.CreateConnection()){
Assert.AreEqual(5672, conn.Endpoint.Port);
}
}

[Test]
public void TestCreateConnectionUsesDefaultPort()
{
Expand Down