Skip to content

Commit

Permalink
Revert "EdgeHub: Upstream perf improvements (Azure#1006)"
Browse files Browse the repository at this point in the history
This reverts commit 864b33d.
  • Loading branch information
richma-ms committed Sep 30, 2019
1 parent c978cc6 commit 354917b
Show file tree
Hide file tree
Showing 20 changed files with 126 additions and 722 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,23 @@ public class EndpointFactory : IEndpointFactory
readonly Core.IMessageConverter<IRoutingMessage> messageConverter;
readonly string edgeDeviceId;
readonly ConcurrentDictionary<string, Endpoint> cache;
readonly int maxBatchSize;
readonly int upstreamFanOutFactor;

public EndpointFactory(
IConnectionManager connectionManager,
Core.IMessageConverter<IRoutingMessage> messageConverter,
string edgeDeviceId,
int maxBatchSize,
int upstreamFanOutFactor)
string edgeDeviceId)
{
this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager));
this.messageConverter = Preconditions.CheckNotNull(messageConverter, nameof(messageConverter));
this.edgeDeviceId = Preconditions.CheckNonWhiteSpace(edgeDeviceId, nameof(edgeDeviceId));
this.cache = new ConcurrentDictionary<string, Endpoint>();
this.maxBatchSize = maxBatchSize;
this.upstreamFanOutFactor = upstreamFanOutFactor;
}

