diff --git a/.github/workflows/main-ci.yaml b/.github/workflows/aqs-sqs-ci.yaml similarity index 69% rename from .github/workflows/main-ci.yaml rename to .github/workflows/aqs-sqs-ci.yaml index 540d443..ae81074 100644 --- a/.github/workflows/main-ci.yaml +++ b/.github/workflows/aqs-sqs-ci.yaml @@ -1,7 +1,10 @@ on: - push: + pull_request: branches: - - main + - "main" + paths: + - "lib/Dequeueable.AmazonSQS/**" + - "tests/Dequeueable.AmazonSQS.**/" jobs: build: diff --git a/.github/workflows/aqs-sqs-publish.yaml b/.github/workflows/aqs-sqs-publish.yaml new file mode 100644 index 0000000..2c758b5 --- /dev/null +++ b/.github/workflows/aqs-sqs-publish.yaml @@ -0,0 +1,50 @@ +on: + push: + tags: + - "aws-sqs_v[0-9]+.[0-9]+.[0-9]" + - "aws-sqs_v[0-9]+.[0-9]+.[0-9]-preview*" + - "aws-sqs_v[0-9]+.[0-9]+.[0-9]-beta*" + - "aws-sqs_v[0-9]+.[0-9]+.[0-9]-alpha*" + +jobs: + build: + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - name: Show the Github context for the triggered event + run: echo "$GITHUB_CONTEXT" + env: + GITHUB_CONTEXT: ${{ toJson(github) }} + - name: Set VERSION variable from tag + run: echo "VERSION=${GITHUB_REF_NAME/aws-sqs_v/}" >> $GITHUB_ENV + - name: Show default environment variables + run: | + echo "The job_id is: $GITHUB_JOB" # reference the default environment variables + echo "The id of this action is: $GITHUB_ACTION" # reference the default environment variables + echo "The run id is: $GITHUB_RUN_ID" + echo "The GitHub Actor's username is: $GITHUB_ACTOR" + echo "GitHub SHA: $GITHUB_SHA" + echo "GitHub REF: $GITHUB_REF" + echo "GitHub REF NAME: $GITHUB_REF_NAME" + echo "VERSION: $VERSION" + echo "dotnet version ${DOTNET_VERSION}" + env: + DOTNET_VERSION: ${{ vars.DOTNET_VERSION }} + - uses: actions/checkout@v3 + - name: Setup .NET Core SDK + uses: actions/setup-dotnet@v3 + with: + dotnet-version: "6.x.x" + + - name: Install dependencies + run: dotnet restore + - name: Build + run: dotnet build --configuration Release /p:Version=${VERSION} --no-restore + - name: Test + run: dotnet test --configuration Release /p:Version=${VERSION} --no-build + - name: Pack + run: dotnet pack lib/Dequeueable.AmazonSQS/Dequeueable.AmazonSQS.csproj --configuration Release /p:Version=${VERSION} --no-build --output . + - name: Push + run: dotnet nuget push Dequeueable.AmazonSQS.${VERSION}.nupkg --source https://api.nuget.org/v3/index.json --api-key ${NUGET_KEY} + env: + NUGET_KEY: ${{secrets.NUGET_KEY}} diff --git a/Dequeueable.sln b/Dequeueable.sln index 23ee326..2ea143a 100644 --- a/Dequeueable.sln +++ b/Dequeueable.sln @@ -17,6 +17,16 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Dequeueable.AzureQueueStora EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Dequeueable.AzureQueueStorage.SampleListener", "samples\Dequeueable.AzureQueueStorage.SampleListener\Dequeueable.AzureQueueStorage.SampleListener.csproj", "{9663C5AD-A70B-49B9-A764-599119EEDEDA}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Dequeueable.AmazonSQS", "lib\Dequeueable.AmazonSQS\Dequeueable.AmazonSQS.csproj", "{952BD452-FFBE-4253-81BE-A85B955EC94A}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Dequeueable.AmazonSQS.SampleJob", "samples\Dequeueable.AmazonSQS.SampleJob\Dequeueable.AmazonSQS.SampleJob.csproj", "{8B16AFB4-6124-4D59-86CE-386559E51C7C}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Dequeueable.AmazonSQS.SampleListener", "samples\Dequeueable.AmazonSQS.SampleListener\Dequeueable.AmazonSQS.SampleListener.csproj", "{8EBE6EF9-3A32-4C5D-B617-D231CCDC332D}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Dequeueable.AmazonSQS.UnitTests", "tests\Dequeueable.AmazonSQS.UnitTests\Dequeueable.AmazonSQS.UnitTests.csproj", "{3B23FE9B-4AEC-4EF0-8B7C-7A14F36B09DF}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dequeueable.AmazonSQS.IntegrationTests", "tests\Dequeueable.AmazonSQS.IntegrationTests\Dequeueable.AmazonSQS.IntegrationTests.csproj", "{70FB0E93-C10E-45CB-8E82-5C1370B92CE2}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -43,6 +53,26 @@ Global {9663C5AD-A70B-49B9-A764-599119EEDEDA}.Debug|Any CPU.Build.0 = Debug|Any CPU {9663C5AD-A70B-49B9-A764-599119EEDEDA}.Release|Any CPU.ActiveCfg = Release|Any CPU {9663C5AD-A70B-49B9-A764-599119EEDEDA}.Release|Any CPU.Build.0 = Release|Any CPU + {952BD452-FFBE-4253-81BE-A85B955EC94A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {952BD452-FFBE-4253-81BE-A85B955EC94A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {952BD452-FFBE-4253-81BE-A85B955EC94A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {952BD452-FFBE-4253-81BE-A85B955EC94A}.Release|Any CPU.Build.0 = Release|Any CPU + {8B16AFB4-6124-4D59-86CE-386559E51C7C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8B16AFB4-6124-4D59-86CE-386559E51C7C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8B16AFB4-6124-4D59-86CE-386559E51C7C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8B16AFB4-6124-4D59-86CE-386559E51C7C}.Release|Any CPU.Build.0 = Release|Any CPU + {8EBE6EF9-3A32-4C5D-B617-D231CCDC332D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8EBE6EF9-3A32-4C5D-B617-D231CCDC332D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8EBE6EF9-3A32-4C5D-B617-D231CCDC332D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8EBE6EF9-3A32-4C5D-B617-D231CCDC332D}.Release|Any CPU.Build.0 = Release|Any CPU + {3B23FE9B-4AEC-4EF0-8B7C-7A14F36B09DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3B23FE9B-4AEC-4EF0-8B7C-7A14F36B09DF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3B23FE9B-4AEC-4EF0-8B7C-7A14F36B09DF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3B23FE9B-4AEC-4EF0-8B7C-7A14F36B09DF}.Release|Any CPU.Build.0 = Release|Any CPU + {70FB0E93-C10E-45CB-8E82-5C1370B92CE2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {70FB0E93-C10E-45CB-8E82-5C1370B92CE2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {70FB0E93-C10E-45CB-8E82-5C1370B92CE2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {70FB0E93-C10E-45CB-8E82-5C1370B92CE2}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -52,6 +82,10 @@ Global {B89684E3-D232-4819-9B1F-8CF0CD937E8C} = {BF8F3BFE-5BF7-472E-A8B5-6F9957FDE3C0} {347E828A-0A4B-493E-BE44-E1A387323DC3} = {F6808A89-2B57-49B1-9D5C-E4ACE0CEC44A} {9663C5AD-A70B-49B9-A764-599119EEDEDA} = {F6808A89-2B57-49B1-9D5C-E4ACE0CEC44A} + {8B16AFB4-6124-4D59-86CE-386559E51C7C} = {F6808A89-2B57-49B1-9D5C-E4ACE0CEC44A} + {8EBE6EF9-3A32-4C5D-B617-D231CCDC332D} = {F6808A89-2B57-49B1-9D5C-E4ACE0CEC44A} + {3B23FE9B-4AEC-4EF0-8B7C-7A14F36B09DF} = {BF8F3BFE-5BF7-472E-A8B5-6F9957FDE3C0} + {70FB0E93-C10E-45CB-8E82-5C1370B92CE2} = {BF8F3BFE-5BF7-472E-A8B5-6F9957FDE3C0} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {FA24EF80-390F-45DE-B60C-2632F0952E02} diff --git a/README.md b/README.md index 6189a58..9a65e85 100644 --- a/README.md +++ b/README.md @@ -6,3 +6,5 @@ A framework to simplify event driven applications in containerized host environm ## Libraries - [Azure Queue Storage](lib/Dequeueable.AzureQueueStorage/README.md) Framework that handles the messages on the Azure Queue. A function will be invoked when new messages are detected on the queue. Dequeueing, exception handling and distributed singleton are handled for you. +- [Amazon Simple Queue Service](lib/Dequeueable.AmazonSQS/README.md) +Framework that handles the messages on the AWS SQS. A function will be invoked when new messages are detected on the queue. diff --git a/lib/Dequeueable.AmazonSQS/Configurations/HostBuilder.cs b/lib/Dequeueable.AmazonSQS/Configurations/HostBuilder.cs new file mode 100644 index 0000000..5ac93ad --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Configurations/HostBuilder.cs @@ -0,0 +1,85 @@ +using Dequeueable.AmazonSQS.Services.Hosts; +using Dequeueable.AmazonSQS.Services.Queues; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; + +namespace Dequeueable.AmazonSQS.Configurations +{ + internal class HostBuilder : IDequeueableHostBuilder + { + private readonly IServiceCollection _services; + + public HostBuilder(IServiceCollection services) + { + _services = services; + } + + public IDequeueableHostBuilder RunAsJob(Action? options = null) + { + _services.AddOptions().BindConfiguration(HostOptions.Dequeueable) + .ValidateDataAnnotations() + .ValidateOnStart(); + + if (options is not null) + { + _services.Configure(options); + } + + _services.AddHostedService(); + _services.AddSingleton(); + + _services.TryAddSingleton(provider => + { + var opt = provider.GetRequiredService>(); + return opt.Value; + }); + + return this; + } + + public IDequeueableHostBuilder RunAsListener(Action? options = null) + { + _services.AddOptions().BindConfiguration(HostOptions.Dequeueable) + .Validate(ListenerHostOptions.ValidatePollingInterval, $"The '{nameof(ListenerHostOptions.MinimumPollingIntervalInMilliseconds)}' must not be greater than the '{nameof(ListenerHostOptions.MaximumPollingIntervalInMilliseconds)}'.") + .Validate(ListenerHostOptions.ValidateNewBatchThreshold, $"The '{nameof(ListenerHostOptions.NewBatchThreshold)}' must not be greater than the '{nameof(ListenerHostOptions.BatchSize)}'.") + .ValidateDataAnnotations() + .ValidateOnStart(); + + if (options is not null) + { + _services.Configure(options); + } + + _services.AddHostedService(); + _services.AddSingleton(); + + _services.TryAddSingleton(provider => + { + var opt = provider.GetRequiredService>(); + return opt.Value; + }); + + return this; + } + + public IDequeueableHostBuilder AsSingleton() + { + _services.AddTransient(); + _services.AddTransient(); + _services.AddTransient(provider => + { + var singletonManager = provider.GetRequiredService(); + var executor = provider.GetRequiredService(); + + return new SingletonQueueMessageExecutor(executor, singletonManager); + }); + + _services.PostConfigure(options => options.AttributeNames = options.AttributeNames.Concat(new List { "MessageGroupId" }).ToList()); + _services.PostConfigure(options => options.AttributeNames = options.AttributeNames.Concat(new List { "MessageGroupId" }).ToList()); + _services.PostConfigure(options => options.NewBatchThreshold = 0); + + return this; + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Configurations/HostOptions.cs b/lib/Dequeueable.AmazonSQS/Configurations/HostOptions.cs new file mode 100644 index 0000000..0970321 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Configurations/HostOptions.cs @@ -0,0 +1,49 @@ +using System.ComponentModel.DataAnnotations; + +namespace Dequeueable.AmazonSQS.Configurations +{ + /// + /// HostOptions to configure the settings of the host + /// + public class HostOptions : IHostOptions + { + private List _attributeNames = new(); + + /// + /// Constant string used to bind the appsettings.*.json + /// + public static string Dequeueable => nameof(Dequeueable); + + /// + /// The URL of the Amazon SQS queue from which messages are received. + /// + [Required(AllowEmptyStrings = false, ErrorMessage = "{0} cannot be empty.")] + public string QueueUrl { get; set; } = string.Empty; + + /// + /// The maximum number of messages processed in parallel. Valid values: 1 to 10. + /// + [Range(1, 10, + ErrorMessage = "Value for {0} must be between {1} and {2}.")] + public int BatchSize { get; set; } = 4; + + /// + /// The timeout after the queue message is visible again for other services. Valid values: 30 to 43200 (12 hours) seconds. + /// + [Range(30, 43200, + ErrorMessage = "Value for {0} must be between {1} and {2}.")] + public int VisibilityTimeoutInSeconds { get; set; } = 300; + + /// + /// A list of attributes that need to be returned along with each message . + /// + public List AttributeNames + { + get => _attributeNames.Distinct().ToList(); + set + { + _attributeNames = value ?? new(); + } + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Configurations/IDequeueableHostBuilder.cs b/lib/Dequeueable.AmazonSQS/Configurations/IDequeueableHostBuilder.cs new file mode 100644 index 0000000..ecf8547 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Configurations/IDequeueableHostBuilder.cs @@ -0,0 +1,26 @@ +namespace Dequeueable.AmazonSQS.Configurations +{ + /// + /// Interface to builds and setup the dequeueable host + /// + public interface IDequeueableHostBuilder + { + /// + /// Runs the function as a Distributed Singleton. Queue messages containing the same MessageGroupId will not run in parallel + /// + /// + IDequeueableHostBuilder AsSingleton(); + /// + /// The application will run as a job, from start to finish, and will automatically shutdown when the messages are executed. + /// + /// Action to configure the + /// + IDequeueableHostBuilder RunAsJob(Action? options = null); + /// + /// The application will run as a listener, the queue will periodically be polled for new message. + /// + /// Action to configure the + /// + IDequeueableHostBuilder RunAsListener(Action? options = null); + } +} diff --git a/lib/Dequeueable.AmazonSQS/Configurations/IHostOptions.cs b/lib/Dequeueable.AmazonSQS/Configurations/IHostOptions.cs new file mode 100644 index 0000000..364bfa8 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Configurations/IHostOptions.cs @@ -0,0 +1,25 @@ +namespace Dequeueable.AmazonSQS.Configurations +{ + /// + /// Use the IHostOptions to configure the settings of the host + /// + public interface IHostOptions + { + /// + /// The maximum number of messages processed in parallel. Valid values: 1 to 10. + /// + int BatchSize { get; set; } + /// + /// The timeout after the queue message is visible again for other services. Valid values: 30 to 43200 (12 hours) seconds. + /// + int VisibilityTimeoutInSeconds { get; set; } + /// + /// The URL of the Amazon SQS queue from which messages are received. + /// + string QueueUrl { get; set; } + /// + /// A list of attributes that need to be returned along with each message . + /// + List AttributeNames { get; set; } + } +} \ No newline at end of file diff --git a/lib/Dequeueable.AmazonSQS/Configurations/ListenerHostOptions.cs b/lib/Dequeueable.AmazonSQS/Configurations/ListenerHostOptions.cs new file mode 100644 index 0000000..48445a4 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Configurations/ListenerHostOptions.cs @@ -0,0 +1,41 @@ +using System.ComponentModel.DataAnnotations; + +namespace Dequeueable.AmazonSQS.Configurations +{ + /// + /// HostOptions to configure the settings of the host + /// + public class ListenerHostOptions : HostOptions + { + /// + /// The threshold at which a new batch of messages will be fetched. + /// + public int? NewBatchThreshold { get; set; } + + /// + /// The minimum polling interval to check the queue for new messages. + /// + [Range(1, long.MaxValue, ErrorMessage = "Value for {0} must be lower than {1}.")] + public long MinimumPollingIntervalInMilliseconds { get; set; } = 5; + + /// + /// The maximum polling interval to check the queue for new messages. + /// + [Range(1, long.MaxValue, ErrorMessage = "Value for {0} must be lower than {1}.")] + public long MaximumPollingIntervalInMilliseconds { get; set; } = 10000; + + /// + /// The delta used to randomize the polling interval. + /// + public TimeSpan? DeltaBackOff { get; set; } + + internal static bool ValidatePollingInterval(ListenerHostOptions options) + { + return options.MinimumPollingIntervalInMilliseconds < options.MaximumPollingIntervalInMilliseconds; + } + internal static bool ValidateNewBatchThreshold(ListenerHostOptions options) + { + return options.NewBatchThreshold is null || options.NewBatchThreshold <= options.BatchSize; + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Dequeueable.AmazonSQS.csproj b/lib/Dequeueable.AmazonSQS/Dequeueable.AmazonSQS.csproj new file mode 100644 index 0000000..7b0c57b --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Dequeueable.AmazonSQS.csproj @@ -0,0 +1,35 @@ + + + + Dequeueable.AmazonSQS + 1.0.0 + Lennart ten Wolde + Amazon;AWS;Simple Queue Service;SQS;Queues;Queue;QueueMessage;QueueMessages;Message;Events;Event + Dequeueable for AWS Simple Queue Service library + + This client library simplifies dequeuing queue messages from Amazon Simple Queue Service. It makes it easy to retrieve messages from the queue: dequeueing, exception handling and distributed singleton are handled for you. + + MIT + ./README.md + https://github.com/lenndewolten/Dequeueable + Git + https://github.com/lenndewolten/Dequeueable + https://github.com/lenndewolten/Dequeueable + true + + + + + + + + + + + + + + + + + diff --git a/lib/Dequeueable.AmazonSQS/Extentions/ServiceCollectionExtentions.cs b/lib/Dequeueable.AmazonSQS/Extentions/ServiceCollectionExtentions.cs new file mode 100644 index 0000000..a67233a --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Extentions/ServiceCollectionExtentions.cs @@ -0,0 +1,35 @@ +using Dequeueable.AmazonSQS.Configurations; +using Dequeueable.AmazonSQS.Factories; +using Dequeueable.AmazonSQS.Services.Queues; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace Dequeueable.AmazonSQS.Extentions +{ + /// + /// Extension methods for adding configuration related of the Queue services to the DI container via . + /// + public static class ServiceCollectionExtentions + { + /// + /// Adds the Amazon Simple Queue Service and the function of the type specified in to the + /// specified . + /// + /// The type implementing the + /// The to register with. + /// + public static IDequeueableHostBuilder AddAmazonSQSServices(this IServiceCollection services) + where TFunction : class, IAmazonSQSFunction + { + services.AddSingleton(); + services.TryAddSingleton(); + services.AddTransient(); + services.AddTransient(); + + services.AddTransient(); + services.AddTransient(); + + return new HostBuilder(services); + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Factories/AmazonSQSClientFactory.cs b/lib/Dequeueable.AmazonSQS/Factories/AmazonSQSClientFactory.cs new file mode 100644 index 0000000..b303ef8 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Factories/AmazonSQSClientFactory.cs @@ -0,0 +1,10 @@ +using Amazon.SQS; + +namespace Dequeueable.AmazonSQS.Factories +{ + internal sealed class AmazonSQSClientFactory : IAmazonSQSClientFactory + { + private AmazonSQSClient? _client; + public AmazonSQSClient Create() => _client ??= new(); + } +} diff --git a/lib/Dequeueable.AmazonSQS/Factories/IAmazonSQSClientFactory.cs b/lib/Dequeueable.AmazonSQS/Factories/IAmazonSQSClientFactory.cs new file mode 100644 index 0000000..8da79db --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Factories/IAmazonSQSClientFactory.cs @@ -0,0 +1,15 @@ +using Amazon.SQS; + +namespace Dequeueable.AmazonSQS.Factories +{ + /// + /// Factory used to create the . This interface can be used when mocking the queue client or when you want to override the default client setup. + /// + public interface IAmazonSQSClientFactory + { + /// + /// Creates the + /// + AmazonSQSClient Create(); + } +} diff --git a/lib/Dequeueable.AmazonSQS/IAmazonSQSFunction.cs b/lib/Dequeueable.AmazonSQS/IAmazonSQSFunction.cs new file mode 100644 index 0000000..ea06150 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/IAmazonSQSFunction.cs @@ -0,0 +1,23 @@ + +using Dequeueable.AmazonSQS.Models; + +namespace Dequeueable.AmazonSQS +{ + /// + /// Interface to bind a function to the framework + /// + public interface IAmazonSQSFunction + { + /// + /// Interface that binds the class that will be invoked when a message is retrieved from the queue + /// + /// + /// The Queue Message on the queue + /// + /// + /// to propagate + /// notifications that the operation should be cancelled. + /// + Task ExecuteAsync(Message message, CancellationToken cancellationToken); + } +} diff --git a/lib/Dequeueable.AmazonSQS/Models/Message.cs b/lib/Dequeueable.AmazonSQS/Models/Message.cs new file mode 100644 index 0000000..05b7d94 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Models/Message.cs @@ -0,0 +1,70 @@ +namespace Dequeueable.AmazonSQS.Models +{ + /// + /// Queue message retrieved from the qeueue. + /// + public class Message + { + /// + /// The unqiue id of the message + /// + public string MessageId { get; } + + /// + /// MessageGroupId is the tag that specifies that a message belongs to a specific message group. This is used as scope for singletons. + /// + public string? MessageGroupId => Attributes.GetValueOrDefault("MessageGroupId"); + + /// + /// Unique receipt of the Queue Message. + /// + public string ReceiptHandle { get; internal set; } + + /// + /// of the queue message when it is visibile again for other clients. + /// + public DateTimeOffset NextVisibleOn { get; } + + /// + /// of the body. + /// + public BinaryData Body { get; } + + /// + /// A list of attributes that need to be returned along with each message . + /// + public Dictionary Attributes { get; } = new(); + + /// + /// Creates an instance of the queue message. + /// + /// Id of the Queue Message. + /// Unique receipt of the Queue Message. + /// of the queue message when it is visibile again for other clients. + /// of the body. + public Message(string messageId, string receiptHandle, DateTimeOffset nextVisibleOn, BinaryData body) + { + MessageId = messageId; + ReceiptHandle = receiptHandle; + NextVisibleOn = nextVisibleOn; + Body = body; + } + + /// + /// Creates an instance of the queue message. + /// + /// Id the Queue Message. + /// Unique receipt of the Queue Message. + /// of the queue message when it is visibile again for other clients. + /// of the body. + /// A list of attributes that need to be returned along with each message . + public Message(string messageId, string receiptHandle, DateTimeOffset nextVisibleOn, BinaryData body, Dictionary attributes) + { + MessageId = messageId; + ReceiptHandle = receiptHandle; + NextVisibleOn = nextVisibleOn; + Body = body; + Attributes = attributes; + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/README.md b/lib/Dequeueable.AmazonSQS/README.md new file mode 100644 index 0000000..96b5b2a --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/README.md @@ -0,0 +1,149 @@ +# Dequeueable.AmazonSQS + +This project is an **opinionated** framework build for the Amazon (AWS) Simple Queue Service (SQS): +- Build as a Console App +- Being able to use optimized alpine/dotnet images +- Have the freedom to use Keda or any other scalers to retrieve queue messages + +This framework can run as a **listener** or **job**: +- **Listener:** +Highly scalable queue listener that will be invoked automatically when new messages are detected on the SQS. +- **Job:** +Framework that depends on external queue triggers, eg; KEDA. When the host is started, new messages on the SQS are being retrieved and executed. After execution the host will shutdown automatically. + +## Getting started + +Scaffold a new project, you can either use a console or web app. +1. Add a class that implements the `IAmazonSQSFunction`. +2. Add `.AddAmazonSQSServices()` in the DI container. +3. Specify how you want to run your service: + - Add `.RunAsJob()` in the DI container of your app to run the host as a job. + - Add `.RunAsListener()` in the DI container of your app to run the app as a back ground listener. + + +*function.cs*: +```csharp +internal class TestFunction : IAmazonSQSFunction + { + public Task ExecuteAsync(Message message, CancellationToken cancellationToken) + { + // Put your magic here! + } + } +``` + +*program.cs*: +```csharp +await Host.CreateDefaultBuilder(args) +.ConfigureServices(services => +{ + services + .AddAmazonSQSServices() + .RunAsJob(options => + { + options.VisibilityTimeoutInSeconds = 300; + options.BatchSize = 4; + }); +}) +.RunConsoleAsync(); + +``` + +### Configurations +You can configure the host via the `appsettings.json` or the `IOptions` pattern during registration. + +**Appsettings** + +Use the `Dequeueable` section to configure the settings: + +```json +"Dequeueable": { + "QueueUrl": "https://sqs..amazonaws.com//" + } +``` + +**Options** + +```csharp +await Host.CreateDefaultBuilder(args) +.ConfigureServices(services => +{ + services + .AddAmazonSQSServices() + // .RunAsListener(options => + .RunAsJob(options => + { + options.VisibilityTimeoutInSeconds = 300; + options.BatchSize = 4; + }); +}) +.RunConsoleAsync(); + +``` + +### Settings +The library uses the `IOptions` pattern to inject the configured app settings. + +#### Job options +Setting | Description | Default | Required +--- | --- | --- | --- | +QueueUrl | The URL of the Amazon SQS queue from which messages are received. | | Yes | +BatchSize | The maximum number of messages processed in parallel. Valid values: 1 to 10. | 4 | No | +MaxDequeueCount | Max dequeue count before moving to the poison queue. | 5 | No | +VisibilityTimeoutInSeconds | The timeout after the queue message is visible again for other services. Valid values: 30 to 43200 (12 hours) seconds. | 300 | No | +AttributeNames | A list of attributes that need to be returned along with each message | [] | No | + +#### Listener options +Setting | Description | Default | Required +--- | --- | --- | --- | +QueueUrl | The URL of the Amazon SQS queue from which messages are received. | | Yes | +BatchSize | The maximum number of messages processed in parallel. Valid values: 1 to 10. | 4 | No | +NewBatchThreshold | The threshold at which a new batch of messages will be fetched. | BatchSize / 2 | No | +MaxDequeueCount | Max dequeue count before moving to the poison queue. | 5 | No | +VisibilityTimeoutInSeconds | The timeout after the queue message is visible again for other services. Valid values: 30 to 43200 (12 hours) seconds. | 300 | No | +MinimumPollingIntervalInMilliseconds | The minimum polling interval to check the queue for new messages. | 5 | No | +VisibilityTimeoutInSeconds | The maximum polling interval to check the queue for new messages. | 10000 | No | +DeltaBackOff | The delta used to randomize the polling interval. | MinimumPollingIntervalInMilliseconds | No | +AttributeNames | A list of attributes that need to be returned along with each message | [] | No | + +## Authentication +The queue client is constructed with the credentials loaded from the application's default configuration, using the `FallbackCredentialsFactory.GetCredentials()`. + +### Custom AmazonSQSClientFactory +There are plenty ways to construct the AmazonSQSClient, and not all are by default supported. You can override the default implementations to retrieve the queue client by implementing the `IAmazonSQSClientFactory`. You still should register your custom factory in your DI container, specific registration order is not needed: + +```csharp +internal class MyCustomQueueFactory : IAmazonSQSClientFactory + { + private AmazonSQSClient? _client; + public AmazonSQSClient Create() => _client ??= new AmazonSQSClient(Amazon.RegionEndpoint.CNNorth1); + } +``` + +## Singleton +The application can run as distributed singleton. The Amazon SQS message group ID is used to processed the messages one by one, in a strict order relative to the message group. Therefore a fifo queue is required. +Both the Job as the Listener services can run as singleton by defining this during registration: + +```csharp +await Host.CreateDefaultBuilder(args) +.ConfigureServices(services => +{ + services + .AddAmazonSQSServices() + .RunAsJob(options => + { + // some options + }) + .AsSingleton(); +}) +.RunConsoleAsync(); +``` + +## Timeouts + +### Visibility Timeout Queue Message +The visibility timeout of the queue messages is automatically updated. It will be updated when the half `VisibilityTimeoutInSeconds` option is reached. Choose this setting wisely to prevent talkative hosts. When renewing the timeout fails, the host cannot guarantee if the message is executed only once. Therefore the CancelationToken is set to Cancelled. It is up to you how to handle this scenario! + +## Sample +- [Job Console app](https://github.com/lenndewolten/Dequeueable/blob/main/samples/Dequeueable.AmazonSQS.SampleJob/README.md) +- [Listener Console app](https://github.com/lenndewolten/Dequeueable/blob/main/samples/Dequeueable.AmazonSQS.SampleListener/README.md) \ No newline at end of file diff --git a/lib/Dequeueable.AmazonSQS/Services/Hosts/IHostExecutor.cs b/lib/Dequeueable.AmazonSQS/Services/Hosts/IHostExecutor.cs new file mode 100644 index 0000000..0296c24 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Hosts/IHostExecutor.cs @@ -0,0 +1,17 @@ +namespace Dequeueable.AmazonSQS.Services.Hosts +{ + /// + /// Inteface that will be called when the host is started. This interface can be used for integration testing. + /// + public interface IHostExecutor + { + /// + /// The method that will be called when the host is started. + /// + /// + /// to propagate + /// notifications that the operation should be cancelled. + /// + Task HandleAsync(CancellationToken cancellationToken); + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Hosts/JobExecutor.cs b/lib/Dequeueable.AmazonSQS/Services/Hosts/JobExecutor.cs new file mode 100644 index 0000000..1440f70 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Hosts/JobExecutor.cs @@ -0,0 +1,48 @@ +using Dequeueable.AmazonSQS.Services.Queues; +using Microsoft.Extensions.Logging; + +namespace Dequeueable.AmazonSQS.Services.Hosts +{ + internal sealed class JobExecutor : IHostExecutor + { + private readonly IQueueMessageManager _queueMessageManager; + private readonly IQueueMessageHandler _queueMessageHandler; + private readonly ILogger _logger; + + private readonly List _processing = new(); + + public JobExecutor(IQueueMessageManager queueMessageManager, IQueueMessageHandler queueMessageHandler, ILogger logger) + { + _queueMessageManager = queueMessageManager; + _queueMessageHandler = queueMessageHandler; + _logger = logger; + } + + public async Task HandleAsync(CancellationToken cancellationToken) + { + var messages = await _queueMessageManager.RetrieveMessagesAsync(cancellationToken: cancellationToken); + var messagesFound = messages.Length > 0; + if (messagesFound) + { + await HandleMessages(messages!, cancellationToken); + } + else + { + _logger.LogDebug("No messages found"); + } + + return; + } + + private Task HandleMessages(Models.Message[] messages, CancellationToken cancellationToken) + { + foreach (var message in messages) + { + var task = _queueMessageHandler.HandleAsync(message, cancellationToken); + _processing.Add(task); + } + + return Task.WhenAll(_processing); + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Hosts/JobHost.cs b/lib/Dequeueable.AmazonSQS/Services/Hosts/JobHost.cs new file mode 100644 index 0000000..3cd7fc3 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Hosts/JobHost.cs @@ -0,0 +1,39 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Dequeueable.AmazonSQS.Services.Hosts +{ + internal sealed class JobHost : BackgroundService + { + private readonly IHostExecutor _hostExecutor; + private readonly IHostApplicationLifetime _hostApplicationLifetime; + private readonly ILogger _logger; + + public JobHost(IHostExecutor hostExecutor, IHostApplicationLifetime hostApplicationLifetime, ILogger logger) + { + _hostExecutor = hostExecutor; + _hostApplicationLifetime = hostApplicationLifetime; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + try + { + await _hostExecutor.HandleAsync(stoppingToken); + } + catch (Exception ex) when (ex is not TaskCanceledException) + { + _logger.LogError(ex, "Unhandled exception occurred, shutting down the host"); + throw; + } + finally + { + if (!stoppingToken.IsCancellationRequested) + { + _hostApplicationLifetime.StopApplication(); + } + } + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Hosts/QueueListenerExecutor.cs b/lib/Dequeueable.AmazonSQS/Services/Hosts/QueueListenerExecutor.cs new file mode 100644 index 0000000..43f6b08 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Hosts/QueueListenerExecutor.cs @@ -0,0 +1,77 @@ +using Dequeueable.AmazonSQS.Configurations; +using Dequeueable.AmazonSQS.Services.Queues; +using Dequeueable.AmazonSQS.Services.Timers; +using Dequeueable.AzureQueueStorage.Services.Timers; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Dequeueable.AmazonSQS.Services.Hosts +{ + internal sealed class QueueListenerExecutor : IHostExecutor + { + private readonly IQueueMessageManager _queueMessageManager; + private readonly IQueueMessageHandler _queueMessageHandler; + private readonly ILogger _logger; + private readonly ListenerHostOptions _options; + private readonly IDelayStrategy _delayStrategy; + private readonly List _processing = new(); + + public QueueListenerExecutor(IQueueMessageManager queueMessageManager, + IQueueMessageHandler queueMessageHandler, + IOptions options, + ILogger logger) + { + _queueMessageManager = queueMessageManager; + _queueMessageHandler = queueMessageHandler; + _logger = logger; + _options = options.Value; + _delayStrategy = new RandomizedExponentialDelayStrategy(TimeSpan.FromMilliseconds(_options.MinimumPollingIntervalInMilliseconds), + TimeSpan.FromMilliseconds(_options.MaximumPollingIntervalInMilliseconds), + _options.DeltaBackOff); + } + + public async Task HandleAsync(CancellationToken cancellationToken) + { + var messages = await _queueMessageManager.RetrieveMessagesAsync(cancellationToken: cancellationToken); + var messagesFound = messages.Length > 0; + if (messagesFound) + { + await HandleMessages(messages!, cancellationToken); + } + else + { + _logger.LogDebug("No messages found"); + } + + await WaitForDelay(messagesFound, cancellationToken); + } + + private Task HandleMessages(Models.Message[] messages, CancellationToken cancellationToken) + { + foreach (var message in messages) + { + var task = _queueMessageHandler.HandleAsync(message, cancellationToken); + _processing.Add(task); + } + + return WaitForNewBatchThreshold(cancellationToken); + } + + private Task WaitForDelay(bool messageFound, CancellationToken cancellationToken) + { + var delay = _delayStrategy.GetNextDelay(executionSucceeded: messageFound); + return Task.Delay(delay, cancellationToken); + } + + private async Task WaitForNewBatchThreshold(CancellationToken cancellationToken) + { + var newBatchThreshold = _options.NewBatchThreshold ?? Convert.ToInt32(Math.Ceiling(_options.BatchSize / (double)2)); + + while (_processing.Count > newBatchThreshold && !cancellationToken.IsCancellationRequested) + { + var processed = await Task.WhenAny(_processing); + _processing.Remove(processed); + } + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Hosts/QueueListenerHost.cs b/lib/Dequeueable.AmazonSQS/Services/Hosts/QueueListenerHost.cs new file mode 100644 index 0000000..cbd3c2d --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Hosts/QueueListenerHost.cs @@ -0,0 +1,39 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Dequeueable.AmazonSQS.Services.Hosts +{ + internal sealed class QueueListenerHost : BackgroundService + { + private readonly IHostExecutor _hostExecutor; + private readonly IHostApplicationLifetime _hostApplicationLifetime; + private readonly ILogger _logger; + + public QueueListenerHost(IHostExecutor hostExecutor, IHostApplicationLifetime hostApplicationLifetime, ILogger logger) + { + _hostExecutor = hostExecutor; + _hostApplicationLifetime = hostApplicationLifetime; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + await _hostExecutor.HandleAsync(stoppingToken); + } + catch (TaskCanceledException) + { + _hostApplicationLifetime.StopApplication(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Unhandled exception occurred, shutting down the host"); + _hostApplicationLifetime.StopApplication(); + } + } + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Queues/IQueueMessageExecutor.cs b/lib/Dequeueable.AmazonSQS/Services/Queues/IQueueMessageExecutor.cs new file mode 100644 index 0000000..d4ce8eb --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Queues/IQueueMessageExecutor.cs @@ -0,0 +1,9 @@ +using Dequeueable.AmazonSQS.Models; + +namespace Dequeueable.AmazonSQS.Services.Queues +{ + internal interface IQueueMessageExecutor + { + Task ExecuteAsync(Message message, CancellationToken cancellationToken); + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Queues/IQueueMessageHandler.cs b/lib/Dequeueable.AmazonSQS/Services/Queues/IQueueMessageHandler.cs new file mode 100644 index 0000000..d70a0ba --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Queues/IQueueMessageHandler.cs @@ -0,0 +1,7 @@ +namespace Dequeueable.AmazonSQS.Services.Queues +{ + internal interface IQueueMessageHandler + { + Task HandleAsync(Models.Message message, CancellationToken cancellationToken); + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Queues/IQueueMessageManager.cs b/lib/Dequeueable.AmazonSQS/Services/Queues/IQueueMessageManager.cs new file mode 100644 index 0000000..c7dcca1 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Queues/IQueueMessageManager.cs @@ -0,0 +1,13 @@ + +using Dequeueable.AmazonSQS.Models; + +namespace Dequeueable.AmazonSQS.Services.Queues +{ + internal interface IQueueMessageManager + { + Task DeleteMessageAsync(Message message, CancellationToken cancellationToken); + Task EnqueueMessageAsync(Message message, CancellationToken cancellationToken); + Task RetrieveMessagesAsync(CancellationToken cancellationToken = default); + Task UpdateVisibilityTimeOutAsync(Message message, CancellationToken cancellationToken); + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Queues/QueueMessageExecutor.cs b/lib/Dequeueable.AmazonSQS/Services/Queues/QueueMessageExecutor.cs new file mode 100644 index 0000000..364d1aa --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Queues/QueueMessageExecutor.cs @@ -0,0 +1,19 @@ +using Dequeueable.AmazonSQS.Models; + +namespace Dequeueable.AmazonSQS.Services.Queues +{ + internal sealed class QueueMessageExecutor : IQueueMessageExecutor + { + private readonly IAmazonSQSFunction _function; + + public QueueMessageExecutor(IAmazonSQSFunction function) + { + _function = function; + } + + public Task ExecuteAsync(Message message, CancellationToken cancellationToken) + { + return _function.ExecuteAsync(message, cancellationToken); + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Queues/QueueMessageHandler.cs b/lib/Dequeueable.AmazonSQS/Services/Queues/QueueMessageHandler.cs new file mode 100644 index 0000000..41c3be5 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Queues/QueueMessageHandler.cs @@ -0,0 +1,74 @@ +using Dequeueable.AmazonSQS.Models; +using Dequeueable.AmazonSQS.Services.Timers; +using Dequeueable.AzureQueueStorage.Services.Timers; +using Microsoft.Extensions.Logging; + +namespace Dequeueable.AmazonSQS.Services.Queues +{ + internal sealed class QueueMessageHandler : IQueueMessageHandler + { + private readonly ILogger _logger; + private readonly IQueueMessageManager _queueMessageManager; + private readonly IQueueMessageExecutor _executor; + + internal TimeSpan MinimalVisibilityTimeoutDelay { get; set; } = TimeSpan.FromSeconds(15); + + public QueueMessageHandler(IQueueMessageManager queueMessageManager, IQueueMessageExecutor executor, ILogger logger) + { + _logger = logger; + _queueMessageManager = queueMessageManager; + _executor = executor; + } + + public async Task HandleAsync(Message message, CancellationToken cancellationToken) + { + try + { + await HandleMessageAsync(message, cancellationToken); + _logger.LogInformation("Executed message with id '{MessageId}' (Succeeded)", message.MessageId); + await _queueMessageManager.DeleteMessageAsync(message, cancellationToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "An error occurred while executing the queue message with id '{MessageId}'", message.MessageId); + await HandleException(message, cancellationToken); + } + } + + private Task HandleMessageAsync(Message message, CancellationToken cancellationToken) + { + var taskCompletionSource = new TaskCompletionSource(); + var run = Task.Factory.StartNew(() => ExecuteMessageAsync(message, taskCompletionSource, cancellationToken)); + + return taskCompletionSource.Task; + } + + private async Task ExecuteMessageAsync(Message message, TaskCompletionSource taskCompletionSource, CancellationToken cancellationToken) + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + using var timer = new VisibilityTimeoutTimer(_queueMessageManager, new LinearDelayStrategy(MinimalVisibilityTimeoutDelay)); + + timer.Start(message, onFaultedAction: () => + { + cts.Cancel(); + taskCompletionSource.TrySetException(new Exception($"Unable to update the visibilty timeout for message with id '{message.MessageId}'. Invisibility cannot be guaranteed.")); + }); + + try + { + await _executor.ExecuteAsync(message, cts.Token); + timer.Stop(); + taskCompletionSource.TrySetResult(); + } + catch (Exception ex) + { + taskCompletionSource.TrySetException(ex); + } + } + + private Task HandleException(Message message, CancellationToken cancellationToken) + { + return _queueMessageManager.EnqueueMessageAsync(message, cancellationToken); + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Queues/QueueMessageManager.cs b/lib/Dequeueable.AmazonSQS/Services/Queues/QueueMessageManager.cs new file mode 100644 index 0000000..3938b7d --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Queues/QueueMessageManager.cs @@ -0,0 +1,59 @@ +using Amazon.SQS; +using Amazon.SQS.Model; +using Dequeueable.AmazonSQS.Configurations; +using Dequeueable.AmazonSQS.Factories; + +namespace Dequeueable.AmazonSQS.Services.Queues +{ + internal sealed class QueueMessageManager : IQueueMessageManager + { + private readonly AmazonSQSClient _client; + private readonly IHostOptions _hostOptions; + + public QueueMessageManager(IAmazonSQSClientFactory amazonSQSClientFactory, + IHostOptions hostOptions) + { + _client = amazonSQSClientFactory.Create(); + _hostOptions = hostOptions; + } + + public async Task RetrieveMessagesAsync(CancellationToken cancellationToken = default) + { + var request = new ReceiveMessageRequest { QueueUrl = _hostOptions.QueueUrl, MaxNumberOfMessages = _hostOptions.BatchSize, VisibilityTimeout = _hostOptions.VisibilityTimeoutInSeconds, AttributeNames = _hostOptions.AttributeNames }; + var res = await _client.ReceiveMessageAsync(request, cancellationToken); + + var nextVisbileOn = NextVisbileOn(); + return res.Messages.Select(m => new Models.Message(m.MessageId, m.ReceiptHandle, nextVisbileOn, BinaryData.FromString(m.Body ?? string.Empty), m.Attributes)).ToArray(); + } + + public async Task DeleteMessageAsync(Models.Message message, CancellationToken cancellationToken) + { + try + { + await _client.DeleteMessageAsync(_hostOptions.QueueUrl, message.ReceiptHandle, cancellationToken); + } + catch (AmazonSQSException ex) when (ex.ErrorCode?.Contains("NonExistentQueue", StringComparison.InvariantCultureIgnoreCase) ?? false || ex.StatusCode == System.Net.HttpStatusCode.NotFound) + { + } + } + + public async Task UpdateVisibilityTimeOutAsync(Models.Message message, CancellationToken cancellationToken) + { + var request = new ChangeMessageVisibilityRequest(_hostOptions.QueueUrl, message.ReceiptHandle, _hostOptions.VisibilityTimeoutInSeconds); + await _client.ChangeMessageVisibilityAsync(request, cancellationToken); + + return NextVisbileOn(); + } + + public async Task EnqueueMessageAsync(Models.Message message, CancellationToken cancellationToken) + { + var request = new ChangeMessageVisibilityRequest(_hostOptions.QueueUrl, message.ReceiptHandle, 0); + await _client.ChangeMessageVisibilityAsync(request, cancellationToken); + } + + private DateTimeOffset NextVisbileOn() + { + return DateTimeOffset.UtcNow.Add(TimeSpan.FromSeconds(_hostOptions.VisibilityTimeoutInSeconds)); + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Queues/SingletonManager.cs b/lib/Dequeueable.AmazonSQS/Services/Queues/SingletonManager.cs new file mode 100644 index 0000000..b79581c --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Queues/SingletonManager.cs @@ -0,0 +1,28 @@ +using System.Collections.Concurrent; + +namespace Dequeueable.AmazonSQS.Services.Queues +{ + internal sealed class SingletonManager + { + private readonly ConcurrentDictionary _locks = new(); + + public Task WaitAsync(string messageGroupId, CancellationToken cancellationToken) + { + var _lock = _locks.GetOrAdd(messageGroupId, (_) => + { + return new SemaphoreSlim(1, 1); + }); + return _lock.WaitAsync(cancellationToken); + } + + public void Release(string messageGroupId) + { + var _lock = _locks.GetValueOrDefault(messageGroupId); + + if (_lock is not null) + { + _lock.Release(); + } + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Queues/SingletonQueueMessageExecutor.cs b/lib/Dequeueable.AmazonSQS/Services/Queues/SingletonQueueMessageExecutor.cs new file mode 100644 index 0000000..63e5ab7 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Queues/SingletonQueueMessageExecutor.cs @@ -0,0 +1,35 @@ +using Dequeueable.AmazonSQS.Models; + +namespace Dequeueable.AmazonSQS.Services.Queues +{ + internal sealed class SingletonQueueMessageExecutor : IQueueMessageExecutor + { + private readonly IQueueMessageExecutor _executor; + private readonly SingletonManager _singletonManager; + + public SingletonQueueMessageExecutor(IQueueMessageExecutor executor, SingletonManager singletonManager) + { + _executor = executor; + _singletonManager = singletonManager; + } + + public async Task ExecuteAsync(Message message, CancellationToken cancellationToken) + { + if (string.IsNullOrWhiteSpace(message.MessageGroupId)) + { + await _executor.ExecuteAsync(message, cancellationToken); + return; + } + + await _singletonManager.WaitAsync(message.MessageGroupId, cancellationToken); + try + { + await _executor.ExecuteAsync(message, cancellationToken); + } + finally + { + _singletonManager.Release(message.MessageGroupId); + } + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Timers/IDelayStrategy.cs b/lib/Dequeueable.AmazonSQS/Services/Timers/IDelayStrategy.cs new file mode 100644 index 0000000..6a692ea --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Timers/IDelayStrategy.cs @@ -0,0 +1,8 @@ +namespace Dequeueable.AzureQueueStorage.Services.Timers +{ + internal interface IDelayStrategy + { + TimeSpan MinimalRenewalDelay { get; set; } + TimeSpan GetNextDelay(DateTimeOffset? nextVisibleOn = null, bool? executionSucceeded = null); + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Timers/LinearDelayStrategy.cs b/lib/Dequeueable.AmazonSQS/Services/Timers/LinearDelayStrategy.cs new file mode 100644 index 0000000..908e491 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Timers/LinearDelayStrategy.cs @@ -0,0 +1,24 @@ +namespace Dequeueable.AzureQueueStorage.Services.Timers +{ + internal sealed class LinearDelayStrategy : IDelayStrategy + { + public TimeSpan MinimalRenewalDelay { get; set; } + internal int Divisor { get; set; } = 2; + + public LinearDelayStrategy(TimeSpan minimalRenewalDelay) + { + MinimalRenewalDelay = minimalRenewalDelay; + } + + public TimeSpan GetNextDelay(DateTimeOffset? nextVisibleOn = null, bool? executionSucceeded = null) + { + if (executionSucceeded == false) + { + return MinimalRenewalDelay; + } + + var wait = ((nextVisibleOn?.UtcDateTime ?? DateTimeOffset.UtcNow) - DateTimeOffset.UtcNow) / Divisor; + return wait.Ticks > 0 ? wait : MinimalRenewalDelay; + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Timers/RandomizedExponentialDelayStrategy.cs b/lib/Dequeueable.AmazonSQS/Services/Timers/RandomizedExponentialDelayStrategy.cs new file mode 100644 index 0000000..a143ed5 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Timers/RandomizedExponentialDelayStrategy.cs @@ -0,0 +1,76 @@ +using Dequeueable.AzureQueueStorage.Services.Timers; + +namespace Dequeueable.AmazonSQS.Services.Timers +{ + internal sealed class RandomizedExponentialDelayStrategy : IDelayStrategy + { + private const int _randomizationFactor = 20; + private TimeSpan _minimumInterval; + private readonly TimeSpan _maximumInterval; + private readonly TimeSpan _deltaBackoff; + + private TimeSpan _currentInterval = TimeSpan.Zero; + private uint _backoffExponent; + private Random? _random; + + public RandomizedExponentialDelayStrategy(TimeSpan minimumInterval, TimeSpan maximumInterval, TimeSpan? deltaBackoff = null) + { + if (minimumInterval.Ticks < 0) + { + throw new ArgumentOutOfRangeException(nameof(minimumInterval), $"'{nameof(minimumInterval)}' must not be negative or zero."); + } + + if (maximumInterval.Ticks < 0) + { + throw new ArgumentOutOfRangeException(nameof(maximumInterval), $"'{nameof(maximumInterval)}' must not be negative or zero."); + } + + if (minimumInterval.Ticks > maximumInterval.Ticks) + { + throw new ArgumentException($"The '{nameof(minimumInterval)}' must not be greater than the '{nameof(maximumInterval)}'.", + nameof(minimumInterval)); + } + + _minimumInterval = minimumInterval; + _maximumInterval = maximumInterval; + _deltaBackoff = deltaBackoff ?? minimumInterval; + } + + public TimeSpan MinimalRenewalDelay { get => _minimumInterval; set { _minimumInterval = value; } } + + public TimeSpan GetNextDelay(DateTimeOffset? _ = null, bool? executionSucceeded = null) + { + if (executionSucceeded == true) + { + _currentInterval = _minimumInterval; + _backoffExponent = 1; + } + else if (_currentInterval != _maximumInterval) + { + var backoffInterval = _minimumInterval; + + if (_backoffExponent > 0) + { + _random ??= new Random(); + var randomIncrementMsec = (double)_random.Next(100 - _randomizationFactor, 100 + _randomizationFactor) / 100; + var incrementMsec = randomIncrementMsec * + Math.Pow(2.0, _backoffExponent - 1) * + _deltaBackoff.TotalMilliseconds; + backoffInterval += TimeSpan.FromMilliseconds(incrementMsec); + } + + if (backoffInterval < _maximumInterval) + { + _currentInterval = backoffInterval; + _backoffExponent++; + } + else + { + _currentInterval = _maximumInterval; + } + } + + return _currentInterval; + } + } +} diff --git a/lib/Dequeueable.AmazonSQS/Services/Timers/VisibilityTimeoutTimer.cs b/lib/Dequeueable.AmazonSQS/Services/Timers/VisibilityTimeoutTimer.cs new file mode 100644 index 0000000..ec6ab94 --- /dev/null +++ b/lib/Dequeueable.AmazonSQS/Services/Timers/VisibilityTimeoutTimer.cs @@ -0,0 +1,80 @@ +using Dequeueable.AmazonSQS.Models; +using Dequeueable.AmazonSQS.Services.Queues; +using Dequeueable.AzureQueueStorage.Services.Timers; + +namespace Dequeueable.AmazonSQS.Services.Timers +{ + internal sealed class VisibilityTimeoutTimer : IDisposable + { + private readonly CancellationTokenSource _cts; + private readonly IQueueMessageManager _queueMessagesManager; + private readonly IDelayStrategy _delayStrategy; + + private bool _disposed; + + public VisibilityTimeoutTimer(IQueueMessageManager queueMessagesManager, IDelayStrategy delayStrategy) + { + _cts = new CancellationTokenSource(); + _queueMessagesManager = queueMessagesManager; + _delayStrategy = delayStrategy; + } + + public void Start(Message message, Action? onFaultedAction = null) + { + StartAsync(message, _cts.Token) + .ContinueWith(_ => + { + onFaultedAction?.Invoke(); + }, TaskContinuationOptions.OnlyOnFaulted) + .ConfigureAwait(false); + } + + public void Stop() + { + _cts.Cancel(); + } + + private async Task StartAsync(Message message, CancellationToken cancellationToken) + { + await Task.Yield(); + var nextVisibleOn = message.NextVisibleOn; + + TaskCompletionSource cancellationTaskSource = new(); + using (cancellationToken.Register(() => cancellationTaskSource.SetCanceled())) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + nextVisibleOn = await UpdateVisbility(message, nextVisibleOn, cancellationToken); + } + catch (OperationCanceledException) + { + } + catch (Exception ex) when (ex.InnerException is OperationCanceledException) + { + } + } + } + } + + private async Task UpdateVisbility(Message message, DateTimeOffset nextVisibleOn, CancellationToken cancellationToken) + { + var delay = _delayStrategy.GetNextDelay(nextVisibleOn); + await Task.Delay(delay, cancellationToken); + nextVisibleOn = await _queueMessagesManager.UpdateVisibilityTimeOutAsync(message, cancellationToken); + return nextVisibleOn; + } + + public void Dispose() + { + if (!_disposed) + { + _cts.Cancel(); + _cts.Dispose(); + } + + _disposed = true; + } + } +} diff --git a/lib/Dequeueable.AzureQueueStorage/Services/Hosts/QueueListener.cs b/lib/Dequeueable.AzureQueueStorage/Services/Hosts/QueueListener.cs index 3d53a88..4a6146d 100644 --- a/lib/Dequeueable.AzureQueueStorage/Services/Hosts/QueueListener.cs +++ b/lib/Dequeueable.AzureQueueStorage/Services/Hosts/QueueListener.cs @@ -83,4 +83,4 @@ private async Task WaitForNewBatchThreshold(CancellationToken cancellationToken) } } } -} +} \ No newline at end of file diff --git a/samples/Dequeueable.AmazonSQS.SampleJob/Dequeueable.AmazonSQS.SampleJob.csproj b/samples/Dequeueable.AmazonSQS.SampleJob/Dequeueable.AmazonSQS.SampleJob.csproj new file mode 100644 index 0000000..0232770 --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleJob/Dequeueable.AmazonSQS.SampleJob.csproj @@ -0,0 +1,23 @@ + + + + Exe + + + + + + + + + + PreserveNewest + + + + + + + + + diff --git a/samples/Dequeueable.AmazonSQS.SampleJob/Functions/TestFunction.cs b/samples/Dequeueable.AmazonSQS.SampleJob/Functions/TestFunction.cs new file mode 100644 index 0000000..c32fbd1 --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleJob/Functions/TestFunction.cs @@ -0,0 +1,21 @@ +using Dequeueable.AmazonSQS.Models; +using Microsoft.Extensions.Logging; + +namespace Dequeueable.AmazonSQS.SampleJob.Functions +{ + internal class TestFunction : IAmazonSQSFunction + { + private readonly ILogger _logger; + + public TestFunction(ILogger logger) + { + _logger = logger; + } + + public Task ExecuteAsync(Message message, CancellationToken cancellationToken) + { + _logger.LogInformation("Function called with MessageId {MessageId} and content {MessageBody}", message.MessageId, message.Body.ToString()); + return Task.Delay(TimeSpan.FromMinutes(2), cancellationToken); + } + } +} diff --git a/samples/Dequeueable.AmazonSQS.SampleJob/Program.cs b/samples/Dequeueable.AmazonSQS.SampleJob/Program.cs new file mode 100644 index 0000000..123c87b --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleJob/Program.cs @@ -0,0 +1,17 @@ +using Dequeueable.AmazonSQS.Extentions; +using Dequeueable.AmazonSQS.SampleJob.Functions; +using Microsoft.Extensions.Hosting; + +await Host.CreateDefaultBuilder(args) +.ConfigureServices(services => +{ + services + .AddAmazonSQSServices() + .RunAsJob(options => + { + options.VisibilityTimeoutInSeconds = 600; + options.BatchSize = 4; + }); +}) +.RunConsoleAsync(); + diff --git a/samples/Dequeueable.AmazonSQS.SampleJob/Properties/launchSettings.json b/samples/Dequeueable.AmazonSQS.SampleJob/Properties/launchSettings.json new file mode 100644 index 0000000..7129167 --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleJob/Properties/launchSettings.json @@ -0,0 +1,11 @@ +{ + "profiles": { + "Dequeueable.AmazonSQS.SampleJob": { + "commandName": "Project", + "environmentVariables": { + "DOTNET_ENVIRONMENT": "Development" + }, + "hotReloadEnabled": false + } + } +} \ No newline at end of file diff --git a/samples/Dequeueable.AmazonSQS.SampleJob/README.md b/samples/Dequeueable.AmazonSQS.SampleJob/README.md new file mode 100644 index 0000000..407a644 --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleJob/README.md @@ -0,0 +1,74 @@ +# Amazon Simple Queue Service Sample job + +## Docker + +### Build +``` +docker build -t -f samples/Dequeueable.AmazonSQS.SampleJob/deployment/Dockerfile . +``` +Image stats: +``` +docker images -f reference=lenndewolten/dequeueable:aws-sqs-samplejob-v1 + +> REPOSITORY TAG IMAGE ID CREATED SIZE +> lenndewolten/dequeueable aws-sqs-samplejob-v1 7cfdf41b4bbb About a minute ago 84.2MB +``` + +``` +docker scan lenndewolten/dequeueable:aws-sqs-samplejob-v1 + +> Testing lenndewolten/dequeueable:aws-sqs-samplejob-v1... +> +> Organization: lenndewolten +> Package manager: apk +> Project name: docker-image|lenndewolten/dequeueable +> Docker image: lenndewolten/dequeueable:aws-sqs-samplejob-v1 +> Platform: linux/amd64 +> Base image: alpine:3.17.3 +> Licenses: enabled +> +> ✔ Tested 25 dependencies for known issues, no vulnerable paths found. +``` + +## Kubernetes + +### Deployment +This sample is using [KEDA](https://keda.sh/) to automatically schedule the jobs based on the messages on the queue + +``` +kubectl apply -f scaledjob.yaml +``` + +#### **Magic!** +After a message is added to the queue: +``` +kubectl get pods + +> NAME READY STATUS RESTARTS AGE +> queuejob-consumer-m8zpl-jpqws 1/1 Running 0 7s +``` + +``` +kubectl get pods + +> NAME READY STATUS RESTARTS AGE +> queuejob-consumer-m8zpl-jpqws 0/1 Completed 0 2m51s +``` + +Logs when when four messages are handled: +``` +kubectl logs pods/queuejob-consumer-m8zpl-jpqws + +> info: Microsoft.Hosting.Lifetime[0] +> Application started. Press Ctrl+C to shut down. +> info: Microsoft.Hosting.Lifetime[0] +> Hosting environment: Production +> info: Microsoft.Hosting.Lifetime[0] +> Content root path: /app +> info: Dequeueable.AmazonSQS.SampleJob.Functions.TestFunction[0] +> Function called with MessageId 7c28f4fe-28d3-4372-84d8-2a116c13520a and content fdfdfdfdfdf +> info: Dequeueable.AmazonSQS.Services.Queues.QueueMessageHandler[0] +> Executed message with id '7c28f4fe-28d3-4372-84d8-2a116c13520a' (Succeeded) +> info: Microsoft.Hosting.Lifetime[0] +> Application is shutting down... +``` \ No newline at end of file diff --git a/samples/Dequeueable.AmazonSQS.SampleJob/appsettings.json b/samples/Dequeueable.AmazonSQS.SampleJob/appsettings.json new file mode 100644 index 0000000..ff907e7 --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleJob/appsettings.json @@ -0,0 +1,5 @@ +{ + "Dequeueable": { + "QueueUrl": "" + } +} diff --git a/samples/Dequeueable.AmazonSQS.SampleJob/deployment/Dockerfile b/samples/Dequeueable.AmazonSQS.SampleJob/deployment/Dockerfile new file mode 100644 index 0000000..388cf3b --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleJob/deployment/Dockerfile @@ -0,0 +1,30 @@ +FROM mcr.microsoft.com/dotnet/sdk:6.0-alpine as build +WORKDIR /app + +COPY /samples/Dequeueable.AmazonSQS.SampleJob samples/consoleapp +COPY /lib lib +COPY Directory.Build.props . + +WORKDIR samples/consoleapp +RUN dotnet restore --runtime alpine-x64 +RUN dotnet publish -c Release -o /app/publish \ + --no-restore \ + --runtime alpine-x64 \ + --self-contained true \ + /p:PublishSingleFile=true + +FROM mcr.microsoft.com/dotnet/runtime-deps:6.0-alpine AS runtime +RUN adduser --disabled-password \ + --home /app \ + --gecos '' dotnetuser && chown -R dotnetuser /app + +# upgrade to remove potential vulnerability +RUN apk upgrade musl +RUN apk add openssl>3.1.0 +RUN apk update && apk upgrade +USER dotnetuser + +WORKDIR /app +COPY --from=build /app/publish . +EXPOSE 5000 +ENTRYPOINT ["./Dequeueable.AmazonSQS.SampleJob"] \ No newline at end of file diff --git a/samples/Dequeueable.AmazonSQS.SampleJob/deployment/scaledjob.yaml b/samples/Dequeueable.AmazonSQS.SampleJob/deployment/scaledjob.yaml new file mode 100644 index 0000000..d61840e --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleJob/deployment/scaledjob.yaml @@ -0,0 +1,60 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-config +data: + AWS_REGION: + Dequeueable__QueueUrl: +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-secrets +data: + AWS_ACCESS_KEY_ID: + AWS_SECRET_ACCESS_KEY: +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-aws-credentials +spec: + secretTargetRef: + - parameter: awsAccessKeyID + name: test-secrets + key: AWS_ACCESS_KEY_ID + - parameter: awsSecretAccessKey + name: test-secrets + key: AWS_SECRET_ACCESS_KEY +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: queuejob-consumer + namespace: default +spec: + jobTargetRef: + template: + spec: + containers: + - name: queuejob-executor + image: lenndewolten/dequeueable:aws-sqs-samplejob-v1 + imagePullPolicy: Always + envFrom: + - configMapRef: + name: test-config + - secretRef: + name: test-secrets + restartPolicy: Never + backoffLimit: 4 + pollingInterval: 60 + maxReplicaCount: 1 + triggers: + - type: aws-sqs-queue + authenticationRef: + name: keda-trigger-auth-aws-credentials + metadata: + queueURL: + queueLength: "5" + awsRegion: + identityOwner: pod diff --git a/samples/Dequeueable.AmazonSQS.SampleListener/Dequeueable.AmazonSQS.SampleListener.csproj b/samples/Dequeueable.AmazonSQS.SampleListener/Dequeueable.AmazonSQS.SampleListener.csproj new file mode 100644 index 0000000..ac37536 --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleListener/Dequeueable.AmazonSQS.SampleListener.csproj @@ -0,0 +1,25 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + + + + PreserveNewest + + + + + + + + diff --git a/samples/Dequeueable.AmazonSQS.SampleListener/Functions/TestFunction.cs b/samples/Dequeueable.AmazonSQS.SampleListener/Functions/TestFunction.cs new file mode 100644 index 0000000..1d67c5a --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleListener/Functions/TestFunction.cs @@ -0,0 +1,21 @@ +using Dequeueable.AmazonSQS.Models; +using Microsoft.Extensions.Logging; + +namespace Dequeueable.AmazonSQS.SampleListener.Functions +{ + internal class TestFunction : IAmazonSQSFunction + { + private readonly ILogger _logger; + + public TestFunction(ILogger logger) + { + _logger = logger; + } + + public Task ExecuteAsync(Message message, CancellationToken cancellationToken) + { + _logger.LogInformation("Function called with MessageId {MessageId} and content {MessageBody}", message.MessageId, message.Body.ToString()); + return Task.Delay(TimeSpan.FromMinutes(2), cancellationToken); + } + } +} diff --git a/samples/Dequeueable.AmazonSQS.SampleListener/Program.cs b/samples/Dequeueable.AmazonSQS.SampleListener/Program.cs new file mode 100644 index 0000000..cee103c --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleListener/Program.cs @@ -0,0 +1,16 @@ +using Dequeueable.AmazonSQS.Extentions; +using Dequeueable.AmazonSQS.SampleListener.Functions; +using Microsoft.Extensions.Hosting; + +await Host.CreateDefaultBuilder(args) +.ConfigureServices(services => +{ + services + .AddAmazonSQSServices() + .RunAsListener(options => + { + options.VisibilityTimeoutInSeconds = 300; + options.BatchSize = 4; + }); +}) +.RunConsoleAsync(); \ No newline at end of file diff --git a/samples/Dequeueable.AmazonSQS.SampleListener/README.md b/samples/Dequeueable.AmazonSQS.SampleListener/README.md new file mode 100644 index 0000000..369bad0 --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleListener/README.md @@ -0,0 +1,66 @@ +# Amazon Simple Queue Service Sample listener + +## Docker + +### Build +``` +docker build -t -f samples/Dequeueable.AmazonSQS.SampleListener/deployment/Dockerfile . +``` +Image stats: +``` +docker images -f reference=lenndewolten/dequeueable:aws-sqs-samplelistener-v1 + +> REPOSITORY TAG IMAGE ID CREATED SIZE +> lenndewolten/dequeueable aws-sqs-samplelistener-v1 cc5b966e169f 52 seconds ago 90.4MB +``` + +``` +docker scan lenndewolten/dequeueable:aws-sqs-samplelistener-v1 + +> Testing lenndewolten/dequeueable:aws-sqs-samplelistener-v1... +> +> Organization: lenndewolten +> Package manager: apk +> Project name: docker-image|lenndewolten/dequeueable +> Docker image: lenndewolten/dequeueable:aws-sqs-samplelistener-v1 +> Platform: linux/amd64 +> Base image: alpine:3.17.3 +> Licenses: enabled +> +> ✔ Tested 25 dependencies for known issues, no vulnerable paths found. +> +> According to our scan, you are currently using the most secure version of the selected base image +``` + +## Kubernetes + +### Deployment + +``` +kubectl apply -f deployment.yaml +``` + +#### **Magic!** +After a message is added to the queue: +``` +kubectl get pods + +> NAME READY STATUS RESTARTS AGE +> queuelistener-deployment-75bc4b7894-gscdx 1/1 Running 0 44s +``` + +Logs when when four messages are handled: +``` +kubectl logs pods/queuelistener-deployment-75bc4b7894-gscdx + +info: Microsoft.Hosting.Lifetime[0] + Application started. Press Ctrl+C to shut down. +info: Microsoft.Hosting.Lifetime[0] + Hosting environment: Production +info: Microsoft.Hosting.Lifetime[0] + Content root path: /app +info: Dequeueable.AmazonSQS.SampleListener.Functions.TestFunction[0] + Function called with MessageId a365b679-eac7-4a29-b002-cd9032786a47 and content fdfdfdfdfdffdfdf +info: Dequeueable.AmazonSQS.Services.Queues.QueueMessageHandler[0] + Executed message with id 'a365b679-eac7-4a29-b002-cd9032786a47' (Succeeded) +``` \ No newline at end of file diff --git a/samples/Dequeueable.AmazonSQS.SampleListener/appsettings.json b/samples/Dequeueable.AmazonSQS.SampleListener/appsettings.json new file mode 100644 index 0000000..ff907e7 --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleListener/appsettings.json @@ -0,0 +1,5 @@ +{ + "Dequeueable": { + "QueueUrl": "" + } +} diff --git a/samples/Dequeueable.AmazonSQS.SampleListener/deployment/Dockerfile b/samples/Dequeueable.AmazonSQS.SampleListener/deployment/Dockerfile new file mode 100644 index 0000000..6f98975 --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleListener/deployment/Dockerfile @@ -0,0 +1,30 @@ +FROM mcr.microsoft.com/dotnet/sdk:6.0-alpine as build +WORKDIR /app + +COPY /samples/Dequeueable.AmazonSQS.SampleListener samples/consoleapp +COPY /lib lib +COPY Directory.Build.props . + +WORKDIR samples/consoleapp +RUN dotnet restore --runtime alpine-x64 +RUN dotnet publish -c Release -o /app/publish \ + --no-restore \ + --runtime alpine-x64 \ + --self-contained true \ + /p:PublishSingleFile=true + +FROM mcr.microsoft.com/dotnet/runtime-deps:6.0-alpine AS runtime +RUN adduser --disabled-password \ + --home /app \ + --gecos '' dotnetuser && chown -R dotnetuser /app + +# upgrade to remove potential vulnerability +RUN apk upgrade musl +RUN apk add openssl>3.1.0 +RUN apk update && apk upgrade +USER dotnetuser + +WORKDIR /app +COPY --from=build /app/publish . +EXPOSE 5000 +ENTRYPOINT ["./Dequeueable.AmazonSQS.SampleListener"] \ No newline at end of file diff --git a/samples/Dequeueable.AmazonSQS.SampleListener/deployment/deployment.yaml b/samples/Dequeueable.AmazonSQS.SampleListener/deployment/deployment.yaml new file mode 100644 index 0000000..a49a105 --- /dev/null +++ b/samples/Dequeueable.AmazonSQS.SampleListener/deployment/deployment.yaml @@ -0,0 +1,40 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-config +data: + AWS_REGION: + Dequeueable__QueueUrl: +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-secrets +data: + AWS_ACCESS_KEY_ID: + AWS_SECRET_ACCESS_KEY: +apiVersion: apps/v1 +kind: Deployment +metadata: + name: queuelistener-deployment + labels: + app: queuelistener-deployment +spec: + replicas: 1 + selector: + matchLabels: + app: queuelistener-deployment + template: + metadata: + labels: + app: queuelistener-deployment + spec: + containers: + - name: queuelistener-executor + image: lenndewolten/dequeueable:aws-sqs-samplelistener-v1 + imagePullPolicy: Always + envFrom: + - configMapRef: + name: test-config + - secretRef: + name: test-secrets diff --git a/tests/Dequeueable.AmazonSQS.IntegrationTests/Dequeueable.AmazonSQS.IntegrationTests.csproj b/tests/Dequeueable.AmazonSQS.IntegrationTests/Dequeueable.AmazonSQS.IntegrationTests.csproj new file mode 100644 index 0000000..e869749 --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.IntegrationTests/Dequeueable.AmazonSQS.IntegrationTests.csproj @@ -0,0 +1,31 @@ + + + + net6.0 + enable + enable + + false + + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + diff --git a/tests/Dequeueable.AmazonSQS.IntegrationTests/Fixtures/LocalStackFixture.cs b/tests/Dequeueable.AmazonSQS.IntegrationTests/Fixtures/LocalStackFixture.cs new file mode 100644 index 0000000..609db2e --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.IntegrationTests/Fixtures/LocalStackFixture.cs @@ -0,0 +1,54 @@ +using System.Net.Sockets; +using System.Net; +using DotNet.Testcontainers.Containers; +using DotNet.Testcontainers.Builders; + +namespace Dequeueable.AmazonSQS.IntegrationTests.Fixtures +{ + public class LocalStackFixture : IAsyncLifetime + { + private int? _port; + + public int Port + { + get + { + if (!_port.HasValue) + { + _port = GetAvailablePort(); + } + + return _port.Value; + } + } + + public string SQSURL => $"http://localhost:{Port}"; + + private IDockerContainer _testcontainersBuilder => new TestcontainersBuilder() + .WithImage("localstack/localstack") + .WithPortBinding(Port, 4566) + .WithCleanUp(true) + .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(4566)) + .Build(); + + public async Task InitializeAsync() + { + await _testcontainersBuilder.StartAsync(); + } + + public Task DisposeAsync() + { + return _testcontainersBuilder.DisposeAsync().AsTask(); + } + + private static readonly IPEndPoint _defaultLoopbackEndpoint = new(IPAddress.Loopback, port: 0); + private static int GetAvailablePort() + { + using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) + { + socket.Bind(_defaultLoopbackEndpoint); + return ((IPEndPoint)socket.LocalEndPoint!).Port; + } + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.IntegrationTests/Functions/HostTests.cs b/tests/Dequeueable.AmazonSQS.IntegrationTests/Functions/HostTests.cs new file mode 100644 index 0000000..3fc6ca9 --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.IntegrationTests/Functions/HostTests.cs @@ -0,0 +1,136 @@ +using Amazon.SQS; +using Amazon.SQS.Model; +using Dequeueable.AmazonSQS.Factories; +using Dequeueable.AmazonSQS.IntegrationTests.Fixtures; +using Dequeueable.AmazonSQS.IntegrationTests.TestDataBuilders; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; +using Moq; + +namespace Dequeueable.AmazonSQS.IntegrationTests.Functions +{ + public class HostTests : IClassFixture, IAsyncLifetime + { + private readonly LocalStackFixture _localStackFixture; + private readonly AmazonSQSClient _client; + private string _queueUrl = null!; + + public HostTests(LocalStackFixture localStackFixture) + { + _localStackFixture = localStackFixture; + _client = new AmazonSQSClient("dummy", "dummy", new AmazonSQSConfig { ServiceURL = _localStackFixture.SQSURL }); + } + + public async Task InitializeAsync() + { + var res = await _client.CreateQueueAsync("testqueue"); + _queueUrl = res.QueueUrl; + } + + public Task DisposeAsync() + { + return _client.DeleteQueueAsync(_queueUrl); + } + + [Fact] + public async Task Given_a_host_running_as_a_job_when_a_Queue_has_two_messages_then_they_are_handled_correctly() + { + // Arrange + var messages = new List + { + new SendMessageBatchRequestEntry("1", "body1"), + new SendMessageBatchRequestEntry("2", "body2") + }; + await _client.SendMessageBatchAsync(new SendMessageBatchRequest + { + QueueUrl = _queueUrl, + Entries = messages + }); + + var options = new Configurations.HostOptions + { + VisibilityTimeoutInSeconds = 500, + QueueUrl = _queueUrl + }; + var factory = new JobHostFactory(opt => + { + opt.VisibilityTimeoutInSeconds = options.VisibilityTimeoutInSeconds; + opt.QueueUrl = options.QueueUrl; + }); + + var fakeServiceMock = new Mock(); + var amazonSQSClientFactoryMock = new Mock(); + amazonSQSClientFactoryMock.Setup(c => c.Create()).Returns(_client); + + factory.ConfigureTestServices(services => + { + services.AddTransient(_ => fakeServiceMock.Object); + services.AddTransient(_ => amazonSQSClientFactoryMock.Object); + }); + + // Act + var host = factory.Build(); + await host.HandleAsync(CancellationToken.None); + + // Assert + var queueResult = await _client.ReceiveMessageAsync(new ReceiveMessageRequest { MaxNumberOfMessages = 10, QueueUrl = _queueUrl }); + queueResult.Messages.Should().BeEmpty(); + foreach (var message in messages) + { + fakeServiceMock.Verify(f => f.Execute(It.Is(m => m.Body.ToString() == message.MessageBody)), Times.Once()); + } + } + + [Fact] + public async Task Given_a_host_running_as_a_listener_when_a_Queue_has_two_messages_then_they_are_handled_correctly() + { + // Arrange + var messages = new List + { + new SendMessageBatchRequestEntry("1", "body1"), + new SendMessageBatchRequestEntry("2", "body2") + }; + await _client.SendMessageBatchAsync(new SendMessageBatchRequest + { + QueueUrl = _queueUrl, + Entries = messages + }); + + var options = new Configurations.ListenerHostOptions + { + VisibilityTimeoutInSeconds = 500, + QueueUrl = _queueUrl + }; + var factory = new ListenerHostFactory(opt => + { + opt.VisibilityTimeoutInSeconds = options.VisibilityTimeoutInSeconds; + opt.MinimumPollingIntervalInMilliseconds = 1; + opt.MaximumPollingIntervalInMilliseconds = 2; + opt.QueueUrl = options.QueueUrl; + }); + + var fakeServiceMock = new Mock(); + var amazonSQSClientFactoryMock = new Mock(); + amazonSQSClientFactoryMock.Setup(c => c.Create()).Returns(_client); + + factory.ConfigureTestServices(services => + { + services.AddTransient(_ => fakeServiceMock.Object); + services.AddTransient(_ => amazonSQSClientFactoryMock.Object); + }); + + // Act + var host = factory.Build(); + await host.HandleAsync(CancellationToken.None); + + // Assert + var queueResult = await _client.ReceiveMessageAsync(new ReceiveMessageRequest { MaxNumberOfMessages = 10, QueueUrl = _queueUrl }); + queueResult.Messages.Should().BeEmpty(); + + foreach (var message in messages) + { + fakeServiceMock.Verify(f => f.Execute(It.Is(m => m.Body.ToString() == message.MessageBody)), Times.Once()); + } + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.IntegrationTests/Functions/SingletonHostTests.cs b/tests/Dequeueable.AmazonSQS.IntegrationTests/Functions/SingletonHostTests.cs new file mode 100644 index 0000000..5542156 --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.IntegrationTests/Functions/SingletonHostTests.cs @@ -0,0 +1,137 @@ +using Amazon.SQS; +using Amazon.SQS.Model; +using Dequeueable.AmazonSQS.Factories; +using Dequeueable.AmazonSQS.IntegrationTests.Fixtures; +using Dequeueable.AmazonSQS.IntegrationTests.TestDataBuilders; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; +using Moq; + +namespace Dequeueable.AmazonSQS.IntegrationTests.Functions +{ + public class SingletonHostTests : IClassFixture, IAsyncLifetime + { + private readonly LocalStackFixture _localStackFixture; + private readonly AmazonSQSClient _client; + private string _queueUrl = null!; + + public SingletonHostTests(LocalStackFixture localStackFixture) + { + _localStackFixture = localStackFixture; + _client = new AmazonSQSClient("dummy", "dummy", new AmazonSQSConfig { ServiceURL = _localStackFixture.SQSURL }); + } + + public async Task InitializeAsync() + { + var res = await _client.CreateQueueAsync(new CreateQueueRequest { Attributes = new Dictionary { { "FifoQueue", "true" }, { "ContentBasedDeduplication", "true" } }, QueueName = "testqueue.fifo" }); + _queueUrl = res.QueueUrl; + } + + public Task DisposeAsync() + { + return _client.DeleteQueueAsync(_queueUrl); + } + + [Fact] + public async Task Given_a_host_running_as_a_singleton_job_when_a_Queue_has_two_messages_of_the_same_group_then_they_are_handled_correctly() + { + // Arrange + var messages = new List + { + new SendMessageBatchRequestEntry("1", "body1"){MessageGroupId ="1"}, + new SendMessageBatchRequestEntry("2", "body2"){MessageGroupId ="1"} + }; + var response = await _client.SendMessageBatchAsync(new SendMessageBatchRequest + { + QueueUrl = _queueUrl, + Entries = messages + }); + + var options = new Configurations.HostOptions + { + VisibilityTimeoutInSeconds = 500, + QueueUrl = _queueUrl + }; + var factory = new JobHostFactory(opt => + { + opt.VisibilityTimeoutInSeconds = options.VisibilityTimeoutInSeconds; + opt.QueueUrl = options.QueueUrl; + }, runAsSingleton: true); + + var fakeServiceMock = new Mock(); + var amazonSQSClientFactoryMock = new Mock(); + amazonSQSClientFactoryMock.Setup(c => c.Create()).Returns(_client); + + factory.ConfigureTestServices(services => + { + services.AddTransient(_ => fakeServiceMock.Object); + services.AddTransient(_ => amazonSQSClientFactoryMock.Object); + }); + + // Act + var host = factory.Build(); + await host.HandleAsync(CancellationToken.None); + + // Assert + var queueResult = await _client.ReceiveMessageAsync(new ReceiveMessageRequest { MaxNumberOfMessages = 10, QueueUrl = _queueUrl }); + queueResult.Messages.Should().BeEmpty(); + + foreach (var message in messages) + { + fakeServiceMock.Verify(f => f.Execute(It.Is(m => m.Body.ToString() == message.MessageBody)), Times.Once()); + } + } + + [Fact] + public async Task Given_a_host_running_as_a_singleton_listener_when_a_Queue_has_two_messages_of_the_same_group_then_they_are_handled_correctly() + { + // Arrange + var messages = new List + { + new SendMessageBatchRequestEntry("1", "body1"){MessageGroupId ="1"}, + new SendMessageBatchRequestEntry("2", "body2"){MessageGroupId ="1"} + }; + var response = await _client.SendMessageBatchAsync(new SendMessageBatchRequest + { + QueueUrl = _queueUrl, + Entries = messages + }); + + var options = new Configurations.ListenerHostOptions + { + VisibilityTimeoutInSeconds = 500, + QueueUrl = _queueUrl + }; + var factory = new ListenerHostFactory(opt => + { + opt.VisibilityTimeoutInSeconds = options.VisibilityTimeoutInSeconds; + opt.MinimumPollingIntervalInMilliseconds = 1; + opt.MaximumPollingIntervalInMilliseconds = 1000; + opt.QueueUrl = options.QueueUrl; + }, runAsSingleton: true); + + var fakeServiceMock = new Mock(); + var amazonSQSClientFactoryMock = new Mock(); + amazonSQSClientFactoryMock.Setup(c => c.Create()).Returns(_client); + + factory.ConfigureTestServices(services => + { + services.AddTransient(_ => fakeServiceMock.Object); + services.AddTransient(_ => amazonSQSClientFactoryMock.Object); + }); + + // Act + var host = factory.Build(); + await host.HandleAsync(CancellationToken.None); + + // Assert + var queueResult = await _client.ReceiveMessageAsync(new ReceiveMessageRequest { MaxNumberOfMessages = 10, QueueUrl = _queueUrl }); + queueResult.Messages.Should().BeEmpty(); + + foreach (var message in messages) + { + fakeServiceMock.Verify(f => f.Execute(It.Is(m => m.Body.ToString() == message.MessageBody)), Times.Once()); + } + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.IntegrationTests/JobHostFactory.cs b/tests/Dequeueable.AmazonSQS.IntegrationTests/JobHostFactory.cs new file mode 100644 index 0000000..5acb47c --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.IntegrationTests/JobHostFactory.cs @@ -0,0 +1,48 @@ +using Dequeueable.AmazonSQS.Extentions; +using Dequeueable.AmazonSQS.IntegrationTests.TestDataBuilders; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace Dequeueable.AmazonSQS.IntegrationTests +{ + public class JobHostFactory + where TFunction : class, IAmazonSQSFunction + { + public readonly IHostBuilder HostBuilder; + private readonly Action? _options; + + public JobHostFactory(Action? overrideOptions = null, bool runAsSingleton = false) + { + if (overrideOptions is not null) + { + _options += overrideOptions; + } + + HostBuilder = Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + var hostBuilder = services.AddAmazonSQSServices() + .RunAsJob(_options); + + if (runAsSingleton) + { + hostBuilder.AsSingleton(); + } + + services.AddTransient(); + }); + } + + public IHostBuilder ConfigureTestServices(Action services) + { + HostBuilder.ConfigureServices(services); + return HostBuilder; + } + + public Services.Hosts.IHostExecutor Build() + { + var host = HostBuilder.Build(); + return host.Services.GetRequiredService(); + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.IntegrationTests/ListenerHostFactory.cs b/tests/Dequeueable.AmazonSQS.IntegrationTests/ListenerHostFactory.cs new file mode 100644 index 0000000..fcac18f --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.IntegrationTests/ListenerHostFactory.cs @@ -0,0 +1,48 @@ +using Dequeueable.AmazonSQS.Extentions; +using Dequeueable.AmazonSQS.IntegrationTests.TestDataBuilders; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace Dequeueable.AmazonSQS.IntegrationTests +{ + public class ListenerHostFactory + where TFunction : class, IAmazonSQSFunction + { + public readonly IHostBuilder HostBuilder; + private readonly Action? _options; + + public ListenerHostFactory(Action? overrideOptions = null, bool runAsSingleton = false) + { + if (overrideOptions is not null) + { + _options += overrideOptions; + } + + HostBuilder = Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + var hostBuilder = services.AddAmazonSQSServices() + .RunAsListener(_options); + + if (runAsSingleton) + { + hostBuilder.AsSingleton(); + } + + services.AddTransient(); + }); + } + + public IHostBuilder ConfigureTestServices(Action services) + { + HostBuilder.ConfigureServices(services); + return HostBuilder; + } + + public Services.Hosts.IHostExecutor Build() + { + var host = HostBuilder.Build(); + return host.Services.GetRequiredService(); + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.IntegrationTests/TestDataBuilders/TestFunctions.cs b/tests/Dequeueable.AmazonSQS.IntegrationTests/TestDataBuilders/TestFunctions.cs new file mode 100644 index 0000000..d8ff6cb --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.IntegrationTests/TestDataBuilders/TestFunctions.cs @@ -0,0 +1,47 @@ +using Dequeueable.AmazonSQS.Models; + +namespace Dequeueable.AmazonSQS.IntegrationTests.TestDataBuilders +{ + public class TestFunction : IAmazonSQSFunction + { + private readonly IFakeService _fakeService; + + public TestFunction(IFakeService fakeService) + { + _fakeService = fakeService; + } + + public async Task ExecuteAsync(Message message, CancellationToken cancellationToken) + { + await _fakeService.Execute(message); + } + } + + public interface IFakeService + { + Task Execute(Message message); + } + + public class FakeService : IFakeService + { + public Task Execute(Message message) { return Task.CompletedTask; } + } + + public class SingletonFakeService : IFakeService + { + private readonly static SemaphoreSlim _lock = new(1, 1); + + public async Task Execute(Message message) + { + + if (_lock.Wait(TimeSpan.FromMilliseconds(1))) + { + await Task.Delay(10); + _lock.Release(); + return; + } + + throw new InvalidOperationException(); + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.IntegrationTests/Usings.cs b/tests/Dequeueable.AmazonSQS.IntegrationTests/Usings.cs new file mode 100644 index 0000000..8c927eb --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.IntegrationTests/Usings.cs @@ -0,0 +1 @@ +global using Xunit; \ No newline at end of file diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/Configurations/HostBuilderTests.cs b/tests/Dequeueable.AmazonSQS.UnitTests/Configurations/HostBuilderTests.cs new file mode 100644 index 0000000..9fbbdfd --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/Configurations/HostBuilderTests.cs @@ -0,0 +1,213 @@ +using Amazon; +using Dequeueable.AmazonSQS.Configurations; +using Dequeueable.AmazonSQS.Extentions; +using Dequeueable.AmazonSQS.Factories; +using Dequeueable.AmazonSQS.Models; +using Dequeueable.AmazonSQS.Services.Hosts; +using Dequeueable.AmazonSQS.Services.Queues; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Options; +using Moq; + +namespace Dequeueable.AmazonSQS.UnitTests.Configurations +{ + public class HostBuilderTests + { + private class TestFunction : IAmazonSQSFunction + { + public Task ExecuteAsync(Message message, CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + } + + [Fact] + public void Given_a_HostBuilder_when_RunAsJob_is_called_then_the_Host_is_registered_correctly() + { + // Arrange + var hostBuilder = Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services + .AddAmazonSQSServices() + .RunAsJob(options => + { + options.QueueUrl = "test"; + }); + + var amazonSQSClientFactoryMock = new Mock(); + amazonSQSClientFactoryMock.Setup(c => c.Create()).Returns(new Amazon.SQS.AmazonSQSClient("dummy", "dummy", RegionEndpoint.MECentral1)); + services.AddSingleton(_ => amazonSQSClientFactoryMock.Object); + }); + + // Act + var host = hostBuilder.Build(); + + // Assert + host.Services.GetRequiredService().Should().BeOfType(); + } + + [Fact] + public void Given_a_HostBuilder_when_RunAsJob_is_called_then_IHostOptions_is_registered_correctly() + { + // Arrange + var hostBuilder = Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services + .AddAmazonSQSServices() + .RunAsJob(options => + { + options.QueueUrl = "test"; + }); + }); + + // Act + var host = hostBuilder.Build(); + + // Assert + host.Services.GetRequiredService().Should().BeOfType(); + } + + [Fact] + public void Given_a_HostBuilder_when_RunAsListener_is_called_then_the_Host_is_registered_correctly() + { + // Arrange + var hostBuilder = Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services + .AddAmazonSQSServices() + .RunAsListener(options => + { + options.QueueUrl = "test"; + }); + + var amazonSQSClientFactoryMock = new Mock(); + amazonSQSClientFactoryMock.Setup(c => c.Create()).Returns(new Amazon.SQS.AmazonSQSClient("dummy", "dummy", RegionEndpoint.MECentral1)); + services.AddSingleton(_ => amazonSQSClientFactoryMock.Object); + }); + + // Act + var host = hostBuilder.Build(); + + // Assert + host.Services.GetRequiredService().Should().BeOfType(); + } + + [Fact] + public void Given_a_HostBuilder_when_RunAsListener_is_called_then_IHostOptions_is_registered_correctly() + { + // Arrange + var hostBuilder = Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services + .AddAmazonSQSServices() + .RunAsListener(options => + { + options.QueueUrl = "test"; + }); + }); + + // Act + var host = hostBuilder.Build(); + + // Assert + host.Services.GetRequiredService().Should().BeOfType(); + } + + [Fact] + public void Given_a_HostBuilder_when_AsSingleton_is_called_then_IQueueMessageExecutor_is_registered_correctly() + { + // Arrange + var hostBuilder = Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services + .AddAmazonSQSServices() + .AsSingleton(); + }); + + // Act + var host = hostBuilder.Build(); + + // Assert + host.Services.GetRequiredService().Should().BeOfType(); + } + + [Fact] + public void Given_a_HostBuilder_when_AsSingleton_is_called_then_HostOptions_AttributeNames_contains_MessageGroupId() + { + // Arrange + var hostBuilder = Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services + .AddAmazonSQSServices() + .RunAsJob(options => + { + options.QueueUrl = "test"; + options.AttributeNames = new List { "other value" }; + }) + .AsSingleton(); + }); + + // Act + var host = hostBuilder.Build(); + + // Assert + host.Services.GetRequiredService>().Value.AttributeNames.Should().Contain("MessageGroupId"); + } + + [Fact] + public void Given_a_HostBuilder_when_AsSingleton_is_called_then_ListenerHostOptions_AttributeNames_contains_MessageGroupId() + { + // Arrange + var hostBuilder = Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services + .AddAmazonSQSServices() + .RunAsListener(options => + { + options.QueueUrl = "test"; + options.AttributeNames = new List { "other value" }; + }) + .AsSingleton(); + }); + + // Act + var host = hostBuilder.Build(); + + // Assert + host.Services.GetRequiredService>().Value.AttributeNames.Should().Contain("MessageGroupId"); + } + + [Fact] + public void Given_a_HostBuilder_when_AsSingleton_is_called_then_ListenerHostOptions_NewBatchThreshold_is_zero() + { + // Arrange + var hostBuilder = Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services + .AddAmazonSQSServices() + .RunAsListener(options => + { + options.QueueUrl = "test"; + options.NewBatchThreshold = 7; + }) + .AsSingleton(); + }); + + // Act + var host = hostBuilder.Build(); + + // Assert + host.Services.GetRequiredService>().Value.NewBatchThreshold.Should().Be(0); + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/Configurations/HostOptionsTests.cs b/tests/Dequeueable.AmazonSQS.UnitTests/Configurations/HostOptionsTests.cs new file mode 100644 index 0000000..5c13272 --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/Configurations/HostOptionsTests.cs @@ -0,0 +1,147 @@ +using Dequeueable.AmazonSQS.Configurations; +using FluentAssertions; +using System.ComponentModel.DataAnnotations; + +namespace Dequeueable.AmazonSQS.UnitTests.Configurations +{ + public class HostOptionsTests + { + [Fact] + public void Given_a_HostOptions_when_QueueUrl_is_empty_then_the_validation_result_contains_the_correct_error_message() + { + // Arrange + var sut = new HostOptions + { + QueueUrl = string.Empty + }; + + // Act + var result = ValidateModel(sut); + + // Assert + result.Should().Contain(e => e.ErrorMessage!.Contains("QueueUrl cannot be empty.")); + } + + [Fact] + public void Given_a_HostOptions_when_QueueUrl_is_not_empty_then_the_validation_result_is_empty() + { + // Arrange + var sut = new HostOptions + { + QueueUrl = "my url" + }; + + // Act + var result = ValidateModel(sut); + + // Assert + result.Should().NotContain(e => e.MemberNames!.Contains("QueueUrl")); + } + + [Fact] + public void Given_a_HostOptions_when_BatchSize_is_zero_then_the_validation_result_contains_the_correct_error_message() + { + // Arrange + var sut = new HostOptions + { + BatchSize = 0 + }; + + // Act + var result = ValidateModel(sut); + + // Assert + result.Should().Contain(e => e.ErrorMessage!.Contains("Value for BatchSize must be between 1 and 10.")); + } + + [Fact] + public void Given_a_HostOptions_when_BatchSize_is_eleven_then_the_validation_result_contains_the_correct_error_message() + { + // Arrange + var sut = new HostOptions + { + BatchSize = 11 + }; + + // Act + var result = ValidateModel(sut); + + // Assert + result.Should().Contain(e => e.ErrorMessage!.Contains("Value for BatchSize must be between 1 and 10.")); + } + + [Fact] + public void Given_a_HostOptions_when_BatchSize_is_within_range_then_the_validation_result_are_empty() + { + // Arrange + var sut = new HostOptions + { + BatchSize = 5 + }; + + // Act + var result = ValidateModel(sut); + + // Assert + result.Should().NotContain(e => e.MemberNames!.Contains("BatchSize")); + } + + + + [Fact] + public void Given_a_HostOptions_when_VisibilityTimeoutInSeconds_is_zero_then_the_validation_result_contains_the_correct_error_message() + { + // Arrange + var sut = new HostOptions + { + VisibilityTimeoutInSeconds = 29 + }; + + // Act + var result = ValidateModel(sut); + + // Assert + result.Should().Contain(e => e.ErrorMessage!.Contains("Value for VisibilityTimeoutInSeconds must be between 30 and 43200.")); + } + + [Fact] + public void Given_a_HostOptions_when_VisibilityTimeoutInSeconds_is_43201_then_the_validation_result_contains_the_correct_error_message() + { + // Arrange + var sut = new HostOptions + { + VisibilityTimeoutInSeconds = 43201 + }; + + // Act + var result = ValidateModel(sut); + + // Assert + result.Should().Contain(e => e.ErrorMessage!.Contains("Value for VisibilityTimeoutInSeconds must be between 30 and 43200.")); + } + + [Fact] + public void Given_a_HostOptions_when_VisibilityTimeoutInSeconds_is_within_range_then_the_validation_result_are_empty() + { + // Arrange + var sut = new HostOptions + { + VisibilityTimeoutInSeconds = 30 + }; + + // Act + var result = ValidateModel(sut); + + // Assert + result.Should().NotContain(e => e.MemberNames!.Contains("VisibilityTimeoutInSeconds")); + } + + private IList ValidateModel(object model) + { + var validationResults = new List(); + var ctx = new ValidationContext(model, null, null); + Validator.TryValidateObject(model, ctx, validationResults, true); + return validationResults; + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/Configurations/ListenerHostOptionsTests.cs b/tests/Dequeueable.AmazonSQS.UnitTests/Configurations/ListenerHostOptionsTests.cs new file mode 100644 index 0000000..a551d80 --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/Configurations/ListenerHostOptionsTests.cs @@ -0,0 +1,149 @@ +using Dequeueable.AmazonSQS.Configurations; +using FluentAssertions; +using System.ComponentModel.DataAnnotations; + +namespace Dequeueable.AmazonSQS.UnitTests.Configurations +{ + public class ListenerHostOptionsTests + { + [Fact] + public void Given_a_ListenerHostOptions_when_MinimumPollingIntervalInMilliseconds_is_zero_then_the_validation_result_contains_the_correct_error_message() + { + // Arrange + var sut = new ListenerHostOptions + { + MinimumPollingIntervalInMilliseconds = 0 + }; + + // Act + var result = ValidateModel(sut); + + // Assert + result.Should().Contain(e => e.ErrorMessage!.Contains("Value for MinimumPollingIntervalInMilliseconds must be lower than 1.")); + } + + [Fact] + public void Given_a_ListenerHostOptions_when_MinimumPollingIntervalInMilliseconds_is_within_range_then_the_validation_result_are_empty() + { + // Arrange + var sut = new ListenerHostOptions + { + MinimumPollingIntervalInMilliseconds = 5 + }; + + // Act + var result = ValidateModel(sut); + + // Assert + result.Should().NotContain(e => e.MemberNames!.Contains("MinimumPollingIntervalInMilliseconds")); + } + + [Fact] + public void Given_a_ListenerHostOptions_when_MaximumPollingIntervalInMilliseconds_is_zero_then_the_validation_result_contains_the_correct_error_message() + { + // Arrange + var sut = new ListenerHostOptions + { + MaximumPollingIntervalInMilliseconds = 0 + }; + + // Act + var result = ValidateModel(sut); + + // Assert + result.Should().Contain(e => e.ErrorMessage!.Contains("Value for MaximumPollingIntervalInMilliseconds must be lower than 1.")); + } + + [Fact] + public void Given_a_ListenerHostOptions_when_MaximumPollingIntervalInMilliseconds_is_within_range_then_the_validation_result_are_empty() + { + // Arrange + var sut = new ListenerHostOptions + { + MaximumPollingIntervalInMilliseconds = 5 + }; + + // Act + var result = ValidateModel(sut); + + // Assert + result.Should().NotContain(e => e.MemberNames!.Contains("MaximumPollingIntervalInMilliseconds")); + } + + [Fact] + public void Given_a_ListenerHostOptions_when_MinimumPollingIntervalInMilliseconds_is_higher_than_MaximumPollingIntervalInMilliseconds_then_ValidatePollingInterval_returns_false() + { + // Arrange + var sut = new ListenerHostOptions + { + MaximumPollingIntervalInMilliseconds = 5, + MinimumPollingIntervalInMilliseconds = 6 + }; + + // Act + var result = ListenerHostOptions.ValidatePollingInterval(sut); + + // Assert + result.Should().BeFalse(); + } + + [Fact] + public void Given_a_ListenerHostOptions_when_MinimumPollingIntervalInMilliseconds_is_lower_than_MaximumPollingIntervalInMilliseconds_then_ValidatePollingInterval_returns_true() + { + // Arrange + var sut = new ListenerHostOptions + { + MaximumPollingIntervalInMilliseconds = 6, + MinimumPollingIntervalInMilliseconds = 5 + }; + + // Act + var result = ListenerHostOptions.ValidatePollingInterval(sut); + + // Assert + result.Should().BeTrue(); + } + + [Fact] + public void Given_a_ListenerHostOptions_when_NewBatchThreshold_is_higher_than_BatchSize_then_ValidateNewBatchThreshold_returns_false() + { + // Arrange + var sut = new ListenerHostOptions + { + NewBatchThreshold = 6, + BatchSize = 4 + }; + + // Act + var result = ListenerHostOptions.ValidateNewBatchThreshold(sut); + + // Assert + result.Should().BeFalse(); + } + + [Fact] + public void Given_a_ListenerHostOptions_when_NewBatchThreshold_is_lower_than_BatchSize_then_ValidateNewBatchThreshold_returns_true() + { + // Arrange + var sut = new ListenerHostOptions + { + NewBatchThreshold = 5, + BatchSize = 5 + }; + + // Act + var result = ListenerHostOptions.ValidateNewBatchThreshold(sut); + + // Assert + result.Should().BeTrue(); + } + + private static IList ValidateModel(object model) + { + var validationResults = new List(); + var ctx = new ValidationContext(model, null, null); + Validator.TryValidateObject(model, ctx, validationResults, true); + return validationResults; + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/Dequeueable.AmazonSQS.UnitTests.csproj b/tests/Dequeueable.AmazonSQS.UnitTests/Dequeueable.AmazonSQS.UnitTests.csproj new file mode 100644 index 0000000..c0a38e4 --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/Dequeueable.AmazonSQS.UnitTests.csproj @@ -0,0 +1,30 @@ + + + + net6.0 + enable + enable + + false + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/Models/MessageTests.cs b/tests/Dequeueable.AmazonSQS.UnitTests/Models/MessageTests.cs new file mode 100644 index 0000000..e9e345d --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/Models/MessageTests.cs @@ -0,0 +1,38 @@ +using Dequeueable.AmazonSQS.UnitTests.TestDataBuilders; +using FluentAssertions; + +namespace Dequeueable.AmazonSQS.UnitTests.Models +{ + public class MessageTests + { + [Fact] + public void Given_a_Message_when_Attibues_has_a_MessageGroupId_then_MessageGroupId_contains_the_correct_value() + { + // Arrange + var attributes = new Dictionary { + {"MessageGroupId", "value" } + }; + var sut = new MessageTestDataBuilder().WithAttributes(attributes).Build(); + + // Act + var result = sut.MessageGroupId; + + // Assert + result.Should().Be(attributes["MessageGroupId"]); + } + + [Fact] + public void Given_a_Message_when_Attibues_has_a_no_MessageGroupId_then_MessageGroupId_is_null() + { + // Arrange + var attributes = new Dictionary { }; + var sut = new MessageTestDataBuilder().WithAttributes(attributes).Build(); + + // Act + var result = sut.MessageGroupId; + + // Assert + result.Should().BeNull(); + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/Services/Hosts/JobExecutorTests.cs b/tests/Dequeueable.AmazonSQS.UnitTests/Services/Hosts/JobExecutorTests.cs new file mode 100644 index 0000000..58d7865 --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/Services/Hosts/JobExecutorTests.cs @@ -0,0 +1,60 @@ +using Dequeueable.AmazonSQS.Models; +using Dequeueable.AmazonSQS.Services.Hosts; +using Dequeueable.AmazonSQS.Services.Queues; +using Dequeueable.AmazonSQS.UnitTests.TestDataBuilders; +using Microsoft.Extensions.Logging; +using Moq; + +namespace Dequeueable.AmazonSQS.UnitTests.Services.Hosts +{ + public class JobExecutorTests + { + [Fact] + public async Task Given_a_JobExecutor_when_HandleAsync_is_called_but_no_messages_are_retrieved_then_the_handler_is_not_called() + { + // Arrange + var queueMessageManagerMock = new Mock(MockBehavior.Strict); + var queueMessageHandlerMock = new Mock(MockBehavior.Strict); + var loggerMock = new Mock>(MockBehavior.Strict); + + queueMessageManagerMock.Setup(m => m.RetrieveMessagesAsync(It.IsAny())).ReturnsAsync(Array.Empty()); + + loggerMock.Setup( + x => x.Log( + It.Is(l => l == LogLevel.Debug), + It.IsAny(), + It.Is((v, t) => v.ToString()!.Contains("No messages found")), + null, + It.Is>((v, t) => true))); + + var sut = new JobExecutor(queueMessageManagerMock.Object, queueMessageHandlerMock.Object, loggerMock.Object); + + // Act + await sut.HandleAsync(CancellationToken.None); + + // Assert + queueMessageHandlerMock.VerifyNoOtherCalls(); + } + + [Fact] + public async Task Given_a_QueueListener_when_HandleAsync_is_called_and_messages_are_retrieved_then_the_handler_is_called_correctly() + { + // Arrange + var messages = new[] { new MessageTestDataBuilder().WithmessageId("1").Build(), new MessageTestDataBuilder().WithmessageId("2").Build() }; + var queueMessageManagerMock = new Mock(MockBehavior.Strict); + var queueMessageHandlerMock = new Mock(MockBehavior.Strict); + var loggerMock = new Mock>(MockBehavior.Strict); + + queueMessageManagerMock.Setup(m => m.RetrieveMessagesAsync(It.IsAny())).ReturnsAsync(messages); + queueMessageHandlerMock.Setup(h => h.HandleAsync(It.Is(m => messages.Any(ma => ma.MessageId == m.MessageId)), CancellationToken.None)).Returns(Task.CompletedTask); + + var sut = new JobExecutor(queueMessageManagerMock.Object, queueMessageHandlerMock.Object, loggerMock.Object); + + // Act + await sut.HandleAsync(CancellationToken.None); + + // Assert + queueMessageHandlerMock.Verify(e => e.HandleAsync(It.Is(m => messages.Any(ma => ma.MessageId == m.MessageId)), It.IsAny()), Times.Exactly(messages.Length)); + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/Services/Hosts/QueueListenerExecutorTests.cs b/tests/Dequeueable.AmazonSQS.UnitTests/Services/Hosts/QueueListenerExecutorTests.cs new file mode 100644 index 0000000..55a006c --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/Services/Hosts/QueueListenerExecutorTests.cs @@ -0,0 +1,69 @@ +using Dequeueable.AmazonSQS.Configurations; +using Dequeueable.AmazonSQS.Models; +using Dequeueable.AmazonSQS.Services.Hosts; +using Dequeueable.AmazonSQS.Services.Queues; +using Dequeueable.AmazonSQS.UnitTests.TestDataBuilders; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Moq; + +namespace Dequeueable.AmazonSQS.UnitTests.Services.Hosts +{ + public class QueueListenerExecutorExecutorTests + { + [Fact] + public async Task Given_a_QueueListenerExecutor_when_HandleAsync_is_called_but_no_messages_are_retrieved_then_the_handler_is_not_called() + { + // Arrange + var queueMessageManagerMock = new Mock(MockBehavior.Strict); + var queueMessageHandlerMock = new Mock(MockBehavior.Strict); + var options = new ListenerHostOptions { MinimumPollingIntervalInMilliseconds = 0, MaximumPollingIntervalInMilliseconds = 1, QueueUrl = "testurl" }; + var optionsMock = new Mock>(MockBehavior.Strict); + var loggerMock = new Mock>(MockBehavior.Strict); + + queueMessageManagerMock.Setup(m => m.RetrieveMessagesAsync(It.IsAny())).ReturnsAsync(Array.Empty()); + optionsMock.SetupGet(o => o.Value).Returns(options); + + loggerMock.Setup( + x => x.Log( + It.Is(l => l == LogLevel.Debug), + It.IsAny(), + It.Is((v, t) => v.ToString()!.Contains("No messages found")), + null, + It.Is>((v, t) => true))); + + var sut = new QueueListenerExecutor(queueMessageManagerMock.Object, queueMessageHandlerMock.Object, optionsMock.Object, loggerMock.Object); + + // Act + await sut.HandleAsync(CancellationToken.None); + + // Assert + queueMessageHandlerMock.VerifyNoOtherCalls(); + } + + [Fact] + public async Task Given_a_QueueListenerExecutor_when_HandleAsync_is_called_and_messages_are_retrieved_then_the_handler_is_called_correctly() + { + // Arrange + var messages = new[] { new MessageTestDataBuilder().WithmessageId("1").Build(), new MessageTestDataBuilder().WithmessageId("2").Build() }; + var queueMessageManagerMock = new Mock(MockBehavior.Strict); + var queueMessageHandlerMock = new Mock(MockBehavior.Strict); + var options = new ListenerHostOptions { MinimumPollingIntervalInMilliseconds = 0, MaximumPollingIntervalInMilliseconds = 1, QueueUrl = "testurl" }; + var optionsMock = new Mock>(MockBehavior.Strict); + var loggerMock = new Mock>(MockBehavior.Strict); + + queueMessageManagerMock.Setup(m => m.RetrieveMessagesAsync(It.IsAny())).ReturnsAsync(messages); + optionsMock.SetupGet(o => o.Value).Returns(options); + + queueMessageHandlerMock.Setup(h => h.HandleAsync(It.Is(m => messages.Any(ma => ma.MessageId == m.MessageId)), CancellationToken.None)).Returns(Task.CompletedTask); + + var sut = new QueueListenerExecutor(queueMessageManagerMock.Object, queueMessageHandlerMock.Object, optionsMock.Object, loggerMock.Object); + + // Act + await sut.HandleAsync(CancellationToken.None); + + // Assert + queueMessageHandlerMock.Verify(e => e.HandleAsync(It.Is(m => messages.Any(ma => ma.MessageId == m.MessageId)), It.IsAny()), Times.Exactly(messages.Length)); + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/Services/Queues/QueueMessageHandlerTests.cs b/tests/Dequeueable.AmazonSQS.UnitTests/Services/Queues/QueueMessageHandlerTests.cs new file mode 100644 index 0000000..971b3c4 --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/Services/Queues/QueueMessageHandlerTests.cs @@ -0,0 +1,75 @@ +using Dequeueable.AmazonSQS.Services.Queues; +using Dequeueable.AmazonSQS.UnitTests.TestDataBuilders; +using Microsoft.Extensions.Logging; +using Moq; + +namespace Dequeueable.AmazonSQS.UnitTests.Services.Queues +{ + public class QueueMessageHandlerTests + { + [Fact] + public async Task Given_a_QueueMessageHandler_when_HandleAsync_is_called_then_message_is_handled_correctly() + { + // Arrange + var message = new MessageTestDataBuilder().Build(); + var queueMessageManagerMock = new Mock(MockBehavior.Strict); + var queueMessageExecutorMock = new Mock(MockBehavior.Strict); + var loggerMock = new Mock>(MockBehavior.Strict); + + queueMessageExecutorMock.Setup(e => e.ExecuteAsync(message, It.IsAny())).Returns(Task.CompletedTask).Verifiable(); + queueMessageManagerMock.Setup(m => m.DeleteMessageAsync(message, It.IsAny())).Returns(Task.CompletedTask).Verifiable(); + + loggerMock.Setup( + x => x.Log( + It.Is(l => l == LogLevel.Information), + It.IsAny(), + It.Is((v, t) => v.ToString()!.Contains($"Executed message with id '{message.MessageId}' (Succeeded)")), + null, + It.Is>((v, t) => true))); + + var sut = new QueueMessageHandler(queueMessageManagerMock.Object, queueMessageExecutorMock.Object, loggerMock.Object); + + // Act + await sut.HandleAsync(message, CancellationToken.None); + + // Assert + queueMessageExecutorMock.Verify(); + queueMessageManagerMock.Verify(); + } + + [Fact] + public async Task Given_a_QueueMessageHandler_when_HandleAsync_is_called_but_updating_the_visibility_timeout_goes_wrong_then_it_is_handled_correctly() + { + // Arrange + var exception = new Exception("test"); + var message = new MessageTestDataBuilder().WithNextVisibileOn(DateTimeOffset.UtcNow.AddSeconds(2)).Build(); + var queueMessageManagerMock = new Mock(MockBehavior.Strict); + var queueMessageExecutorMock = new Mock(MockBehavior.Strict); + var loggerMock = new Mock>(MockBehavior.Strict); + + queueMessageExecutorMock.Setup(e => e.ExecuteAsync(message, It.IsAny())).Returns(Task.Delay(TimeSpan.FromSeconds(60))); + queueMessageManagerMock.Setup(m => m.UpdateVisibilityTimeOutAsync(message, It.IsAny())).ThrowsAsync(exception); + queueMessageManagerMock.Setup(m => m.EnqueueMessageAsync(message, It.IsAny())).Returns(Task.CompletedTask).Verifiable(); + + loggerMock.Setup( + x => x.Log( + It.Is(l => l == LogLevel.Error), + It.IsAny(), + It.Is((v, t) => v.ToString()!.Contains($"An error occurred while executing the queue message with id '{message.MessageId}'")), + It.IsAny(), + It.Is>((v, t) => true))); + + var sut = new QueueMessageHandler(queueMessageManagerMock.Object, queueMessageExecutorMock.Object, loggerMock.Object) + { + MinimalVisibilityTimeoutDelay = TimeSpan.Zero + }; + + // Act + await sut.HandleAsync(message, CancellationToken.None); + + // Assert + queueMessageExecutorMock.Verify(); + queueMessageManagerMock.Verify(); + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/Services/Queues/QueueMessageManagerTests.cs b/tests/Dequeueable.AmazonSQS.UnitTests/Services/Queues/QueueMessageManagerTests.cs new file mode 100644 index 0000000..76d8c76 --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/Services/Queues/QueueMessageManagerTests.cs @@ -0,0 +1,162 @@ +using Amazon.SQS; +using Amazon.SQS.Model; +using Dequeueable.AmazonSQS.Configurations; +using Dequeueable.AmazonSQS.Factories; +using Dequeueable.AmazonSQS.Services.Queues; +using Dequeueable.AmazonSQS.UnitTests.TestDataBuilders; +using FluentAssertions; +using Moq; + +namespace Dequeueable.AmazonSQS.UnitTests.Services.Queues +{ + public class QueueMessageManagerTests + { + [Fact] + public async Task Given_a_QueueMessageManager_when_RetrieveMessagesAsync_is_called_then_messages_are_retrieved_correctly() + { + // Arrange + var options = new HostOptions(); + var amazonSQSClientFactoryMock = new Mock(MockBehavior.Strict); + var clientFake = new Mock("TESTKEY", "TESTSECRET", Amazon.RegionEndpoint.EUCentral1); ; + var fakeResponse = new ReceiveMessageResponse + { + Messages = new List { new Message() } + }; + + clientFake.Setup(r => r.ReceiveMessageAsync(It.IsAny(), It.IsAny())).ReturnsAsync(fakeResponse); + + amazonSQSClientFactoryMock.Setup(e => e.Create()).Returns(clientFake.Object); + var sut = new QueueMessageManager(amazonSQSClientFactoryMock.Object, options); + + // Act + var messages = await sut.RetrieveMessagesAsync(CancellationToken.None); + + // Assert + messages.Should().HaveSameCount(fakeResponse.Messages); + messages.Should().AllSatisfy(m => m.NextVisibleOn.Should().BeCloseTo(DateTimeOffset.UtcNow.Add(TimeSpan.FromSeconds(options.VisibilityTimeoutInSeconds)), TimeSpan.FromMilliseconds(100))); + } + + [Fact] + public async Task Given_a_QueueMessageManager_when_DeleteMessageAsync_is_called_then_the_message_is_deleted_correctly() + { + // Arrange + var options = new HostOptions(); + var amazonSQSClientFactoryMock = new Mock(MockBehavior.Strict); + var clientFake = new Mock("TESTKEY", "TESTSECRET", Amazon.RegionEndpoint.EUCentral1); ; + var message = new MessageTestDataBuilder().Build(); + clientFake.Setup(r => r.DeleteMessageAsync(options.QueueUrl, message.ReceiptHandle, It.IsAny())).ReturnsAsync(new DeleteMessageResponse()).Verifiable(); + + amazonSQSClientFactoryMock.Setup(e => e.Create()).Returns(clientFake.Object); + var sut = new QueueMessageManager(amazonSQSClientFactoryMock.Object, options); + + // Act + await sut.DeleteMessageAsync(message, CancellationToken.None); + + // Assert + clientFake.Verify(); + } + + [Fact] + public async Task Given_a_QueueMessageManager_when_DeleteMessageAsync_is_called_when_the_queue_doesnt_exsit_then_it_is_handled_correctly() + { + // Arrange + var options = new HostOptions(); + var amazonSQSClientFactoryMock = new Mock(MockBehavior.Strict); + var clientFake = new Mock("TESTKEY", "TESTSECRET", Amazon.RegionEndpoint.EUCentral1); ; + var message = new MessageTestDataBuilder().Build(); + clientFake.Setup(r => r.DeleteMessageAsync(options.QueueUrl, message.ReceiptHandle, It.IsAny())).ThrowsAsync(new AmazonSQSException("testfail") { ErrorCode = "NonExistentQueue" }); + + amazonSQSClientFactoryMock.Setup(e => e.Create()).Returns(clientFake.Object); + var sut = new QueueMessageManager(amazonSQSClientFactoryMock.Object, options); + + // Act + await sut.DeleteMessageAsync(message, CancellationToken.None); + + // Assert + clientFake.Verify(); + } + + [Fact] + public async Task Given_a_QueueMessageManager_when_DeleteMessageAsync_is_called_when_the_message_doesnt_exsit_then_it_is_handled_correctly() + { + // Arrange + var options = new HostOptions(); + var amazonSQSClientFactoryMock = new Mock(MockBehavior.Strict); + var clientFake = new Mock("TESTKEY", "TESTSECRET", Amazon.RegionEndpoint.EUCentral1); ; + var message = new MessageTestDataBuilder().Build(); + clientFake.Setup(r => r.DeleteMessageAsync(options.QueueUrl, message.ReceiptHandle, It.IsAny())).ThrowsAsync(new AmazonSQSException("testfail") { StatusCode = System.Net.HttpStatusCode.NotFound }); + + amazonSQSClientFactoryMock.Setup(e => e.Create()).Returns(clientFake.Object); + var sut = new QueueMessageManager(amazonSQSClientFactoryMock.Object, options); + + // Act + await sut.DeleteMessageAsync(message, CancellationToken.None); + + // Assert + clientFake.Verify(); + } + + [Fact] + public async Task Given_a_QueueMessageManager_when_DeleteMessageAsync_is_called_when_a_different_exception_occures_then_it_is_thrown() + { + // Arrange + var options = new HostOptions(); + var amazonSQSClientFactoryMock = new Mock(MockBehavior.Strict); + var clientFake = new Mock("TESTKEY", "TESTSECRET", Amazon.RegionEndpoint.EUCentral1); ; + var message = new MessageTestDataBuilder().Build(); + clientFake.Setup(r => r.DeleteMessageAsync(options.QueueUrl, message.ReceiptHandle, It.IsAny())).ThrowsAsync(new AmazonSQSException("testfail") { StatusCode = System.Net.HttpStatusCode.BadGateway }); + + amazonSQSClientFactoryMock.Setup(e => e.Create()).Returns(clientFake.Object); + var sut = new QueueMessageManager(amazonSQSClientFactoryMock.Object, options); + + // Act + Func act = () => sut.DeleteMessageAsync(message, CancellationToken.None); + + // Assert + await act.Should().ThrowExactlyAsync(); + } + + [Fact] + public async Task Given_a_QueueMessageManager_when_UpdateVisibilityTimeOutAsync_is_called_then_messages_are_retrieved_correctly() + { + // Arrange + var options = new HostOptions(); + var amazonSQSClientFactoryMock = new Mock(MockBehavior.Strict); + var clientFake = new Mock("TESTKEY", "TESTSECRET", Amazon.RegionEndpoint.EUCentral1); ; + var message = new MessageTestDataBuilder().Build(); + + clientFake.Setup(r => r.ChangeMessageVisibilityAsync(It.Is(o => o.VisibilityTimeout == options.VisibilityTimeoutInSeconds), It.IsAny())).ReturnsAsync(new ChangeMessageVisibilityResponse()).Verifiable(); + + amazonSQSClientFactoryMock.Setup(e => e.Create()).Returns(clientFake.Object); + var sut = new QueueMessageManager(amazonSQSClientFactoryMock.Object, options); + + // Act + var nextVisbileOn = await sut.UpdateVisibilityTimeOutAsync(message, CancellationToken.None); + + // Assert + clientFake.Verify(); + nextVisbileOn.Should().BeCloseTo(DateTimeOffset.UtcNow.Add(TimeSpan.FromSeconds(options.VisibilityTimeoutInSeconds)), TimeSpan.FromMilliseconds(100)); + } + + [Fact] + public async Task Given_a_QueueMessageManager_when_EnqueueMessageAsync_is_called_then_messages_are_retrieved_correctly() + { + // Arrange + var options = new HostOptions(); + var amazonSQSClientFactoryMock = new Mock(MockBehavior.Strict); + var clientFake = new Mock("TESTKEY", "TESTSECRET", Amazon.RegionEndpoint.EUCentral1); ; + var message = new MessageTestDataBuilder().Build(); + + clientFake.Setup(r => r.ChangeMessageVisibilityAsync(It.Is(o => o.VisibilityTimeout == 0), It.IsAny())).ReturnsAsync(new ChangeMessageVisibilityResponse()).Verifiable(); + + amazonSQSClientFactoryMock.Setup(e => e.Create()).Returns(clientFake.Object); + var sut = new QueueMessageManager(amazonSQSClientFactoryMock.Object, options); + + // Act + await sut.EnqueueMessageAsync(message, CancellationToken.None); + + // Assert + clientFake.Verify(); + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/Services/Timers/LinearDelayStrategyTests.cs b/tests/Dequeueable.AmazonSQS.UnitTests/Services/Timers/LinearDelayStrategyTests.cs new file mode 100644 index 0000000..473a973 --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/Services/Timers/LinearDelayStrategyTests.cs @@ -0,0 +1,57 @@ +using Dequeueable.AzureQueueStorage.Services.Timers; +using FluentAssertions; + +namespace Dequeueable.AmazonSQS.UnitTests.Services.Timers +{ + public class LinearDelayStrategyTests + { + [Fact] + public void Given_a_LinearDelayStrategy_when_GetNextDelay_is_called_with_executionSucceeded_false_then_the_MinimalRenewalDelay_is_returned() + { + // Arrange + var executionSucceeded = false; + var minimalRenewalDelay = TimeSpan.FromSeconds(1); + + var sut = new LinearDelayStrategy(minimalRenewalDelay); + + // Act + var delay = sut.GetNextDelay(executionSucceeded: executionSucceeded); + + // Assert + delay.Should().Be(minimalRenewalDelay); + } + + [Fact] + public void Given_a_LinearDelayStrategy_when_GetNextDelay_is_called_with_nextVisibleOn_null_then_the_MinimalRenewalDelay_is_returned() + { + // Arrange + var minimalRenewalDelay = TimeSpan.FromSeconds(1); + + var sut = new LinearDelayStrategy(minimalRenewalDelay); + + // Act + var delay = sut.GetNextDelay(); + + // Assert + delay.Should().Be(minimalRenewalDelay); + } + + [Fact] + public void Given_a_LinearDelayStrategy_when_GetNextDelay_is_called_with_a_positive_nextVisibleOn_then_the_MinimalRenewalDelay_is_returned() + { + // Arrange + var minimalRenewalDelay = TimeSpan.FromSeconds(1); + + var sut = new LinearDelayStrategy(minimalRenewalDelay) + { + Divisor = 2 + }; + + // Act + var delay = sut.GetNextDelay(nextVisibleOn: DateTimeOffset.UtcNow.Add(TimeSpan.FromSeconds(60))); + + // Assert + delay.Should().BeCloseTo(TimeSpan.FromSeconds(30), TimeSpan.FromMilliseconds(6)); + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/Services/Timers/RandomizedExponentialDelayStrategyTests.cs b/tests/Dequeueable.AmazonSQS.UnitTests/Services/Timers/RandomizedExponentialDelayStrategyTests.cs new file mode 100644 index 0000000..9db25ae --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/Services/Timers/RandomizedExponentialDelayStrategyTests.cs @@ -0,0 +1,104 @@ +using Dequeueable.AmazonSQS.Services.Timers; +using FluentAssertions; + +namespace Dequeueable.AmazonSQS.UnitTests.Services.Timers +{ + public class RandomizedExponentialDelayStrategyTests + { + [Fact] + public void Given_a_RandomizedExponentialDelayStrategy_when_constructed_with_a_minimumInterval_lower_than_zero_then_an_ArgumentOutOfRangeException_is_thrown() + { + // Arrange + var minimumPollingInterval = TimeSpan.FromMilliseconds(-1); + var maximumPollingInterval = TimeSpan.FromMilliseconds(2); + + // Act + Action act = () => { var _ = new RandomizedExponentialDelayStrategy(minimumPollingInterval, maximumPollingInterval); }; + + // Assert + act.Should().ThrowExactly(); + } + + [Fact] + public void Given_a_RandomizedExponentialDelayStrategy_when_constructed_with_a_minimumInterval_higer_than_the_maximumInterval_then_an_ArgumentException_is_thrown() + { + // Arrange + var minimumPollingInterval = TimeSpan.FromMilliseconds(2); + var maximumPollingInterval = TimeSpan.FromMilliseconds(1); + + // Act + Action act = () => { var _ = new RandomizedExponentialDelayStrategy(minimumPollingInterval, maximumPollingInterval); }; + + // Assert + act.Should().ThrowExactly(); + } + + [Fact] + public void Given_a_RandomizedExponentialDelayStrategy_when_constructed_with_a_maximumInterval_lower_than_zero_then_an_ArgumentOutOfRangeException_is_thrown() + { + // Arrange + var minimumPollingInterval = TimeSpan.FromMilliseconds(1); + var maximumPollingInterval = TimeSpan.FromMilliseconds(-2); + + // Act + Action act = () => { var _ = new RandomizedExponentialDelayStrategy(minimumPollingInterval, maximumPollingInterval); }; + + // Assert + act.Should().ThrowExactly(); + } + + [Fact] + public void Given_a_RandomizedExponentialDelayStrategy_when_constructed_with_a_maximumInterval_lower_than_the_minimumInterval_then_an_ArgumentException_is_thrown() + { + // Arrange + var minimumPollingInterval = TimeSpan.FromMilliseconds(2); + var maximumPollingInterval = TimeSpan.FromMilliseconds(1); + + // Act + Action act = () => { var _ = new RandomizedExponentialDelayStrategy(minimumPollingInterval, maximumPollingInterval); }; + + // Assert + act.Should().ThrowExactly(); + } + + [Fact] + public void Given_a_RandomizedExponentialDelayStrategy_when_GetNextDelay_is_called_with_executionSucceeded_true_then_the_correct_result_TimeSpan_is_returned() + { + // Arrange + var executionSucceeded = true; + var minimumPollingInterval = TimeSpan.FromMilliseconds(1); + var maximumPollingInterval = TimeSpan.FromMilliseconds(2); + + var sut = new RandomizedExponentialDelayStrategy(minimumPollingInterval, maximumPollingInterval); + + // Act + var actual = sut.GetNextDelay(executionSucceeded: executionSucceeded); + + // Assert + actual.Should().Be(minimumPollingInterval); + } + + [Fact] + public void Given_a_RandomizedExponentialBackoffStrategy_when_NextDelay_is_called_multiple_times_with_executionSucceeded_false_then_the_TimeSpan_increment_correctly() + { + // Arrange + var executionSucceeded = false; + var minimumPollingInterval = TimeSpan.FromMilliseconds(1); + var maximumPollingInterval = TimeSpan.FromMilliseconds(500); + + var sut = new RandomizedExponentialDelayStrategy(minimumPollingInterval, maximumPollingInterval); + + // Act & Assert + var currentInterval = TimeSpan.Zero; + while (currentInterval != maximumPollingInterval) + { + var actual = sut.GetNextDelay(executionSucceeded: executionSucceeded); + actual.Should().BeGreaterThan(currentInterval); + + currentInterval = actual; + } + + currentInterval.Should().Be(maximumPollingInterval); + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/TestDataBuilders/MessageTestDataBuilder.cs b/tests/Dequeueable.AmazonSQS.UnitTests/TestDataBuilders/MessageTestDataBuilder.cs new file mode 100644 index 0000000..0b00395 --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/TestDataBuilders/MessageTestDataBuilder.cs @@ -0,0 +1,48 @@ +using Dequeueable.AmazonSQS.Models; + +namespace Dequeueable.AmazonSQS.UnitTests.TestDataBuilders +{ + public class MessageTestDataBuilder + { + private string _messageId = "some id"; + private readonly string _receiptHandle = "some pop"; + private DateTimeOffset _nextVisibileOn = DateTimeOffset.UtcNow.AddMinutes(1); + private BinaryData _body = BinaryData.FromString("test body"); + private Dictionary _attributes = new(); + + public Message Build() + { + return new Message(_messageId, _receiptHandle, _nextVisibileOn, _body, _attributes); + } + + public MessageTestDataBuilder WithmessageId(string messageId) + { + _messageId = messageId; + return this; + } + + public MessageTestDataBuilder WithNextVisibileOn(DateTimeOffset nextVisibileOn) + { + _nextVisibileOn = nextVisibileOn; + return this; + } + + public MessageTestDataBuilder WithBody(string body) + { + _body = BinaryData.FromString(body); + return this; + } + + public MessageTestDataBuilder WithBody(BinaryData body) + { + _body = body; + return this; + } + + public MessageTestDataBuilder WithAttributes(Dictionary attributes) + { + _attributes = attributes; + return this; + } + } +} diff --git a/tests/Dequeueable.AmazonSQS.UnitTests/Usings.cs b/tests/Dequeueable.AmazonSQS.UnitTests/Usings.cs new file mode 100644 index 0000000..8c927eb --- /dev/null +++ b/tests/Dequeueable.AmazonSQS.UnitTests/Usings.cs @@ -0,0 +1 @@ +global using Xunit; \ No newline at end of file