Skip to content

Commit

Permalink
Merge b221ad0 into d2e8e10
Browse files Browse the repository at this point in the history
  • Loading branch information
TRybina132 committed Jan 3, 2024
2 parents d2e8e10 + b221ad0 commit 2cc6a15
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v1
with:
dotnet-version: 6.0.x
dotnet-version: 8.0.x

# run build and test
- name: Restore dependencies
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/nuget.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v1
with:
dotnet-version: 6.0.x
dotnet-version: 8.0.x

- name: Restore dependencies
run: dotnet restore
Expand Down
4 changes: 2 additions & 2 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageReadmeFile>README.md</PackageReadmeFile>
<Product>Managed Code - Queue</Product>
<Version>0.1.7</Version>
<PackageVersion>0.1.7</PackageVersion>
<Version>8.0.0</Version>
<PackageVersion>8.0.0</PackageVersion>
</PropertyGroup>
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">
<ContinuousIntegrationBuild>true</ContinuousIntegrationBuild>
Expand Down
62 changes: 40 additions & 22 deletions ManagedCode.Queue.AzureServiceBus/AzureServiceBusReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,28 @@

namespace ManagedCode.Queue.AzureServiceBus;

public class AzureServiceBusQueue: IQueueSender, IQueueReceiver, IQueueManager, IAsyncDisposable
public class AzureServiceBusQueue : IQueueSender, IQueueReceiver, IQueueManager, IAsyncDisposable
{
private readonly AzureServiceBusOptions _options;
private readonly ServiceBusClient _client;
private readonly ServiceBusAdministrationClient _adminClient;
private readonly ILogger<AzureServiceBusQueue> _logger;
private Dictionary<string,ServiceBusSender> _senders = new ();

private Dictionary<string, ServiceBusSender> _senders = new();

public AzureServiceBusQueue(ILogger<AzureServiceBusQueue> logger, AzureServiceBusOptions options)
{
_logger = logger;
_options = options;
_client = new ServiceBusClient(options.ConnectionString);
_adminClient = new ServiceBusAdministrationClient(options.ConnectionString);
_options = options;
_client = new ServiceBusClient(options.ConnectionString);
_adminClient = new ServiceBusAdministrationClient(options.ConnectionString);
if (!string.IsNullOrWhiteSpace(_options.ConnectionString))
{
_client = new ServiceBusClient(_options.ConnectionString);
_adminClient = new ServiceBusAdministrationClient(_options.ConnectionString);
}
else
{
_client = new ServiceBusClient(_options.FullyQualifiedNamespace, _options.DefaultAzureCredential);
_adminClient = new ServiceBusAdministrationClient(_options.FullyQualifiedNamespace, _options.DefaultAzureCredential);
}
}

public async ValueTask DisposeAsync()
Expand All @@ -35,10 +39,10 @@ public async ValueTask DisposeAsync()
{
await _sender.DisposeAsync();
}

await _client.DisposeAsync();
}

public Task SendMessageAsync(string queue, Message message, CancellationToken cancellationToken = default)
{
if (!_senders.ContainsKey(queue))
Expand All @@ -47,23 +51,25 @@ public Task SendMessageAsync(string queue, Message message, CancellationToken ca
}

_logger.LogInformation($"SendMessageAsync to queue {queue}");

return _senders[queue].SendMessageAsync(new ServiceBusMessage(message.Body), cancellationToken);
}

public Task SendMessageAsync(string queue, string topic, Message message, CancellationToken cancellationToken = default)
public Task SendMessageAsync(string queue, string topic, Message message,
CancellationToken cancellationToken = default)
{
if (!_senders.ContainsKey(topic))
{
_senders[topic] = _client.CreateSender(topic);
}

_logger.LogInformation($"SendMessageAsync to queue {topic}");

return _senders[topic].SendMessageAsync(new ServiceBusMessage(message.Body), cancellationToken);
}

