Skip to content

Commit

Permalink
Initial state
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Dec 7, 2023
1 parent 850022e commit 570af17
Show file tree
Hide file tree
Showing 11 changed files with 8 additions and 975 deletions.
51 changes: 0 additions & 51 deletions Sample/Helpdesk/Helpdesk.Api/Core/Kafka/KafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,4 @@

namespace Helpdesk.Api.Core.Kafka;

public class KafkaProducer: IProjection
{
private const string DefaultConfigKey = "KafkaProducer";

private readonly KafkaProducerConfig config;

public KafkaProducer(IConfiguration configuration)
{
config = configuration.GetRequiredSection(DefaultConfigKey).Get<KafkaProducerConfig>() ??
throw new InvalidOperationException();
}

public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList<StreamAction> streamsActions,
CancellationToken ct)
{
foreach (var @event in streamsActions.SelectMany(streamAction => streamAction.Events))
{
await Publish(@event.Data, ct);
}
}

public void Apply(IDocumentOperations operations, IReadOnlyList<StreamAction> streams) =>
throw new NotImplementedException("Producer should be only used in the AsyncDaemon");

private async Task Publish(object @event, CancellationToken ct)
{
try
{
using var producer = new ProducerBuilder<string, string>(config.ProducerConfig).Build();

await producer.ProduceAsync(config.Topic,
new Message<string, string>
{
// store event type name in message Key
Key = @event.GetType().Name,
// serialize event to message Value
Value = JsonSerializer.Serialize(@event)
}, ct).ConfigureAwait(false);
}
catch (Exception exc)
{
Console.WriteLine(exc.Message);
throw;
}
}
}

