Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc changes #1560

Merged
merged 14 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ jobs:
Receive-Job -Job $tx; `
& "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; `
dotnet test `
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' `
--environment 'PASSWORD=grapefruit' `
--environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" `
Expand Down Expand Up @@ -114,7 +114,12 @@ jobs:
id: install-start-rabbitmq
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
- name: Sequential Integration Tests
run: dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" "${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
run: dotnet test `
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
--environment 'PASSWORD=grapefruit' `
--environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" `
"${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
- name: Maybe upload RabbitMQ logs
if: failure()
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -182,8 +187,8 @@ jobs:
- name: Integration Tests
run: |
dotnet test \
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' \
--environment 'PASSWORD=grapefruit' \
--environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \
Expand Down Expand Up @@ -222,7 +227,10 @@ jobs:
- name: Sequential Integration Tests
run: |
dotnet test \
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \
--environment 'PASSWORD=grapefruit' \
--environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \
"${{ github.workspace }}/projects/Test/SequentialIntegration/SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
- name: Maybe upload RabbitMQ logs
if: failure()
Expand Down
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ build:
test:
dotnet test $(CURDIR)/projects/Test/Unit/Unit.csproj --logger 'console;verbosity=detailed'
dotnet test --environment 'GITHUB_ACTIONS=true' \
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' \
--environment 'PASSWORD=grapefruit' \
--environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \
"$(CURDIR)/projects/Test/Integration/Integration.csproj" --logger 'console;verbosity=detailed'
dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed'
dotnet test --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
--environment 'PASSWORD=grapefruit' \
--environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \
$(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed'

# Note:
# You must have the expected OAuth2 environment set up for this target
Expand Down
88 changes: 88 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client.Impl
{
#nullable enable
internal struct AsyncEventingWrapper<T>
{
private event AsyncEventHandler<T>? _event;
private Delegate[]? _handlers;
private string? _context;
private Func<Exception, string, Task>? _onException;

public readonly bool IsEmpty => _event is null;

public AsyncEventingWrapper(string context, Func<Exception, string, Task> onException)
{
_event = null;
_handlers = null;
_context = context;
_onException = onException;
}

public void AddHandler(AsyncEventHandler<T>? handler)
{
_event += handler;
_handlers = null;
}

public void RemoveHandler(AsyncEventHandler<T>? handler)
{
_event -= handler;
_handlers = null;
}

// Do not make this function async! (This type is a struct that gets copied at the start of an async method => empty _handlers is copied)
public Task InvokeAsync(object sender, T parameter)
{
Delegate[]? handlers = _handlers;
if (handlers is null)
{
handlers = _event?.GetInvocationList();
if (handlers is null)
{
return Task.CompletedTask;
}

_handlers = handlers;
}

return InternalInvoke(handlers, sender, parameter);
}

private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T parameter)
{
foreach (AsyncEventHandler<T> action in handlers.Cast<AsyncEventHandler<T>>())
{
try
{
await action(sender, parameter)
.ConfigureAwait(false);
}
catch (Exception exception)
{
if (_onException != null)
{
await _onException(exception, _context!)
.ConfigureAwait(false);
}
else
{
throw;
}
}
}
}

public void Takeover(in AsyncEventingWrapper<T> other)
{
_event = other._event;
_handlers = other._handlers;
_context = other._context;
_onException = other._onException;
}
}
}
7 changes: 7 additions & 0 deletions projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ await FinishCloseAsync(cts.Token)
_heartbeatReadTimer?.Change((int)Heartbeat.TotalMilliseconds, Timeout.Infinite);
}
}
catch (OperationCanceledException)
{
if (false == _mainLoopCts.IsCancellationRequested)
{
throw;
}
}
catch (ObjectDisposedException)
{
// timer is already disposed,
Expand Down
8 changes: 8 additions & 0 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
Expand Down Expand Up @@ -335,6 +336,13 @@ await _session0.TransmitAsync(method, cancellationToken)
.ConfigureAwait(false);
}
}
catch (ChannelClosedException)
{
if (false == abort)
{
throw;
}
}
catch (AlreadyClosedException)
{
if (false == abort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,49 @@ internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency)

protected override async Task ProcessChannelAsync(CancellationToken token)
{
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
try
{
while (_reader.TryRead(out WorkStruct work))
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
{
using (work)
while (_reader.TryRead(out WorkStruct work))
{
try
using (work)
{
Task task = work.WorkType switch
try
{
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
work.ConsumerTag, work.DeliveryTag, work.Redelivered,
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory),
Task task = work.WorkType switch
{
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
work.ConsumerTag, work.DeliveryTag, work.Redelivered,
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory),

WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag),
WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag),

WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag),
WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag),

WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag),
WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag),

WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason),
WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason),

_ => Task.CompletedTask
};
await task.ConfigureAwait(false);
}
catch (Exception e)
{
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
_ => Task.CompletedTask
};
await task.ConfigureAwait(false);
}
catch (Exception e)
{
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
}
}
}
}
}
catch (OperationCanceledException)
{
if (false == token.IsCancellationRequested)
{
throw;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,55 @@ internal ConsumerDispatcher(ChannelBase channel, int concurrency)

protected override async Task ProcessChannelAsync(CancellationToken token)
{
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
try
{
while (_reader.TryRead(out var work))
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
{
using (work)
while (_reader.TryRead(out WorkStruct work))
{
try
using (work)
{
IBasicConsumer consumer = work.Consumer;
string? consumerTag = work.ConsumerTag;
switch (work.WorkType)
try
{
case WorkType.Deliver:
await consumer.HandleBasicDeliverAsync(
consumerTag, work.DeliveryTag, work.Redelivered,
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory)
.ConfigureAwait(false);
break;
case WorkType.Cancel:
consumer.HandleBasicCancel(consumerTag);
break;
case WorkType.CancelOk:
consumer.HandleBasicCancelOk(consumerTag);
break;
case WorkType.ConsumeOk:
consumer.HandleBasicConsumeOk(consumerTag);
break;
case WorkType.Shutdown:
consumer.HandleChannelShutdown(_channel, work.Reason);
break;
IBasicConsumer consumer = work.Consumer;
string? consumerTag = work.ConsumerTag;
switch (work.WorkType)
{
case WorkType.Deliver:
await consumer.HandleBasicDeliverAsync(
consumerTag, work.DeliveryTag, work.Redelivered,
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory)
.ConfigureAwait(false);
break;
case WorkType.Cancel:
consumer.HandleBasicCancel(consumerTag);
break;
case WorkType.CancelOk:
consumer.HandleBasicCancelOk(consumerTag);
break;
case WorkType.ConsumeOk:
consumer.HandleBasicConsumeOk(consumerTag);
break;
case WorkType.Shutdown:
consumer.HandleChannelShutdown(_channel, work.Reason);
break;
}
}
catch (Exception e)
{
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
}
}
catch (Exception e)
{
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
}
}
}
}
catch (OperationCanceledException)
{
if (false == token.IsCancellationRequested)
{
throw;
}
}
}
}
}
Loading