diff --git a/Core.Streaming.Kafka/Consumers/KafkaConsumer.cs b/Core.Streaming.Kafka/Consumers/KafkaConsumer.cs index 7b108282e..b41b151c0 100644 --- a/Core.Streaming.Kafka/Consumers/KafkaConsumer.cs +++ b/Core.Streaming.Kafka/Consumers/KafkaConsumer.cs @@ -40,26 +40,24 @@ public async Task StartAsync(CancellationToken cancellationToken) logger.LogInformation("Kafka consumer started"); // create consumer - using (var consumer = new ConsumerBuilder(config.ConsumerConfig).Build()) + using var consumer = new ConsumerBuilder(config.ConsumerConfig).Build(); + // subscribe to Kafka topics (taken from config) + consumer.Subscribe(config.Topics); + try { - // subscribe to Kafka topics (taken from config) - consumer.Subscribe(config.Topics); - try + // keep consumer working until it get signal that it should be shuted down + while (!cancellationToken.IsCancellationRequested) { - // keep consumer working until it get signal that it should be shuted down - while (!cancellationToken.IsCancellationRequested) - { - // consume event from Kafka - await ConsumeNextEvent(consumer, cancellationToken); - } + // consume event from Kafka + await ConsumeNextEvent(consumer, cancellationToken); } - catch (Exception e) - { - logger.LogInformation("Error consuming message: " + e.Message + e.StackTrace); + } + catch (Exception e) + { + logger.LogInformation("Error consuming message: " + e.Message + e.StackTrace); - // Ensure the consumer leaves the group cleanly and final offsets are committed. - consumer.Close(); - } + // Ensure the consumer leaves the group cleanly and final offsets are committed. + consumer.Close(); } } diff --git a/Core/BackgroundWorkers/BackgroundWorker.cs b/Core/BackgroundWorkers/BackgroundWorker.cs index 5904fed91..95ba8a18d 100644 --- a/Core/BackgroundWorkers/BackgroundWorker.cs +++ b/Core/BackgroundWorkers/BackgroundWorker.cs @@ -6,10 +6,10 @@ namespace Core.BackgroundWorkers; -public class BackgroundWorker: IHostedService, IDisposable +public class BackgroundWorker: BackgroundService { - private Task? executingTask; - private CancellationTokenSource? cts; +// private Task? executingTask; +// private CancellationTokenSource? cts; private readonly ILogger logger; private readonly Func perform; @@ -22,36 +22,47 @@ Func perform this.perform = perform; } - public Task StartAsync(CancellationToken cancellationToken) + protected override Task ExecuteAsync(CancellationToken stoppingToken) { - // Create a linked token so we can trigger cancellation outside of this token's cancellation - cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - - executingTask = Task.Run(() => perform(cts.Token), cancellationToken); - - return executingTask; + return Task.Run(async () => + { + await Task.Yield(); + logger.LogInformation("Background worker stopped"); + await perform(stoppingToken); + logger.LogInformation("Background worker stopped"); + }, stoppingToken); } - public async Task StopAsync(CancellationToken cancellationToken) - { - // Stop called without start - if (executingTask == null) - return; - - // Signal cancellation to the executing method - cts?.Cancel(); - - // Wait until the issue completes or the stop token triggers - await Task.WhenAny(executingTask, Task.Delay(-1, cancellationToken)); - - // Throw if cancellation triggered - cancellationToken.ThrowIfCancellationRequested(); - - logger.LogInformation("Background worker stopped"); - } - - public void Dispose() - { - cts?.Dispose(); - } + // public Task StartAsync(CancellationToken cancellationToken) + // { + // // Create a linked token so we can trigger cancellation outside of this token's cancellation + // cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + // + // executingTask = Task.Run(() => perform(cts.Token), cancellationToken); + // + // return executingTask; + // } + // + // public async Task StopAsync(CancellationToken cancellationToken) + // { + // // Stop called without start + // if (executingTask == null) + // return; + // + // // Signal cancellation to the executing method + // cts?.Cancel(); + // + // // Wait until the issue completes or the stop token triggers + // await Task.WhenAny(executingTask, Task.Delay(-1, cancellationToken)); + // + // // Throw if cancellation triggered + // cancellationToken.ThrowIfCancellationRequested(); + // + // logger.LogInformation("Background worker stopped"); + // } + // + // public void Dispose() + // { + // cts?.Dispose(); + // } }