From 391caa8b60079a73b42ea4eaee1aa0d398841517 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Tue, 28 Mar 2023 16:45:13 -0700 Subject: [PATCH] Ordered IO completion option in CompletePending --- cs/src/core/Async/CompletePendingAsync.cs | 4 +- cs/src/core/ClientSession/BasicContext.cs | 16 ++--- cs/src/core/ClientSession/ClientSession.cs | 40 ++++++------- cs/src/core/ClientSession/IFasterContext.cs | 10 ++-- cs/src/core/ClientSession/LockableContext.cs | 20 +++---- .../ClientSession/LockableUnsafeContext.cs | 16 ++--- cs/src/core/ClientSession/UnsafeContext.cs | 16 ++--- cs/src/core/Index/Common/Contexts.cs | 18 +++++- cs/src/core/Index/FASTER/FASTERThread.cs | 59 +++++++++++++++++-- .../core/Index/Interfaces/IFasterSession.cs | 2 +- 10 files changed, 132 insertions(+), 69 deletions(-) diff --git a/cs/src/core/Async/CompletePendingAsync.cs b/cs/src/core/Async/CompletePendingAsync.cs index 237b67e58..b789c169f 100644 --- a/cs/src/core/Async/CompletePendingAsync.cs +++ b/cs/src/core/Async/CompletePendingAsync.cs @@ -28,7 +28,7 @@ public partial class FasterKV : FasterBase, IFasterKV /// /// internal async ValueTask CompletePendingAsync(FasterSession fasterSession, - CancellationToken token, CompletedOutputIterator completedOutputs) + CancellationToken token, CompletedOutputIterator completedOutputs, bool orderedResponses) where FasterSession : IFasterSession { while (true) @@ -36,7 +36,7 @@ public partial class FasterKV : FasterBase, IFasterKV fasterSession.UnsafeResumeThread(); try { - InternalCompletePendingRequests(fasterSession, completedOutputs); + InternalCompletePendingRequests(fasterSession, completedOutputs, orderedResponses); } finally { diff --git a/cs/src/core/ClientSession/BasicContext.cs b/cs/src/core/ClientSession/BasicContext.cs index 911d26d51..bd370f5e4 100644 --- a/cs/src/core/ClientSession/BasicContext.cs +++ b/cs/src/core/ClientSession/BasicContext.cs @@ -36,20 +36,20 @@ public void UnsafeSuspendThread() #region IFasterContext /// - public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) - => clientSession.CompletePending(wait, spinWaitForCommit); + public bool CompletePending(bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) + => clientSession.CompletePending(wait, spinWaitForCommit, orderedResponses); /// - public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) - => clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit); + public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) + => clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit, orderedResponses); /// - public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default) - => clientSession.CompletePendingAsync(waitForCommit, token); + public ValueTask CompletePendingAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default) + => clientSession.CompletePendingAsync(waitForCommit, orderedResponses, token); /// - public ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default) - => clientSession.CompletePendingWithOutputsAsync(waitForCommit, token); + public ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default) + => clientSession.CompletePendingWithOutputsAsync(waitForCommit, orderedResponses, token); /// [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index 83aad5410..ae873b343 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -610,14 +610,14 @@ public IEnumerable GetPendingRequests() } /// - public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) - => CompletePending(false, wait, spinWaitForCommit); + public bool CompletePending(bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) + => CompletePending(false, wait, spinWaitForCommit, orderedResponses); /// - public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) + public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) { InitializeCompletedOutputs(); - var result = CompletePending(true, wait, spinWaitForCommit); + var result = CompletePending(true, wait, spinWaitForCommit, orderedResponses); completedOutputs = this.completedOutputs; return result; } @@ -626,11 +626,11 @@ public bool CompletePendingWithOutputs(out CompletedOutputIterator - internal bool UnsafeCompletePendingWithOutputs(FasterSession fasterSession, out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) + internal bool UnsafeCompletePendingWithOutputs(FasterSession fasterSession, out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) where FasterSession : IFasterSession { InitializeCompletedOutputs(); - var result = UnsafeCompletePending(fasterSession, true, wait, spinWaitForCommit); + var result = UnsafeCompletePending(fasterSession, true, wait, spinWaitForCommit, orderedResponses); completedOutputs = this.completedOutputs; return result; } @@ -643,12 +643,12 @@ private void InitializeCompletedOutputs() this.completedOutputs.Dispose(); } - internal bool CompletePending(bool getOutputs, bool wait, bool spinWaitForCommit) + internal bool CompletePending(bool getOutputs, bool wait, bool spinWaitForCommit, bool orderedResponses) { UnsafeResumeThread(); try { - return UnsafeCompletePending(FasterSession, getOutputs, wait, spinWaitForCommit); + return UnsafeCompletePending(FasterSession, getOutputs, wait, spinWaitForCommit, orderedResponses); } finally { @@ -656,21 +656,21 @@ internal bool CompletePending(bool getOutputs, bool wait, bool spinWaitForCommit } } - internal bool UnsafeCompletePending(FasterSession fasterSession, bool getOutputs, bool wait, bool spinWaitForCommit) + internal bool UnsafeCompletePending(FasterSession fasterSession, bool getOutputs, bool wait, bool spinWaitForCommit, bool orderedResponses) where FasterSession : IFasterSession { var requestedOutputs = getOutputs ? this.completedOutputs : default; - var result = fht.InternalCompletePending(fasterSession, wait, requestedOutputs); + var result = fht.InternalCompletePending(fasterSession, wait, requestedOutputs, orderedResponses); if (spinWaitForCommit) { if (!wait) throw new FasterException("Can spin-wait for commit (checkpoint completion) only if wait is true"); do { - fht.InternalCompletePending(fasterSession, wait, requestedOutputs); + fht.InternalCompletePending(fasterSession, wait, requestedOutputs, orderedResponses); if (fht.InRestPhase()) { - fht.InternalCompletePending(fasterSession, wait, requestedOutputs); + fht.InternalCompletePending(fasterSession, wait, requestedOutputs, orderedResponses); return true; } } while (wait); @@ -679,18 +679,18 @@ internal bool UnsafeCompletePending(FasterSession fasterSession, } /// - public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default) - => CompletePendingAsync(false, waitForCommit, token); + public ValueTask CompletePendingAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default) + => CompletePendingAsync(false, waitForCommit, orderedResponses, token); /// - public async ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default) + public async ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default) { InitializeCompletedOutputs(); - await CompletePendingAsync(true, waitForCommit, token).ConfigureAwait(false); + await CompletePendingAsync(true, waitForCommit, orderedResponses, token).ConfigureAwait(false); return this.completedOutputs; } - private async ValueTask CompletePendingAsync(bool getOutputs, bool waitForCommit = false, CancellationToken token = default) + private async ValueTask CompletePendingAsync(bool getOutputs, bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default) { token.ThrowIfCancellationRequested(); @@ -698,7 +698,7 @@ private async ValueTask CompletePendingAsync(bool getOutputs, bool waitForCommit throw new NotSupportedException("Async operations not supported over protected epoch"); // Complete all pending operations on session - await fht.CompletePendingAsync(this.FasterSession, token, getOutputs ? this.completedOutputs : null).ConfigureAwait(false); + await fht.CompletePendingAsync(this.FasterSession, token, getOutputs ? this.completedOutputs : null, orderedResponses).ConfigureAwait(false); // Wait for commit if necessary if (waitForCommit) @@ -1381,8 +1381,8 @@ public IHeapContainer GetHeapContainer(ref Input input) public void UnsafeSuspendThread() => _clientSession.UnsafeSuspendThread(); - public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) - => _clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit); + public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) + => _clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit, orderedResponses); public FasterKV.FasterExecutionContext Ctx => this._clientSession.ctx; #endregion Internal utilities diff --git a/cs/src/core/ClientSession/IFasterContext.cs b/cs/src/core/ClientSession/IFasterContext.cs index 164cf3c4f..64cec7384 100644 --- a/cs/src/core/ClientSession/IFasterContext.cs +++ b/cs/src/core/ClientSession/IFasterContext.cs @@ -17,8 +17,9 @@ public interface IFasterContext /// /// Wait for all pending operations on session to complete /// Spin-wait until ongoing commit/checkpoint, if any, completes + /// Whether responses are process in the order of IO issue /// True if all pending operations have completed, false otherwise - bool CompletePending(bool wait = false, bool spinWaitForCommit = false); + bool CompletePending(bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false); /// /// Synchronously complete outstanding pending synchronous operations, returning outputs for the completed operations. @@ -27,22 +28,23 @@ public interface IFasterContext /// Outputs completed by this operation /// Wait for all pending operations on session to complete /// Spin-wait until ongoing commit/checkpoint, if any, completes + /// Whether responses are process in the order of IO issue /// True if all pending operations have completed, false otherwise - bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false); + bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false); /// /// Complete all pending synchronous FASTER operations. /// Async operations must be completed individually. /// /// - ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default); + ValueTask CompletePendingAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default); /// /// Complete all pending synchronous FASTER operations, returning outputs for the completed operations. /// Async operations must be completed individually. /// /// Outputs completed by this operation - ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default); + ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default); /// /// Read operation diff --git a/cs/src/core/ClientSession/LockableContext.cs b/cs/src/core/ClientSession/LockableContext.cs index 39a2c34fa..871fb4343 100644 --- a/cs/src/core/ClientSession/LockableContext.cs +++ b/cs/src/core/ClientSession/LockableContext.cs @@ -170,13 +170,13 @@ public void Unlock(TLockableKey[] keys, int start, int count) #region IFasterContext /// - public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) + public bool CompletePending(bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) { Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected()); clientSession.UnsafeResumeThread(); try { - return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit); + return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit, orderedResponses); } finally { @@ -185,13 +185,13 @@ public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) } /// - public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) + public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) { Debug.Assert(!clientSession.fht.epoch.ThisInstanceProtected()); clientSession.UnsafeResumeThread(); try { - return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit); + return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit, orderedResponses); } finally { @@ -200,12 +200,12 @@ public bool CompletePendingWithOutputs(out CompletedOutputIterator - public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default) - => this.clientSession.CompletePendingAsync(waitForCommit, token); + public ValueTask CompletePendingAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default) + => this.clientSession.CompletePendingAsync(waitForCommit, orderedResponses, token); /// - public ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default) - => this.clientSession.CompletePendingWithOutputsAsync(waitForCommit, token); + public ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default) + => this.clientSession.CompletePendingWithOutputsAsync(waitForCommit, orderedResponses, token); /// [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -750,8 +750,8 @@ public IHeapContainer GetHeapContainer(ref Input input) public void UnsafeSuspendThread() => _clientSession.UnsafeSuspendThread(); - public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) - => _clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit); + public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) + => _clientSession.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit, orderedResponses); public FasterKV.FasterExecutionContext Ctx => this._clientSession.ctx; #endregion Internal utilities diff --git a/cs/src/core/ClientSession/LockableUnsafeContext.cs b/cs/src/core/ClientSession/LockableUnsafeContext.cs index 4ffc23493..490c6dccc 100644 --- a/cs/src/core/ClientSession/LockableUnsafeContext.cs +++ b/cs/src/core/ClientSession/LockableUnsafeContext.cs @@ -112,26 +112,26 @@ public void Unlock(TLockableKey[] keys, int start, int count) #region IFasterContext /// - public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) + public bool CompletePending(bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) { Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); - return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit); + return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit, orderedResponses); } /// - public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) + public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) { Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); - return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit); + return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit, orderedResponses); } /// - public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default) - => this.clientSession.CompletePendingAsync(waitForCommit, token); + public ValueTask CompletePendingAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default) + => this.clientSession.CompletePendingAsync(waitForCommit, orderedResponses, token); /// - public ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default) - => this.clientSession.CompletePendingWithOutputsAsync(waitForCommit, token); + public ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default) + => this.clientSession.CompletePendingWithOutputsAsync(waitForCommit, orderedResponses, token); /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public Status Read(ref Key key, ref Input input, ref Output output, Context userContext = default, long serialNo = 0) diff --git a/cs/src/core/ClientSession/UnsafeContext.cs b/cs/src/core/ClientSession/UnsafeContext.cs index d35872d2b..76741fa1a 100644 --- a/cs/src/core/ClientSession/UnsafeContext.cs +++ b/cs/src/core/ClientSession/UnsafeContext.cs @@ -41,26 +41,26 @@ internal UnsafeContext(ClientSession - public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) + public bool CompletePending(bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) { Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); - return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit); + return this.clientSession.UnsafeCompletePending(this.FasterSession, false, wait, spinWaitForCommit, orderedResponses); } /// - public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) + public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false) { Debug.Assert(clientSession.fht.epoch.ThisInstanceProtected()); - return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit); + return this.clientSession.UnsafeCompletePendingWithOutputs(this.FasterSession, out completedOutputs, wait, spinWaitForCommit, orderedResponses); } /// - public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default) - => this.clientSession.CompletePendingAsync(waitForCommit, token); + public ValueTask CompletePendingAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default) + => this.clientSession.CompletePendingAsync(waitForCommit, orderedResponses, token); /// - public ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default) - => this.clientSession.CompletePendingWithOutputsAsync(waitForCommit, token); + public ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, bool orderedResponses = false, CancellationToken token = default) + => this.clientSession.CompletePendingWithOutputsAsync(waitForCommit, orderedResponses, token); /// [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index 180717503..906737397 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -64,8 +64,8 @@ internal enum OperationStatus RETRY_LATER, /// - /// I/O has been enqueued and the caller must go through or - /// , + /// I/O has been enqueued and the caller must go through or + /// , /// or one of the Async forms. /// RECORD_ON_DISK, @@ -238,6 +238,14 @@ public void Dispose() } } + internal class AsyncIoContextComparer : IComparer> + { + public int Compare(AsyncIOContext x, AsyncIOContext y) + { + return x.id.CompareTo(y.id); + } + } + internal sealed class FasterExecutionContext { internal int sessionID; @@ -255,6 +263,12 @@ internal sealed class FasterExecutionContext public Dictionary> ioPendingRequests; public AsyncCountDown pendingReads; public AsyncQueue> readyResponses; +#if NET5_0_OR_GREATER + public PriorityQueue, long> orderedResponses; +#else + public SortedSet> orderedResponses; + public static readonly AsyncIoContextComparer asyncIoContextComparer = new(); +#endif public List excludedSerialNos; public int asyncPendingCount; public ISynchronizationStateMachine threadStateMachine; diff --git a/cs/src/core/Index/FASTER/FASTERThread.cs b/cs/src/core/Index/FASTER/FASTERThread.cs index 69d61c6c4..1de5c54c3 100644 --- a/cs/src/core/Index/FASTER/FASTERThread.cs +++ b/cs/src/core/Index/FASTER/FASTERThread.cs @@ -111,12 +111,13 @@ public partial class FasterKV : FasterBase, IFasterKV } internal bool InternalCompletePending(FasterSession fasterSession, bool wait = false, - CompletedOutputIterator completedOutputs = null) + CompletedOutputIterator completedOutputs = null, + bool orderedResponses = false) where FasterSession : IFasterSession { while (true) { - InternalCompletePendingRequests(fasterSession, completedOutputs); + InternalCompletePendingRequests(fasterSession, completedOutputs, orderedResponses); if (wait) fasterSession.Ctx.WaitPending(epoch); if (fasterSession.Ctx.HasNoPendingRequests) return true; @@ -132,16 +133,62 @@ public partial class FasterKV : FasterBase, IFasterKV #region Complete Pending Requests internal void InternalCompletePendingRequests(FasterSession fasterSession, - CompletedOutputIterator completedOutputs) + CompletedOutputIterator completedOutputs, + bool orderedResponses) where FasterSession : IFasterSession { hlog.TryComplete(); if (fasterSession.Ctx.readyResponses.Count == 0) return; - while (fasterSession.Ctx.readyResponses.TryDequeue(out AsyncIOContext request)) + if (orderedResponses) { - InternalCompletePendingRequest(fasterSession, request, completedOutputs); + long firstId = fasterSession.Ctx.totalPending - fasterSession.Ctx.ioPendingRequests.Count; + + while (fasterSession.Ctx.readyResponses.TryDequeue(out AsyncIOContext request)) + { + if (request.id == firstId) + { + InternalCompletePendingRequest(fasterSession, request, completedOutputs); + firstId++; + } + else + { +#if NET5_0_OR_GREATER + if (fasterSession.Ctx.orderedResponses == null) + fasterSession.Ctx.orderedResponses = new PriorityQueue, long>(); + fasterSession.Ctx.orderedResponses.Enqueue(request, request.id); + while (fasterSession.Ctx.orderedResponses != null && + fasterSession.Ctx.orderedResponses.TryPeek(out request, out _) && + request.id == firstId) + { + var success = fasterSession.Ctx.orderedResponses.TryDequeue(out _, out _); + Debug.Assert(success); + InternalCompletePendingRequest(fasterSession, request, completedOutputs); + firstId++; + } +#else + if (fasterSession.Ctx.orderedResponses == null) + fasterSession.Ctx.orderedResponses = new SortedSet>(FasterExecutionContext.asyncIoContextComparer); + fasterSession.Ctx.orderedResponses.Add(request); + while (fasterSession.Ctx.orderedResponses != null && + fasterSession.Ctx.orderedResponses.Min.id == firstId) + { + var success = fasterSession.Ctx.orderedResponses.Remove(fasterSession.Ctx.orderedResponses.Min); + Debug.Assert(success); + InternalCompletePendingRequest(fasterSession, request, completedOutputs); + firstId++; + } +#endif + } + } + } + else + { + while (fasterSession.Ctx.readyResponses.TryDequeue(out AsyncIOContext request)) + { + InternalCompletePendingRequest(fasterSession, request, completedOutputs); + } } } @@ -221,6 +268,6 @@ public partial class FasterKV : FasterBase, IFasterKV request.Dispose(); return status; } - #endregion +#endregion } } diff --git a/cs/src/core/Index/Interfaces/IFasterSession.cs b/cs/src/core/Index/Interfaces/IFasterSession.cs index 758d4432c..f07c83b44 100644 --- a/cs/src/core/Index/Interfaces/IFasterSession.cs +++ b/cs/src/core/Index/Interfaces/IFasterSession.cs @@ -81,7 +81,7 @@ internal interface IFasterSession : IFasterS void UnlockTransientShared(ref Key key, ref OperationStackContext stackCtx); #endregion - bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false); + bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false, bool orderedResponses = false); public FasterKV.FasterExecutionContext Ctx { get; }