Skip to content

Commit

Permalink
Option to skip broker verification and set Kafkamessage for StreamPac…
Browse files Browse the repository at this point in the history
…kage (#41)

Option to skip broker verification and set KafkaMessage for StreamPackage
  • Loading branch information
peter-quix committed Apr 2, 2024
1 parent a55a9ca commit a0e5cae
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 33 deletions.
4 changes: 2 additions & 2 deletions builds/csharp/nuget/build_nugets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from typing import List

version = "0.6.4.0"
informal_version = "0.6.4.0-dev1"
nuget_version = "0.6.4.0-dev1"
informal_version = "0.6.4.0-dev5"
nuget_version = "0.6.4.0-dev5"


def updatecsproj(projfilepath):
Expand Down
23 changes: 16 additions & 7 deletions src/QuixStreams.Kafka/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public class KafkaConsumer : IKafkaConsumer
/// </summary>
public event EventHandler<RevokedEventArgs> OnRevoked;

/// <summary>
/// If set to false it wont wait for the broker verification
/// </summary>
public bool VerifyBrokerConnection { get; set; } = true;

/// <summary>
/// Initializes a new instance of <see cref="KafkaConsumer"/>
/// </summary>
Expand Down Expand Up @@ -321,15 +326,19 @@ void NullLoggerForAdminLogs(IAdminClient adminClient, LogMessage logMessage)
connectionEstablishedEvent.Reset();
var connectSw = Stopwatch.StartNew();
this.StartWorkerThread();
if (connectionEstablishedEvent.Wait(TimeSpan.FromSeconds(5)))
{
connectSw.Stop();
this.logger.LogTrace("[{0}] Connected to broker in {1}", this.configId, connectSw.Elapsed);
}
else
if (VerifyBrokerConnection)
{
this.logger.LogDebug("[{0}] Connection to broker was not verified in {1}", this.configId, connectSw.Elapsed);
if (connectionEstablishedEvent.Wait(TimeSpan.FromSeconds(5)))
{
connectSw.Stop();
this.logger.LogTrace("[{0}] Connected to broker in {1}", this.configId, connectSw.Elapsed);
}
else
{
this.logger.LogDebug("[{0}] Connection to broker was not verified in {1}", this.configId, connectSw.Elapsed);
}
}

this.logger.LogTrace("[{0}] Open finished", this.configId);
}
}
Expand Down
44 changes: 21 additions & 23 deletions src/QuixStreams.Telemetry/Kafka/TelemetryKafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ namespace QuixStreams.Telemetry.Kafka
/// KafkaReader initializes transport layer classes and sets up a <see cref="StreamPipelineFactory"/> to detect new streams in Kafka topic
/// and start new <see cref="StreamPipeline"/> instances where all the messages of the stream are going to be sent.
/// </summary>
public class TelemetryKafkaConsumer: IDisposable
public class TelemetryKafkaConsumer : IDisposable
{
/// <summary>
/// The topic the kafka consumer is created for
/// </summary>
public readonly string Topic = "Unknown";

private readonly ILogger logger = Logging.CreateLogger<TelemetryKafkaConsumer>();
private IKafkaTransportConsumer kafkaTransportConsumer;
private bool isDisposed = false;
Expand All @@ -33,7 +33,7 @@ public class TelemetryKafkaConsumer: IDisposable
/// Event raised when an exception occurs during the Reading processes
/// </summary>
public event EventHandler<Exception> OnReceiveException;

/// <summary>
/// Event raised with streams belonging to kafka partition(s) revoked
/// </summary>
Expand All @@ -43,12 +43,12 @@ public class TelemetryKafkaConsumer: IDisposable
/// Raised when the kafka topic will became unavailable, but it is still possible at this point
/// </summary>
public event EventHandler OnRevoking;

/// <summary>
/// Raised when commit occurs
/// </summary>
public event EventHandler OnCommitted;

/// <summary>
/// Raised before commit
/// </summary>
Expand All @@ -57,11 +57,7 @@ public class TelemetryKafkaConsumer: IDisposable
/// <summary>
/// Stream Context cache for all the streams of the topic
/// </summary>
public IStreamContextCache ContextCache
{
get;
private set;
}
public IStreamContextCache ContextCache { get; private set; }

/// <summary>
/// Group id
Expand All @@ -74,7 +70,8 @@ public IStreamContextCache ContextCache
/// <param name="telemetryKafkaConsumerConfiguration">Kafka broker configuration for <see cref="TelemetryKafkaConsumer"/></param>
/// <param name="topic">Topic name to read from</param>
/// <param name="partitionOffset">The partition offset to start reading from</param>
public TelemetryKafkaConsumer(TelemetryKafkaConsumerConfiguration telemetryKafkaConsumerConfiguration, string topic, PartitionOffset partitionOffset = null)
/// <param name="verifyBrokerConnection">If set to false it wont wait for the broker verification</param>
public TelemetryKafkaConsumer(TelemetryKafkaConsumerConfiguration telemetryKafkaConsumerConfiguration, string topic, PartitionOffset partitionOffset = null, bool verifyBrokerConnection = true)
{
Topic = topic;
// Kafka Transport layer -> Transport layer
Expand All @@ -92,12 +89,15 @@ public TelemetryKafkaConsumer(TelemetryKafkaConsumerConfiguration telemetryKafka
o.CommitInterval = commitOptions.CommitInterval;
o.AutoCommitEnabled = commitOptions.AutoCommitEnabled;
};

var topicConfig = new ConsumerTopicConfiguration(topic, partitionOffset);
this.kafkaConsumer = new KafkaConsumer(subConfig, topicConfig);
this.kafkaConsumer = new KafkaConsumer(subConfig, topicConfig)
{
VerifyBrokerConnection = verifyBrokerConnection
};
this.GroupId = subConfig.GroupId;
}

/// <summary>
/// Initializes a new instance of <see cref="TelemetryKafkaConsumer"/>
/// </summary>
Expand All @@ -122,10 +122,7 @@ private bool InitializeTransport()
return false;

this.kafkaConsumer.OnErrorOccurred += ReadingExceptionHandler;
this.kafkaTransportConsumer = new KafkaTransportConsumer(kafkaConsumer, (o) =>
{
this.configureCommitOptions?.Invoke(o.CommitOptions);
});
this.kafkaTransportConsumer = new KafkaTransportConsumer(kafkaConsumer, (o) => { this.configureCommitOptions?.Invoke(o.CommitOptions); });
return true;
}

