Skip to content
Merged
22 changes: 21 additions & 1 deletion ReleaseNotes/version2.2.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
# Release Notes #

## 2.2.9-D2

Deployment:
* updated proxy-with-sidecar deployment scripts

Proxy:
* Add back-pressure delay when blob writer queue starts to fill up
* Bug fix: race conditions during shutdown in async mode
* Bug fix: string matching not working for disposed objects
* Bug fix: fix error handling of cancelled jobs
* Bug fix: dont log task cancelled during user profile shutdown
* Buf Fix: reset the dos after sending 202 for async request
* Bug fix: cleanup shutdown sequence, make sure health probe stops last
* Bug fix: dont sent status updates before writing the blob

## 2.2.9-D1

Proxy:

* Bugfix for non-sidecar mode startup
* Soft-delete user profile
* Soft-delete user profile
* Bug fix: apim backend host not recognised
* Bug fix: Poller failures during startup incorrectly measured
* Bug fix: Health status incorrectly reporting eventhub client status

## 2.2.9

Expand Down
115 changes: 84 additions & 31 deletions src/SimpleL7Proxy/BlobStorage/BlobWriteQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class BlobWriteQueue : IHostedService, IDisposable
private readonly Channel<BlobWriteOperation>[] _workerChannels;
private readonly List<Task> _workers;
private readonly CancellationTokenSource _shutdownCts;
private readonly CancellationTokenSource _metricsLoopCts;
private readonly ILogger<BlobWriteQueue> _logger;
private readonly BlobWriteQueueOptions _options;
private readonly BlobWriter _blobWriter;
Expand All @@ -106,6 +107,7 @@ public class BlobWriteQueue : IHostedService, IDisposable
private long _batchesExecuted = 0;
private long _totalQueueTimeMs = 0;
private long _totalProcessTimeMs = 0;
private volatile bool _isShuttingDown = false;

public BlobWriteQueue(
BlobWriter blobWriter,
Expand All @@ -116,6 +118,7 @@ public BlobWriteQueue(
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_shutdownCts = new CancellationTokenSource();
_metricsLoopCts = new CancellationTokenSource();
_workers = new List<Task>();

// Create per-worker channels for worker affinity
Expand Down Expand Up @@ -222,7 +225,7 @@ public Task StartAsync(CancellationToken cancellationToken)
_workers.Add(Task.Run(() => WorkerLoop(workerId, _shutdownCts.Token), _shutdownCts.Token));
}

_workers.Add(Task.Run(() => MetricsLoop(_shutdownCts.Token), _shutdownCts.Token));
_workers.Add(Task.Run(() => MetricsLoop(_metricsLoopCts.Token), _metricsLoopCts.Token));

return Task.CompletedTask;
}
Expand All @@ -231,31 +234,38 @@ public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("[BlobWriteQueue] Stopping...");

// Signal no more writes will come - workers will exit after draining
// Signal shutdown to MetricsLoop (will increase frequency)
_isShuttingDown = true;

// Complete the channels - no more writes will be accepted
// Safe because CoordinatedShutdownService ensures all producers
// (proxy workers, async workers) are done before calling this
foreach (var channel in _workerChannels)
{
channel.Writer.Complete();
}

// Wait for workers to finish processing remaining items
// DON'T cancel the shutdown token - let in-flight operations complete
// DO NOT cancel _shutdownCts - let ALL blob operations complete
var shutdownTimeout = TimeSpan.FromSeconds(60); // Allow time for blob operations to complete

_logger.LogInformation("[BlobWriteQueue] Waiting for workers to complete (timeout: {Timeout}s)...", shutdownTimeout.TotalSeconds);