public Endpoint CreateSystemEndpoint(string endpoint)
{
if (CloudEndpointName.Equals(endpoint, StringComparison.OrdinalIgnoreCase))
{
return this.cache.GetOrAdd(CloudEndpointName, s => new CloudEndpoint("iothub", id => this.connectionManager.GetCloudConnection(id), this.messageConverter, this.maxBatchSize, this.upstreamFanOutFactor));
return this.cache.GetOrAdd(CloudEndpointName, s => new CloudEndpoint("iothub", id => this.connectionManager.GetCloudConnection(id), this.messageConverter));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Routing

public class RoutingEdgeHub : IEdgeHub
{
const long MaxMessageSize = 256 * 1024; // matches IoTHub
readonly Router router;
readonly Core.IMessageConverter<IRoutingMessage> messageConverter;
readonly IConnectionManager connectionManager;
Expand Down Expand Up @@ -160,9 +161,9 @@ internal void AddEdgeSystemProperties(IMessage message)
static void ValidateMessageSize(IRoutingMessage messageToBeValidated)
{
long messageSize = messageToBeValidated.Size();
if (messageSize > Constants.MaxMessageSize)
if (messageSize > MaxMessageSize)
{
throw new EdgeHubMessageTooLargeException($"Message size is {messageSize} bytes which is greater than the max size {Constants.MaxMessageSize} bytes allowed");
throw new EdgeHubMessageTooLargeException($"Message size is {messageSize} bytes which is greater than the max size {MaxMessageSize} bytes allowed");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ void RegisterRoutingModule(
bool useV1TwinManager = this.GetConfigurationValueIfExists<string>("TwinManagerVersion")
.Map(v => v.Equals("v1", StringComparison.OrdinalIgnoreCase))
.GetOrElse(false);
int maxUpstreamBatchSize = this.configuration.GetValue("MaxUpstreamBatchSize", 10);
int upstreamFanOutFactor = this.configuration.GetValue("UpstreamFanOutFactor", 10);
bool encryptTwinStore = this.configuration.GetValue("EncryptTwinStore", false);
int configUpdateFrequencySecs = this.configuration.GetValue("ConfigRefreshFrequencySecs", 3600);
TimeSpan configUpdateFrequency = TimeSpan.FromSeconds(configUpdateFrequencySecs);
Expand All @@ -163,8 +161,6 @@ void RegisterRoutingModule(
minTwinSyncPeriod,
reportedPropertiesSyncFrequency,
useV1TwinManager,
maxUpstreamBatchSize,
upstreamFanOutFactor,
encryptTwinStore,
configUpdateFrequency,
experimentalFeatures));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ public class RoutingModule : Module
readonly Option<TimeSpan> minTwinSyncPeriod;
readonly Option<TimeSpan> reportedPropertiesSyncFrequency;
readonly bool useV1TwinManager;
readonly int maxUpstreamBatchSize;
readonly int upstreamFanOutFactor;
readonly bool encryptTwinStore;
readonly TimeSpan configUpdateFrequency;
readonly ExperimentalFeatures experimentalFeatures;
Expand All @@ -72,8 +70,6 @@ public RoutingModule(
Option<TimeSpan> minTwinSyncPeriod,
Option<TimeSpan> reportedPropertiesSyncFrequency,
bool useV1TwinManager,
int maxUpstreamBatchSize,
int upstreamFanOutFactor,
bool encryptTwinStore,
TimeSpan configUpdateFrequency,
ExperimentalFeatures experimentalFeatures)
Expand All @@ -97,8 +93,6 @@ public RoutingModule(
this.minTwinSyncPeriod = minTwinSyncPeriod;
this.reportedPropertiesSyncFrequency = reportedPropertiesSyncFrequency;
this.useV1TwinManager = useV1TwinManager;
this.maxUpstreamBatchSize = maxUpstreamBatchSize;
this.upstreamFanOutFactor = upstreamFanOutFactor;
this.encryptTwinStore = encryptTwinStore;
this.configUpdateFrequency = configUpdateFrequency;
this.experimentalFeatures = experimentalFeatures;
Expand Down Expand Up @@ -258,7 +252,7 @@ protected override void Load(ContainerBuilder builder)
{
var messageConverter = c.Resolve<Core.IMessageConverter<IRoutingMessage>>();
IConnectionManager connectionManager = await c.Resolve<Task<IConnectionManager>>();
return new EndpointFactory(connectionManager, messageConverter, this.edgeDeviceId, this.maxUpstreamBatchSize, this.upstreamFanOutFactor) as IEndpointFactory;
return new EndpointFactory(connectionManager, messageConverter, this.edgeDeviceId) as IEndpointFactory;
})
.As<Task<IEndpointFactory>>()
.SingleInstance();
Expand Down
2 changes: 0 additions & 2 deletions edge-hub/src/Microsoft.Azure.Devices.Routing.Core/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ protected Endpoint(string id, string name, string iotHubName)

public abstract void LogUserMetrics(long messageCount, long latencyInMs);

public virtual int FanOutFactor => 1;

public bool Equals(Endpoint other)
{
if (ReferenceEquals(null, other))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,18 @@ namespace Microsoft.Azure.Devices.Routing.Core.Endpoints
using App.Metrics;
using App.Metrics.Counter;
using App.Metrics.Timer;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Routing.Core.Endpoints.StateMachine;
using Microsoft.Azure.Devices.Routing.Core.Util;
using Microsoft.Azure.Devices.Routing.Core.Util.Concurrency;
using Microsoft.Extensions.Logging;
using Nito.AsyncEx;
using static System.FormattableString;
using AsyncLock = Microsoft.Azure.Devices.Routing.Core.Util.Concurrency.AsyncLock;

public class StoringAsyncEndpointExecutor : IEndpointExecutor
{
readonly AtomicBoolean closed = new AtomicBoolean();
readonly IMessageStore messageStore;
readonly Task sendMessageTask;
readonly AsyncManualResetEvent hasMessagesInQueue = new AsyncManualResetEvent(true);
readonly ManualResetEvent hasMessagesInQueue = new ManualResetEvent(true);
readonly ICheckpointer checkpointer;
readonly AsyncEndpointExecutorOptions options;
readonly EndpointExecutorFsm machine;
Expand Down Expand Up @@ -128,23 +126,19 @@ async Task SendMessagesPump()
{
Events.StartSendMessagesPump(this);
IMessageIterator iterator = this.messageStore.GetMessageIterator(this.Endpoint.Id, this.checkpointer.Offset + 1);
int batchSize = this.options.BatchSize * this.Endpoint.FanOutFactor;
var storeMessagesProvider = new StoreMessagesProvider(iterator, batchSize);
while (!this.cts.IsCancellationRequested)
{
try
{
await this.hasMessagesInQueue.WaitAsync(this.options.BatchTimeout);
IMessage[] messages = await storeMessagesProvider.GetMessages();
if (messages.Length > 0)
this.hasMessagesInQueue.WaitOne(this.options.BatchTimeout);
IMessage[] messages = (await iterator.GetNext(this.options.BatchSize)).ToArray();
await this.ProcessMessages(messages);
Events.SendMessagesSuccess(this, messages);
Metrics.DrainedCountIncrement(this.Endpoint.Id, messages.Length);

// If store has no messages, then reset the hasMessagesInQueue flag.
if (messages.Length == 0)
{
await this.ProcessMessages(messages);
Events.SendMessagesSuccess(this, messages);
Metrics.DrainedCountIncrement(this.Endpoint.Id, messages.Length);
}
else
{
// If store has no messages, then reset the hasMessagesInQueue flag.
this.hasMessagesInQueue.Reset();
}
}
Expand Down Expand Up @@ -178,60 +172,6 @@ void Dispose(bool disposing)
}
}

// This class is used to prefetch messages from the store before they are needed.
// As soon as the previous batch is consumed, the next batch is fetched.
// A pump is started as soon as the object is created, and it keeps the messages list populated.
internal class StoreMessagesProvider
{
readonly IMessageIterator iterator;
readonly int batchSize;
readonly AsyncLock stateLock = new AsyncLock();
Task<IList<IMessage>> getMessagesTask;

public StoreMessagesProvider(IMessageIterator iterator, int batchSize)
{
this.iterator = iterator;
this.batchSize = batchSize;
this.getMessagesTask = Task.Run(this.GetMessagesFromStore);
}

public async Task<IMessage[]> GetMessages()
{
using (await this.stateLock.LockAsync())
{
var messages = await this.getMessagesTask;
if (messages.Count == 0)
{
messages = await this.GetMessagesFromStore();
}
else
{
this.getMessagesTask = Task.Run(this.GetMessagesFromStore);
}

return messages.ToArray();
}
}

async Task<IList<IMessage>> GetMessagesFromStore()
{
var messagesList = new List<IMessage>();
while (messagesList.Count < this.batchSize)
{
int curBatchSize = this.batchSize - messagesList.Count;
IList<IMessage> messages = (await this.iterator.GetNext(curBatchSize)).ToList();
if (!messages.Any())
{
break;
}

messagesList.AddRange(messages);
}

return messagesList;
}
}

static class Events
{
const int IdStart = Routing.EventIds.StoringAsyncEndpointExecutor;
Expand All @@ -251,7 +191,6 @@ enum EventIds
Close,
CloseSuccess,
CloseFailure,
ErrorInPopulatePump
}

public static void AddMessageSuccess(StoringAsyncEndpointExecutor executor, long offset)
Expand Down Expand Up @@ -324,11 +263,6 @@ public static void CloseFailure(StoringAsyncEndpointExecutor executor, Exception
{
Log.LogError((int)EventIds.CloseFailure, ex, "[CloseFailure] Close failed. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name);
}

public static void ErrorInPopulatePump(Exception ex)
{
Log.LogWarning((int)EventIds.ErrorInPopulatePump, ex, "Error in populate messages pump");
}
}

static class Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ static async Task EnterSendingAsync(EndpointExecutorFsm thisPtr)
TimeSpan retryAfter;
ICollection<IMessage> messages = EmptyMessages;
Stopwatch stopwatch = Stopwatch.StartNew();
TimeSpan endpointTimeout = TimeSpan.FromMilliseconds(thisPtr.config.Timeout.TotalMilliseconds * thisPtr.Endpoint.FanOutFactor);

try
{
Preconditions.CheckNotNull(thisPtr.currentSendCommand);
Expand All @@ -294,8 +294,7 @@ static async Task EnterSendingAsync(EndpointExecutorFsm thisPtr)
{
ISinkResult<IMessage> result;
Events.Send(thisPtr, thisPtr.currentSendCommand.Messages, messages);

using (var cts = new CancellationTokenSource(endpointTimeout))
using (var cts = new CancellationTokenSource(thisPtr.config.Timeout))
{
result = await thisPtr.processor.ProcessAsync(messages, cts.Token);
}
Expand Down Expand Up @@ -891,16 +890,16 @@ static void SetProcessingInternalCounters(EndpointExecutorFsm fsm, string status
Log.LogError((int)EventIds.CounterFailure, "[LogEventsProcessedCounterFailed] {0}", error);
}

double totalTimeMSecs = messages.Select(m => m.DequeuedTime).Aggregate(0D, (span, time) => span + (fsm.systemTime.UtcNow - time).TotalMilliseconds);
long averageLatencyInMs = totalTimeMSecs < 0 ? 0L : (long)(totalTimeMSecs / messages.Count);
TimeSpan totalTime = messages.Select(m => m.DequeuedTime).Aggregate(TimeSpan.Zero, (span, time) => span + (fsm.systemTime.UtcNow - time));
long averageLatencyInMs = totalTime < TimeSpan.Zero ? 0L : (long)(totalTime.TotalMilliseconds / messages.Count);

if (!Routing.PerfCounter.LogEventProcessingLatency(fsm.Endpoint.IotHubName, fsm.Endpoint.Name, fsm.Endpoint.Type, status, averageLatencyInMs, out error))
{
Log.LogError((int)EventIds.CounterFailure, "[LogEventProcessingLatencyCounterFailed] {0}", error);
}

double messageE2EProcessingLatencyTotalMSecs = messages.Select(m => m.EnqueuedTime).Aggregate(0D, (span, time) => span + (fsm.systemTime.UtcNow - time).TotalMilliseconds);
long averageE2ELatencyInMs = messageE2EProcessingLatencyTotalMSecs < 0 ? 0L : (long)(messageE2EProcessingLatencyTotalMSecs / messages.Count);
TimeSpan messageE2EProcessingLatencyTotal = messages.Select(m => m.EnqueuedTime).Aggregate(TimeSpan.Zero, (span, time) => span + (fsm.systemTime.UtcNow - time));
long averageE2ELatencyInMs = messageE2EProcessingLatencyTotal < TimeSpan.Zero ? 0L : (long)(messageE2EProcessingLatencyTotal.TotalMilliseconds / messages.Count);

if (!Routing.PerfCounter.LogE2EEventProcessingLatency(fsm.Endpoint.IotHubName, fsm.Endpoint.Name, fsm.Endpoint.Type, status, averageE2ELatencyInMs, out error))
{
Expand All @@ -922,8 +921,8 @@ static void SetSuccessfulEgressUserMetricCounter(EndpointExecutorFsm fsm, IColle
}

// calculate average latency
double totalTimeMSecs = messages.Select(m => m.EnqueuedTime).Aggregate(0D, (span, time) => span + (fsm.systemTime.UtcNow - time).TotalMilliseconds);
long averageLatencyInMs = totalTimeMSecs < 0 ? 0L : (long)(totalTimeMSecs / messages.Count);
TimeSpan totalTime = messages.Select(m => m.EnqueuedTime).Aggregate(TimeSpan.Zero, (span, time) => span + (fsm.systemTime.UtcNow - time));
long averageLatencyInMs = totalTime < TimeSpan.Zero ? 0L : (long)(totalTime.TotalMilliseconds / messages.Count);

fsm.Endpoint.LogUserMetrics(messages.Count, averageLatencyInMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void CloudMessageProcessor_CloseAsyncTest()

IProcessor moduleMessageProcessor = cloudEndpoint.CreateProcessor();
Task result = moduleMessageProcessor.CloseAsync(CancellationToken.None);
Assert.Equal(Task.CompletedTask, result);
Assert.Equal(TaskEx.Done, result);
}

[Fact]
Expand Down
Loading

0 comments on commit 354917b

Please sign in to comment.