Skip to content

Commit

Permalink
Fix: Delay frequent twin pulls on reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
vipeller committed Jun 29, 2021
1 parent 8df00f5 commit 6dd7275
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Agent.Core;
Expand All @@ -20,9 +20,12 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub

public class EdgeAgentConnection : IEdgeAgentConnection
{
const int PullFrequencyThreshold = 10;

internal static readonly Version ExpectedSchemaVersion = new Version("1.1.0");
static readonly TimeSpan DefaultConfigRefreshFrequency = TimeSpan.FromHours(1);
static readonly TimeSpan DeviceClientInitializationWaitTime = TimeSpan.FromSeconds(5);
static readonly TimeSpan DefaultTwinPullOnConnectThrottleTime = TimeSpan.FromSeconds(30);

static readonly ITransientErrorDetectionStrategy AllButFatalErrorDetectionStrategy = new DelegateErrorDetectionStrategy(ex => ex.IsFatal() == false);

Expand All @@ -38,18 +41,23 @@ public class EdgeAgentConnection : IEdgeAgentConnection
readonly bool pullOnReconnect;
readonly IDeviceManager deviceManager;
readonly IDeploymentMetrics deploymentMetrics;
readonly TimeSpan twinPullOnConnectThrottleTime;

Option<TwinCollection> desiredProperties;
Option<TwinCollection> reportedProperties;
Option<DeploymentConfigInfo> deploymentConfigInfo;

DateTime lastTwinPullOnConnect = DateTime.MinValue;
AtomicBoolean isDelayedTwinPullInProgress = new AtomicBoolean(false);
int pullRequestCounter = 0;

public EdgeAgentConnection(
IModuleClientProvider moduleClientProvider,
ISerde<DeploymentConfig> desiredPropertiesSerDe,
IRequestManager requestManager,
IDeviceManager deviceManager,
IDeploymentMetrics deploymentMetrics)
: this(moduleClientProvider, desiredPropertiesSerDe, requestManager, deviceManager, true, DefaultConfigRefreshFrequency, TransientRetryStrategy, deploymentMetrics)
: this(moduleClientProvider, desiredPropertiesSerDe, requestManager, deviceManager, true, DefaultConfigRefreshFrequency, TransientRetryStrategy, deploymentMetrics, DefaultTwinPullOnConnectThrottleTime)
{
}

Expand All @@ -61,7 +69,7 @@ public class EdgeAgentConnection : IEdgeAgentConnection
bool enableSubscriptions,
TimeSpan configRefreshFrequency,
IDeploymentMetrics deploymentMetrics)
: this(moduleClientProvider, desiredPropertiesSerDe, requestManager, deviceManager, enableSubscriptions, configRefreshFrequency, TransientRetryStrategy, deploymentMetrics)
: this(moduleClientProvider, desiredPropertiesSerDe, requestManager, deviceManager, enableSubscriptions, configRefreshFrequency, TransientRetryStrategy, deploymentMetrics, DefaultTwinPullOnConnectThrottleTime)
{
}

Expand All @@ -73,7 +81,8 @@ public class EdgeAgentConnection : IEdgeAgentConnection
bool enableSubscriptions,
TimeSpan refreshConfigFrequency,
RetryStrategy retryStrategy,
IDeploymentMetrics deploymentMetrics)
IDeploymentMetrics deploymentMetrics,
TimeSpan twinPullOnConnectThrottleTime)
{
this.desiredPropertiesSerDe = Preconditions.CheckNotNull(desiredPropertiesSerDe, nameof(desiredPropertiesSerDe));
this.deploymentConfigInfo = Option.None<DeploymentConfigInfo>();
Expand All @@ -86,6 +95,7 @@ public class EdgeAgentConnection : IEdgeAgentConnection
Events.TwinRefreshInit(refreshConfigFrequency);
this.deploymentMetrics = Preconditions.CheckNotNull(deploymentMetrics, nameof(deploymentMetrics));
this.initTask = this.ForceRefreshTwin();
this.twinPullOnConnectThrottleTime = twinPullOnConnectThrottleTime;
}

