Skip to content

Commit

Permalink
Send connection device Id information on twin change notifications Az…
Browse files Browse the repository at this point in the history
…ure#4… (Azure#5145)

…762 (Azure#4888)

Cherry-pick: Azure@fa60e52

Issue: EdgeHub supports routing twin change notifications, which basically includes routing RP updates to other modules. The message that gets routed should have information about the device that sent the RP update.
But because of a recent change in EH, this information got dropped.

Fix: Fix is to route the information about the device/module that sent the RP update to the receiving module.
  • Loading branch information
varunpuranik committed Jun 23, 2021
1 parent 9c2051f commit acc3e1f
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,20 @@ public AmqpMessage FromMessage(IMessage message)
amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsInputNameKey] = inputName;
}

if (message.SystemProperties.TryGetNonEmptyValue(SystemProperties.ConnectionDeviceId, out string connectionDeviceId))
if (message.SystemProperties.TryGetNonEmptyValue(SystemProperties.RpConnectionDeviceIdInternal, out string rpConnectionDeviceId))
{
amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsConnectionDeviceId] = rpConnectionDeviceId;
}
else if (message.SystemProperties.TryGetNonEmptyValue(SystemProperties.ConnectionDeviceId, out string connectionDeviceId))
{
amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsConnectionDeviceId] = connectionDeviceId;
}

if (message.SystemProperties.TryGetNonEmptyValue(SystemProperties.ConnectionModuleId, out string connectionModuleId))
if (message.SystemProperties.TryGetNonEmptyValue(SystemProperties.RpConnectionModuleIdInternal, out string rpConnectionModuleId))
{
amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsConnectionModuleId] = rpConnectionModuleId;
}
else if (message.SystemProperties.TryGetNonEmptyValue(SystemProperties.ConnectionModuleId, out string connectionModuleId))
{
amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsConnectionModuleId] = connectionModuleId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class Constants
public const string InternalOriginInterface = "internal";
public const string DownstreamOriginInterface = "downstream";

public const string EdgeHubModuleId = "$edgeHub";
public const string IotEdgeIdentityCapability = "iotEdge";
public const string ServiceIdentityRefreshMethodName = "RefreshDeviceScopeIdentityCache";
public const string IoTEdgeProductInfoIdentifier = "EdgeHub";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core
/// </summary>
public interface IEdgeHub : IDisposable
{
string GetEdgeDeviceId();

Task ProcessDeviceMessage(IIdentity identity, IMessage message);

Task ProcessDeviceMessageBatch(IIdentity identity, IEnumerable<IMessage> message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public static class SystemProperties
public const string InterfaceId = "iothub-interface-id";
public const string ModelId = "modelId";

public const string RpConnectionDeviceIdInternal = "rpSenderDeviceId";
public const string RpConnectionModuleIdInternal = "rpSenderModuleId";

public static readonly Dictionary<string, string> IncomingSystemPropertiesMap = new Dictionary<string, string>
{
{ OnTheWireSystemPropertyNames.ExpiryTimeUtcOnTheWireName, ExpiryTimeUtc },
Expand Down Expand Up @@ -67,7 +70,9 @@ public static class SystemProperties
{ Operation, OnTheWireSystemPropertyNames.OperationOnTheWireName },
{ CreationTime, OnTheWireSystemPropertyNames.CreationTimeOnTheWireName },
{ ConnectionDeviceId, OnTheWireSystemPropertyNames.ConnectionDeviceIdOnTheWireName },
{ ConnectionModuleId, OnTheWireSystemPropertyNames.ConnectionModuleIdOnTheWireName }
{ ConnectionModuleId, OnTheWireSystemPropertyNames.ConnectionModuleIdOnTheWireName },
{ RpConnectionDeviceIdInternal, OnTheWireSystemPropertyNames.ConnectionDeviceIdOnTheWireName },
{ RpConnectionModuleIdInternal, OnTheWireSystemPropertyNames.ConnectionModuleIdOnTheWireName }
};

static class OnTheWireSystemPropertyNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,17 @@ public async Task UpdateReportedPropertiesAsync(IMessage reportedPropertiesMessa
switch (this.Identity)
{
case IModuleIdentity moduleIdentity:
reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionDeviceId] = moduleIdentity.DeviceId;
reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionModuleId] = moduleIdentity.ModuleId;
reportedPropertiesMessage.SystemProperties[SystemProperties.RpConnectionDeviceIdInternal] = moduleIdentity.DeviceId;
reportedPropertiesMessage.SystemProperties[SystemProperties.RpConnectionModuleIdInternal] = moduleIdentity.ModuleId;
break;
case IDeviceIdentity deviceIdentity:
reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionDeviceId] = deviceIdentity.DeviceId;
reportedPropertiesMessage.SystemProperties[SystemProperties.RpConnectionDeviceIdInternal] = deviceIdentity.DeviceId;
break;
}

reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionDeviceId] = this.edgeHub.GetEdgeDeviceId();
reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionModuleId] = Constants.EdgeHubModuleId;

try
{
using (Metrics.TimeReportedPropertiesUpdate(this.Identity.Id))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class RoutingEdgeHub : IEdgeHub
this.subscriptionProcessor = Preconditions.CheckNotNull(subscriptionProcessor, nameof(subscriptionProcessor));
}

public string GetEdgeDeviceId() => this.edgeDeviceId;

public Task ProcessDeviceMessage(IIdentity identity, IMessage message)
{
Preconditions.CheckNotNull(message, nameof(message));
Expand Down Expand Up @@ -100,9 +102,6 @@ public Task UpdateReportedPropertiesAsync(IIdentity identity, IMessage reportedP
Events.UpdateReportedPropertiesReceived(identity);
Task cloudSendMessageTask = this.twinManager.UpdateReportedPropertiesAsync(identity.Id, reportedPropertiesMessage);

reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionDeviceId] = this.edgeDeviceId;
reportedPropertiesMessage.SystemProperties[SystemProperties.ConnectionModuleId] = this.edgeModuleId;

IRoutingMessage routingMessage = this.ProcessMessageInternal(reportedPropertiesMessage, false);
Task routingSendMessageTask = this.router.RouteAsync(routingMessage);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ public IProtocolGatewayMessage FromMessage(IMessage message)
}
}

if (message.SystemProperties.TryGetValue(SystemProperties.RpConnectionDeviceIdInternal, out string rpDeviceId))
{
properties[SystemProperties.OutgoingSystemPropertiesMap[SystemProperties.RpConnectionDeviceIdInternal]] = rpDeviceId;
}

if (message.SystemProperties.TryGetValue(SystemProperties.RpConnectionModuleIdInternal, out string rpModuleId))
{
properties[SystemProperties.OutgoingSystemPropertiesMap[SystemProperties.RpConnectionModuleIdInternal]] = rpModuleId;
}

if (!this.addressConvertor.TryBuildProtocolAddressFromEdgeHubMessage(uriTemplateKey, message, properties, out string address))
{
throw new InvalidOperationException("Could not derive destination address using message system properties");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,5 +343,31 @@ byte[] GetMessageBody(AmqpMessage sourceMessage)
Assert.Equal("Value2", amqpMessage.ApplicationProperties.Map["Prop2"].ToString());
}
}