public async IAsyncEnumerable<Message> ReceiveMessages(string queue, string topic, CancellationToken cancellationToken = default)
public async IAsyncEnumerable<Message> ReceiveMessages(string queue, string topic,
CancellationToken cancellationToken = default)
{
var subscriptionName = topic + "Subscription";

Expand All @@ -77,7 +83,7 @@ public async IAsyncEnumerable<Message> ReceiveMessages(string queue, string topi
await using var processor = _client.CreateProcessor(
topic,
subscriptionName,
new ServiceBusProcessorOptions {ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete});
new ServiceBusProcessorOptions { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete });

await foreach (var message in ProcessMessagesAsync(cancellationToken, processor))
{
Expand All @@ -89,7 +95,7 @@ await foreach (var message in ProcessMessagesAsync(cancellationToken, processor)
public async IAsyncEnumerable<Message> ReceiveMessages(string queue, CancellationToken cancellationToken = default)
{
await using var processor = _client.CreateProcessor(queue,
new ServiceBusProcessorOptions {ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete});
new ServiceBusProcessorOptions { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete });

await foreach (var message in ProcessMessagesAsync(cancellationToken, processor))
{
Expand Down Expand Up @@ -134,8 +140,9 @@ public async Task DeleteTopicAsync(string queue, string topic, CancellationToken
await _adminClient.DeleteTopicAsync(topic, cancellationToken);
}
}

private static async IAsyncEnumerable<Message> ProcessMessagesAsync([EnumeratorCancellation] CancellationToken cancellationToken, ServiceBusProcessor processor)

private static async IAsyncEnumerable<Message> ProcessMessagesAsync(
[EnumeratorCancellation] CancellationToken cancellationToken, ServiceBusProcessor processor)
{
var reusableAwaiter = new ReusableAwaiter<Message>();

Expand Down Expand Up @@ -164,7 +171,7 @@ private static async IAsyncEnumerable<Message> ProcessMessagesAsync([EnumeratorC
reusableAwaiter.Reset();
}

if (message is not null)
if (message is not null)
yield return message;
}

Expand All @@ -177,17 +184,18 @@ Task OnProcessMessageAsync(ProcessMessageEventArgs args)
Id = args.Message.MessageId,
ReceiptHandle = args.Message.To,
},

Body = args.Message.Body.ToString()
});


return Task.CompletedTask;
}

Task OnProcessErrorAsync(ProcessErrorEventArgs args)
{
reusableAwaiter.TrySetResult(new Message{
reusableAwaiter.TrySetResult(new Message
{
Id = new MessageId(),
Body = string.Empty,
Error = new Error(args.Exception)
Expand All @@ -196,4 +204,14 @@ Task OnProcessErrorAsync(ProcessErrorEventArgs args)
return Task.CompletedTask;
}
}

private void InitializeClients()
{
if (!string.IsNullOrWhiteSpace(_options.ConnectionString))
{

return;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.8.0" />
<PackageReference Include="Microsoft.Azure.Management.ServiceBus" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.1" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="Azure.Identity" Version="1.10.4" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.1" />
<PackageReference Include="Microsoft.Azure.Management.ServiceBus" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="All" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
namespace ManagedCode.Queue.AzureServiceBus.Options;
using Azure.Identity;

namespace ManagedCode.Queue.AzureServiceBus.Options;

public class AzureServiceBusOptions
{
public string ConnectionString { get; set; }

public string FullyQualifiedNamespace { get; set; }
public DefaultAzureCredential DefaultAzureCredential { get; set; }
}

2 changes: 1 addition & 1 deletion ManagedCode.Queue.Core/ManagedCode.Queue.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="All" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class InMemoryQueueTests
private readonly IQueueSender _queueSender;
private readonly IQueueReceiver _queueReceiver;
private readonly IQueueManager _queueManager;

public InMemoryQueueTests()
{
var services = new ServiceCollection();
Expand Down
16 changes: 8 additions & 8 deletions ManagedCode.Queue.Tests/ManagedCode.Queue.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<IsPackable>false</IsPackable>
<LangVersion>10</LangVersion>
Expand All @@ -17,20 +17,20 @@
</None>
</ItemGroup>
<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.5.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="FluentAssertions" Version="6.12.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PackageReference Include="xunit" Version="2.6.4" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.6">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="3.1.2">
<PackageReference Include="coverlet.collector" Version="6.0.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.msbuild" Version="3.1.2">
<PackageReference Include="coverlet.msbuild" Version="6.0.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down

0 comments on commit 2cc6a15

Please sign in to comment.