From 81fc6e47304702755054e556eaaf4141dc56fd0d Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Thu, 31 Oct 2024 19:39:46 +0900 Subject: [PATCH 1/3] Add failing test coverage of exception handling --- .../BatchProcessorTests.cs | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs b/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs index d777fd8..6522e98 100644 --- a/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs +++ b/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs @@ -195,6 +195,40 @@ public void SendThenErrorDoesRetry() Assert.Equal(obj, receivedObject); } + [Fact] + public void MultipleErrorsAttachedToCorrectItems() + { + var cts = new CancellationTokenSource(10000); + + var obj1 = FakeData.New(); + var obj2 = FakeData.New(); + + bool gotCorrectExceptionForItem1 = false; + bool gotCorrectExceptionForItem2 = false; + + processor.Error += (exception, item) => + { + Assert.NotNull(exception); + + gotCorrectExceptionForItem1 |= Equals(item.Data, obj1.Data) && exception.Message == "1"; + gotCorrectExceptionForItem2 |= Equals(item.Data, obj2.Data) && exception.Message == "2"; + }; + + processor.PushToQueue(new[] { obj1, obj2 }); + + processor.Received += o => + { + if (Equals(o.Data, obj1.Data)) throw new Exception("1"); + if (Equals(o.Data, obj2.Data)) throw new Exception("2"); + }; + + processor.Run(cts.Token); + + Assert.Equal(0, processor.GetQueueSize()); + Assert.True(gotCorrectExceptionForItem1); + Assert.True(gotCorrectExceptionForItem2); + } + [Fact] public void SendThenErrorForeverDoesDrop() { From 563a5ad7c402a2247121d6370d2cfe06d18dfd30 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Thu, 31 Oct 2024 19:32:53 +0900 Subject: [PATCH 2/3] Change how exceptions are handled to better relate them to individual items --- .../BatchProcessorTests.cs | 1 + .../TestBatchProcessor.cs | 9 +++++- osu.Server.QueueProcessor/QueueItem.cs | 12 ++++++- osu.Server.QueueProcessor/QueueProcessor.cs | 31 +++++++++++++------ 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs b/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs index 6522e98..71a4381 100644 --- a/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs +++ b/osu.Server.QueueProcessor.Tests/BatchProcessorTests.cs @@ -209,6 +209,7 @@ public void MultipleErrorsAttachedToCorrectItems() processor.Error += (exception, item) => { Assert.NotNull(exception); + Assert.Equal(exception, item.Exception); gotCorrectExceptionForItem1 |= Equals(item.Data, obj1.Data) && exception.Message == "1"; gotCorrectExceptionForItem2 |= Equals(item.Data, obj2.Data) && exception.Message == "2"; diff --git a/osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs b/osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs index 63d5cc9..db9c80b 100644 --- a/osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs +++ b/osu.Server.QueueProcessor.Tests/TestBatchProcessor.cs @@ -21,7 +21,14 @@ protected override void ProcessResults(IEnumerable items) { foreach (var item in items) { - Received?.Invoke(item); + try + { + Received?.Invoke(item); + } + catch (Exception e) + { + item.Exception = e; + } } } diff --git a/osu.Server.QueueProcessor/QueueItem.cs b/osu.Server.QueueProcessor/QueueItem.cs index 9e622fb..823a28e 100644 --- a/osu.Server.QueueProcessor/QueueItem.cs +++ b/osu.Server.QueueProcessor/QueueItem.cs @@ -12,11 +12,21 @@ namespace osu.Server.QueueProcessor [Serializable] public abstract class QueueItem { + [IgnoreDataMember] + private bool failed; + /// /// Set to true to mark this item is failed. This will cause it to be retried. /// [IgnoreDataMember] - public bool Failed { get; set; } + public bool Failed + { + get => failed || Exception != null; + set => failed = value; + } + + [IgnoreDataMember] + public Exception? Exception { get; set; } /// /// The number of times processing this item has been retried. Handled internally by . diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs index 8fd2df0..0136590 100644 --- a/osu.Server.QueueProcessor/QueueProcessor.cs +++ b/osu.Server.QueueProcessor/QueueProcessor.cs @@ -132,11 +132,11 @@ public void Run(CancellationToken cancellation = default) // individual processing should not be cancelled as we have already grabbed from the queue. Task.Factory.StartNew(() => { ProcessResults(items); }, CancellationToken.None, TaskCreationOptions.HideScheduler, threadPool) - .ContinueWith(t => + .ContinueWith(_ => { foreach (var item in items) { - if (t.Exception != null || item.Failed) + if (item.Failed) { Interlocked.Increment(ref totalErrors); @@ -145,12 +145,12 @@ public void Run(CancellationToken cancellation = default) Interlocked.Increment(ref consecutiveErrors); - Error?.Invoke(t.Exception, item); + Error?.Invoke(item.Exception, item); - if (t.Exception != null) - SentrySdk.CaptureException(t.Exception); + if (item.Exception != null) + SentrySdk.CaptureException(item.Exception); - Console.WriteLine($"Error processing {item}: {t.Exception}"); + Console.WriteLine($"Error processing {item}: {item.Exception}"); attemptRetry(item); } else @@ -197,8 +197,6 @@ private void setupSentry(SentryOptions options) private void attemptRetry(T item) { - item.Failed = false; - if (item.TotalRetries++ < config.MaxRetries) { Console.WriteLine($"Re-queueing for attempt {item.TotalRetries} / {config.MaxRetries}"); @@ -274,11 +272,26 @@ protected virtual void ProcessResult(T item) /// /// Implement to process batches of items from the queue. /// + /// + /// In most cases, you should only need to override and implement . + /// Only override this if you need more efficient batch processing. + /// + /// If overriding this method, you should try-catch for exceptions, and set any exception against + /// the relevant . If this is not done, failures will not be handled correctly. /// The items to process. protected virtual void ProcessResults(IEnumerable items) { foreach (var item in items) - ProcessResult(item); + { + try + { + ProcessResult(item); + } + catch (Exception e) + { + item.Exception = e; + } + } } } } From b4e84856d70a1a7422537989075000bfbd074fc5 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Thu, 31 Oct 2024 19:46:23 +0900 Subject: [PATCH 3/3] Error handle exception handler for safety --- osu.Server.QueueProcessor/QueueProcessor.cs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs index 0136590..4cfe21c 100644 --- a/osu.Server.QueueProcessor/QueueProcessor.cs +++ b/osu.Server.QueueProcessor/QueueProcessor.cs @@ -145,7 +145,13 @@ public void Run(CancellationToken cancellation = default) Interlocked.Increment(ref consecutiveErrors); - Error?.Invoke(item.Exception, item); + try + { + Error?.Invoke(item.Exception, item); + } + catch + { + } if (item.Exception != null) SentrySdk.CaptureException(item.Exception);