Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for CloudEvent schema #117

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
![GitHub all releases](https://img.shields.io/github/downloads/pmcilreavy/AzureEventGridSimulator/total)
![Docker Pulls](https://img.shields.io/docker/pulls/pmcilreavy/azureeventgridsimulator)

A simulator that provides endpoints to mimic the functionality of [Azure Event Grid](https://azure.microsoft.com/en-au/services/event-grid/) topics and subscribers and is compatible with the `Microsoft.Azure.EventGrid` client library. NOTE: Currently only the `EventGrid` event schema is supported. Support for the `CloudEvent` schema may be added at a future date.
A simulator that provides endpoints to mimic the functionality of [Azure Event Grid](https://azure.microsoft.com/en-au/services/event-grid/) topics and subscribers and is compatible with the `Microsoft.Azure.EventGrid` client library. NOTE: Currently only the `EventGrid` event schema is supported.

## Configuration

Expand Down Expand Up @@ -51,7 +51,9 @@ An example of one topic with one subscriber is shown below.

#### Subscription Validation

When a subscription is added to Azure Event Grid it first sends a validation event to the subscription endpoint. The validation event contains a `validationCode` which the subscription endpoint must echo back. If this does not occur then Azure Event Grid will not enable the subscription.
When a subscription is added to Azure Event Grid it first sends a validation event to the subscription endpoint. The validation event contains a `validationCode` which the subscription endpoint must echo back. If this does not occur then Azure Event Grid will not enable the subscription.

Validation is not supported for the cloudevent schema.

More information about subscription validation can be found at [https://docs.microsoft.com/en-us/azure/event-grid/webhook-event-delivery](https://docs.microsoft.com/en-us/azure/event-grid/webhook-event-delivery).

Expand Down Expand Up @@ -163,17 +165,22 @@ docker-compose up --build `
--detach
```

## Using the Simulator
## Using the Simulator

Once configured and running, requests are `posted` to a topic endpoint. There are two endpoints, one for each supported schemas.

EventGridEvent (Default) : The endpoint of a topic will be in the form: `https://localhost:<configured-port>/api/events?api-version=2018-01-01`.
CloudEvent : The endpoint of a topic will be in the form: `https://localhost:<configured-port>/api/events/cloudevent?api-version=2018-01-01`.


Once configured and running, requests are `posted` to a topic endpoint. The endpoint of a topic will be in the form: `https://localhost:<configured-port>/api/events?api-version=2018-01-01`.

#### cURL Example

```bash
curl -k -H "Content-Type: application/json" -H "aeg-sas-key: TheLocal+DevelopmentKey=" -X POST "https://localhost:60101/api/events?api-version=2018-01-01" -d @Data.json
curl -k -H "Content-Type: application/json" -H "aeg-sas-key: TheLocal+DevelopmentKey=" -X POST "https://localhost:60101/api/events?api-version=2018-01-01" -d @Event_Grid_Data.json
```

_Data.json_
Event_Grid_Data.json_

```json
[
Expand All @@ -188,7 +195,23 @@ _Data.json_
"dataVersion": "1"
}
]

```
_CloudEvent_Data.json_

```json
[{
"Data": "some data",
"Id": "8727823",
"Source": "https://awesomesource.com/somestuff",
"Type": "Example.DataType",
"Time": "2022-09-06T15:15:36.927736+00:00",
"DataSchema": "https://awesomeschema.com/someuri",
"DataContentType": "application/json",
"Subject": "/example/subject"
}
```


#### Postman

Expand Down Expand Up @@ -262,7 +285,6 @@ It posts the payload to https://host:port and drops the query uri. All of the ex

Some features that could be added if there was a need for them: -

- `CloudEvent` schema support.
- Subscriber retries & dead lettering. https://docs.microsoft.com/en-us/azure/event-grid/delivery-and-retry
- Certificate configuration in `appsettings.json`.
- Subscriber token auth
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
using System;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Messaging;
using Azure.Messaging.EventGrid;
using Shouldly;
using Xunit;
using EventGridEvent = Azure.Messaging.EventGrid.EventGridEvent;

namespace AzureEventGridSimulator.Tests.ActualSimulatorTests;

Expand All @@ -19,7 +22,9 @@ namespace AzureEventGridSimulator.Tests.ActualSimulatorTests;
public class AzureMessagingEventGridTest
{
// ReSharper disable once NotAccessedField.Local
private readonly ActualSimulatorFixture _actualSimulatorFixture;
private readonly ActualSimulatorFixture _actualSimulatorFixture;

private BinaryData _data = new BinaryData(Encoding.UTF8.GetBytes("##This is treated as binary data##"));

public AzureMessagingEventGridTest(ActualSimulatorFixture actualSimulatorFixture)
{
Expand All @@ -40,6 +45,23 @@ public async Task GivenValidEvent_WhenUriContainsNonStandardPort_ThenItShouldBeA
response.Status.ShouldBe((int)HttpStatusCode.OK);
}


[Fact]
public async Task GivenValidCloudEvent_WhenUriContainsNonStandardPort_ThenItShouldBeAccepted()
{
var client = new EventGridPublisherClient(
new Uri("https://localhost:60101/api/events/cloudevent"),
new AzureKeyCredential("TheLocal+DevelopmentKey="),
new EventGridPublisherClientOptions
{ Retry = { Mode = RetryMode.Fixed, MaxRetries = 0, NetworkTimeout = TimeSpan.FromSeconds(5) } });

var myEvent = new CloudEvent("https://awesomesource.com/somestuff", "The.Event.Type", _data, "application/json");

var response = await client.SendEventAsync(myEvent);

response.Status.ShouldBe((int)HttpStatusCode.OK);
}

[Fact]
public async Task GivenValidEvents_WhenUriContainsNonStandardPort_TheyShouldBeAccepted()
{
Expand All @@ -60,6 +82,26 @@ public async Task GivenValidEvents_WhenUriContainsNonStandardPort_TheyShouldBeAc
response.Status.ShouldBe((int)HttpStatusCode.OK);
}

[Fact]
public async Task GivenValidCloudEvents_WhenUriContainsNonStandardPort_TheyShouldBeAccepted()
{
var client = new EventGridPublisherClient(
new Uri("https://localhost:60101/api/events/cloudevent"),
new AzureKeyCredential("TheLocal+DevelopmentKey="),
new EventGridPublisherClientOptions
{ Retry = { Mode = RetryMode.Fixed, MaxRetries = 0, NetworkTimeout = TimeSpan.FromSeconds(5) } });

var events = new[]
{
new CloudEvent("https://awesomesource.com/somestuff1", "The.Event.Type1", _data, "application/json"),
new CloudEvent("https://awesomesource.com/somestuff2", "The.Event.Type2", _data, "application/json")
};

var response = await client.SendEventsAsync(events);

response.Status.ShouldBe((int)HttpStatusCode.OK);
}

[Fact]
public async Task GivenValidEvent_WhenUriContainsNonExistentPort_ThenItShouldNotBeAccepted()
{
Expand Down
37 changes: 37 additions & 0 deletions src/AzureEventGridSimulator.Tests/IntegrationTests/BasicTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,41 @@ public async Task GivenAValidEvent_WhenPublished_ThenItShouldBeAccepted()
response.EnsureSuccessStatusCode();
response.StatusCode.ShouldBe(HttpStatusCode.OK);
}

[Fact]
public async Task GivenAValidCloudEvent_WhenPublished_ThenItShouldBeAccepted()
{
// Arrange
var client = _factory.CreateClient(new WebApplicationFactoryClientOptions
{
BaseAddress = new Uri("https://localhost:60101")
});

client.DefaultRequestHeaders.Add(Constants.AegSasKeyHeader, "TheLocal+DevelopmentKey=");
client.DefaultRequestHeaders.Add(Constants.AegEventTypeHeader, Constants.NotificationEventType);

var data = new BinaryData(Encoding.UTF8.GetBytes("##This is treated as binary data##"));

var testEvent = new Domain.Entities.CloudEventGridEvent()
{
Data_Base64 = Convert.ToBase64String(data),
Id = "1232",
Source = "https://awesomesource.com/somestuff",
Type = "The.Event.Type",
Time = DateTimeOffset.UtcNow,
DataSchema = "https://awesomeschema.com/someuri",
DataContentType = "application/json",
Subject = "/the/subject",
};

var json = JsonConvert.SerializeObject(new[] { testEvent }, Formatting.Indented);

// Act
var jsonContent = new StringContent(json, Encoding.UTF8, "application/json");
var response = await client.PostAsync("/api/events/cloudevent", jsonContent);

// Assert
response.EnsureSuccessStatusCode();
response.StatusCode.ShouldBe(HttpStatusCode.OK);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Linq;
using System.Linq;
using System.Threading.Tasks;
using AzureEventGridSimulator.Domain;
using AzureEventGridSimulator.Domain.Commands;
Expand Down Expand Up @@ -30,10 +30,25 @@ public class NotificationController : ControllerBase
public async Task<IActionResult> Post()
{
var topicSettingsForCurrentRequestPort = _simulatorSettings.Topics.First(t => t.Port == HttpContext.Request.Host.Port);

var eventsFromCurrentRequestBody = JsonConvert.DeserializeObject<EventGridEvent[]>(await HttpContext.RequestBody());

await _mediator.Send(new SendNotificationEventsToSubscriberCommand(eventsFromCurrentRequestBody, topicSettingsForCurrentRequestPort));

return Ok();
}


[Route("cloudevent")]
[HttpPost]
public async Task<IActionResult> PostCloudEvent()
{
var topicSettingsForCurrentRequestPort = _simulatorSettings.Topics.First(t => t.Port == HttpContext.Request.Host.Port);

var eventsFromCurrentRequestBody = JsonConvert.DeserializeObject<CloudEventGridEvent[]>(await HttpContext.RequestBody());

await _mediator.Send(new SendNotificationCloudEventsToSubscriberCommand(eventsFromCurrentRequestBody, topicSettingsForCurrentRequestPort));

return Ok();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using AzureEventGridSimulator.Domain.Entities;
using AzureEventGridSimulator.Infrastructure.Settings;
using MediatR;

namespace AzureEventGridSimulator.Domain.Commands;

public class SendNotificationEventsToSubscriberCommand : IRequest
{
public SendNotificationEventsToSubscriberCommand(EventGridEvent[] events, TopicSettings topic)
{
Events = events;
Topic = topic;
}

public TopicSettings Topic { get; }

public EventGridEvent[] Events { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using AzureEventGridSimulator.Domain.Entities;
using AzureEventGridSimulator.Infrastructure.Extensions;
using AzureEventGridSimulator.Infrastructure.Settings;
using MediatR;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace AzureEventGridSimulator.Domain.Commands;

// ReSharper disable once UnusedMember.Global
public class SendNotificationCloudEventsToSubscriberCommandHandler : AsyncRequestHandler<SendNotificationCloudEventsToSubscriberCommand>
{
private readonly IHttpClientFactory _httpClientFactory;
private readonly ILogger<SendNotificationCloudEventsToSubscriberCommand> _logger;

public SendNotificationCloudEventsToSubscriberCommandHandler(IHttpClientFactory httpClientFactory, ILogger<SendNotificationCloudEventsToSubscriberCommand> logger)
{
_httpClientFactory = httpClientFactory;
_logger = logger;
}

protected override Task Handle(SendNotificationCloudEventsToSubscriberCommand request, CancellationToken cancellationToken)
{
_logger.LogInformation("{EventCount} event(s) received on topic '{TopicName}'", request.Events.Length, request.Topic.Name);

if (!request.Topic.Subscribers.Any())
{
_logger.LogWarning("'{TopicName}' has no subscribers so {EventCount} event(s) could not be forwarded", request.Topic.Name, request.Events.Length);
}
else if (request.Topic.Subscribers.All(o => o.Disabled))
{
_logger.LogWarning("'{TopicName}' has no enabled subscribers so {EventCount} event(s) could not be forwarded", request.Topic.Name, request.Events.Length);
}
else
{
var eventsFilteredOutByAllSubscribers = request.Events
.Where(e => request.Topic.Subscribers.All(s => !s.Filter.AcceptsEvent(e)))
.ToArray();

if (eventsFilteredOutByAllSubscribers.Any())
{
foreach (var eventFilteredOutByAllSubscribers in eventsFilteredOutByAllSubscribers)
{
_logger.LogWarning("All subscribers of topic '{TopicName}' filtered out event {EventId}",
request.Topic.Name,
eventFilteredOutByAllSubscribers.Id);
}
}
else
{
foreach (var subscription in request.Topic.Subscribers)
{
#pragma warning disable 4014
SendToSubscriber(subscription, request.Events, request.Topic.Name);
#pragma warning restore 4014
}
}
}

return Task.CompletedTask;
}

private async Task SendToSubscriber(SubscriptionSettings subscription, IEnumerable<CloudEventGridEvent> events, string topicName)
{
try
{
if (subscription.Disabled)
{
_logger.LogWarning("Subscription '{SubscriberName}' on topic '{TopicName}' is disabled and so Notification was skipped", subscription.Name, topicName);
return;
}

if (!subscription.DisableValidation &&
subscription.ValidationStatus != SubscriptionValidationStatus.ValidationSuccessful)
{
_logger.LogWarning("Subscription '{SubscriberName}' on topic '{TopicName}' can't receive events. It's still pending validation", subscription.Name, topicName);
return;
}

_logger.LogDebug("Sending to subscriber '{SubscriberName}' on topic '{TopicName}'", subscription.Name, topicName);

// "Event Grid sends the events to subscribers in an array that has a single event. This behaviour may change in the future."
// https://docs.microsoft.com/en-us/azure/event-grid/event-schema
foreach (var evt in events)
{
if (subscription.Filter.AcceptsEvent(evt))
{
var json = JsonConvert.SerializeObject(new[] { evt }, Formatting.Indented);
using var content = new StringContent(json, Encoding.UTF8, "application/json");
var httpClient = _httpClientFactory.CreateClient();
httpClient.DefaultRequestHeaders.Add(Constants.AegEventTypeHeader, Constants.NotificationEventType);
httpClient.DefaultRequestHeaders.Add(Constants.AegSubscriptionNameHeader, subscription.Name.ToUpperInvariant());
httpClient.DefaultRequestHeaders.Add(Constants.AegDeliveryCountHeader, "0"); // TODO implement re-tries
httpClient.Timeout = TimeSpan.FromSeconds(60);

await httpClient.PostAsync(subscription.Endpoint, content)
.ContinueWith(t => LogResult(t, evt, subscription, topicName));
}
else
{
_logger.LogDebug("Event {EventId} filtered out for subscriber '{SubscriberName}'", evt.Id, subscription.Name);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send to subscriber '{SubscriberName}'", subscription.Name);
}
}

private void LogResult(Task<HttpResponseMessage> task, CloudEventGridEvent evt, SubscriptionSettings subscription, string topicName)
{
if (task.IsCompletedSuccessfully && task.Result.IsSuccessStatusCode)
{
_logger.LogDebug("Event {EventId} sent to subscriber '{SubscriberName}' on topic '{TopicName}' successfully", evt.Id, subscription.Name, topicName);
}
else
{
_logger.LogError(task.Exception?.GetBaseException(),
"Failed to send event {EventId} to subscriber '{SubscriberName}', '{TaskStatus}', '{Reason}'",
evt.Id,
subscription.Name,
task.Status.ToString(),
task.Result?.ReasonPhrase);
}
}
}
Loading