Skip to content

Commit

Permalink
MessageBus: ReceiveAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
phongnguyend committed May 28, 2023
1 parent c312a24 commit e4a3588
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace ClassifiedAds.BackgroundServer.HostedServices;

internal class MessageBusReceiver : BackgroundService
internal sealed class MessageBusReceiver : BackgroundService
{
private readonly ILogger<MessageBusReceiver> _logger;
private readonly IMessageReceiver<FileUploadedEvent> _fileUploadedEventMessageReceiver;
Expand All @@ -25,23 +24,23 @@ internal class MessageBusReceiver : BackgroundService

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_fileUploadedEventMessageReceiver?.Receive(async (data, metaData) =>
_fileUploadedEventMessageReceiver?.ReceiveAsync(async (data, metaData) =>
{
string message = data.FileEntry.Id.ToString();
_logger.LogInformation(message);
_logger.LogInformation("{Message}", message);
await Task.Delay(5000); // simulate long running task
});
}, stoppingToken);

_fileDeletedEventMessageReceiver?.Receive(async (data, metaData) =>
_fileDeletedEventMessageReceiver?.ReceiveAsync(async (data, metaData) =>
{
string message = data.FileEntry.Id.ToString();
_logger.LogInformation(message);
_logger.LogInformation("{Message}", message);
await Task.Delay(5000); // simulate long running task
});
}, stoppingToken);

return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers;