public class KafkaProducerConfig
{
public ProducerConfig? ProducerConfig { get; set; }
public string? Topic { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,3 @@

namespace Helpdesk.Api.Core.Marten;

public static class DocumentSessionExtensions
{
public static Task Add<T>(this IDocumentSession documentSession, Guid id, object @event, CancellationToken ct)
where T : class
{
documentSession.Events.StartStream<T>(id, @event);
return documentSession.SaveChangesAsync(token: ct);
}

public static Task GetAndUpdate<T>(
this IDocumentSession documentSession,
Guid id,
int version,
Func<T, object> handle,
CancellationToken ct
) where T : class =>
documentSession.Events.WriteToAggregate<T>(id, version, stream =>
stream.AppendOne(handle(stream.Aggregate)), ct);
}
18 changes: 0 additions & 18 deletions Sample/Helpdesk/Helpdesk.Api/Core/SignalR/SignalRProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,5 @@

namespace Helpdesk.Api.Core.SignalR;

public class SignalRProducer: IProjection
{
private readonly IHubContext hubContext;

public SignalRProducer(IHubContext hubContext) =>
this.hubContext = hubContext;

public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList<StreamAction> streamsActions,
CancellationToken ct)
{
foreach (var @event in streamsActions.SelectMany(streamAction => streamAction.Events))
{
await hubContext.Clients.All.SendAsync(@event.EventTypeName, @event.Data, ct);
}
}

public void Apply(IDocumentOperations operations, IReadOnlyList<StreamAction> streams) =>
throw new NotImplementedException("Producer should be only used in the AsyncDaemon");
}

6 changes: 6 additions & 0 deletions Sample/Helpdesk/Helpdesk.Api/Helpdesk.Api.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,10 @@
<PackageReference Include="Confluent.Kafka" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.SignalR" Version="1.1.0" />
</ItemGroup>

<ItemGroup>
<Folder Include="Core\Marten\" />
<Folder Include="Core\SignalR\" />
<Folder Include="Incidents\" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -5,75 +5,3 @@

namespace Helpdesk.Api.Incidents.GetCustomerIncidentsSummary;

public class CustomerIncidentsSummary
{
public Guid Id { get; set; }
public int Pending { get; set; }
public int Resolved { get; set; }
public int Acknowledged { get; set; }
public int Closed { get; set; }
}

public class CustomerIncidentsSummaryProjection: MultiStreamProjection<CustomerIncidentsSummary, Guid>
{
public CustomerIncidentsSummaryProjection()
{
Identity<IncidentLogged>(e => e.CustomerId);
CustomGrouping(new CustomerIncidentsSummaryGrouper());
}

public void Apply(IncidentLogged logged, CustomerIncidentsSummary current)
{
current.Pending++;
}

public void Apply(IncidentResolved resolved, CustomerIncidentsSummary current)
{
current.Pending--;
current.Resolved++;
}

public void Apply(ResolutionAcknowledgedByCustomer acknowledged, CustomerIncidentsSummary current)
{
current.Resolved--;
current.Acknowledged++;
}

public void Apply(IncidentClosed closed, CustomerIncidentsSummary current)
{
current.Acknowledged--;
current.Closed++;
}
}

public class CustomerIncidentsSummaryGrouper: IAggregateGrouper<Guid>
{
private readonly Type[] eventTypes =
{
typeof(IncidentResolved), typeof(ResolutionAcknowledgedByCustomer),
typeof(IncidentClosed)
};

public async Task Group(IQuerySession session, IEnumerable<IEvent> events, ITenantSliceGroup<Guid> grouping)
{
var filteredEvents = events
.Where(ev => eventTypes.Contains(ev.EventType))
.ToList();

if (!filteredEvents.Any())
return;

var incidentIds = filteredEvents.Select(e => e.StreamId).ToList();

var result = await session.Events.QueryRawEventDataOnly<IncidentLogged>()
.Where(e => incidentIds.Contains(e.IncidentId))
.Select(x => new { x.IncidentId, x.CustomerId })
.ToListAsync();

foreach (var group in result.Select(g =>
new { g.CustomerId, Events = filteredEvents.Where(ev => ev.StreamId == g.IncidentId) }))
{
grouping.AddEvents(group.CustomerId, group.Events);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,3 @@

namespace Helpdesk.Api.Incidents.GetIncidentDetails;

public record IncidentDetails(
Guid Id,
Guid CustomerId,
IncidentStatus Status,
IncidentNote[] Notes,
IncidentCategory? Category = null,
IncidentPriority? Priority = null,
Guid? AgentId = null,
int Version = 1
);

public record IncidentNote(
IncidentNoteType Type,
Guid From,
string Content,
bool VisibleToCustomer
);

public enum IncidentNoteType
{
FromAgent,
FromCustomer
}

public class IncidentDetailsProjection: SingleStreamProjection<IncidentDetails>
{
public static IncidentDetails Create(IncidentLogged logged) =>
new(logged.IncidentId, logged.CustomerId, IncidentStatus.Pending, Array.Empty<IncidentNote>());

public IncidentDetails Apply(IncidentCategorised categorised, IncidentDetails current) =>
current with { Category = categorised.Category };

public IncidentDetails Apply(IncidentPrioritised prioritised, IncidentDetails current) =>
current with { Priority = prioritised.Priority };

public IncidentDetails Apply(AgentAssignedToIncident prioritised, IncidentDetails current) =>
current with { AgentId = prioritised.AgentId };

public IncidentDetails Apply(AgentRespondedToIncident agentResponded, IncidentDetails current) =>
current with
{
Notes = current.Notes.Union(
new[]
{
new IncidentNote(
IncidentNoteType.FromAgent,
agentResponded.Response.AgentId,
agentResponded.Response.Content,
agentResponded.Response.VisibleToCustomer
)
}).ToArray()
};

public IncidentDetails Apply(CustomerRespondedToIncident customerResponded, IncidentDetails current) =>
current with
{
Notes = current.Notes.Union(
new[]
{
new IncidentNote(
IncidentNoteType.FromCustomer,
customerResponded.Response.CustomerId,
customerResponded.Response.Content,
true
)
}).ToArray()
};

public IncidentDetails Apply(IncidentResolved resolved, IncidentDetails current) =>
current with { Status = IncidentStatus.Resolved };

public IncidentDetails Apply(ResolutionAcknowledgedByCustomer acknowledged, IncidentDetails current) =>
current with { Status = IncidentStatus.ResolutionAcknowledgedByCustomer };

public IncidentDetails Apply(IncidentClosed closed, IncidentDetails current) =>
current with { Status = IncidentStatus.Closed };
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,113 +3,3 @@
using Marten.Schema.Identity;

namespace Helpdesk.Api.Incidents.GetIncidentHistory;

public record IncidentHistory(
Guid Id,
Guid IncidentId,
string Description
);

public class IncidentHistoryTransformation: EventProjection
{
public IncidentHistory Transform(IEvent<IncidentLogged> input)
{
var (incidentId, customerId, contact, description, loggedBy, loggedAt) = input.Data;

return new IncidentHistory(
CombGuidIdGeneration.NewGuid(),
incidentId,
$"['{loggedAt}'] Logged Incident with id: '{incidentId}' for customer '{customerId}' and description `{description}' through {contact} by '{loggedBy}'"
);
}

public IncidentHistory Transform(IEvent<IncidentCategorised> input)
{
var (incidentId, category, categorisedBy, categorisedAt) = input.Data;

return new IncidentHistory(
CombGuidIdGeneration.NewGuid(),
incidentId,
$"[{categorisedAt}] Categorised Incident with id: '{incidentId}' as {category} by {categorisedBy}"
);
}

public IncidentHistory Transform(IEvent<IncidentPrioritised> input)
{
var (incidentId, priority, prioritisedBy, prioritisedAt) = input.Data;

return new IncidentHistory(
CombGuidIdGeneration.NewGuid(),
incidentId,
$"[{prioritisedAt}] Prioritised Incident with id: '{incidentId}' as '{priority}' by {prioritisedBy}"
);
}

public IncidentHistory Transform(IEvent<AgentAssignedToIncident> input)
{
var (incidentId, agentId, assignedAt) = input.Data;

return new IncidentHistory(
CombGuidIdGeneration.NewGuid(),
incidentId,
$"[{assignedAt}] Assigned agent `{agentId} to incident with id: '{incidentId}'"
);
}

public IncidentHistory Transform(IEvent<CustomerRespondedToIncident> input)
{
var (incidentId, response, respondedAt) = input.Data;

return new IncidentHistory(
CombGuidIdGeneration.NewGuid(),
incidentId,
$"[{respondedAt}] Agent '{response.CustomerId}' responded with response '{response.Content}' to Incident with id: '{incidentId}'"
);
}

public IncidentHistory Transform(IEvent<AgentRespondedToIncident> input)
{
var (incidentId, response, respondedAt) = input.Data;

var responseVisibility = response.VisibleToCustomer ? "public" : "private";

return new IncidentHistory(
CombGuidIdGeneration.NewGuid(),
incidentId,
$"[{respondedAt}] Agent '{response.AgentId}' responded with {responseVisibility} response '{response.Content}' to Incident with id: '{incidentId}'"
);
}

public IncidentHistory Transform(IEvent<IncidentResolved> input)
{
var (incidentId, resolution, resolvedBy, resolvedAt) = input.Data;

return new IncidentHistory(
CombGuidIdGeneration.NewGuid(),
incidentId,
$"[{resolvedAt}] Resolved Incident with id: '{incidentId}' with resolution `{resolution} by '{resolvedBy}'"
);
}

public IncidentHistory Transform(IEvent<ResolutionAcknowledgedByCustomer> input)
{
var (incidentId, acknowledgedBy, acknowledgedAt) = input.Data;

return new IncidentHistory(
CombGuidIdGeneration.NewGuid(),
incidentId,
$"[{acknowledgedAt}] Customer '{acknowledgedBy}' acknowledged resolution of Incident with id: '{incidentId}'"
);
}

public IncidentHistory Transform(IEvent<IncidentClosed> input)
{
var (incidentId, closedBy, closedAt) = input.Data;

return new IncidentHistory(
CombGuidIdGeneration.NewGuid(),
incidentId,
$"[{closedAt}] Agent '{closedBy}' closed Incident with id: '{incidentId}'"
);
}
}
Loading

0 comments on commit 570af17

Please sign in to comment.