From d44be7a6075958de07295d6cfa12f8b4c5f04a71 Mon Sep 17 00:00:00 2001 From: Mark Waterman Date: Thu, 14 Mar 2024 11:10:13 -0700 Subject: [PATCH] Support processing context's SendToDataSource in RealTimeWorkbench. Messages available on new DataSourceMessageReceived event. --- src/Workbench.UnitTests/RealTimeBasics.cs | 21 +++++++++++++ src/Workbench/RealTimeProcessingContext.cs | 19 +++++++++--- src/Workbench/RealTimeWorkbench.cs | 21 +++++++++++++ src/Workbench/SendToDataSourceEventArgs.cs | 34 ++++++++++++++++++++++ 4 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 src/Workbench/SendToDataSourceEventArgs.cs diff --git a/src/Workbench.UnitTests/RealTimeBasics.cs b/src/Workbench.UnitTests/RealTimeBasics.cs index 8e803a5..b668cfb 100644 --- a/src/Workbench.UnitTests/RealTimeBasics.cs +++ b/src/Workbench.UnitTests/RealTimeBasics.cs @@ -40,5 +40,26 @@ public void SendMessage() var instances = wb.GetInstances(nameof(RealTimeCar)); Assert.True(instances.ContainsKey("Car1")); } + + [Fact] + public void SendToDataSource() + { + RealTimeWorkbench wb = new RealTimeWorkbench(); + bool msgReceived = false; + wb.DataSourceMessageReceived += (sender, e) => + { + msgReceived = true; + Assert.Equal("Too Fast", ((StatusMessage)e.Message).Payload); + Assert.Equal("Car1", e.DigitalTwinId); + Assert.Equal(nameof(RealTimeCar), e.ModelName); + }; + + var endpoint = wb.AddRealTimeModel(nameof(RealTimeCar), new RealTimeCarMessageProcessor2()); + + endpoint.Send("Car1", new CarMessage { Speed = 22 }); + + + Assert.True(msgReceived); + } } } diff --git a/src/Workbench/RealTimeProcessingContext.cs b/src/Workbench/RealTimeProcessingContext.cs index 7d92ca9..751c1c1 100644 --- a/src/Workbench/RealTimeProcessingContext.cs +++ b/src/Workbench/RealTimeProcessingContext.cs @@ -21,6 +21,7 @@ using Scaleout.Streaming.DigitalTwin.Core; using System; using System.Collections.Generic; +using System.Data; using System.Linq; using System.Text; @@ -85,22 +86,32 @@ public override SendingResult SendAlert(string providerName, AlertMessage alertM public override SendingResult SendToDataSource(byte[] message) { - throw new NotSupportedException(); + _env.SendToDataSouce(InstanceId, DigitalTwinModel, message); + return SendingResult.Handled; } public override SendingResult SendToDataSource(object message) { - throw new NotSupportedException(); + _env.SendToDataSouce(InstanceId, DigitalTwinModel, message); + return SendingResult.Handled; } public override SendingResult SendToDataSource(IEnumerable messages) { - throw new NotSupportedException(); + foreach (var msg in messages) + { + SendToDataSource(msg); + } + return SendingResult.Handled; } public override SendingResult SendToDataSource(IEnumerable messages) { - throw new NotSupportedException(); + foreach (var msg in messages) + { + SendToDataSource(msg); + } + return SendingResult.Handled; } public override SendingResult SendToTwin(string targetTwinModel, string targetTwinId, byte[] message) diff --git a/src/Workbench/RealTimeWorkbench.cs b/src/Workbench/RealTimeWorkbench.cs index af772eb..84e3f28 100644 --- a/src/Workbench/RealTimeWorkbench.cs +++ b/src/Workbench/RealTimeWorkbench.cs @@ -42,6 +42,12 @@ public class RealTimeWorkbench : IDisposable private ILogger _logger; private bool _disposed; + /// + /// Event raised when a message processor implementation sends a message back to its + /// data source using . + /// + public event EventHandler? DataSourceMessageReceived; + /// /// Constructor /// @@ -218,6 +224,21 @@ internal void RecordAlert(string providerName, AlertMessage alertMessage) return new InstanceDictionary(instances); } + /// + /// Raises the event. + /// + /// Event arguments containing information about the message sent back to the data source. + protected virtual void OnDataSourceMessageReceived(SendToDataSourceEventArgs e) + { + DataSourceMessageReceived?.Invoke(this, e); + } + + internal void SendToDataSouce(string digitalTwinId, string modelName, object message) + { + SendToDataSourceEventArgs e = new SendToDataSourceEventArgs(digitalTwinId, modelName, message); + OnDataSourceMessageReceived(e); + } + /// /// When overridden in a derived class, releases the unmanaged resources used by the environment, /// and optionally releases the managed resources. diff --git a/src/Workbench/SendToDataSourceEventArgs.cs b/src/Workbench/SendToDataSourceEventArgs.cs new file mode 100644 index 0000000..2e75988 --- /dev/null +++ b/src/Workbench/SendToDataSourceEventArgs.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Scaleout.DigitalTwin.Workbench +{ + /// + /// + /// + public class SendToDataSourceEventArgs : EventArgs + { + internal SendToDataSourceEventArgs(string digitalTwinId, string modelName, object message) + { + DigitalTwinId = digitalTwinId; + Message = message; + ModelName = modelName; + } + /// + /// Message sent to the digital twin's data source. + /// + public object Message { get; set; } + + /// + /// ID of the digital twin instance. + /// + public string DigitalTwinId { get; set; } + + + /// + /// Name of the model. + /// + public string ModelName { get; set; } + } +}