public Option<TwinCollection> ReportedProperties => this.reportedProperties;
Expand Down Expand Up @@ -227,9 +237,28 @@ async void OnConnectionStatusChanged(ConnectionStatus status, ConnectionStatusCh

if (this.pullOnReconnect && this.initTask.IsCompleted && status == ConnectionStatus.Connected)
{
var delayedTwinPull = true;
using (await this.twinLock.LockAsync())
{
await this.RefreshTwinAsync();
var now = DateTime.Now;
if (now - this.lastTwinPullOnConnect > this.twinPullOnConnectThrottleTime && !this.isDelayedTwinPullInProgress.Get())
{
this.lastTwinPullOnConnect = now;
await this.RefreshTwinAsync();
delayedTwinPull = false;
}
}

if (delayedTwinPull)
{
if (this.isDelayedTwinPullInProgress.GetAndSet(true))
{
Interlocked.Increment(ref this.pullRequestCounter);
}
else
{
_ = this.DelayedRefreshTwinAsync();
}
}
}
}
Expand All @@ -239,6 +268,38 @@ async void OnConnectionStatusChanged(ConnectionStatus status, ConnectionStatusCh
}
}

async Task DelayedRefreshTwinAsync()
{
Events.StartedDelayedTwinPull();
await Task.Delay(this.twinPullOnConnectThrottleTime);

var requestCounter = default(int);
using (await this.twinLock.LockAsync())
{
this.lastTwinPullOnConnect = DateTime.Now;

try
{
await this.RefreshTwinAsync();
}
catch
{
// swallowing intentionally
}

requestCounter = Interlocked.Exchange(ref this.pullRequestCounter, 0);

this.isDelayedTwinPullInProgress.Set(false);
}

if (requestCounter > PullFrequencyThreshold)
{
Events.PullingTwinHasBeenTriggeredFrequently(requestCounter, Convert.ToInt32(this.twinPullOnConnectThrottleTime.TotalSeconds));
}

Events.FinishedDelayedTwinPull();
}

async Task OnDesiredPropertiesUpdated(TwinCollection desiredPropertiesPatch, object userContext)
{
Events.DesiredPropertiesUpdated();
Expand Down Expand Up @@ -422,6 +483,9 @@ enum EventIds
SendEventClientEmpty,
ErrorSendingEvent,
ErrorClosingModuleClient,
PullingTwinHasBeenTriggeredFrequently,
StartedDelayedTwinPull,
FinishedDelayedTwinPull
}

public static void DesiredPropertiesPatchFailed(Exception exception)
Expand Down Expand Up @@ -550,6 +614,21 @@ public static void ErrorClosingModuleClientForRetry(Exception e)
{
Log.LogWarning((int)EventIds.ErrorClosingModuleClient, e, "Error closing module client for retry");
}

internal static void PullingTwinHasBeenTriggeredFrequently(int count, int seconds)
{
Log.LogWarning((int)EventIds.PullingTwinHasBeenTriggeredFrequently, $"Pulling twin by 'Connected' event has been triggered frequently, {count} times in the last {seconds} seconds. This can be a sign when more edge devices use the same identity and they keep getting disconnected.");
}

internal static void StartedDelayedTwinPull()
{
Log.LogDebug((int)EventIds.StartedDelayedTwinPull, $"Started delayed twin-pull");
}

internal static void FinishedDelayedTwinPull()
{
Log.LogDebug((int)EventIds.FinishedDelayedTwinPull, $"Finished delayed twin-pull");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub.Test
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Common.Exceptions;
Expand Down Expand Up @@ -741,6 +742,103 @@ public async Task GetDeploymentConfigInfoAsyncIncludesExceptionWhenDeserializeTh
deviceManager.Verify(x => x.ReprovisionDeviceAsync(), Times.Exactly(shouldReprovision ? 1 : 0));
}

