Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using Quix.Sdk.Streaming.QuixApi.Portal;
using System;

namespace Quix.Sdk.Streaming.Configuration
{
Expand All @@ -10,7 +11,7 @@ public class SecurityOptions
/// <summary>
/// The Sasl mechanism to use
/// </summary>
public SaslMechanism SaslMechanism { get; set; }
public SaslMechanism? SaslMechanism { get; set; }

/// <summary>
/// SASL username.
Expand All @@ -27,6 +28,16 @@ public class SecurityOptions
/// </summary>
public string SslCertificates { get; set; }

/// <summary>
/// Use SSL
/// </summary>
public bool UseSsl { get; set; }

/// <summary>
/// Use authentication
/// </summary>
public bool UseSasl { get; set; }

/// <summary>
/// For deserialization when binding to Configurations like Appsettings
/// </summary>
Expand All @@ -47,6 +58,12 @@ public SecurityOptions(string sslCertificates, string username, string password,
this.Username = username;
this.Password = password;
this.SaslMechanism = saslMechanism;

// Assume that if we get sslCertificates it's because we will use ssl
this.UseSsl = !string.IsNullOrEmpty(this.SslCertificates);

// Assume that if we have username, we will use Sasl
this.UseSasl = !string.IsNullOrEmpty(this.Username);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Quix.Sdk.Process.Models.Utility;
using Quix.Sdk.Process.Managers;
using System.Linq;
using Quix.Sdk.Streaming.Exceptions;

namespace Quix.Sdk.Streaming.Models.StreamWriter
{
Expand Down Expand Up @@ -306,7 +307,18 @@ private void ResetFlushDefinitionsTimer()
private void OnFlushDefinitionsTimerEvent(object state)
{
if (!timerEnabled) return;
this.FlushDefinitions();
try
{
this.FlushDefinitions();
}
catch (StreamClosedException exception) when (this.isDisposed)
{
// Ignore exception because the timer flush definition may finish executing only after closure due to how close lock works in streamWriter
}
catch (Exception ex)
{
this.logger.Log(LogLevel.Error, ex, "Exception occurred while trying to flush event definition buffer.");
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Quix.Sdk.Process.Managers;
using Microsoft.Extensions.Logging;
using Quix.Sdk.Process.Models.Utility;
using Quix.Sdk.Streaming.Exceptions;

namespace Quix.Sdk.Streaming.Models.StreamWriter
{
Expand All @@ -22,6 +23,7 @@ public class StreamParametersWriter : IDisposable
private bool timerEnabled = false; // Here because every now and then reseting its due time to never doesn't work
private bool isDisposed;
private const int TimerInterval = 20;
private readonly object flushLock = new object();

/// <summary>
/// Initializes a new instance of <see cref="StreamParametersWriter"/>
Expand Down Expand Up @@ -218,8 +220,19 @@ private void Flush(bool force)
{
throw new ObjectDisposedException(nameof(StreamParametersWriter));
}
this.FlushDefinitions();
this.Buffer.Flush();

try
{
lock (flushLock)
{
this.FlushDefinitions();
this.Buffer.Flush();
}
}
catch (Exception ex)
{
this.logger.LogError(ex, "Exception occurred while trying to flush parameter data buffer.");
}
}

private void ResetFlushDefinitionsTimer()
Expand All @@ -231,11 +244,15 @@ private void ResetFlushDefinitionsTimer()

private void OnFlushDefinitionsTimerEvent(object state)
{
if (!timerEnabled) return;
if (!this.timerEnabled) return;
try
{
this.FlushDefinitions();
}
catch (StreamClosedException exception) when (this.isDisposed)
{
// Ignore exception because the timer flush definition may finish executing only after closure due to how close lock works in streamWriter
}
catch (Exception ex)
{
this.logger.Log(LogLevel.Error, ex, "Exception occurred while trying to flush parameter definition buffer.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public string Id
set
{
if (value == null) throw new ArgumentNullException(nameof(Id));
this.id = value.ToLowerInvariant();
this.id = value;
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/CsharpClient/Quix.Sdk.Streaming/QuixApi/Portal/Workspace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ internal enum BrokerSecurityMode
/// SSL secured ACL role system.
/// </summary>
SaslSsl,

/// <summary>
/// Plain Text mode
/// </summary>
PlainText,

/// <summary>
/// ACL role system
/// </summary>
Sasl
}

/// <summary>
Expand Down
36 changes: 30 additions & 6 deletions src/CsharpClient/Quix.Sdk.Streaming/QuixStreamingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
using Quix.Sdk.Streaming.Raw;
using Quix.Sdk.Streaming.Utils;
using Exception = System.Exception;
using Quix.Sdk.Process.Configuration;

namespace Quix.Sdk.Streaming
{
Expand Down Expand Up @@ -208,7 +209,7 @@ public IOutputTopic OpenOutputTopic(string topicIdOrName)
private async Task<(StreamingClient client, string topicId)> ValidateTopicAndCreateClient(string topicIdOrName)
{
CheckToken(token);
topicIdOrName = topicIdOrName.ToLowerInvariant().Trim();;
topicIdOrName = topicIdOrName.Trim();
var sw = Stopwatch.StartNew();
var ws = await GetWorkspaceFromConfiguration(topicIdOrName);
var client = await this.CreateStreamingClientForWorkspace(ws);
Expand All @@ -232,7 +233,7 @@ private async Task<string> ValidateTopicExistence(Workspace workspace, string to
{
this.logger.LogTrace("Checking if topic {0} is already created.", topicIdOrName);
var topics = await this.GetTopics(workspace, true);
var existingTopic = topics.FirstOrDefault(y => y.Id.Equals(topicIdOrName, StringComparison.InvariantCultureIgnoreCase)) ?? topics.FirstOrDefault(y=> y.Name.Equals(topicIdOrName, StringComparison.InvariantCultureIgnoreCase)); // id prio
var existingTopic = topics.FirstOrDefault(y => y.Id.Equals(topicIdOrName, StringComparison.InvariantCulture)) ?? topics.FirstOrDefault(y=> y.Name.Equals(topicIdOrName, StringComparison.InvariantCulture)); // id prio
var topicName = existingTopic?.Name;
if (topicName == null)
{
Expand Down Expand Up @@ -388,13 +389,36 @@ private async Task<StreamingClient> CreateStreamingClientForWorkspace(Workspace
logger.LogWarning("Workspace {0} is in state {1} instead of {2}.", ws.WorkspaceId, ws.Status, WorkspaceStatus.Ready);
}

var certPath = await GetWorkspaceCertificatePath(ws);
if (!Enum.TryParse(ws.Broker.SaslMechanism.ToString(), true, out SaslMechanism parsed))
var securityOptions = new SecurityOptions();

if (ws.Broker.SecurityMode == BrokerSecurityMode.Ssl || ws.Broker.SecurityMode == BrokerSecurityMode.SaslSsl)
{
securityOptions.UseSsl = true;
securityOptions.SslCertificates = await GetWorkspaceCertificatePath(ws);
}
else
{
securityOptions.UseSsl = false;
}

if (ws.Broker.SecurityMode == BrokerSecurityMode.Sasl || ws.Broker.SecurityMode == BrokerSecurityMode.SaslSsl)
{
if (!Enum.TryParse(ws.Broker.SaslMechanism.ToString(), true, out SaslMechanism parsed))
{
throw new ArgumentOutOfRangeException(nameof(ws.Broker.SaslMechanism), "Unsupported sasl mechanism " + ws.Broker.SaslMechanism);
}

securityOptions.UseSasl = true;
securityOptions.SaslMechanism = parsed;
securityOptions.Username = ws.Broker.Username;
securityOptions.Password = ws.Broker.Password;
}
else
{
throw new ArgumentOutOfRangeException(nameof(ws.Broker.SaslMechanism), "Unsupported sasl mechanism " + ws.Broker.SaslMechanism);
securityOptions.UseSasl = false;
}

var client = new StreamingClient(ws.Broker.Address, new SecurityOptions(certPath, ws.Broker.Username, ws.Broker.Password, parsed), brokerProperties, debug);
var client = new StreamingClient(ws.Broker.Address, securityOptions, brokerProperties, debug);
return wsToStreamingClientDict.GetOrAdd(ws.WorkspaceId, client);
}

Expand Down
29 changes: 26 additions & 3 deletions src/CsharpClient/Quix.Sdk.Streaming/StreamingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Quix.Sdk.Streaming.Raw;
using Quix.Sdk.Streaming.Utils;
using SaslMechanism = Confluent.Kafka.SaslMechanism;
using Quix.Sdk.Streaming.QuixApi.Portal;

namespace Quix.Sdk.Streaming
{
Expand Down Expand Up @@ -41,12 +42,34 @@ public StreamingClient(string brokerAddress, SecurityOptions securityOptions = n
}
else
{
if (!Enum.TryParse(securityOptions.SaslMechanism.ToString(), true, out SaslMechanism parsed))
var securityOptionsBuilder = new SecurityOptionsBuilder();

if (securityOptions.UseSsl)
{
securityOptionsBuilder.SetSslEncryption(securityOptions.SslCertificates);
}
else
{
securityOptionsBuilder.SetNoEncryption();
}

if (securityOptions.UseSasl)
{
throw new ArgumentOutOfRangeException(nameof(securityOptions.SaslMechanism), "Unsupported sasl mechanism " + securityOptions.SaslMechanism);
if (!Enum.TryParse(securityOptions.SaslMechanism.ToString(), true, out Confluent.Kafka.SaslMechanism parsed))
{
throw new ArgumentOutOfRangeException(nameof(securityOptions.SaslMechanism), "Unsupported sasl mechanism " + securityOptions.SaslMechanism);
}

securityOptionsBuilder.SetSaslAuthentication(securityOptions.Username, securityOptions.Password, parsed);
}
else
{
securityOptionsBuilder.SetNoAuthentication();
}
this.brokerProperties = new SecurityOptionsBuilder().SetSslEncryption(securityOptions.SslCertificates).SetSaslAuthentication(securityOptions.Username, securityOptions.Password, parsed).Build();

this.brokerProperties = securityOptionsBuilder.Build();
}

if (properties != null)
{
foreach (var property in properties)
Expand Down