Skip to content
This repository has been archived by the owner on Feb 4, 2023. It is now read-only.

Commit

Permalink
Run polling thread in high priority + trigger send thread when data a…
Browse files Browse the repository at this point in the history
…rrive (#10)
  • Loading branch information
kbeaugrand committed Oct 5, 2022
1 parent ff881c1 commit 65f1a08
Showing 1 changed file with 44 additions and 45 deletions.
89 changes: 44 additions & 45 deletions src/PlcPublisher/modules/PLC-Publisher/PLCPublisherModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public async Task StartAsync(CancellationToken cancellationToken)

this.logger.LogInformation("Method handlers registered");

StartPublisherThread();
ThreadPool.QueueUserWorkItem(async (state) => await SendBatchedEvent());
this.logger.LogInformation("Publisher thread started");

await RegisterTags(twin.Properties.Desired);
Expand All @@ -104,62 +104,58 @@ public class EnqueuedMessage
public object Data { get; set; }
}

private void StartPublisherThread()
private async Task SendBatchedEvent()
{
ThreadPool.QueueUserWorkItem(async (state) =>
List<EnqueuedMessage> currentMessages = new List<EnqueuedMessage>();

do
{
do
if (!this.messagesQueue.TryDequeue(out var item))
{
List<EnqueuedMessage> currentMessages = new List<EnqueuedMessage>();
break;
}

do
{
if (!this.messagesQueue.TryDequeue(out var item))
{
break;
}
currentMessages.Add(item);
} while (true);

currentMessages.Add(item);
} while (true);
var messages = currentMessages.Select(item =>
{
var jsonTagValue = JsonConvert.SerializeObject(item.Data);
this.logger.LogTrace($"handling message for tag {item.Tag.Name} with value {jsonTagValue}");
var messages = currentMessages.Select(item =>
{
var jsonTagValue = JsonConvert.SerializeObject(item.Data);
this.logger.LogTrace($"handling message for tag {item.Tag.Name} with value {jsonTagValue}");
JObject jsonEvent = null;
JObject jsonEvent = null;
if (item.Tag.Transform != null)
{
this.logger.LogTrace("Transforming message with {0}", item.Tag.Transform.ToString());
JObject inputMessage = new JObject();
inputMessage.Add("input", JToken.FromObject(item.Data));
if (item.Tag.Transform != null)
{
this.logger.LogTrace("Transforming message with {0}", item.Tag.Transform.ToString());
JObject inputMessage = new JObject();
inputMessage.Add("input", JToken.FromObject(item.Data));
jsonEvent = JObject.Parse(JsonTransformer.Transform(item.Tag.Transform.ToString(), inputMessage.ToString()));
}
else
{
jsonEvent = JObject.Parse($"{{\"{item.Tag.Name}\":{jsonTagValue}}}");
}
jsonEvent = JObject.Parse(JsonTransformer.Transform(item.Tag.Transform.ToString(), inputMessage.ToString()));
}
else
{
jsonEvent = JObject.Parse($"{{\"{item.Tag.Name}\":{jsonTagValue}}}");
}
jsonEvent.Add("timestamp", item.EnqueuedTime.ToString("o"));
var jsonData = jsonEvent.ToString();
jsonEvent.Add("timestamp", item.EnqueuedTime.ToString("o"));
var jsonData = jsonEvent.ToString();
this.logger.LogTrace("{1}", jsonData);
this.logger.LogTrace("{1}", jsonData);
return new Message(Encoding.UTF8.GetBytes(jsonData))
{
CreationTimeUtc = item.EnqueuedTime.UtcDateTime,
};
}).ToArray();

return new Message(Encoding.UTF8.GetBytes(jsonData))
{
CreationTimeUtc = item.EnqueuedTime.UtcDateTime,
};
}).ToArray();
if (!messages.Any())
return;

if (!messages.Any())
continue;
this.logger.LogDebug("Sending {0} messages", messages.Count());
await this.moduleClient.SendEventBatchAsync("tag", messages);

this.logger.LogDebug("Sending {0} messages", messages.Count());
await this.moduleClient.SendEventBatchAsync("tag", messages);
} while (true);
});
Thread.Sleep(100);
}

public async Task StopAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -207,7 +203,7 @@ private void StartTagPolling(TagTwinDefinition tag, CancellationToken cancellati
{
this.logger.LogDebug($"Tag: {tag.TagName}, PollingInterval: {tag.PollingInterval}");

ThreadPool.QueueUserWorkItem(new WaitCallback(async c =>
var thread = new Thread(async c =>
{
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(tag.PollingInterval));
this.runningTasks.Add(new object());
Expand Down Expand Up @@ -258,7 +254,10 @@ private void StartTagPolling(TagTwinDefinition tag, CancellationToken cancellati
}
}
while (!cancellationToken.IsCancellationRequested);
}));
});

thread.Priority = ThreadPriority.Highest;
thread.Start();
}
}
}

0 comments on commit 65f1a08

Please sign in to comment.