[Fact]
public void TestConnectionDeviceIdTest()
{
// Setup
var systemProperties = new Dictionary<string, string>
{
[SystemProperties.ConnectionDeviceId] = "edgeDeviceId",
[SystemProperties.ConnectionModuleId] = "$edgeHub",
[SystemProperties.RpConnectionDeviceIdInternal] = "leafDevice1",
[SystemProperties.RpConnectionModuleIdInternal] = "leafModule1",
};

byte[] bytes = { 1, 2, 3, 4 };

var message = new EdgeMessage(bytes, new Dictionary<string, string>(), systemProperties);
var messageConverter = new AmqpMessageConverter();

// Act
using (AmqpMessage amqpMessage = messageConverter.FromMessage(message))
{
// Assert
Assert.Equal("leafDevice1", amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsConnectionDeviceId]);
Assert.Equal("leafModule1", amqpMessage.MessageAnnotations.Map[Constants.MessageAnnotationsConnectionModuleId]);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public async Task ForwardsTwinPatchOperationToTheCloudProxy()
edgeHub.Setup(e => e.UpdateReportedPropertiesAsync(It.IsAny<IIdentity>(), It.IsAny<IMessage>()))
.Callback<IIdentity, IMessage>((id, m) => receivedMessage = m)
.Returns(Task.CompletedTask);
edgeHub.Setup(e => e.GetEdgeDeviceId()).Returns("edgeDeviceId1");
Mock.Get(connMgr).Setup(c => c.GetCloudConnection(It.IsAny<string>())).Returns(Task.FromResult(Option.Some(cloudProxy)));
var listener = new DeviceMessageHandler(identity, edgeHub.Object, connMgr, DefaultMessageAckTimeout, Option.None<string>());
var underlyingDeviceProxy = new Mock<IDeviceProxy>();
Expand All @@ -126,8 +127,10 @@ public async Task ForwardsTwinPatchOperationToTheCloudProxy()
Assert.NotNull(receivedMessage);
Assert.Equal(Constants.TwinChangeNotificationMessageSchema, receivedMessage.SystemProperties[SystemProperties.MessageSchema]);
Assert.Equal(Constants.TwinChangeNotificationMessageType, receivedMessage.SystemProperties[SystemProperties.MessageType]);
Assert.Equal("device1", receivedMessage.SystemProperties[SystemProperties.ConnectionDeviceId]);
Assert.Equal("module1", receivedMessage.SystemProperties[SystemProperties.ConnectionModuleId]);
Assert.Equal("edgeDeviceId1", receivedMessage.SystemProperties[SystemProperties.ConnectionDeviceId]);
Assert.Equal("$edgeHub", receivedMessage.SystemProperties[SystemProperties.ConnectionModuleId]);
Assert.Equal("device1", receivedMessage.SystemProperties[SystemProperties.RpConnectionDeviceIdInternal]);
Assert.Equal("module1", receivedMessage.SystemProperties[SystemProperties.RpConnectionModuleIdInternal]);
Assert.True(receivedMessage.SystemProperties.ContainsKey(SystemProperties.EnqueuedTime));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class RoutingTest
{
static readonly TimeSpan DefaultMessageAckTimeout = TimeSpan.FromSeconds(30);
static readonly Random Rand = new Random();
static readonly string edgeHubModuleId = "$edgeHub";

[Fact]
public async Task RouteToCloudTest()
Expand Down Expand Up @@ -379,7 +378,7 @@ public async Task ReportedPropertyUpdatesAsTelemetryTest()
await device1.UpdateReportedProperties(message);
await Task.Delay(GetSleepTime());

Assert.True(iotHub.HasReceivedTwinChangeNotification(edgeDeviceId, edgeHubModuleId));
Assert.True(iotHub.HasReceivedTwinChangeNotification(edgeDeviceId, Constants.EdgeHubModuleId));
}

[Fact(Skip = "Flaky test, bug #2494150")]
Expand All @@ -400,8 +399,8 @@ public async Task TestRoutingTwinChangeNotificationFromDevice()
IMessage message = GetReportedPropertiesMessage();
await device1.UpdateReportedProperties(message);
await Task.Delay(GetSleepTime());
Assert.True(iotHub.HasReceivedTwinChangeNotification(edgeDeviceId, edgeHubModuleId));
Assert.True(module1.HasReceivedTwinChangeNotification());
Assert.True(iotHub.HasReceivedTwinChangeNotification(edgeDeviceId, Constants.EdgeHubModuleId));
Assert.True(module1.HasReceivedTwinChangeNotification("device1", null));
}

[Fact(Skip = "Flaky test, bug #2494150")]
Expand All @@ -422,8 +421,8 @@ public async Task TestRoutingTwinChangeNotificationFromModule()
IMessage message = GetReportedPropertiesMessage();
await module2.UpdateReportedProperties(message);
await Task.Delay(GetSleepTime());
Assert.True(iotHub.HasReceivedTwinChangeNotification(edgeDeviceId, edgeHubModuleId));
Assert.True(module1.HasReceivedTwinChangeNotification());
Assert.True(iotHub.HasReceivedTwinChangeNotification(edgeDeviceId, Constants.EdgeHubModuleId));
Assert.True(module1.HasReceivedTwinChangeNotification(edgeDeviceId, "mod2"));
}

