diff --git a/src/CsharpClient/Quix.Sdk.Streaming/Configuration/SecurityOptions.cs b/src/CsharpClient/Quix.Sdk.Streaming/Configuration/SecurityOptions.cs
index d78d3f0fb..17447f5ff 100644
--- a/src/CsharpClient/Quix.Sdk.Streaming/Configuration/SecurityOptions.cs
+++ b/src/CsharpClient/Quix.Sdk.Streaming/Configuration/SecurityOptions.cs
@@ -1,4 +1,5 @@
-using System;
+using Quix.Sdk.Streaming.QuixApi.Portal;
+using System;
namespace Quix.Sdk.Streaming.Configuration
{
@@ -10,7 +11,7 @@ public class SecurityOptions
///
/// The Sasl mechanism to use
///
- public SaslMechanism SaslMechanism { get; set; }
+ public SaslMechanism? SaslMechanism { get; set; }
///
/// SASL username.
@@ -27,6 +28,16 @@ public class SecurityOptions
///
public string SslCertificates { get; set; }
+ ///
+ /// Use SSL
+ ///
+ public bool UseSsl { get; set; }
+
+ ///
+ /// Use authentication
+ ///
+ public bool UseSasl { get; set; }
+
///
/// For deserialization when binding to Configurations like Appsettings
///
@@ -47,6 +58,12 @@ public SecurityOptions(string sslCertificates, string username, string password,
this.Username = username;
this.Password = password;
this.SaslMechanism = saslMechanism;
+
+ // Assume that if we get sslCertificates it's because we will use ssl
+ this.UseSsl = !string.IsNullOrEmpty(this.SslCertificates);
+
+ // Assume that if we have username, we will use Sasl
+ this.UseSasl = !string.IsNullOrEmpty(this.Username);
}
}
}
diff --git a/src/CsharpClient/Quix.Sdk.Streaming/Models/StreamWriter/StreamEventsWriter.cs b/src/CsharpClient/Quix.Sdk.Streaming/Models/StreamWriter/StreamEventsWriter.cs
index 9c9b8d923..1fb7452e9 100644
--- a/src/CsharpClient/Quix.Sdk.Streaming/Models/StreamWriter/StreamEventsWriter.cs
+++ b/src/CsharpClient/Quix.Sdk.Streaming/Models/StreamWriter/StreamEventsWriter.cs
@@ -6,6 +6,7 @@
using Quix.Sdk.Process.Models.Utility;
using Quix.Sdk.Process.Managers;
using System.Linq;
+using Quix.Sdk.Streaming.Exceptions;
namespace Quix.Sdk.Streaming.Models.StreamWriter
{
@@ -306,7 +307,18 @@ private void ResetFlushDefinitionsTimer()
private void OnFlushDefinitionsTimerEvent(object state)
{
if (!timerEnabled) return;
- this.FlushDefinitions();
+ try
+ {
+ this.FlushDefinitions();
+ }
+ catch (StreamClosedException exception) when (this.isDisposed)
+ {
+ // Ignore exception because the timer flush definition may finish executing only after closure due to how close lock works in streamWriter
+ }
+ catch (Exception ex)
+ {
+ this.logger.Log(LogLevel.Error, ex, "Exception occurred while trying to flush event definition buffer.");
+ }
}
diff --git a/src/CsharpClient/Quix.Sdk.Streaming/Models/StreamWriter/StreamParametersWriter.cs b/src/CsharpClient/Quix.Sdk.Streaming/Models/StreamWriter/StreamParametersWriter.cs
index e3cf58d7d..8b00b9edc 100644
--- a/src/CsharpClient/Quix.Sdk.Streaming/Models/StreamWriter/StreamParametersWriter.cs
+++ b/src/CsharpClient/Quix.Sdk.Streaming/Models/StreamWriter/StreamParametersWriter.cs
@@ -4,6 +4,7 @@
using Quix.Sdk.Process.Managers;
using Microsoft.Extensions.Logging;
using Quix.Sdk.Process.Models.Utility;
+using Quix.Sdk.Streaming.Exceptions;
namespace Quix.Sdk.Streaming.Models.StreamWriter
{
@@ -22,6 +23,7 @@ public class StreamParametersWriter : IDisposable
private bool timerEnabled = false; // Here because every now and then reseting its due time to never doesn't work
private bool isDisposed;
private const int TimerInterval = 20;
+ private readonly object flushLock = new object();
///
/// Initializes a new instance of
@@ -218,8 +220,19 @@ private void Flush(bool force)
{
throw new ObjectDisposedException(nameof(StreamParametersWriter));
}
- this.FlushDefinitions();
- this.Buffer.Flush();
+
+ try
+ {
+ lock (flushLock)
+ {
+ this.FlushDefinitions();
+ this.Buffer.Flush();
+ }
+ }
+ catch (Exception ex)
+ {
+ this.logger.LogError(ex, "Exception occurred while trying to flush parameter data buffer.");
+ }
}
private void ResetFlushDefinitionsTimer()
@@ -231,11 +244,15 @@ private void ResetFlushDefinitionsTimer()
private void OnFlushDefinitionsTimerEvent(object state)
{
- if (!timerEnabled) return;
+ if (!this.timerEnabled) return;
try
{
this.FlushDefinitions();
}
+ catch (StreamClosedException exception) when (this.isDisposed)
+ {
+ // Ignore exception because the timer flush definition may finish executing only after closure due to how close lock works in streamWriter
+ }
catch (Exception ex)
{
this.logger.Log(LogLevel.Error, ex, "Exception occurred while trying to flush parameter definition buffer.");
diff --git a/src/CsharpClient/Quix.Sdk.Streaming/QuixApi/Portal/Topic.cs b/src/CsharpClient/Quix.Sdk.Streaming/QuixApi/Portal/Topic.cs
index 39912ae81..d10e32526 100644
--- a/src/CsharpClient/Quix.Sdk.Streaming/QuixApi/Portal/Topic.cs
+++ b/src/CsharpClient/Quix.Sdk.Streaming/QuixApi/Portal/Topic.cs
@@ -18,7 +18,7 @@ public string Id
set
{
if (value == null) throw new ArgumentNullException(nameof(Id));
- this.id = value.ToLowerInvariant();
+ this.id = value;
}
}
diff --git a/src/CsharpClient/Quix.Sdk.Streaming/QuixApi/Portal/Workspace.cs b/src/CsharpClient/Quix.Sdk.Streaming/QuixApi/Portal/Workspace.cs
index 9c0826097..365e0bbbd 100644
--- a/src/CsharpClient/Quix.Sdk.Streaming/QuixApi/Portal/Workspace.cs
+++ b/src/CsharpClient/Quix.Sdk.Streaming/QuixApi/Portal/Workspace.cs
@@ -93,6 +93,16 @@ internal enum BrokerSecurityMode
/// SSL secured ACL role system.
///
SaslSsl,
+
+ ///
+ /// Plain Text mode
+ ///
+ PlainText,
+
+ ///
+ /// ACL role system
+ ///
+ Sasl
}
///
diff --git a/src/CsharpClient/Quix.Sdk.Streaming/QuixStreamingClient.cs b/src/CsharpClient/Quix.Sdk.Streaming/QuixStreamingClient.cs
index 631197a4e..393f70d53 100644
--- a/src/CsharpClient/Quix.Sdk.Streaming/QuixStreamingClient.cs
+++ b/src/CsharpClient/Quix.Sdk.Streaming/QuixStreamingClient.cs
@@ -28,6 +28,7 @@
using Quix.Sdk.Streaming.Raw;
using Quix.Sdk.Streaming.Utils;
using Exception = System.Exception;
+using Quix.Sdk.Process.Configuration;
namespace Quix.Sdk.Streaming
{
@@ -208,7 +209,7 @@ public IOutputTopic OpenOutputTopic(string topicIdOrName)
private async Task<(StreamingClient client, string topicId)> ValidateTopicAndCreateClient(string topicIdOrName)
{
CheckToken(token);
- topicIdOrName = topicIdOrName.ToLowerInvariant().Trim();;
+ topicIdOrName = topicIdOrName.Trim();
var sw = Stopwatch.StartNew();
var ws = await GetWorkspaceFromConfiguration(topicIdOrName);
var client = await this.CreateStreamingClientForWorkspace(ws);
@@ -232,7 +233,7 @@ private async Task ValidateTopicExistence(Workspace workspace, string to
{
this.logger.LogTrace("Checking if topic {0} is already created.", topicIdOrName);
var topics = await this.GetTopics(workspace, true);
- var existingTopic = topics.FirstOrDefault(y => y.Id.Equals(topicIdOrName, StringComparison.InvariantCultureIgnoreCase)) ?? topics.FirstOrDefault(y=> y.Name.Equals(topicIdOrName, StringComparison.InvariantCultureIgnoreCase)); // id prio
+ var existingTopic = topics.FirstOrDefault(y => y.Id.Equals(topicIdOrName, StringComparison.InvariantCulture)) ?? topics.FirstOrDefault(y=> y.Name.Equals(topicIdOrName, StringComparison.InvariantCulture)); // id prio
var topicName = existingTopic?.Name;
if (topicName == null)
{
@@ -388,13 +389,36 @@ private async Task CreateStreamingClientForWorkspace(Workspace
logger.LogWarning("Workspace {0} is in state {1} instead of {2}.", ws.WorkspaceId, ws.Status, WorkspaceStatus.Ready);
}
- var certPath = await GetWorkspaceCertificatePath(ws);
- if (!Enum.TryParse(ws.Broker.SaslMechanism.ToString(), true, out SaslMechanism parsed))
+ var securityOptions = new SecurityOptions();
+
+ if (ws.Broker.SecurityMode == BrokerSecurityMode.Ssl || ws.Broker.SecurityMode == BrokerSecurityMode.SaslSsl)
+ {
+ securityOptions.UseSsl = true;
+ securityOptions.SslCertificates = await GetWorkspaceCertificatePath(ws);
+ }
+ else
+ {
+ securityOptions.UseSsl = false;
+ }
+
+ if (ws.Broker.SecurityMode == BrokerSecurityMode.Sasl || ws.Broker.SecurityMode == BrokerSecurityMode.SaslSsl)
+ {
+ if (!Enum.TryParse(ws.Broker.SaslMechanism.ToString(), true, out SaslMechanism parsed))
+ {
+ throw new ArgumentOutOfRangeException(nameof(ws.Broker.SaslMechanism), "Unsupported sasl mechanism " + ws.Broker.SaslMechanism);
+ }
+
+ securityOptions.UseSasl = true;
+ securityOptions.SaslMechanism = parsed;
+ securityOptions.Username = ws.Broker.Username;
+ securityOptions.Password = ws.Broker.Password;
+ }
+ else
{
- throw new ArgumentOutOfRangeException(nameof(ws.Broker.SaslMechanism), "Unsupported sasl mechanism " + ws.Broker.SaslMechanism);
+ securityOptions.UseSasl = false;
}
- var client = new StreamingClient(ws.Broker.Address, new SecurityOptions(certPath, ws.Broker.Username, ws.Broker.Password, parsed), brokerProperties, debug);
+ var client = new StreamingClient(ws.Broker.Address, securityOptions, brokerProperties, debug);
return wsToStreamingClientDict.GetOrAdd(ws.WorkspaceId, client);
}
diff --git a/src/CsharpClient/Quix.Sdk.Streaming/StreamingClient.cs b/src/CsharpClient/Quix.Sdk.Streaming/StreamingClient.cs
index 19f0c649c..8e783a6cd 100644
--- a/src/CsharpClient/Quix.Sdk.Streaming/StreamingClient.cs
+++ b/src/CsharpClient/Quix.Sdk.Streaming/StreamingClient.cs
@@ -13,6 +13,7 @@
using Quix.Sdk.Streaming.Raw;
using Quix.Sdk.Streaming.Utils;
using SaslMechanism = Confluent.Kafka.SaslMechanism;
+using Quix.Sdk.Streaming.QuixApi.Portal;
namespace Quix.Sdk.Streaming
{
@@ -41,12 +42,34 @@ public StreamingClient(string brokerAddress, SecurityOptions securityOptions = n
}
else
{
- if (!Enum.TryParse(securityOptions.SaslMechanism.ToString(), true, out SaslMechanism parsed))
+ var securityOptionsBuilder = new SecurityOptionsBuilder();
+
+ if (securityOptions.UseSsl)
+ {
+ securityOptionsBuilder.SetSslEncryption(securityOptions.SslCertificates);
+ }
+ else
+ {
+ securityOptionsBuilder.SetNoEncryption();
+ }
+
+ if (securityOptions.UseSasl)
{
- throw new ArgumentOutOfRangeException(nameof(securityOptions.SaslMechanism), "Unsupported sasl mechanism " + securityOptions.SaslMechanism);
+ if (!Enum.TryParse(securityOptions.SaslMechanism.ToString(), true, out Confluent.Kafka.SaslMechanism parsed))
+ {
+ throw new ArgumentOutOfRangeException(nameof(securityOptions.SaslMechanism), "Unsupported sasl mechanism " + securityOptions.SaslMechanism);
+ }
+
+ securityOptionsBuilder.SetSaslAuthentication(securityOptions.Username, securityOptions.Password, parsed);
+ }
+ else
+ {
+ securityOptionsBuilder.SetNoAuthentication();
}
- this.brokerProperties = new SecurityOptionsBuilder().SetSslEncryption(securityOptions.SslCertificates).SetSaslAuthentication(securityOptions.Username, securityOptions.Password, parsed).Build();
+
+ this.brokerProperties = securityOptionsBuilder.Build();
}
+
if (properties != null)
{
foreach (var property in properties)