public interface IMessageReceiver<T>
{
void Receive(Action<T, MetaData> action);
Task ReceiveAsync(Func<T, MetaData, Task> action, CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace ClassifiedAds.Infrastructure.MessageBrokers.AzureEventHub;
Expand All @@ -29,29 +30,22 @@ public void Dispose()
{
}

public void Receive(Action<T, MetaData> action)
{
ReceiveAsync(action).GetAwaiter().GetResult();
}

public async Task ReceiveAsync(Action<T, MetaData> action)
public async Task ReceiveAsync(Func<T, MetaData, Task> action, CancellationToken cancellationToken)
{
var storageClient = new BlobContainerClient(_storageConnectionString, _storageContainerName);

Task ProcessEventHandler(ProcessEventArgs eventArgs)
async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
try
{
var messageAsString = Encoding.UTF8.GetString(eventArgs.Data.EventBody);
var message = JsonSerializer.Deserialize<Message<T>>(messageAsString);
action(message.Data, message.MetaData);
await action(message.Data, message.MetaData);
}
catch (Exception ex)
{
Console.WriteLine(ex);
}

return Task.CompletedTask;
}

Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
Expand All @@ -76,6 +70,6 @@ Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)

processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
await processor.StartProcessingAsync();
await processor.StartProcessingAsync(cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace ClassifiedAds.Infrastructure.MessageBrokers.AzureQueue;
Expand All @@ -19,57 +20,47 @@ public AzureQueueReceiver(string connectionString, string queueName, QueueMessag
_messageEncoding = messageEncoding;
}

public void Receive(Action<T, MetaData> action)
public async Task ReceiveAsync(Func<T, MetaData, Task> action, CancellationToken cancellationToken)
{
Task.Factory.StartNew(() => ReceiveAsync(action));
}

private Task ReceiveAsync(Action<T, MetaData> action)
{
return ReceiveStringAsync(retrievedMessage =>
await ReceiveStringAsync(async retrievedMessage =>
{
var message = JsonSerializer.Deserialize<Message<T>>(retrievedMessage);
action(message.Data, message.MetaData);
});
}

public void ReceiveString(Action<string> action)
{
Task.Factory.StartNew(() => ReceiveStringAsync(action));
await action(message.Data, message.MetaData);
}, cancellationToken);
}

private async Task ReceiveStringAsync(Action<string> action)
private async Task ReceiveStringAsync(Func<string, Task> action, CancellationToken cancellationToken)
{
var queueClient = new QueueClient(_connectionString, _queueName, new QueueClientOptions
{
MessageEncoding = _messageEncoding,
});

await queueClient.CreateIfNotExistsAsync();
await queueClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken);

while (true)
while (!cancellationToken.IsCancellationRequested)
{
try
{
var retrievedMessages = (await queueClient.ReceiveMessagesAsync()).Value;
var retrievedMessages = (await queueClient.ReceiveMessagesAsync(cancellationToken)).Value;

if (retrievedMessages.Length > 0)
{
foreach (var retrievedMessage in retrievedMessages)
{
action(retrievedMessage.Body.ToString());
await queueClient.DeleteMessageAsync(retrievedMessage.MessageId, retrievedMessage.PopReceipt);
await action(retrievedMessage.Body.ToString());
await queueClient.DeleteMessageAsync(retrievedMessage.MessageId, retrievedMessage.PopReceipt, cancellationToken);
}
}
else
{
await Task.Delay(1000);
await Task.Delay(1000, cancellationToken);
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
await Task.Delay(1000);
await Task.Delay(1000, cancellationToken);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace ClassifiedAds.Infrastructure.MessageBrokers.AzureServiceBus;
Expand All @@ -18,42 +19,32 @@ public AzureServiceBusReceiver(string connectionString, string queueName)
_queueName = queueName;
}

public void Receive(Action<T, MetaData> action)
public async Task ReceiveAsync(Func<T, MetaData, Task> action, CancellationToken cancellationToken)
{
Task.Factory.StartNew(() => ReceiveAsync(action));
}

private Task ReceiveAsync(Action<T, MetaData> action)
{
return ReceiveStringAsync(retrievedMessage =>
await ReceiveStringAsync(async retrievedMessage =>
{
var message = JsonSerializer.Deserialize<Message<T>>(retrievedMessage);
action(message.Data, message.MetaData);
});
}

public void ReceiveString(Action<string> action)
{
Task.Factory.StartNew(() => ReceiveStringAsync(action));
await action(message.Data, message.MetaData);
}, cancellationToken);
}

private async Task ReceiveStringAsync(Action<string> action)
private async Task ReceiveStringAsync(Func<string, Task> action, CancellationToken cancellationToken)
{
await using var client = new ServiceBusClient(_connectionString);
ServiceBusReceiver receiver = client.CreateReceiver(_queueName);

while (true)
while (!cancellationToken.IsCancellationRequested)
{
var retrievedMessage = await receiver.ReceiveMessageAsync();
var retrievedMessage = await receiver.ReceiveMessageAsync(cancellationToken: cancellationToken);

if (retrievedMessage != null)
{
action(Encoding.UTF8.GetString(retrievedMessage.Body));
await receiver.CompleteMessageAsync(retrievedMessage);
await action(Encoding.UTF8.GetString(retrievedMessage.Body));
await receiver.CompleteMessageAsync(retrievedMessage, cancellationToken);
}
else
{
await Task.Delay(1000);
await Task.Delay(1000, cancellationToken);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace ClassifiedAds.Infrastructure.MessageBrokers.AzureServiceBus;
Expand All @@ -20,42 +21,32 @@ public AzureServiceBusSubscriptionReceiver(string connectionString, string topic
_subscriptionName = subscriptionName;
}

public void Receive(Action<T, MetaData> action)
public async Task ReceiveAsync(Func<T, MetaData, Task> action, CancellationToken cancellationToken)
{
Task.Factory.StartNew(() => ReceiveAsync(action));
}

private Task ReceiveAsync(Action<T, MetaData> action)
{
return ReceiveStringAsync(retrievedMessage =>
await ReceiveStringAsync(async retrievedMessage =>
{
var message = JsonSerializer.Deserialize<Message<T>>(retrievedMessage);
action(message.Data, message.MetaData);
});
}

public void ReceiveString(Action<string> action)
{
Task.Factory.StartNew(() => ReceiveStringAsync(action));
await action(message.Data, message.MetaData);
}, cancellationToken);
}

private async Task ReceiveStringAsync(Action<string> action)
private async Task ReceiveStringAsync(Func<string, Task> action, CancellationToken cancellationToken)
{
await using var client = new ServiceBusClient(_connectionString);
ServiceBusReceiver receiver = client.CreateReceiver(_topicName, _subscriptionName);

while (true)
while (!cancellationToken.IsCancellationRequested)
{
var retrievedMessage = await receiver.ReceiveMessageAsync();
var retrievedMessage = await receiver.ReceiveMessageAsync(cancellationToken: cancellationToken);

if (retrievedMessage != null)
{
action(Encoding.UTF8.GetString(retrievedMessage.Body));
await receiver.CompleteMessageAsync(retrievedMessage);
await action(Encoding.UTF8.GetString(retrievedMessage.Body));
await receiver.CompleteMessageAsync(retrievedMessage, cancellationToken);
}
else
{
await Task.Delay(1000);
await Task.Delay(1000, cancellationToken);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace ClassifiedAds.Infrastructure.MessageBrokers.Fake;

public class FakeReceiver<T> : IMessageReceiver<T>
{
public void Receive(Action<T, MetaData> action)
public Task ReceiveAsync(Func<T, MetaData, Task> action, CancellationToken cancellationToken)
{
// do nothing
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,9 @@ public void Dispose()
_consumer.Dispose();
}

public void Receive(Action<T, MetaData> action)
public async Task ReceiveAsync(Func<T, MetaData, Task> action, CancellationToken cancellationToken)
{
CancellationTokenSource cts = new CancellationTokenSource();
var cancellationToken = cts.Token;

Task.Factory.StartNew(() =>
{
try
{
StartReceiving(action, cancellationToken);
}
catch (OperationCanceledException)
{
Console.WriteLine("Closing consumer.");
_consumer.Close();
}
});
}

private void StartReceiving(Action<T, MetaData> action, CancellationToken cancellationToken)
{
while (true)
while (!cancellationToken.IsCancellationRequested)
{
try
{
Expand All @@ -62,7 +43,7 @@ private void StartReceiving(Action<T, MetaData> action, CancellationToken cancel
}

var message = JsonSerializer.Deserialize<Message<T>>(consumeResult.Message.Value);
action(message.Data, message.MetaData);
await action(message.Data, message.MetaData);
}
catch (ConsumeException e)
{
Expand Down
Loading

0 comments on commit e4a3588

Please sign in to comment.