/
EventHubClientExtensions.cs
136 lines (126 loc) · 6.5 KB
/
EventHubClientExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#region Copyright
//=======================================================================================
// Microsoft Azure Customer Advisory Team
//
// This sample is supplemental to the technical guidance published on the community
// blog at http://blogs.msdn.com/b/paolos/.
//
// Author: Paolo Salvatori
//=======================================================================================
// Copyright © 2015 Microsoft Corporation. All rights reserved.
//
// THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER
// EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. YOU BEAR THE RISK OF USING IT.
//=======================================================================================
#endregion
#region Using Directives
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Microsoft.ServiceBus.Messaging;
using System.Threading.Tasks;
#endregion
namespace Microsoft.AzureCat.ServiceBusExtensions
{
/// <summary>
/// This class contains extensions methods for the EventHubClient class
/// </summary>
public static class EventHubClientExtensions
{
#region Private Constants
//*******************************
// Formats
//*******************************
private const string EventDataListCannotBeNullOrEmpty = "The eventDataEnumerable parameter cannot be null or empty.";
private const string SendPartitionedBatchFormat = "[EventHubClient.SendPartitionedBatch] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
private const string SendPartitionedBatchAsyncFormat = "[EventHubClient.SendPartitionedBatchAsync] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
#endregion
#region Public Methods
/// <summary>
/// Asynchronously sends a batch of event data to the same partition.
/// All the event data in the batch need to have the same value in the Partitionkey property.
/// If the batch size is greater than the maximum batch size,
/// the method partitions the original batch into multiple batches,
/// each smaller in size than the maximum batch size.
/// </summary>
/// <param name="eventHubClient">The current EventHubClient object.</param>
/// <param name="eventDataEnumerable">An IEnumerable object containing event data instances.</param>
/// <param name="trace">true to cause a message to be written; otherwise, false.</param>
/// <returns>The asynchronous operation.</returns>
public async static Task SendPartitionedBatchAsync(this EventHubClient eventHubClient, IEnumerable<EventData> eventDataEnumerable, bool trace = false)
{
var eventDataList = eventDataEnumerable as IList<EventData> ?? eventDataEnumerable.ToList();
if (eventDataEnumerable == null || !eventDataList.Any())
{
throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty);
}
var batchList = new List<EventData>();
long batchSize = 0;
foreach (var eventData in eventDataList)
{
if ((batchSize + eventData.SerializedSizeInBytes) > Constants.MaxBathSizeInBytes)
{
// Send current batch
await eventHubClient.SendBatchAsync(batchList);
Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
// Initialize a new batch
batchList = new List<EventData> { eventData };
batchSize = eventData.SerializedSizeInBytes;
}
else
{
// Add the EventData to the current batch
batchList.Add(eventData);
batchSize += eventData.SerializedSizeInBytes;
}
}
// The final batch is sent outside of the loop
await eventHubClient.SendBatchAsync(batchList);
Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
}
/// <summary>
/// Asynchronously sends a batch of event data to the same partition.
/// All the event data in the batch need to have the same value in the Partitionkey property.
/// If the batch size is greater than the maximum batch size,
/// the method partitions the original batch into multiple batches,
/// each smaller in size than the maximum batch size.
/// </summary>
/// <param name="eventHubClient">The current EventHubClient object.</param>
/// <param name="eventDataEnumerable">An IEnumerable object containing event data instances.</param>
/// <param name="trace">true to cause a message to be written; otherwise, false.</param>
public static void SendPartitionedBatch(this EventHubClient eventHubClient, IEnumerable<EventData> eventDataEnumerable, bool trace = false)
{
var eventDataList = eventDataEnumerable as IList<EventData> ?? eventDataEnumerable.ToList();
if (eventDataEnumerable == null || !eventDataList.Any())
{
throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty);
}
var batchList = new List<EventData>();
long batchSize = 0;
foreach (var eventData in eventDataList)
{
if ((batchSize + eventData.SerializedSizeInBytes) > Constants.MaxBathSizeInBytes)
{
// Send current batch
eventHubClient.SendBatch(batchList);
Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchFormat, batchSize, batchList.Count));
// Initialize a new batch
batchList = new List<EventData> { eventData };
batchSize = eventData.SerializedSizeInBytes;
}
else
{
// Add the EventData to the current batch
batchList.Add(eventData);
batchSize += eventData.SerializedSizeInBytes;
}
}
// The final batch is sent outside of the loop
eventHubClient.SendBatch(batchList);
Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchFormat, batchSize, batchList.Count));
}
#endregion
}
}