try
{
var workerTask = Task.WhenAll(_workers);
var completedTask = await Task.WhenAny(workerTask, Task.Delay(shutdownTimeout, cancellationToken))
// Get worker tasks (exclude MetricsLoop which is last)
var workerTasks = _workers.Take(_workers.Count - 1).ToList();
var workerTask = Task.WhenAll(workerTasks);

// Don't use host's cancellationToken for timeout - it may fire earlier than our timeout
var completedTask = await Task.WhenAny(workerTask, Task.Delay(shutdownTimeout))
.ConfigureAwait(false);

if (completedTask != workerTask)
{
_logger.LogWarning("[BlobWriteQueue] Shutdown timeout - cancelling remaining operations");
// Only cancel as a last resort after timeout
_shutdownCts.Cancel();

// Give cancelled operations a moment to clean up
_logger.LogWarning("[BlobWriteQueue] Shutdown timeout reached - {Timeout}s", shutdownTimeout.TotalSeconds);
// DO NOT cancel _shutdownCts - we need blob operations to complete
// Just wait a bit more for cleanup
try
{
await Task.WhenAny(workerTask, Task.Delay(5000)).ConfigureAwait(false);
Expand All @@ -268,6 +278,16 @@ public async Task StopAsync(CancellationToken cancellationToken)
await workerTask.ConfigureAwait(false);
_logger.LogDebug("[BlobWriteQueue] All workers completed gracefully");
}

// NOW stop MetricsLoop (it was last to run)
_metricsLoopCts.Cancel();

// Wait for MetricsLoop to finish
try
{
await Task.WhenAny(_workers.Last(), Task.Delay(2000)).ConfigureAwait(false);
}
catch { }
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -603,35 +623,67 @@ private async Task ExecuteBatchAsync(
}
}

static string lastLog = "";
// Snapshot counters for delta calculation between metrics intervals
private long _lastQueued = 0;
private long _lastCompleted = 0;
private long _lastFailed = 0;
private long _lastBatches = 0;

private async Task MetricsLoop(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(_options.MetricsIntervalSeconds), cancellationToken)
.ConfigureAwait(false);

var queueDepth = _workerChannels.Sum(ch => ch.Reader.Count);
var avgQueueTime = _operationsCompleted > 0 ? _totalQueueTimeMs / _operationsCompleted : 0;
var avgProcessTime = _operationsCompleted > 0 ? _totalProcessTimeMs / _operationsCompleted : 0;
var successRate = _operationsQueued > 0 ? (double)_operationsCompleted / _operationsQueued : 0;

var logline = $"[BlobWriteQueue] Metrics - Queued: {_operationsQueued}, Completed: {_operationsCompleted}, " +
$"Failed: {_operationsFailed}, Batches: {_batchesExecuted}, Depth: {queueDepth}, " +
$"SuccessRate: {successRate:P2}, AvgQueue: {avgQueueTime}ms, AvgProcess: {avgProcessTime}ms";

if (logline != lastLog)
{
lastLog = logline;
// During shutdown, log more frequently to show progress
var delay = _isShuttingDown
? TimeSpan.FromSeconds(2)
: TimeSpan.FromSeconds(_options.MetricsIntervalSeconds);

await Task.Delay(delay, cancellationToken)
.ConfigureAwait(false);

// Snapshot current totals
var queued = Interlocked.Read(ref _operationsQueued);
var completed = Interlocked.Read(ref _operationsCompleted);
var failed = Interlocked.Read(ref _operationsFailed);
var batches = Interlocked.Read(ref _batchesExecuted);

// Calculate deltas since last report
var deltaQueued = queued - _lastQueued;
var deltaCompleted = completed - _lastCompleted;
var deltaFailed = failed - _lastFailed;
var deltaBatches = batches - _lastBatches;

// Skip if nothing happened since last report
if (deltaQueued == 0 && deltaCompleted == 0 && deltaFailed == 0)
continue;

// Update snapshots
_lastQueued = queued;
_lastCompleted = completed;
_lastFailed = failed;
_lastBatches = batches;

var remaining = _workerChannels.Sum(ch => ch.Reader.Count);
var avgQueueTime = completed > 0 ? _totalQueueTimeMs / completed : 0;
var avgProcessTime = completed > 0 ? _totalProcessTimeMs / completed : 0;

if (failed > 0)
{
_logger.LogWarning(
"[BlobWriteQueue] Δ Queued: +{DeltaQueued}, Completed: +{DeltaCompleted}, Failed: +{DeltaFailed} (total: {TotalFailed}), " +
"Batches: +{DeltaBatches} | Remaining: {Remaining}, AvgQueue: {AvgQueue}ms, AvgProcess: {AvgProcess}ms",
deltaQueued, deltaCompleted, deltaFailed, failed,
deltaBatches, remaining, avgQueueTime, avgProcessTime);
}
else
{
_logger.LogInformation(
"[BlobWriteQueue] Metrics - Queued: {Queued}, Completed: {Completed}, Failed: {Failed}, " +
"Batches: {Batches}, Depth: {Depth}, SuccessRate: {SuccessRate:P2}, " +
"AvgQueue: {AvgQueue}ms, AvgProcess: {AvgProcess}ms",
_operationsQueued, _operationsCompleted, _operationsFailed, _batchesExecuted,
queueDepth, successRate, avgQueueTime, avgProcessTime);
"[BlobWriteQueue] Δ Queued: +{DeltaQueued}, Completed: +{DeltaCompleted}, " +
"Batches: +{DeltaBatches} | Remaining: {Remaining}, AvgQueue: {AvgQueue}ms, AvgProcess: {AvgProcess}ms",
deltaQueued, deltaCompleted,
deltaBatches, remaining, avgQueueTime, avgProcessTime);
}
}
catch (OperationCanceledException)
Expand All @@ -644,6 +696,7 @@ await Task.Delay(TimeSpan.FromSeconds(_options.MetricsIntervalSeconds), cancella
public void Dispose()
{
_shutdownCts?.Dispose();
_metricsLoopCts?.Dispose();
GC.SuppressFinalize(this);
}
}
Expand Down
44 changes: 44 additions & 0 deletions src/SimpleL7Proxy/BlobStorage/QueuedBlobWriter.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Uncomment to test blob shutdown behavior with 100 copies per write
//#define TEST_BLOB_SHUTDOWN

