Skip to content

Commit

Permalink
[Event Hubs] Geo DR - Preview v2 (Azure#43973)
Browse files Browse the repository at this point in the history
* primary changes

* fix build

* run scripts

* fix unit tests

* additional test tweaks

* changelog

* Update sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobCheckpointStoreInternal.cs

Co-authored-by: Jesse Squire <jesse.squire@gmail.com>

* Update sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md

Co-authored-by: Jesse Squire <jesse.squire@gmail.com>

* Update sdk/eventhub/Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj

Co-authored-by: Jesse Squire <jesse.squire@gmail.com>

* Update sdk/eventhub/Azure.Messaging.EventHubs/src/Processor/CheckpointPosition.cs

Co-authored-by: Jesse Squire <jesse.squire@gmail.com>

* Update sdk/eventhub/Azure.Messaging.EventHubs/src/Processor/CheckpointPosition.cs

Co-authored-by: Jesse Squire <jesse.squire@gmail.com>

* forgot to bring some things from the other preview pr

---------

Co-authored-by: Jesse Squire <jesse.squire@gmail.com>
  • Loading branch information
m-redding and jsquire committed May 16, 2024
1 parent 07e8615 commit ebdab81
Show file tree
Hide file tree
Showing 66 changed files with 771 additions and 507 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public partial class EventProcessorClient : Azure.Messaging.EventHubs.Primitives
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, string offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
protected override System.Threading.Tasks.Task ValidateProcessingPreconditions(System.Threading.CancellationToken cancellationToken) { throw null; }
}
public partial class EventProcessorClientOptions
Expand Down Expand Up @@ -71,7 +72,8 @@ public partial class BlobCheckpointStore : Azure.Messaging.EventHubs.Primitives.
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ClaimOwnershipAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership> desiredOwnership, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ListOwnershipAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string clientIdentifier, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ EventData eventData = EventHubsModelFactory.EventData(
systemProperties: new Dictionary<string, object>(), //arbitrary value
partitionKey: "sample-key",
sequenceNumber: 1000,
offset: 1500,
offset: "1500",
enqueuedTime: DateTimeOffset.Parse("11:36 PM"));

// This creates a new instance of ProcessEventArgs to pass into the handler directly.
Expand Down Expand Up @@ -110,7 +110,7 @@ TimerCallback dispatchEvent = async _ =>
systemProperties: new Dictionary<string, object>(), //arbitrary value
partitionKey: "sample-key",
sequenceNumber: 1000,
offset: 1500,
offset: "1500",
enqueuedTime: DateTimeOffset.Parse("11:36 PM"));

ProcessEventArgs eventArgs = new(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Description>Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This library extends its Event Processor with durable storage for checkpoint information using Azure Blob storage. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/</Description>
<Version>5.12.0-beta.1</Version>
Expand All @@ -9,12 +9,17 @@
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
<!--TEMP REMOVE BEFORE RELEASE -->
<ProjectReference Include="..\..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="Properties\" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" />
<!--<PackageReference Include="Azure.Messaging.EventHubs" />-->
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Microsoft.Azure.Amqp" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ static BlobCheckpointStoreInternal()
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with the checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with the checkpoint.</param>
/// <param name="offset">The offset associated with the checkpoint.</param>
/// <param name="exception">The exception that occurred.</param>
///
Expand All @@ -106,10 +105,9 @@ static BlobCheckpointStoreInternal()
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset,
Exception exception) =>
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, exception.Message, sequenceNumber, replicationSegment, offset);
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, exception.Message, sequenceNumber, offset);

/// <summary>
/// Indicates that an attempt to update a checkpoint has completed.
Expand All @@ -121,7 +119,6 @@ static BlobCheckpointStoreInternal()
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
///
partial void UpdateCheckpointComplete(string partitionId,
Expand All @@ -130,9 +127,8 @@ static BlobCheckpointStoreInternal()
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset) =>
Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, replicationSegment, offset);
Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, offset);

/// <summary>
/// Indicates that an attempt to create/update a checkpoint has started.
Expand All @@ -144,7 +140,6 @@ static BlobCheckpointStoreInternal()
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
///
partial void UpdateCheckpointStart(string partitionId,
Expand All @@ -153,9 +148,8 @@ static BlobCheckpointStoreInternal()
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset) =>
Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, replicationSegment, offset);
Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, offset);

/// <summary>
/// Indicates that an attempt to retrieve claim partition ownership has completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,22 +252,20 @@ protected BlobEventStoreEventSource() : base(EventSourceName)
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
///
[Event(32, Level = EventLevel.Verbose, Message = "Starting to create/update a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'; ClientIdentifier: '{4}'; at SequenceNumber: '{5}' ReplicationSegment: '{6}' Offset: '{7}'.")]
[Event(32, Level = EventLevel.Verbose, Message = "Starting to create/update a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'; ClientIdentifier: '{4}'; at SequenceNumber: '{5}' Offset: '{6}'.")]
public virtual void UpdateCheckpointStart(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset)
{
if (IsEnabled())
{
WriteEvent(32, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, clientIdentifier ?? string.Empty, sequenceNumber ?? string.Empty, replicationSegment ?? string.Empty, offset ?? string.Empty);
WriteEvent(32, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, clientIdentifier ?? string.Empty, sequenceNumber ?? string.Empty, offset ?? string.Empty);
}
}

Expand All @@ -281,22 +279,20 @@ protected BlobEventStoreEventSource() : base(EventSourceName)
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
///
[Event(33, Level = EventLevel.Verbose, Message = "Completed the attempt to create/update a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'; ClientIdentifier: '{4}'; at SequenceNumber: '{5}' ReplicationSegment: '{6}' Offset: '{7}'.")]
[Event(33, Level = EventLevel.Verbose, Message = "Completed the attempt to create/update a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'; ClientIdentifier: '{4}'; at SequenceNumber: '{5}' Offset: '{6}'.")]
public virtual void UpdateCheckpointComplete(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset)
{
if (IsEnabled())
{
WriteEvent(33, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, clientIdentifier ?? string.Empty, sequenceNumber ?? string.Empty, replicationSegment ?? string.Empty, offset ?? string.Empty);
WriteEvent(33, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, clientIdentifier ?? string.Empty, sequenceNumber ?? string.Empty, offset ?? string.Empty);
}
}

Expand All @@ -310,24 +306,22 @@ protected BlobEventStoreEventSource() : base(EventSourceName)
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the processor that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
///
[Event(34, Level = EventLevel.Error, Message = "An exception occurred when creating/updating a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'; ClientIdentifier: '{5}'; at SequenceNumber: '{6}' ReplicationSegment '{7}' Offset '{8}'. ErrorMessage: '{4}'.")]
[Event(34, Level = EventLevel.Error, Message = "An exception occurred when creating/updating a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'; ErrorMessage: '{4}'; ClientIdentifier: '{5}'; at SequenceNumber: '{6}' Offset '{7}'.")]
public virtual void UpdateCheckpointError(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string clientIdentifier,
string errorMessage,
string sequenceNumber,
string replicationSegment,
string offset)
{
if (IsEnabled())
{
WriteEvent(34, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, errorMessage ?? string.Empty, clientIdentifier ?? string.Empty, sequenceNumber ?? string.Empty, replicationSegment ?? string.Empty, offset ?? string.Empty);
WriteEvent(34, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, errorMessage ?? string.Empty, clientIdentifier ?? string.Empty, sequenceNumber ?? string.Empty, offset ?? string.Empty);
}
}

Expand Down

0 comments on commit ebdab81

Please sign in to comment.