Expand All @@ -135,7 +132,7 @@ private void ReadingExceptionHandler(object sender, Exception e)
{
if (e is KafkaException)
{
this.logger.LogError(e, "Exception receiving package from Kafka");
this.logger.LogError(e, "Exception receiving package from Kafka");
}
else
{
Expand All @@ -162,15 +159,15 @@ public void Start()
this.kafkaTransportConsumer.OnRevoking += RevokingHandler;
this.kafkaTransportConsumer.OnCommitted += CommittedHandler;
this.kafkaTransportConsumer.OnCommitting += CommitingHandler;
this.streamPipelineFactory.Open();
this.streamPipelineFactory.Open();
this.kafkaConsumer.Open();
}

private void CommittedHandler(object sender, CommittedEventArgs e)
{
this.OnCommitted?.Invoke(this, EventArgs.Empty);
}

private void CommitingHandler(object sender, CommittingEventArgs e)
{
this.OnCommitting?.Invoke(this, EventArgs.Empty);
Expand Down Expand Up @@ -230,7 +227,7 @@ private void StopHelper()

/// <inheritdoc />
public void Dispose()
{
{
if (isDisposed) return;
isDisposed = true;
this.StopHelper();
Expand Down Expand Up @@ -258,9 +255,10 @@ public void Commit()
this.logger.LogTrace("Finished manual commit (nothing to commit)");
return; // there is nothing to commit
}

this.kafkaTransportConsumer.Commit(contexts);
this.logger.LogTrace("Finished manual commit");
}
}
}
}
}
13 changes: 12 additions & 1 deletion src/QuixStreams.Telemetry/Models/Package/StreamPackage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ public StreamPackage(Type type, object value)
this.Value = value;
}

/// <summary>
/// Initializes a new instance of <see cref="StreamPackage"/>
/// </summary>
/// <param name="type">The type of the content value</param>
/// <param name="value">The content value of the package</param>
/// <param name="kafkaMessage">The kafka message the stream package is derived from</param>
public StreamPackage(Type type, object value, KafkaMessage kafkaMessage) : this(type, value)
{
this.KafkaMessage = kafkaMessage;
}

/// <summary>
/// Type of the content value
/// </summary>
Expand All @@ -47,7 +58,7 @@ public StreamPackage(Type type, object value)
/// The Kafka message this stream package derives from
/// Can be null if not consumed from broker
/// </summary>
public KafkaMessage KafkaMessage { get; }
public KafkaMessage KafkaMessage { get; protected set; }

/// <summary>
/// Serialize the package into Json
Expand Down

0 comments on commit a0e5cae

Please sign in to comment.