Skip to content
This repository has been archived by the owner on Feb 4, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2 from kbeaugrand-org/feature/batch_messages_in_o…
Browse files Browse the repository at this point in the history
…ther_thread

Transform and batch messages in separate thread
  • Loading branch information
kbeaugrand committed Sep 19, 2022
2 parents 475a463 + 5c7b714 commit b423c31
Showing 1 changed file with 74 additions and 21 deletions.
95 changes: 74 additions & 21 deletions src/PlcPublisher/modules/PLC-Publisher/PLCPublisherModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ namespace PLCPublisher
using libplctag;
using PLCPublisher.Commands.ListPrograms;
using JUST;
using System.Collections.Generic;
using System.Linq;
using Newtonsoft.Json.Linq;

public class PLCPublisherModule : IHostedService
{
Expand All @@ -34,6 +37,7 @@ public class PLCPublisherModule : IHostedService
private readonly ICommandHandler listUdtTypesCommandHandler;
private readonly ICommandHandler listProgramsCommandHandler;
private readonly ConcurrentBag<object> runningTasks = new ConcurrentBag<object>();
private readonly ConcurrentQueue<EnqueuedMessage> messagesQueue = new ConcurrentQueue<EnqueuedMessage>();
private CancellationTokenSource runningTaskCancellationTokenSource;

public PLCPublisherModule(
Expand Down Expand Up @@ -75,6 +79,9 @@ public async Task StartAsync(CancellationToken cancellationToken)

this.logger.LogInformation("Method handlers registered");

StartPublisherThread();
this.logger.LogInformation("Publisher thread started");

await RegisterTags(twin.Properties.Desired);

this.logger.LogInformation("Tags registered");
Expand All @@ -88,6 +95,67 @@ private async Task OnDesiredPropertyUpdate(TwinCollection desiredProperties, obj
await RegisterTags(desiredProperties);
}


public class EnqueuedMessage
{
public DateTimeOffset EnqueuedTime { get; set; } = DateTimeOffset.UtcNow;

public TagTwinDefinition Tag { get; set; }

public object Data { get; set; }
}

private void StartPublisherThread()
{
ThreadPool.QueueUserWorkItem(async (state) =>
{
do
{
List<EnqueuedMessage> currentMessages = new List<EnqueuedMessage>();
do
{
if (!this.messagesQueue.TryDequeue(out var item))
{
break;
}
currentMessages.Add(item);
} while (true);
var messages = currentMessages.Select(item =>
{
var jsonTagValue = JsonConvert.SerializeObject(item.Data);
this.logger.LogTrace($"handling message for tag {item.Tag.Name} with value {jsonTagValue}");
if (item.Tag.Transform != null)
{
this.logger.LogTrace("Transforming message with {0}", item.Tag.Transform.ToString());
JObject inputMessage = new JObject();
inputMessage.Add("input", JToken.FromObject(item.Data));
jsonTagValue = JsonTransformer.Transform(item.Tag.Transform.ToString(), inputMessage.ToString()).ToString();
}
this.logger.LogTrace("Tag {0} value: {1}", item.Tag.TagName, jsonTagValue);
string jsonData = $"{{\"timestamp\":\"{item.EnqueuedTime.ToString("o")}\",\"{item.Tag.Name}\":{jsonTagValue}}}";
return new Message(Encoding.UTF8.GetBytes(jsonData))
{
CreationTimeUtc = item.EnqueuedTime.UtcDateTime,
};
}).ToArray();
if (!messages.Any())
continue;
this.logger.LogDebug("Sending {0} messages", messages.Count());
await this.moduleClient.SendEventBatchAsync("tag", messages);
} while (true);
});
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await UnregisterTags();
Expand Down Expand Up @@ -131,7 +199,7 @@ private async Task UnregisterTags()

private void StartTagPolling(TagTwinDefinition tag, CancellationToken cancellationToken)
{
this.logger.LogDebug($"Tag: {tag.Name}, PollingInterval: {tag.PollingInterval}");
this.logger.LogDebug($"Tag: {tag.TagName}, PollingInterval: {tag.PollingInterval}");

ThreadPool.QueueUserWorkItem(new WaitCallback(async c =>
{
Expand All @@ -154,28 +222,13 @@ private void StartTagPolling(TagTwinDefinition tag, CancellationToken cancellati
await plcTag.ReadAsync(cancellationToken);
this.logger.LogTrace("Polling tag {0} completed ({1})", tag.TagName, plcTag.GetStatus());
var newJsonTagValue = JsonConvert.SerializeObject(plcTag.Value);
if (string.Equals(newJsonTagValue, jsonTagValue))
{
continue;
}
jsonTagValue = newJsonTagValue;
if (tag.Transform != null)
this.messagesQueue.Enqueue(new EnqueuedMessage
{
jsonTagValue = JsonTransformer.Transform(tag.Transform, JObject.Parse(jsonTagValue)).ToString();
}
this.logger.LogDebug("Tag {0} value: {1}", tag.TagName, jsonTagValue);
string jsonData = $"{{\"timestamp\":\"{DateTime.UtcNow.ToString("o")}\",\"{tag.Name}\":{jsonTagValue}}}";
var message = new Message(Encoding.UTF8.GetBytes(jsonData));
Data = plcTag.Value,
Tag = tag
});
this.logger.LogTrace("Sending message {0}", jsonData);
await this.moduleClient.SendEventAsync("tag", message);
this.logger.LogTrace("Message queued for tag {0}", tag.TagName);
}
catch (Exception e)
when (e is TaskCanceledException || e is OperationCanceledException)
Expand Down

0 comments on commit b423c31

Please sign in to comment.