using System;
using System.IO;
using System.Threading;
Expand All @@ -18,6 +21,7 @@ internal class QueuedBlobStream : Stream
private readonly string _blobName;
private readonly ILogger _logger;
private bool _disposed;
private readonly List<Task<BlobWriteResult>> _pendingWrites = new();

public QueuedBlobStream(
BlobWriteQueue queue,
Expand Down Expand Up @@ -61,6 +65,26 @@ public override async Task FlushAsync(CancellationToken cancellationToken)
_buffer.SetLength(0);
_buffer.Position = 0;

#if TEST_BLOB_SHUTDOWN
// TEST: Enqueue 100 copies to test shutdown flushing behavior
for (int i = 0; i < 100; i++)
{
var operation = new BlobWriteOperation
{
ContainerName = _containerName,
BlobName = $"{_blobName}-{i}",
Data = new ReadOnlyMemory<byte>(data),
Priority = 0
};

await _queue.EnqueueAsync(operation, cancellationToken).ConfigureAwait(false);
_pendingWrites.Add(operation.GetResultAsync());
}

_logger.LogTrace(
"[QueuedBlobStream] Enqueued 100 copies ({Size}B each) for {Container}/{Blob}",
data.Length, _containerName, _blobName);
#else
var operation = new BlobWriteOperation
{
ContainerName = _containerName,
Expand All @@ -70,10 +94,12 @@ public override async Task FlushAsync(CancellationToken cancellationToken)
};

await _queue.EnqueueAsync(operation, cancellationToken).ConfigureAwait(false);
_pendingWrites.Add(operation.GetResultAsync());

_logger.LogTrace(
"[QueuedBlobStream] Enqueued {Size}B for {Container}/{Blob}",
data.Length, _containerName, _blobName);
#endif
}

public override void Write(byte[] buffer, int offset, int count)
Expand All @@ -100,6 +126,24 @@ public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, Cancella
await _buffer.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Waits for all enqueued blob write operations to complete.
/// Call this before sending a "Completed" status so the client
/// does not try to read the blob before it exists.
/// </summary>
public async Task WaitForPendingWritesAsync(CancellationToken cancellationToken = default)
{
if (_pendingWrites.Count == 0)
return;

_logger.LogDebug(
"[QueuedBlobStream] Waiting for {Count} pending writes for {Container}/{Blob}",
_pendingWrites.Count, _containerName, _blobName);

await Task.WhenAll(_pendingWrites).WaitAsync(cancellationToken).ConfigureAwait(false);
_pendingWrites.Clear();
}

public override int Read(byte[] buffer, int offset, int count) =>
throw new NotSupportedException("QueuedBlobStream does not support reading.");

