From a0e5cae67a104b4c23f70faaed12306e81d7a241 Mon Sep 17 00:00:00 2001 From: PeterN <79838809+peter-quix@users.noreply.github.com> Date: Tue, 2 Apr 2024 15:57:58 +0100 Subject: [PATCH] Option to skip broker verification and set Kafkamessage for StreamPackage (#41) Option to skip broker verification and set KafkaMessage for StreamPackage --- builds/csharp/nuget/build_nugets.py | 4 +- src/QuixStreams.Kafka/KafkaConsumer.cs | 23 +++++++--- .../Kafka/TelemetryKafkaConsumer.cs | 44 +++++++++---------- .../Models/Package/StreamPackage.cs | 13 +++++- 4 files changed, 51 insertions(+), 33 deletions(-) diff --git a/builds/csharp/nuget/build_nugets.py b/builds/csharp/nuget/build_nugets.py index 2c47d989..6b4f2854 100644 --- a/builds/csharp/nuget/build_nugets.py +++ b/builds/csharp/nuget/build_nugets.py @@ -7,8 +7,8 @@ from typing import List version = "0.6.4.0" -informal_version = "0.6.4.0-dev1" -nuget_version = "0.6.4.0-dev1" +informal_version = "0.6.4.0-dev5" +nuget_version = "0.6.4.0-dev5" def updatecsproj(projfilepath): diff --git a/src/QuixStreams.Kafka/KafkaConsumer.cs b/src/QuixStreams.Kafka/KafkaConsumer.cs index 2a99a984..57f9a33f 100644 --- a/src/QuixStreams.Kafka/KafkaConsumer.cs +++ b/src/QuixStreams.Kafka/KafkaConsumer.cs @@ -82,6 +82,11 @@ public class KafkaConsumer : IKafkaConsumer /// public event EventHandler OnRevoked; + /// + /// If set to false it wont wait for the broker verification + /// + public bool VerifyBrokerConnection { get; set; } = true; + /// /// Initializes a new instance of /// @@ -321,15 +326,19 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage) connectionEstablishedEvent.Reset(); var connectSw = Stopwatch.StartNew(); this.StartWorkerThread(); - if (connectionEstablishedEvent.Wait(TimeSpan.FromSeconds(5))) - { - connectSw.Stop(); - this.logger.LogTrace("[{0}] Connected to broker in {1}", this.configId, connectSw.Elapsed); - } - else + if (VerifyBrokerConnection) { - this.logger.LogDebug("[{0}] Connection to broker was not verified in {1}", this.configId, connectSw.Elapsed); + if (connectionEstablishedEvent.Wait(TimeSpan.FromSeconds(5))) + { + connectSw.Stop(); + this.logger.LogTrace("[{0}] Connected to broker in {1}", this.configId, connectSw.Elapsed); + } + else + { + this.logger.LogDebug("[{0}] Connection to broker was not verified in {1}", this.configId, connectSw.Elapsed); + } } + this.logger.LogTrace("[{0}] Open finished", this.configId); } } diff --git a/src/QuixStreams.Telemetry/Kafka/TelemetryKafkaConsumer.cs b/src/QuixStreams.Telemetry/Kafka/TelemetryKafkaConsumer.cs index adc48397..893afb45 100644 --- a/src/QuixStreams.Telemetry/Kafka/TelemetryKafkaConsumer.cs +++ b/src/QuixStreams.Telemetry/Kafka/TelemetryKafkaConsumer.cs @@ -13,13 +13,13 @@ namespace QuixStreams.Telemetry.Kafka /// KafkaReader initializes transport layer classes and sets up a to detect new streams in Kafka topic /// and start new instances where all the messages of the stream are going to be sent. /// - public class TelemetryKafkaConsumer: IDisposable + public class TelemetryKafkaConsumer : IDisposable { /// /// The topic the kafka consumer is created for /// public readonly string Topic = "Unknown"; - + private readonly ILogger logger = Logging.CreateLogger(); private IKafkaTransportConsumer kafkaTransportConsumer; private bool isDisposed = false; @@ -33,7 +33,7 @@ public class TelemetryKafkaConsumer: IDisposable /// Event raised when an exception occurs during the Reading processes /// public event EventHandler OnReceiveException; - + /// /// Event raised with streams belonging to kafka partition(s) revoked /// @@ -43,12 +43,12 @@ public class TelemetryKafkaConsumer: IDisposable /// Raised when the kafka topic will became unavailable, but it is still possible at this point /// public event EventHandler OnRevoking; - + /// /// Raised when commit occurs /// public event EventHandler OnCommitted; - + /// /// Raised before commit /// @@ -57,11 +57,7 @@ public class TelemetryKafkaConsumer: IDisposable /// /// Stream Context cache for all the streams of the topic /// - public IStreamContextCache ContextCache - { - get; - private set; - } + public IStreamContextCache ContextCache { get; private set; } /// /// Group id @@ -74,7 +70,8 @@ public IStreamContextCache ContextCache /// Kafka broker configuration for /// Topic name to read from /// The partition offset to start reading from - public TelemetryKafkaConsumer(TelemetryKafkaConsumerConfiguration telemetryKafkaConsumerConfiguration, string topic, PartitionOffset partitionOffset = null) + /// If set to false it wont wait for the broker verification + public TelemetryKafkaConsumer(TelemetryKafkaConsumerConfiguration telemetryKafkaConsumerConfiguration, string topic, PartitionOffset partitionOffset = null, bool verifyBrokerConnection = true) { Topic = topic; // Kafka Transport layer -> Transport layer @@ -92,12 +89,15 @@ public TelemetryKafkaConsumer(TelemetryKafkaConsumerConfiguration telemetryKafka o.CommitInterval = commitOptions.CommitInterval; o.AutoCommitEnabled = commitOptions.AutoCommitEnabled; }; - + var topicConfig = new ConsumerTopicConfiguration(topic, partitionOffset); - this.kafkaConsumer = new KafkaConsumer(subConfig, topicConfig); + this.kafkaConsumer = new KafkaConsumer(subConfig, topicConfig) + { + VerifyBrokerConnection = verifyBrokerConnection + }; this.GroupId = subConfig.GroupId; } - + /// /// Initializes a new instance of /// @@ -122,10 +122,7 @@ private bool InitializeTransport() return false; this.kafkaConsumer.OnErrorOccurred += ReadingExceptionHandler; - this.kafkaTransportConsumer = new KafkaTransportConsumer(kafkaConsumer, (o) => - { - this.configureCommitOptions?.Invoke(o.CommitOptions); - }); + this.kafkaTransportConsumer = new KafkaTransportConsumer(kafkaConsumer, (o) => { this.configureCommitOptions?.Invoke(o.CommitOptions); }); return true; } @@ -135,7 +132,7 @@ private void ReadingExceptionHandler(object sender, Exception e) { if (e is KafkaException) { - this.logger.LogError(e, "Exception receiving package from Kafka"); + this.logger.LogError(e, "Exception receiving package from Kafka"); } else { @@ -162,7 +159,7 @@ public void Start() this.kafkaTransportConsumer.OnRevoking += RevokingHandler; this.kafkaTransportConsumer.OnCommitted += CommittedHandler; this.kafkaTransportConsumer.OnCommitting += CommitingHandler; - this.streamPipelineFactory.Open(); + this.streamPipelineFactory.Open(); this.kafkaConsumer.Open(); } @@ -170,7 +167,7 @@ private void CommittedHandler(object sender, CommittedEventArgs e) { this.OnCommitted?.Invoke(this, EventArgs.Empty); } - + private void CommitingHandler(object sender, CommittingEventArgs e) { this.OnCommitting?.Invoke(this, EventArgs.Empty); @@ -230,7 +227,7 @@ private void StopHelper() /// public void Dispose() - { + { if (isDisposed) return; isDisposed = true; this.StopHelper(); @@ -258,9 +255,10 @@ public void Commit() this.logger.LogTrace("Finished manual commit (nothing to commit)"); return; // there is nothing to commit } + this.kafkaTransportConsumer.Commit(contexts); this.logger.LogTrace("Finished manual commit"); } } } -} +} \ No newline at end of file diff --git a/src/QuixStreams.Telemetry/Models/Package/StreamPackage.cs b/src/QuixStreams.Telemetry/Models/Package/StreamPackage.cs index f526ee94..a85f5530 100644 --- a/src/QuixStreams.Telemetry/Models/Package/StreamPackage.cs +++ b/src/QuixStreams.Telemetry/Models/Package/StreamPackage.cs @@ -33,6 +33,17 @@ public StreamPackage(Type type, object value) this.Value = value; } + /// + /// Initializes a new instance of + /// + /// The type of the content value + /// The content value of the package + /// The kafka message the stream package is derived from + public StreamPackage(Type type, object value, KafkaMessage kafkaMessage) : this(type, value) + { + this.KafkaMessage = kafkaMessage; + } + /// /// Type of the content value /// @@ -47,7 +58,7 @@ public StreamPackage(Type type, object value) /// The Kafka message this stream package derives from /// Can be null if not consumed from broker /// - public KafkaMessage KafkaMessage { get; } + public KafkaMessage KafkaMessage { get; protected set; } /// /// Serialize the package into Json