Skip to content

Commit

Permalink
Some ugly hack, because why not?
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Mar 3, 2022
1 parent 63d98d3 commit c24da62
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 48 deletions.
30 changes: 14 additions & 16 deletions Core.Streaming.Kafka/Consumers/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,24 @@ public async Task StartAsync(CancellationToken cancellationToken)
logger.LogInformation("Kafka consumer started");

// create consumer
using (var consumer = new ConsumerBuilder<string, string>(config.ConsumerConfig).Build())
using var consumer = new ConsumerBuilder<string, string>(config.ConsumerConfig).Build();
// subscribe to Kafka topics (taken from config)
consumer.Subscribe(config.Topics);
try
{
// subscribe to Kafka topics (taken from config)
consumer.Subscribe(config.Topics);
try
// keep consumer working until it get signal that it should be shuted down
while (!cancellationToken.IsCancellationRequested)
{
// keep consumer working until it get signal that it should be shuted down
while (!cancellationToken.IsCancellationRequested)
{
// consume event from Kafka
await ConsumeNextEvent(consumer, cancellationToken);
}
// consume event from Kafka
await ConsumeNextEvent(consumer, cancellationToken);
}
catch (Exception e)
{
logger.LogInformation("Error consuming message: " + e.Message + e.StackTrace);
}
catch (Exception e)
{
logger.LogInformation("Error consuming message: " + e.Message + e.StackTrace);

// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}

Expand Down
75 changes: 43 additions & 32 deletions Core/BackgroundWorkers/BackgroundWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

namespace Core.BackgroundWorkers;

public class BackgroundWorker: IHostedService, IDisposable
public class BackgroundWorker: BackgroundService
{
private Task? executingTask;
private CancellationTokenSource? cts;
// private Task? executingTask;
// private CancellationTokenSource? cts;
private readonly ILogger<BackgroundWorker> logger;
private readonly Func<CancellationToken, Task> perform;

Expand All @@ -22,36 +22,47 @@ public class BackgroundWorker: IHostedService, IDisposable
this.perform = perform;
}

public Task StartAsync(CancellationToken cancellationToken)
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
// Create a linked token so we can trigger cancellation outside of this token's cancellation
cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

executingTask = Task.Run(() => perform(cts.Token), cancellationToken);

return executingTask;
return Task.Run(async () =>
{
await Task.Yield();
logger.LogInformation("Background worker stopped");
await perform(stoppingToken);
logger.LogInformation("Background worker stopped");
}, stoppingToken);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
// Stop called without start
if (executingTask == null)
return;

// Signal cancellation to the executing method
cts?.Cancel();

// Wait until the issue completes or the stop token triggers
await Task.WhenAny(executingTask, Task.Delay(-1, cancellationToken));

// Throw if cancellation triggered
cancellationToken.ThrowIfCancellationRequested();

logger.LogInformation("Background worker stopped");
}

public void Dispose()
{
cts?.Dispose();
}
// public Task StartAsync(CancellationToken cancellationToken)
// {
// // Create a linked token so we can trigger cancellation outside of this token's cancellation
// cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
//
// executingTask = Task.Run(() => perform(cts.Token), cancellationToken);
//
// return executingTask;
// }
//
// public async Task StopAsync(CancellationToken cancellationToken)
// {
// // Stop called without start
// if (executingTask == null)
// return;
//
// // Signal cancellation to the executing method
// cts?.Cancel();
//
// // Wait until the issue completes or the stop token triggers
// await Task.WhenAny(executingTask, Task.Delay(-1, cancellationToken));
//
// // Throw if cancellation triggered
// cancellationToken.ThrowIfCancellationRequested();
//
// logger.LogInformation("Background worker stopped");
// }
//
// public void Dispose()
// {
// cts?.Dispose();
// }
}

0 comments on commit c24da62

Please sign in to comment.