From 7611d2d116112c1945da30a1c74473e3086718f6 Mon Sep 17 00:00:00 2001 From: "quang.le" Date: Mon, 17 Jan 2022 15:43:03 +0700 Subject: [PATCH 1/5] Update Queue.cs --- Src/Coravel/Queuing/Queue.cs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Src/Coravel/Queuing/Queue.cs b/Src/Coravel/Queuing/Queue.cs index fd23a228..d36de200 100644 --- a/Src/Coravel/Queuing/Queue.cs +++ b/Src/Coravel/Queuing/Queue.cs @@ -47,10 +47,16 @@ public Guid QueueInvocable() where T : IInvocable public Guid QueueInvocableWithPayload(TParams payload) where T : IInvocable, IInvocableWithPayload { + var tokenSource = new CancellationTokenSource(); var job = this.EnqueueInvocable(invocable => { - IInvocableWithPayload invocableWithParams = (IInvocableWithPayload) invocable; + var invocableWithParams = (IInvocableWithPayload) invocable; invocableWithParams.Payload = payload; + if (invocableWithParams is ICancellableTask) + { + ((ICancellableTask) invocable).Token = tokenSource.Token; + } }); + this._tokens.TryAdd(job.Guid, tokenSource); return job.Guid; } From 005214fd889c0eef08d03b2e60b95ef5abef1bc3 Mon Sep 17 00:00:00 2001 From: "quang.le" Date: Mon, 17 Jan 2022 16:10:01 +0700 Subject: [PATCH 2/5] Add CanCancelInvocableWithPayload Test --- .../CancellableInvocableForQueueTests.cs | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs b/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs index 0dd98247..5414e54b 100644 --- a/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs +++ b/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs @@ -12,6 +12,25 @@ namespace UnitTests.Queuing { public class CancellableInvocableForQueueTests { + [Fact] + public async Task CanCancelInvocableWithPayload() + { + var services = new ServiceCollection(); + services.AddTransient(); + services.AddTransient(); + var provider = services.BuildServiceProvider(); + + var queue = new Queue(provider.GetRequiredService(), new DispatcherStub()); + var payload = new TestPayload() + { + Code = "test" + }; + queue.QueueInvocableWithPayload(payload); + TestCancellableInvocable.TokensCancelled = 0; + await queue.ConsumeQueueOnShutdown(); + Assert.Equal(1, TestCancellableInvocableWithPayload.TokensCancelled); + } + [Fact] public async Task CanCancelInvocable() { @@ -82,9 +101,37 @@ public Task Invoke() { Interlocked.Increment(ref TokensCancelled); } + return Task.CompletedTask; + } + } + + private class TestCancellableInvocableWithPayload : IInvocable, IInvocableWithPayload, ICancellableTask + { + /// + /// Static fields keeps track of all cancelled tokens count. + /// + public static int TokensCancelled = 0; + public TestPayload Payload { get; set; } + public TestCancellableInvocableWithPayload() {} + + public CancellationToken Token { get; set; } + + public Task Invoke() + { + if(this.Token.IsCancellationRequested) + { + Interlocked.Increment(ref TokensCancelled); + } + Thread.Sleep(10000); return Task.CompletedTask; } + + } + + private class TestPayload + { + public string Code { get; set; } } } } \ No newline at end of file From 593c6aa45411fdc1863c8366b8afcc81ec6953c7 Mon Sep 17 00:00:00 2001 From: lengockyquang Date: Sun, 15 May 2022 17:34:03 +0700 Subject: [PATCH 3/5] Add try cancel by cancel token id --- Src/Coravel/Queuing/Interfaces/IQueue.cs | 7 +++++++ Src/Coravel/Queuing/Queue.cs | 12 ++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/Src/Coravel/Queuing/Interfaces/IQueue.cs b/Src/Coravel/Queuing/Interfaces/IQueue.cs index a57dd74c..0d35026d 100644 --- a/Src/Coravel/Queuing/Interfaces/IQueue.cs +++ b/Src/Coravel/Queuing/Interfaces/IQueue.cs @@ -51,5 +51,12 @@ public interface IQueue /// /// QueueMetrics GetMetrics(); + + /// + /// Try to cancel an invocable by its token id return from QueueInvocableWithPayload + /// + /// Token id return from QueueInvocableWithPayload + /// + bool TryCancelInvocable(Guid tokenId); } } \ No newline at end of file diff --git a/Src/Coravel/Queuing/Queue.cs b/Src/Coravel/Queuing/Queue.cs index 5678b15d..ffc4ab0e 100644 --- a/Src/Coravel/Queuing/Queue.cs +++ b/Src/Coravel/Queuing/Queue.cs @@ -51,9 +51,9 @@ public Guid QueueInvocableWithPayload(TParams payload) where T : IIn var job = this.EnqueueInvocable(invocable => { var invocableWithParams = (IInvocableWithPayload) invocable; invocableWithParams.Payload = payload; - if (invocableWithParams is ICancellableTask) + if (invocableWithParams is ICancellableTask task) { - ((ICancellableTask) invocable).Token = tokenSource.Token; + task.Token = tokenSource.Token; } }); this._tokens.TryAdd(job.Guid, tokenSource); @@ -138,6 +138,14 @@ public QueueMetrics GetMetrics() return new QueueMetrics(this._tasksRunningCount, waitingCount); } + public bool TryCancelInvocable(Guid tokenId) + { + if (!_tokens.TryGetValue(tokenId, out var tokenNeedCancel)) return false; // token does not exist + if (tokenNeedCancel.IsCancellationRequested) return false; // token is already canceled + tokenNeedCancel.Cancel(); + return true; + } + private void CancelAllTokens() { foreach(var kv in this._tokens.AsEnumerable()) From 3664e888dff8ff26d7052b116f78a70c689cae24 Mon Sep 17 00:00:00 2001 From: lengockyquang Date: Sun, 15 May 2022 17:35:52 +0700 Subject: [PATCH 4/5] Add test for IQueue.TryCancelInvocable - Test IQueue TryCancelInvocable - Minor refactor on CanCancelInvocableWithPayloadOnQueueShutdown --- .../CancellableInvocableForQueueTests.cs | 46 +++++++++++++++++-- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs b/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs index 5414e54b..dbe0255f 100644 --- a/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs +++ b/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs @@ -13,22 +13,58 @@ namespace UnitTests.Queuing public class CancellableInvocableForQueueTests { [Fact] - public async Task CanCancelInvocableWithPayload() + public async Task CanCancelSpecificInvocableWithPayload() { + // init var services = new ServiceCollection(); services.AddTransient(); services.AddTransient(); var provider = services.BuildServiceProvider(); - var queue = new Queue(provider.GetRequiredService(), new DispatcherStub()); + + //exec var payload = new TestPayload() { Code = "test" }; - queue.QueueInvocableWithPayload(payload); - TestCancellableInvocable.TokensCancelled = 0; + var firstItem = queue.QueueInvocableWithPayload(payload); + var secondItem = queue.QueueInvocableWithPayload(payload); + var thirdItem = queue.QueueInvocableWithPayload(payload); + + var cancelResultFirstItem = queue.TryCancelInvocable(firstItem); + var cancelResultThirdItem = queue.TryCancelInvocable(thirdItem); + + //assert + TestCancellableInvocableWithPayload.TokensCancelled = 0; + await queue.ConsumeQueueAsync(); + Assert.True(cancelResultFirstItem); + Assert.True(cancelResultThirdItem); + Assert.Equal(2, TestCancellableInvocableWithPayload.TokensCancelled); + } + + [Fact] + public async Task CanCancelInvocableWithPayloadOnQueueShutdown() + { + // init + var services = new ServiceCollection(); + services.AddTransient(); + services.AddTransient(); + var provider = services.BuildServiceProvider(); + var queue = new Queue(provider.GetRequiredService(), new DispatcherStub()); + + //exec + var payload = new TestPayload() + { + Code = "test" + }; + var firstItem = queue.QueueInvocableWithPayload(payload); + var secondItem = queue.QueueInvocableWithPayload(payload); + var thirdItem = queue.QueueInvocableWithPayload(payload); + + // assert + TestCancellableInvocableWithPayload.TokensCancelled = 0; await queue.ConsumeQueueOnShutdown(); - Assert.Equal(1, TestCancellableInvocableWithPayload.TokensCancelled); + Assert.Equal(3, TestCancellableInvocableWithPayload.TokensCancelled); } [Fact] From 07a12f74ebbbcaa1520516de1454cd6b840c1820 Mon Sep 17 00:00:00 2001 From: lengockyquang Date: Sun, 22 May 2022 17:02:17 +0700 Subject: [PATCH 5/5] Remove useless thread sleep --- .../Queuing/CancellableInvocableForQueueTests.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs b/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs index dbe0255f..8a9d6e19 100644 --- a/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs +++ b/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs @@ -158,8 +158,6 @@ public Task Invoke() { Interlocked.Increment(ref TokensCancelled); } - Thread.Sleep(10000); - return Task.CompletedTask; }