Skip to content

Commit

Permalink
Support processing context's SendToDataSource in RealTimeWorkbench. M…
Browse files Browse the repository at this point in the history
…essages available on new DataSourceMessageReceived event.
  • Loading branch information
markwaterman committed Mar 14, 2024
1 parent 6d60da4 commit d44be7a
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 4 deletions.
21 changes: 21 additions & 0 deletions src/Workbench.UnitTests/RealTimeBasics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,26 @@ public void SendMessage()
var instances = wb.GetInstances<RealTimeCarModel>(nameof(RealTimeCar));
Assert.True(instances.ContainsKey("Car1"));
}

[Fact]
public void SendToDataSource()
{
RealTimeWorkbench wb = new RealTimeWorkbench();
bool msgReceived = false;
wb.DataSourceMessageReceived += (sender, e) =>
{
msgReceived = true;
Assert.Equal("Too Fast", ((StatusMessage)e.Message).Payload);
Assert.Equal("Car1", e.DigitalTwinId);
Assert.Equal(nameof(RealTimeCar), e.ModelName);
};

var endpoint = wb.AddRealTimeModel(nameof(RealTimeCar), new RealTimeCarMessageProcessor2());

endpoint.Send("Car1", new CarMessage { Speed = 22 });


Assert.True(msgReceived);
}
}
}
19 changes: 15 additions & 4 deletions src/Workbench/RealTimeProcessingContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using Scaleout.Streaming.DigitalTwin.Core;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;

Expand Down Expand Up @@ -85,22 +86,32 @@ public override SendingResult SendAlert(string providerName, AlertMessage alertM

public override SendingResult SendToDataSource(byte[] message)
{
throw new NotSupportedException();
_env.SendToDataSouce(InstanceId, DigitalTwinModel, message);
return SendingResult.Handled;
}

public override SendingResult SendToDataSource(object message)
{
throw new NotSupportedException();
_env.SendToDataSouce(InstanceId, DigitalTwinModel, message);
return SendingResult.Handled;
}

public override SendingResult SendToDataSource(IEnumerable<byte[]> messages)
{
throw new NotSupportedException();
foreach (var msg in messages)
{
SendToDataSource(msg);
}
return SendingResult.Handled;
}

public override SendingResult SendToDataSource(IEnumerable<object> messages)
{
throw new NotSupportedException();
foreach (var msg in messages)
{
SendToDataSource(msg);
}
return SendingResult.Handled;
}

public override SendingResult SendToTwin(string targetTwinModel, string targetTwinId, byte[] message)
Expand Down
21 changes: 21 additions & 0 deletions src/Workbench/RealTimeWorkbench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public class RealTimeWorkbench : IDisposable
private ILogger _logger;
private bool _disposed;

/// <summary>
/// Event raised when a message processor implementation sends a message back to its
/// data source using <see cref="ProcessingContext.SendToDataSource(object)"/>.
/// </summary>
public event EventHandler<SendToDataSourceEventArgs>? DataSourceMessageReceived;

/// <summary>
/// Constructor
/// </summary>
Expand Down Expand Up @@ -218,6 +224,21 @@ internal void RecordAlert(string providerName, AlertMessage alertMessage)
return new InstanceDictionary<TDigitalTwin>(instances);
}

/// <summary>
/// Raises the <see cref="DataSourceMessageReceived"/> event.
/// </summary>
/// <param name="e">Event arguments containing information about the message sent back to the data source.</param>
protected virtual void OnDataSourceMessageReceived(SendToDataSourceEventArgs e)
{
DataSourceMessageReceived?.Invoke(this, e);
}

internal void SendToDataSouce(string digitalTwinId, string modelName, object message)
{
SendToDataSourceEventArgs e = new SendToDataSourceEventArgs(digitalTwinId, modelName, message);
OnDataSourceMessageReceived(e);
}

/// <summary>
/// When overridden in a derived class, releases the unmanaged resources used by the environment,
/// and optionally releases the managed resources.
Expand Down
34 changes: 34 additions & 0 deletions src/Workbench/SendToDataSourceEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Scaleout.DigitalTwin.Workbench
{
/// <summary>
///
/// </summary>
public class SendToDataSourceEventArgs : EventArgs
{
internal SendToDataSourceEventArgs(string digitalTwinId, string modelName, object message)
{
DigitalTwinId = digitalTwinId;
Message = message;
ModelName = modelName;
}
/// <summary>
/// Message sent to the digital twin's data source.
/// </summary>
public object Message { get; set; }

/// <summary>
/// ID of the digital twin instance.
/// </summary>
public string DigitalTwinId { get; set; }


/// <summary>
/// Name of the model.
/// </summary>
public string ModelName { get; set; }
}
}

0 comments on commit d44be7a

Please sign in to comment.