// Need longer sleep when run tests in parallel
Expand Down Expand Up @@ -467,7 +466,7 @@ static async Task<(IEdgeHub, IConnectionManager)> SetupEdgeHub(IEnumerable<strin
ITwinManager twinManager = new TwinManager(connectionManager, new TwinCollectionMessageConverter(), new TwinMessageConverter(), Option.None<IEntityStore<string, TwinInfo>>());
var invokeMethodHandler = Mock.Of<IInvokeMethodHandler>();
var subscriptionProcessor = new SubscriptionProcessor(connectionManager, invokeMethodHandler, deviceConnectivityManager);
IEdgeHub edgeHub = new RoutingEdgeHub(router, routingMessageConverter, connectionManager, twinManager, edgeDeviceId, edgeHubModuleId, invokeMethodHandler, subscriptionProcessor);
IEdgeHub edgeHub = new RoutingEdgeHub(router, routingMessageConverter, connectionManager, twinManager, edgeDeviceId, Constants.EdgeHubModuleId, invokeMethodHandler, subscriptionProcessor);
return (edgeHub, connectionManager);
}

Expand Down Expand Up @@ -638,9 +637,11 @@ public Task SendMessage(IMessage message)
public Task UpdateReportedProperties(IMessage reportedPropertiesMessage) =>
this.deviceListener.UpdateReportedPropertiesAsync(reportedPropertiesMessage, Guid.NewGuid().ToString());

public bool HasReceivedTwinChangeNotification() => this.receivedMessages.Any(
public bool HasReceivedTwinChangeNotification(string connDeviceId, string connModuleId) => this.receivedMessages.Any(
m =>
m.SystemProperties[SystemProperties.MessageType] == Constants.TwinChangeNotificationMessageType);
m.SystemProperties[SystemProperties.MessageType] == Constants.TwinChangeNotificationMessageType
&& m.SystemProperties[SystemProperties.RpConnectionDeviceIdInternal] == connDeviceId
&& (string.IsNullOrWhiteSpace(connModuleId) || m.SystemProperties[SystemProperties.RpConnectionModuleIdInternal] == connModuleId));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,5 +347,62 @@ public void TestToMessage_NoTopicMatch()
var protocolGatewayMessageConverter = new ProtocolGatewayMessageConverter(converter, ByteBufferConverter);
Assert.Throws<InvalidOperationException>(() => protocolGatewayMessageConverter.ToMessage(protocolGatewayMessage));
}

[Fact]
public void RpSenderTest()
{
// Setup
const string DeviceId = "Device1";
const string ModuleId = "Module1";
const string Input = "input1";
var outputTemplates = new Dictionary<string, string>
{
["ModuleEndpoint"] = "devices/{deviceId}/modules/{moduleId}/inputs/{inputName}"
};
var inputTemplates = new List<string>
{
"devices/{deviceId}/messages/events/{params}/"
};
var config = new MessageAddressConversionConfiguration(
inputTemplates,
outputTemplates);
var converter = new MessageAddressConverter(config);

var properties = new Dictionary<string, string>();

var systemProperties = new Dictionary<string, string>
{
[SystemProperties.OutboundUri] = Constants.OutboundUriModuleEndpoint,
[SystemProperties.LockToken] = Guid.NewGuid().ToString(),
[TemplateParameters.DeviceIdTemplateParam] = DeviceId,
[Constants.ModuleIdTemplateParameter] = ModuleId,
[SystemProperties.InputName] = Input,
[SystemProperties.OutputName] = "output",
[SystemProperties.ContentEncoding] = "utf-8",
[SystemProperties.ContentType] = "application/json",

[SystemProperties.ConnectionDeviceId] = "edgeDevice1",
[SystemProperties.ConnectionModuleId] = "$edgeHub",
[SystemProperties.RpConnectionDeviceIdInternal] = "leafDevice1",
[SystemProperties.RpConnectionModuleIdInternal] = "leafModule1",
};

var message = Mock.Of<IMessage>(
m =>
m.Body == new byte[] { 1, 2, 3 } &&
m.Properties == properties &&
m.SystemProperties == systemProperties);

var protocolGatewayMessageConverter = new ProtocolGatewayMessageConverter(converter, ByteBufferConverter);

// Act
IProtocolGatewayMessage pgMessage = protocolGatewayMessageConverter.FromMessage(message);

// Verify
Assert.NotNull(pgMessage);
Assert.Equal(@"devices/Device1/modules/Module1/inputs/input1/%24.ce=utf-8&%24.ct=application%2Fjson&%24.cdid=leafDevice1&%24.cmid=leafModule1", pgMessage.Address);
Assert.Equal("leafDevice1", pgMessage.Properties["$.cdid"]);
Assert.Equal("leafModule1", pgMessage.Properties["$.cmid"]);
}
}
}

0 comments on commit acc3e1f

Please sign in to comment.