[Fact]
[Unit]
public async Task FrequentTwinPullsOnConnectionAreThrottledAsync()
{
// Arrange
var deviceClient = new Mock<IModuleClient>();
deviceClient.Setup(x => x.UpstreamProtocol).Returns(UpstreamProtocol.Amqp);
deviceClient.Setup(x => x.IsActive).Returns(true);
var serde = new Mock<ISerde<DeploymentConfig>>();
ConnectionStatusChangesHandler connectionStatusChangesHandler = null;
var twin = new Twin
{
Properties = new TwinProperties
{
Desired = new TwinCollection(
JObject.FromObject(
new Dictionary<string, object>
{
{ "$version", 10 },
{ "MoreStuff", "MoreStuffHereToo" }
}).ToString()),
Reported = new TwinCollection()
}
};

var moduleClientProvider = new Mock<IModuleClientProvider>();
moduleClientProvider.Setup(d => d.Create(It.IsAny<ConnectionStatusChangesHandler>()))
.Callback<ConnectionStatusChangesHandler>(statusChanges => connectionStatusChangesHandler = statusChanges)
.ReturnsAsync(deviceClient.Object);

var retryStrategy = new Mock<RetryStrategy>(new object[] { false });
retryStrategy.Setup(rs => rs.GetShouldRetry())
.Returns(
(int retryCount, Exception lastException, out TimeSpan delay) =>
{
delay = TimeSpan.Zero;
return false;
});

var counter = 0;
var milestone = new SemaphoreSlim(0, 1);

deviceClient.Setup(d => d.GetTwinAsync())
.ReturnsAsync(
() =>
{
counter++;
milestone.Release();
return twin;
});

serde.Setup(s => s.Deserialize(It.IsAny<string>())).Returns(() => DeploymentConfig.Empty);

IEnumerable<IRequestHandler> requestHandlers = new List<IRequestHandler> { new PingRequestHandler() };
var deviceManager = new Mock<IDeviceManager>();
deviceManager.Setup(x => x.ReprovisionDeviceAsync()).Returns(Task.CompletedTask);

var connection = new EdgeAgentConnection(moduleClientProvider.Object, serde.Object, new RequestManager(requestHandlers, DefaultRequestTimeout), deviceManager.Object, true, TimeSpan.FromHours(1), retryStrategy.Object, Mock.Of<IDeploymentMetrics>(), TimeSpan.FromSeconds(3));

// There is a twin pull during init, wait for that
await milestone.WaitAsync(TimeSpan.FromSeconds(2));

// A first time call should just go through
counter = 0;
connectionStatusChangesHandler.Invoke(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);

await milestone.WaitAsync(TimeSpan.FromSeconds(2));

Assert.Equal(1, counter);

// get out of the 3 sec window
await Task.Delay(3500);

// The second call out of the window should go through
connectionStatusChangesHandler.Invoke(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);

await milestone.WaitAsync(TimeSpan.FromSeconds(2));

Assert.Equal(2, counter);

// Still in the window, so these should not go through. However, a delayed pull gets started
connectionStatusChangesHandler.Invoke(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);
connectionStatusChangesHandler.Invoke(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);
connectionStatusChangesHandler.Invoke(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);

await milestone.WaitAsync(TimeSpan.FromSeconds(2));

await Task.Delay(500); // wait a bit more, so there is time to pull twin more if the throttling does not work

Assert.Equal(2, counter);

// get out of the 3 sec window, the delayed pull should finish by then
await Task.Delay(3500);
Assert.Equal(3, counter);
}

[Fact]
[Unit]
public async Task GetDeploymentConfigInfoIncludesExceptionWhenSchemaVersionDoesNotMatch()
Expand Down Expand Up @@ -859,7 +957,7 @@ public async Task GetDeploymentConfigInfoAsyncIDoesNotIncludeExceptionWhenGetTwi
var deviceManager = new Mock<IDeviceManager>();

// Act
IEdgeAgentConnection connection = new EdgeAgentConnection(moduleClientProvider.Object, serde.Object, new RequestManager(requestHandlers, DefaultRequestTimeout), deviceManager.Object, true, TimeSpan.FromHours(1), retryStrategy.Object, Mock.Of<IDeploymentMetrics>());
IEdgeAgentConnection connection = new EdgeAgentConnection(moduleClientProvider.Object, serde.Object, new RequestManager(requestHandlers, DefaultRequestTimeout), deviceManager.Object, true, TimeSpan.FromHours(1), retryStrategy.Object, Mock.Of<IDeploymentMetrics>(), TimeSpan.FromSeconds(30));