Expand Down
2 changes: 1 addition & 1 deletion src/SimpleL7Proxy/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public static class Constants
public const string RoundRobin = "roundrobin";
public const string Random = "random";
public const string Server = "simplel7proxy";
public const string VERSION = "2.2.9-d1";
public const string VERSION = "2.2.9-d2";

public const int AnyPriority = -1;

Expand Down
30 changes: 29 additions & 1 deletion src/SimpleL7Proxy/CoordinatedShutdownService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Microsoft.Extensions.Options;

using SimpleL7Proxy.Backend;
using SimpleL7Proxy.BlobStorage;
using SimpleL7Proxy.Config;
using SimpleL7Proxy.Events;
using SimpleL7Proxy.Proxy;
Expand All @@ -28,6 +29,8 @@ public class CoordinatedShutdownService : IHostedService
private readonly IBackendService _backends;
private readonly IAsyncFeeder _asyncFeeder;
private readonly IRequeueWorker _requeueWorker;
private readonly BlobWriteQueue _blobWriteQueue;
private readonly ProbeServer _probeServer;


public CoordinatedShutdownService(IHostApplicationLifetime appLifetime,
Expand All @@ -40,6 +43,8 @@ public CoordinatedShutdownService(IHostApplicationLifetime appLifetime,
IAsyncFeeder asyncFeeder,
IBackupAPIService backupAPIService,
IRequeueWorker requeueWorker,
BlobWriteQueue blobWriteQueue,
ProbeServer probeServer,
ILogger<CoordinatedShutdownService> logger,
Server server)
{
Expand All @@ -54,10 +59,21 @@ public CoordinatedShutdownService(IHostApplicationLifetime appLifetime,
_asyncFeeder = asyncFeeder;
_backendTokenProvider = backendTokenProvider;
_requeueWorker = requeueWorker;
_blobWriteQueue = blobWriteQueue;
_probeServer = probeServer;
_options = backendOptions.Value;
}

public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public async Task StartAsync(CancellationToken cancellationToken)
{
// Start services explicitly since they're no longer registered as IHostedService
// (we control their shutdown ordering in StopAsync)
await _blobWriteQueue.StartAsync(cancellationToken).ConfigureAwait(false);
await _probeServer.StartAsync(cancellationToken).ConfigureAwait(false);

if (_serviceBusRequestService is IHostedService sbHosted)
await sbHosted.StartAsync(cancellationToken).ConfigureAwait(false);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -129,7 +145,19 @@ public async Task StopAsync(CancellationToken cancellationToken)
if (_serviceBusRequestService != null)
await _serviceBusRequestService.StopAsync(cancellationToken).ConfigureAwait(false);

// BlobWriteQueue is stopped LAST before probes - all producers (proxy workers, async workers, backup service)
// are guaranteed to be done at this point, so no more enqueues will happen
_logger.LogInformation("[SHUTDOWN] ⏹ Stopping BlobWriteQueue (final flush)");
await _blobWriteQueue.StopAsync(CancellationToken.None).ConfigureAwait(false);

_eventClient?.StopTimer();

// Health probes are stopped at the VERY END so the container orchestrator
// (e.g. Kubernetes, Container Apps) continues to see healthy probes while
// other services drain. If probes fail early, the orchestrator may kill the pod.
_logger.LogInformation("[SHUTDOWN] ⏹ Stopping health probes");
await _probeServer.StopAsync().ConfigureAwait(false);
await _server.StopProbes(CancellationToken.None).ConfigureAwait(false);
}

}
7 changes: 7 additions & 0 deletions src/SimpleL7Proxy/DTO/RequestDataBackupService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ public async Task RestoreIntoAsync(RequestData rdata)
rdata.setBody(Encoding.UTF8.GetBytes(datastr));
//_logger.LogTrace($"[BLOB-TRACE] BackupService.RestoreIntoAsync | Action: ReadBody-Complete | Guid: {rdata.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}");
}
else if (rdata.Body == null)
{
// No body blob exists — client never sent a request body.
// Set an empty body so ProxyToBackEndAsync doesn't throw ArgumentNullException.
rdata.setBody(Array.Empty<byte>());
_logger.LogInformation("No body blob found for {Guid} - client did not send a request body", rdata.Guid);
}

//_logger.LogTrace($"[BLOB-TRACE] BackupService.RestoreIntoAsync | Action: Complete | Guid: {rdata.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}");
return;
Expand Down
Loading