Skip to content

Commit

Permalink
Updates EventHubs dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
cwe1ss committed Feb 22, 2019
1 parent a864d32 commit c5b986e
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 22 deletions.
27 changes: 9 additions & 18 deletions src/Meceqs.AzureEventHubs/FileFake/FileFakeEventDataSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;
using Meceqs.Transport;
using Microsoft.Azure.EventHubs;
Expand All @@ -14,13 +12,8 @@ public static class FileFakeEventDataSerializer
{
// From https://github.com/Azure/azure-event-hubs-dotnet/blob/dev/src/Microsoft.Azure.EventHubs/Primitives/ClientConstants.cs
private const string EnqueuedTimeUtcName = "x-opt-enqueued-time";
private const string SequenceNumberName = "x-opt-sequence-number";
private const string OffsetName = "x-opt-offset";

private static readonly PropertyInfo SystemPropertiesSetter = typeof(EventData).GetProperty("SystemProperties");
private static readonly ConstructorInfo SystemPropertiesCtor = typeof(EventData.SystemPropertiesCollection).GetTypeInfo().DeclaredConstructors.First();

public static string Serialize(EventData eventData)
public static string Serialize(EventData eventData, string partitionKey)
{
string serializedEnvelope = Encoding.UTF8.GetString(eventData.Body.Array);

Expand All @@ -37,6 +30,9 @@ public static string Serialize(EventData eventData)
writer.WriteValue(kvp.Value);
}

writer.WritePropertyName("PartitionKey");
writer.WriteValue(partitionKey);

writer.WritePropertyName(EnqueuedTimeUtcName);
writer.WriteValue(DateTime.UtcNow);

Expand All @@ -62,16 +58,11 @@ public static EventData Deserialize(string serializedEventData, long sequenceNum
eventData.Properties[header] = value;
}

// System properties are internal - that's why we need reflection :(
// Header names: https://github.com/Azure/azure-event-hubs-dotnet/blob/dev/src/Microsoft.Azure.EventHubs/Primitives/ClientConstants.cs

var systemProperties = (EventData.SystemPropertiesCollection)SystemPropertiesCtor.Invoke(new object[] { });

systemProperties[EnqueuedTimeUtcName] = (DateTime)jsonEventData.GetValue(EnqueuedTimeUtcName).ToObject(typeof(DateTime));
systemProperties[SequenceNumberName] = sequenceNumber;
systemProperties[OffsetName] = sequenceNumber.ToString();

SystemPropertiesSetter.SetValue(eventData, systemProperties);
eventData.SystemProperties = new EventData.SystemPropertiesCollection(
sequenceNumber: sequenceNumber,
enqueuedTimeUtc: (DateTime)jsonEventData.GetValue(EnqueuedTimeUtcName).ToObject(typeof(DateTime)),
offset: sequenceNumber.ToString(),
partitionKey: jsonEventData.GetValue("PartitionKey").ToString());

return eventData;
}
Expand Down
4 changes: 1 addition & 3 deletions src/Meceqs.AzureEventHubs/FileFake/FileFakeEventHubClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ public FileFakeEventHubClient(string fileName, ILoggerFactory loggerFactory)

public Task SendAsync(EventData data, string partitionKey)
{
// TODO @cweiss partition algorithm?

string serializedEventData = FileFakeEventDataSerializer.Serialize(data);
string serializedEventData = FileFakeEventDataSerializer.Serialize(data, partitionKey);

InvokeWithRetry(3, () =>
{
Expand Down
2 changes: 1 addition & 1 deletion src/Meceqs.AzureEventHubs/Meceqs.AzureEventHubs.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
<ProjectReference Include="..\Meceqs\Meceqs.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.EventHubs" Version="2.2.1" />
<PackageReference Include="Microsoft.Azure.EventHubs" Version="3.0.0" />
</ItemGroup>
</Project>

0 comments on commit c5b986e

Please sign in to comment.