// Assert
// The connection hasn't been created yet. So wait for it.
Expand Down Expand Up @@ -942,7 +1040,7 @@ public async Task GetDeploymentConfigInfoAsyncRetriesWhenGetTwinThrows()
var deviceManager = new Mock<IDeviceManager>();

// Act
IEdgeAgentConnection connection = new EdgeAgentConnection(moduleClientProvider.Object, serde.Object, new RequestManager(requestHandlers, DefaultRequestTimeout), deviceManager.Object, true, TimeSpan.FromHours(1), retryStrategy.Object, Mock.Of<IDeploymentMetrics>());
IEdgeAgentConnection connection = new EdgeAgentConnection(moduleClientProvider.Object, serde.Object, new RequestManager(requestHandlers, DefaultRequestTimeout), deviceManager.Object, true, TimeSpan.FromHours(1), retryStrategy.Object, Mock.Of<IDeploymentMetrics>(), TimeSpan.FromSeconds(30));

// Assert
// The connection hasn't been created yet. So wait for it.
Expand Down Expand Up @@ -1388,7 +1486,7 @@ public async Task GetTwinFailureDoesNotUpdateState()
var deviceManager = new Mock<IDeviceManager>();

// Act
using (var edgeAgentConnection = new EdgeAgentConnection(moduleClientProvider.Object, serde, new RequestManager(requestHandlers, DefaultRequestTimeout), deviceManager.Object, true, TimeSpan.FromSeconds(10), retryStrategy, Mock.Of<IDeploymentMetrics>()))
using (var edgeAgentConnection = new EdgeAgentConnection(moduleClientProvider.Object, serde, new RequestManager(requestHandlers, DefaultRequestTimeout), deviceManager.Object, true, TimeSpan.FromSeconds(10), retryStrategy, Mock.Of<IDeploymentMetrics>(), TimeSpan.FromSeconds(30)))
{
await Task.Delay(TimeSpan.FromSeconds(3));
Option<DeploymentConfigInfo> receivedDeploymentConfigInfo = await edgeAgentConnection.GetDeploymentConfigInfoAsync();
Expand Down Expand Up @@ -1499,7 +1597,7 @@ public async Task GetTwinRetryLogicGetsNewClient()
var deviceManager = new Mock<IDeviceManager>();

// Act
using (var edgeAgentConnection = new EdgeAgentConnection(moduleClientProvider.Object, serde, new RequestManager(requestHandlers, DefaultRequestTimeout), deviceManager.Object, true, TimeSpan.FromSeconds(10), retryStrategy, Mock.Of<IDeploymentMetrics>()))
using (var edgeAgentConnection = new EdgeAgentConnection(moduleClientProvider.Object, serde, new RequestManager(requestHandlers, DefaultRequestTimeout), deviceManager.Object, true, TimeSpan.FromSeconds(10), retryStrategy, Mock.Of<IDeploymentMetrics>(), TimeSpan.FromSeconds(30)))
{
await Task.Delay(TimeSpan.FromSeconds(3));
Option<DeploymentConfigInfo> receivedDeploymentConfigInfo = await edgeAgentConnection.GetDeploymentConfigInfoAsync();
Expand Down Expand Up @@ -1602,7 +1700,7 @@ public async Task GetDeploymentConfigInfoAsync_CreateNewModuleClientWhenGetTwinT
var deviceManager = new Mock<IDeviceManager>();

// Act
IEdgeAgentConnection connection = new EdgeAgentConnection(moduleClientProvider.Object, serde.Object, new RequestManager(requestHandlers, DefaultRequestTimeout), deviceManager.Object, true, TimeSpan.FromHours(1), retryStrategy.Object, Mock.Of<IDeploymentMetrics>());
IEdgeAgentConnection connection = new EdgeAgentConnection(moduleClientProvider.Object, serde.Object, new RequestManager(requestHandlers, DefaultRequestTimeout), deviceManager.Object, true, TimeSpan.FromHours(1), retryStrategy.Object, Mock.Of<IDeploymentMetrics>(), TimeSpan.FromSeconds(30));

// Assert
// The connection hasn't been created yet. So wait for it.
Expand Down

0 comments on commit 6dd7275

Please sign in to comment.