Skip to content

Commit

Permalink
Handle concurrency issues for azure integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Lennart ten Wolde authored and Lennart ten Wolde committed Apr 28, 2023
1 parent 601897e commit ebd7af5
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,35 @@

namespace Dequeueable.AzureQueueStorage.IntegrationTests.Functions
{
public class JobTests : IClassFixture<AzuriteFixture>
public class JobTests : IClassFixture<AzuriteFixture>, IAsyncLifetime
{
private readonly QueueClientOptions _queueClientOptions = new() { MessageEncoding = QueueMessageEncoding.Base64 };
private readonly AzuriteFixture _azuriteFixture;
private readonly string _queueName;
private readonly QueueClient _queueClient;

public JobTests(AzuriteFixture azuriteFixture)
{
_azuriteFixture = azuriteFixture;
_queueName = "jobqueue";
_queueClient = new QueueClient(_azuriteFixture.ConnectionString, _queueName, _queueClientOptions);
}

public Task InitializeAsync()
{
return _queueClient.CreateAsync();
}

public Task DisposeAsync()
{
return _queueClient.DeleteAsync();
}

[Fact]
public async Task Given_a_Queue_when_is_has_two_messages_then_they_are_handled_correctly()
{
// Arrange
var queueName = "testqueue1";
var queueClient = new QueueClient(_azuriteFixture.ConnectionString, queueName, _queueClientOptions);

var factory = new JobHostFactory<TestFunction>(_azuriteFixture.ConnectionString, queueName);
var factory = new JobHostFactory<TestFunction>(_azuriteFixture.ConnectionString, _queueName);

var fakeServiceMock = new Mock<IFakeService>();

Expand All @@ -37,19 +48,17 @@ public async Task Given_a_Queue_when_is_has_two_messages_then_they_are_handled_c
services.AddTransient(_ => fakeServiceMock.Object);
});

await queueClient.CreateAsync();

foreach (var message in messages)
{
await queueClient.SendMessageAsync(message);
await _queueClient.SendMessageAsync(message);
}

// Act
var host = factory.Build();
await host.HandleAsync(CancellationToken.None);

// Assert
var peekedMessage = await queueClient.PeekMessageAsync();
var peekedMessage = await _queueClient.PeekMessageAsync();
peekedMessage.Value.Should().BeNull();

foreach (var message in messages)
Expand All @@ -62,10 +71,7 @@ public async Task Given_a_Queue_when_is_has_two_messages_then_they_are_handled_c
public async Task Given_a_QueueMessage_with_DequeueCount_1_when_an_error_occurred_while_executing_the_function_and_the_MaxDequeueCount_is_not_yet_reached_then_the_message_is_enqueued_correctly()
{
// Arrange
var queueName = "testqueue2";
var queueClient = new QueueClient(_azuriteFixture.ConnectionString, queueName, _queueClientOptions);

var factory = new JobHostFactory<TestFunction>(_azuriteFixture.ConnectionString, queueName, options =>
var factory = new JobHostFactory<TestFunction>(_azuriteFixture.ConnectionString, _queueName, options =>
{
options.MaxDequeueCount = 5;
});
Expand All @@ -79,15 +85,14 @@ public async Task Given_a_QueueMessage_with_DequeueCount_1_when_an_error_occurre
});

var message = "message1";
await queueClient.CreateAsync();
await queueClient.SendMessageAsync(message);
await _queueClient.SendMessageAsync(message);

// Act
var host = factory.Build();
await host.HandleAsync(CancellationToken.None);

// Assert
var peekedMessage = await queueClient.PeekMessageAsync();
var peekedMessage = await _queueClient.PeekMessageAsync();
peekedMessage.Value.Should().NotBeNull();
peekedMessage.Value.Body.ToString().Should().Be(message);
peekedMessage.Value.DequeueCount.Should().Be(1);
Expand All @@ -97,11 +102,9 @@ public async Task Given_a_QueueMessage_with_DequeueCount_1_when_an_error_occurre
public async Task Given_a_QueueMessage_with_DequeueCount_1_when_an_error_occurred_while_executing_the_function_and_the_MaxDequeueCount_is_reached_then_the_message_is_moved_to_the_poisen_queue()
{
// Arrange
var queueName = "testqueue3";
var queueClient = new QueueClient(_azuriteFixture.ConnectionString, queueName, _queueClientOptions);
var poisenQueueSuffix = "poison";

var factory = new JobHostFactory<TestFunction>(_azuriteFixture.ConnectionString, queueName, options =>
var factory = new JobHostFactory<TestFunction>(_azuriteFixture.ConnectionString, _queueName, options =>
{
options.MaxDequeueCount = 1;
options.PoisonQueueSuffix = poisenQueueSuffix;
Expand All @@ -116,18 +119,17 @@ public async Task Given_a_QueueMessage_with_DequeueCount_1_when_an_error_occurre
});

var message = "message1";
await queueClient.CreateAsync();
await queueClient.SendMessageAsync(message);
await _queueClient.SendMessageAsync(message);

// Act
var host = factory.Build();
await host.HandleAsync(CancellationToken.None);

// Assert
var peekedMessage = await queueClient.PeekMessageAsync();
var peekedMessage = await _queueClient.PeekMessageAsync();
peekedMessage.Value.Should().BeNull();

var poisenQueueClient = new QueueClient(_azuriteFixture.ConnectionString, $"{queueName}-{poisenQueueSuffix}", _queueClientOptions);
var poisenQueueClient = new QueueClient(_azuriteFixture.ConnectionString, $"{_queueName}-{poisenQueueSuffix}", _queueClientOptions);

var peekedPoisonQueueMessage = await poisenQueueClient.PeekMessageAsync();
peekedPoisonQueueMessage.Value.Should().NotBeNull();
Expand All @@ -138,9 +140,8 @@ public async Task Given_a_QueueMessage_with_DequeueCount_1_when_an_error_occurre
public async Task Given_a_Function_with_a_singleton_attribute_when_a_queue_has_two_messages_then_they_are_handled_correctly()
{
// Arrange
var queueName = "singletontestsqueue";
var scope = "testscope";
var factory = new JobHostFactory<SingletonFunction>(_azuriteFixture.ConnectionString, queueName);
var factory = new JobHostFactory<SingletonFunction>(_azuriteFixture.ConnectionString, _queueName);

var fakeServiceMock = new Mock<IFakeService>();

Expand All @@ -149,15 +150,12 @@ public async Task Given_a_Function_with_a_singleton_attribute_when_a_queue_has_t
services.AddTransient(_ => fakeServiceMock.Object);
});

var queueClient = new QueueClient(_azuriteFixture.ConnectionString, queueName, _queueClientOptions);
var blobContainerClient = new BlobContainerClient(_azuriteFixture.ConnectionString, SingletonFunction.ContainerName);

await queueClient.CreateAsync();

var messages = new[] { new { Id = scope }, new { Id = scope } };
foreach (var message in messages)
{
await queueClient.SendMessageAsync(BinaryData.FromObjectAsJson(message));
await _queueClient.SendMessageAsync(BinaryData.FromObjectAsJson(message));
}

// Act
Expand All @@ -166,7 +164,7 @@ public async Task Given_a_Function_with_a_singleton_attribute_when_a_queue_has_t

// Assert
fakeServiceMock.Verify(f => f.Execute(It.IsAny<Message>()), Times.Exactly(messages.Length));
var peekedMessage = await queueClient.PeekMessageAsync();
var peekedMessage = await _queueClient.PeekMessageAsync();
peekedMessage.Value.Should().BeNull();

var blobclient = blobContainerClient.GetBlobClient(scope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,35 @@

namespace Dequeueable.AzureQueueStorage.IntegrationTests.Functions
{
public class ListenerTests : IClassFixture<AzuriteFixture>
public class ListenerTests : IClassFixture<AzuriteFixture>, IAsyncLifetime
{
private readonly QueueClientOptions _queueClientOptions = new() { MessageEncoding = QueueMessageEncoding.Base64 };
private readonly AzuriteFixture _azuriteFixture;
private readonly string _queueName;
private readonly QueueClient _queueClient;

public ListenerTests(AzuriteFixture azuriteFixture)
{
_azuriteFixture = azuriteFixture;
_queueName = "jobqueue";
_queueClient = new QueueClient(_azuriteFixture.ConnectionString, _queueName, _queueClientOptions);
}

public Task InitializeAsync()
{
return _queueClient.CreateAsync();
}

public Task DisposeAsync()
{
return _queueClient.DeleteAsync();
}

[Fact]
public async Task Given_a_Queue_when_is_has_two_messages_then_they_are_handled_correctly()
{
// Arrange
var queueName = "testqueue1";
var queueClient = new QueueClient(_azuriteFixture.ConnectionString, queueName, _queueClientOptions);

var factory = new ListenerHostFactory<TestFunction>(_azuriteFixture.ConnectionString, queueName);
var factory = new ListenerHostFactory<TestFunction>(_azuriteFixture.ConnectionString, _queueName);

var fakeServiceMock = new Mock<IFakeService>();

Expand All @@ -37,19 +48,17 @@ public async Task Given_a_Queue_when_is_has_two_messages_then_they_are_handled_c
services.AddTransient(_ => fakeServiceMock.Object);
});

await queueClient.CreateAsync();

foreach (var message in messages)
{
await queueClient.SendMessageAsync(message);
await _queueClient.SendMessageAsync(message);
}

// Act
var host = factory.Build();
await host.HandleAsync(CancellationToken.None);

// Assert
var peekedMessage = await queueClient.PeekMessageAsync();
var peekedMessage = await _queueClient.PeekMessageAsync();
peekedMessage.Value.Should().BeNull();

foreach (var message in messages)
Expand All @@ -62,10 +71,7 @@ public async Task Given_a_Queue_when_is_has_two_messages_then_they_are_handled_c
public async Task Given_a_QueueMessage_with_DequeueCount_1_when_an_error_occurred_while_executing_the_function_and_the_MaxDequeueCount_is_not_yet_reached_then_the_message_is_enqueued_correctly()
{
// Arrange
var queueName = "testqueue2";
var queueClient = new QueueClient(_azuriteFixture.ConnectionString, queueName, _queueClientOptions);

var factory = new ListenerHostFactory<TestFunction>(_azuriteFixture.ConnectionString, queueName, options =>
var factory = new ListenerHostFactory<TestFunction>(_azuriteFixture.ConnectionString, _queueName, options =>
{
options.MaxDequeueCount = 5;
});
Expand All @@ -79,15 +85,14 @@ public async Task Given_a_QueueMessage_with_DequeueCount_1_when_an_error_occurre
});

var message = "message1";
await queueClient.CreateAsync();
await queueClient.SendMessageAsync(message);
await _queueClient.SendMessageAsync(message);

// Act
var host = factory.Build();
await host.HandleAsync(CancellationToken.None);

// Assert
var peekedMessage = await queueClient.PeekMessageAsync();
var peekedMessage = await _queueClient.PeekMessageAsync();
peekedMessage.Value.Should().NotBeNull();
peekedMessage.Value.Body.ToString().Should().Be(message);
peekedMessage.Value.DequeueCount.Should().Be(1);
Expand All @@ -97,11 +102,9 @@ public async Task Given_a_QueueMessage_with_DequeueCount_1_when_an_error_occurre
public async Task Given_a_QueueMessage_with_DequeueCount_1_when_an_error_occurred_while_executing_the_function_and_the_MaxDequeueCount_is_reached_then_the_message_is_moved_to_the_poisen_queue()
{
// Arrange
var queueName = "testqueue3";
var queueClient = new QueueClient(_azuriteFixture.ConnectionString, queueName, _queueClientOptions);
var poisenQueueSuffix = "poison";

var factory = new ListenerHostFactory<TestFunction>(_azuriteFixture.ConnectionString, queueName, options =>
var factory = new ListenerHostFactory<TestFunction>(_azuriteFixture.ConnectionString, _queueName, options =>
{
options.MaxDequeueCount = 1;
options.PoisonQueueSuffix = poisenQueueSuffix;
Expand All @@ -116,18 +119,17 @@ public async Task Given_a_QueueMessage_with_DequeueCount_1_when_an_error_occurre
});

var message = "message1";
await queueClient.CreateAsync();
await queueClient.SendMessageAsync(message);
await _queueClient.SendMessageAsync(message);

// Act
var host = factory.Build();
await host.HandleAsync(CancellationToken.None);

// Assert
var peekedMessage = await queueClient.PeekMessageAsync();
var peekedMessage = await _queueClient.PeekMessageAsync();
peekedMessage.Value.Should().BeNull();

var poisenQueueClient = new QueueClient(_azuriteFixture.ConnectionString, $"{queueName}-{poisenQueueSuffix}", _queueClientOptions);
var poisenQueueClient = new QueueClient(_azuriteFixture.ConnectionString, $"{_queueName}-{poisenQueueSuffix}", _queueClientOptions);

var peekedPoisonQueueMessage = await poisenQueueClient.PeekMessageAsync();
peekedPoisonQueueMessage.Value.Should().NotBeNull();
Expand All @@ -138,9 +140,8 @@ public async Task Given_a_QueueMessage_with_DequeueCount_1_when_an_error_occurre
public async Task Given_a_Function_with_a_singleton_attribute_when_a_queue_has_two_messages_then_they_are_handled_correctly()
{
// Arrange
var queueName = "scopedsingletontestsqueue";
var scope = "testscope";
var factory = new ListenerHostFactory<SingletonFunction>(_azuriteFixture.ConnectionString, queueName);
var factory = new ListenerHostFactory<SingletonFunction>(_azuriteFixture.ConnectionString, _queueName);

var fakeServiceMock = new Mock<IFakeService>();

Expand All @@ -149,15 +150,12 @@ public async Task Given_a_Function_with_a_singleton_attribute_when_a_queue_has_t
services.AddTransient(_ => fakeServiceMock.Object);
});

var queueClient = new QueueClient(_azuriteFixture.ConnectionString, queueName, _queueClientOptions);
var blobContainerClient = new BlobContainerClient(_azuriteFixture.ConnectionString, SingletonFunction.ContainerName);

await queueClient.CreateAsync();

var messages = new[] { new { Id = scope }, new { Id = scope } };
foreach (var message in messages)
{
await queueClient.SendMessageAsync(BinaryData.FromObjectAsJson(message));
await _queueClient.SendMessageAsync(BinaryData.FromObjectAsJson(message));
}

// Act
Expand All @@ -166,7 +164,7 @@ public async Task Given_a_Function_with_a_singleton_attribute_when_a_queue_has_t

// Assert
fakeServiceMock.Verify(f => f.Execute(It.IsAny<Message>()), Times.Exactly(messages.Length));
var peekedMessage = await queueClient.PeekMessageAsync();
var peekedMessage = await _queueClient.PeekMessageAsync();
peekedMessage.Value.Should().BeNull();

var blobclient = blobContainerClient.GetBlobClient(scope);
Expand Down

0 comments on commit ebd7af5

Please sign in to comment.