From 3b400d4a8684e90dbc709f0965a11eeb60f5f8eb Mon Sep 17 00:00:00 2001 From: William Yochum Date: Fri, 28 Aug 2020 15:50:54 -0700 Subject: [PATCH 1/2] method to prefix metric name --- src/lib/Microsoft.Health.Common/Telemetry/Metric.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/lib/Microsoft.Health.Common/Telemetry/Metric.cs b/src/lib/Microsoft.Health.Common/Telemetry/Metric.cs index 9bd0fe60..63e505da 100644 --- a/src/lib/Microsoft.Health.Common/Telemetry/Metric.cs +++ b/src/lib/Microsoft.Health.Common/Telemetry/Metric.cs @@ -15,8 +15,16 @@ public Metric(string name, IDictionary dimensions) Dimensions = dimensions; } - public string Name { get; } + public string Name { get; set; } public IDictionary Dimensions { get; } + + public void AddPrefixToName(string prefix) + { + if (!Name.StartsWith(prefix, System.StringComparison.CurrentCulture)) + { + Name = $"{prefix}{Name}"; + } + } } } From 34773f3d5db36d3e8d3b1ea3e97cebd33468950a Mon Sep 17 00:00:00 2001 From: Will Yochum Date: Thu, 18 Mar 2021 16:46:53 -0700 Subject: [PATCH 2/2] fix checkpointing sequencing issue --- .../Service/EventBatchingService.cs | 19 ++++++++----------- .../EventHubProcessor/EventProcessor.cs | 6 ++---- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventBatchingService.cs b/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventBatchingService.cs index 9354a40e..28284a6d 100644 --- a/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventBatchingService.cs +++ b/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventBatchingService.cs @@ -61,7 +61,7 @@ private EventPartition CreatePartitionIfMissing(string partitionId, DateTime ini return _eventPartitions.GetOrAdd(partitionId, new EventPartition(partitionId, initTime, flushTimespan, _logger)); } - public Task ConsumeEvent(IEventMessage eventArg) + public async Task ConsumeEvent(IEventMessage eventArg) { EnsureArg.IsNotNull(eventArg); @@ -73,7 +73,7 @@ public Task ConsumeEvent(IEventMessage eventArg) if (EventPartitionExists(partitionId)) { var windowThresholdTime = GetPartition(partitionId).GetPartitionWindow(); - ThresholdWaitReached(partitionId, windowThresholdTime); + await ThresholdWaitReached(partitionId, windowThresholdTime); } } else @@ -85,21 +85,17 @@ public Task ConsumeEvent(IEventMessage eventArg) var windowThresholdTime = partition.GetPartitionWindow(); if (eventEnqueuedTime > windowThresholdTime) { - ThresholdTimeReached(partitionId, eventArg, windowThresholdTime); - return Task.CompletedTask; + await ThresholdTimeReached(partitionId, eventArg, windowThresholdTime); } if (partition.GetPartitionBatchCount() >= _maxEvents) { - ThresholdCountReached(partitionId); + await ThresholdCountReached(partitionId); } } - - return Task.CompletedTask; } - // todo: fix -"Collection was modified; enumeration operation may not execute." - private async void ThresholdCountReached(string partitionId) + private async Task ThresholdCountReached(string partitionId) { _logger.LogTrace($"Partition {partitionId} threshold count {_maxEvents} was reached."); var events = await GetPartition(partitionId).Flush(_maxEvents); @@ -107,9 +103,10 @@ private async void ThresholdCountReached(string partitionId) await UpdateCheckpoint(events); } - private async void ThresholdTimeReached(string partitionId, IEventMessage eventArg, DateTime windowEnd) + private async Task ThresholdTimeReached(string partitionId, IEventMessage eventArg, DateTime windowEnd) { _logger.LogTrace($"Partition {partitionId} threshold time {_eventPartitions[partitionId].GetPartitionWindow()} was reached."); + var queue = GetPartition(partitionId); var events = await queue.Flush(windowEnd); queue.IncrementPartitionWindow(eventArg.EnqueuedTime.UtcDateTime); @@ -117,7 +114,7 @@ private async void ThresholdTimeReached(string partitionId, IEventMessage eventA await UpdateCheckpoint(events); } - private async void ThresholdWaitReached(string partitionId, DateTime windowEnd) + private async Task ThresholdWaitReached(string partitionId, DateTime windowEnd) { if (windowEnd < DateTime.UtcNow.AddSeconds(_timeBuffer)) { diff --git a/src/lib/Microsoft.Health.Events/EventHubProcessor/EventProcessor.cs b/src/lib/Microsoft.Health.Events/EventHubProcessor/EventProcessor.cs index a256d1b0..345d00c2 100644 --- a/src/lib/Microsoft.Health.Events/EventHubProcessor/EventProcessor.cs +++ b/src/lib/Microsoft.Health.Events/EventHubProcessor/EventProcessor.cs @@ -39,7 +39,7 @@ public async Task RunAsync(EventProcessorClient processor, CancellationToken ct) // 1) Event hub events // 2) Maximum wait events. These are generated when we have not received an event hub // event for a certain time period and this event is used to flush events in the current window. - Task ProcessEventHandler(ProcessEventArgs eventArgs) + async Task ProcessEventHandler(ProcessEventArgs eventArgs) { IEventMessage evt; if (eventArgs.HasEvent) @@ -51,9 +51,7 @@ Task ProcessEventHandler(ProcessEventArgs eventArgs) evt = new MaximumWaitEvent(eventArgs.Partition.PartitionId, DateTime.UtcNow); } - _eventConsumerService.ConsumeEvent(evt); - - return Task.CompletedTask; + await _eventConsumerService.ConsumeEvent(evt); } // todo: consider retry