From 9a0c41811637edf4afe0e265e08fdd1cb18109ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Jare=C5=A1?= Date: Wed, 2 Aug 2023 09:39:59 +0200 Subject: [PATCH] Fix timing issue in parallel execution (#4629) --- .../Parallel/ParallelOperationManager.cs | 57 +++++++++-- .../Parallel/ParallelProxyDiscoveryManager.cs | 6 ++ .../Parallel/ParallelProxyExecutionManager.cs | 98 +++++++++++-------- .../Parallel/ParallelRunEventsHandler.cs | 6 ++ 4 files changed, 118 insertions(+), 49 deletions(-) diff --git a/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelOperationManager.cs b/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelOperationManager.cs index f61c535771..df2ef9133a 100644 --- a/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelOperationManager.cs +++ b/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelOperationManager.cs @@ -9,6 +9,7 @@ using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client.Parallel; using Microsoft.VisualStudio.TestPlatform.ObjectModel; +using Microsoft.VisualStudio.TestPlatform.ObjectModel.Client; namespace Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Client; @@ -63,6 +64,8 @@ public ParallelOperationManager(Func !s.HasWork); OccupiedSlotCount = _managerSlots.Count - AvailableSlotCount; + + if (EqtTrace.IsVerboseEnabled) + { + EqtTrace.Verbose($"ParallelOperationManager.SetOccupiedSlotCount: Setting slot counts AvailableSlotCount = {AvailableSlotCount}, OccupiedSlotCount = {OccupiedSlotCount}."); + EqtTrace.Verbose($"Occupied slots:\n{(string.Join("\n", _managerSlots.Where(s => s.HasWork).Select((slot) => $"{slot.Index}: {GetSourcesForSlotExpensive(slot)}").ToArray()))}"); + + } } public void StartWork( @@ -91,6 +101,7 @@ private void SetOccupiedSlotCount() _initializeWorkload = initializeWorkload ?? throw new ArgumentNullException(nameof(initializeWorkload)); _runWorkload = runWorkload ?? throw new ArgumentNullException(nameof(runWorkload)); + EqtTrace.Verbose($"ParallelOperationManager.StartWork: Starting adding {workloads.Count} workloads."); _workloads.AddRange(workloads); ClearSlots(acceptMoreWork: true); @@ -123,7 +134,10 @@ private bool RunWorkInParallel() // so when it is allowed to enter it will try to add more work, but we already cancelled, // so we should not start more work. if (!_acceptMoreWork) + { + EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We don't accept more work, returning false."); return false; + } // We grab all empty slots. var availableSlots = _managerSlots.Where(slot => !slot.HasWork).ToImmutableArray(); @@ -136,11 +150,10 @@ private bool RunWorkInParallel() var workloadsToAdd = availableWorkloads.Take(amount).ToImmutableArray(); // We associate each workload to a slot, if we reached the max parallel - // level, then we will run only initalize step of the given workload. + // level, then we will run only initialize step of the given workload. for (int i = 0; i < amount; i++) { var slot = availableSlots[i]; - slot.HasWork = true; var workload = workloadsToAdd[i]; slot.ShouldPreStart = occupiedSlots + i + 1 > MaxParallelLevel; @@ -152,6 +165,13 @@ private bool RunWorkInParallel() slot.Work = workload.Work; _workloads.Remove(workload); + + EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Adding 1 workload to slot, remaining workloads {_workloads.Count}."); + + // This must be set last, every loop below looks at this property, + // and they can do so from a different thread. So if we mark it as HasWork before actually assigning the work + // we can pick up the slot, but it has no associated work yet. + slot.HasWork = true; } slots = _managerSlots.ToArray(); @@ -172,12 +192,16 @@ private bool RunWorkInParallel() { startedWork++; slot.IsRunning = true; - EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Running on pre-started host: {(DateTime.Now.TimeOfDay - slot.PreStartTime).TotalMilliseconds}ms {slot.InitTask?.Status}"); + if (EqtTrace.IsVerboseEnabled) + { + EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Running on pre-started host for work (source) {GetSourcesForSlotExpensive(slot)}: {(DateTime.Now.TimeOfDay - slot.PreStartTime).TotalMilliseconds}ms {slot.InitTask?.Status}"); + } _runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask); // We already started as many as we were allowed, jump out; if (startedWork == MaxParallelLevel) { + EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {startedWork} work items, which is the max parallel level. Won't start more work."); break; } } @@ -194,7 +218,10 @@ private bool RunWorkInParallel() { startedWork++; slot.IsRunning = true; - EqtTrace.Verbose("ParallelOperationManager.RunWorkInParallel: Started work on a host."); + if (EqtTrace.IsVerboseEnabled) + { + EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Started host in slot number {slot.Index} for work (source): {GetSourcesForSlotExpensive(slot)}."); + } _runWorkload(slot.Manager!, slot.EventHandler!, slot.Work!, slot.IsPreStarted, slot.InitTask); } } @@ -202,6 +229,7 @@ private bool RunWorkInParallel() // We already started as many as we were allowed, jump out; if (startedWork == MaxParallelLevel) { + EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {startedWork} work items, which is the max parallel level. Won't start more work."); break; } } @@ -215,14 +243,19 @@ private bool RunWorkInParallel() preStartedWork++; slot.PreStartTime = DateTime.Now.TimeOfDay; slot.IsPreStarted = true; - EqtTrace.Verbose("ParallelOperationManager.RunWorkInParallel: Pre-starting a host."); + if (EqtTrace.IsVerboseEnabled) + { + EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: Pre-starting a host for work (source): {GetSourcesForSlotExpensive(slot)}."); + } slot.InitTask = _initializeWorkload!(slot.Manager!, slot.EventHandler!, slot.Work!); } } // Return true when we started more work. Or false, when there was nothing more to do. // This will propagate to handling of partial discovery or partial run. - return preStartedWork + startedWork > 0; + var weAddedMoreWork = preStartedWork + startedWork > 0; + EqtTrace.Verbose($"ParallelOperationManager.RunWorkInParallel: We started {preStartedWork + startedWork} work items in here, returning {weAddedMoreWork}."); + return weAddedMoreWork; } public bool RunNextWork(TManager completedManager) @@ -258,6 +291,10 @@ private void ClearCompletedSlot(TManager completedManager) throw new InvalidOperationException("The provided manager was found in multiple slots."); } + if (EqtTrace.IsVerboseEnabled) + { + EqtTrace.Verbose($"ParallelOperationManager.ClearCompletedSlot: Clearing slot number {completedSlot[0].Index} with work (source): {GetSourcesForSlotExpensive(completedSlot[0])}."); + } var slot = completedSlot[0]; slot.PreStartTime = TimeSpan.Zero; slot.Work = default(TWorkload); @@ -273,8 +310,14 @@ private void ClearCompletedSlot(TManager completedManager) } } + private static string GetSourcesForSlotExpensive(ParallelOperationManager.Slot slot) + { + return string.Join(", ", (slot.Work as DiscoveryCriteria)?.Sources ?? (slot.Work as TestRunCriteria)?.Sources ?? Array.Empty()); + } + public void DoActionOnAllManagers(Action action, bool doActionsInParallel = false) { + EqtTrace.Verbose($"ParallelOperationManager.DoActionOnAllManagers: Calling an action on all managers."); // We don't need to lock here, we just grab the current list of // slots that are occupied (have managers) and run action on each one of them. var managers = _managerSlots.Where(slot => slot.HasWork).Select(slot => slot.Manager).ToImmutableArray(); @@ -320,11 +363,13 @@ private static void DoManagerAction(Action action) internal void StopAllManagers() { + EqtTrace.Verbose($"ParallelOperationManager.StopAllManagers: Stopping all managers."); ClearSlots(acceptMoreWork: false); } public void Dispose() { + EqtTrace.Verbose($"ParallelOperationManager.Dispose: Disposing all managers."); ClearSlots(acceptMoreWork: false); } diff --git a/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelProxyDiscoveryManager.cs b/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelProxyDiscoveryManager.cs index bb2179e792..7b871e4e91 100644 --- a/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelProxyDiscoveryManager.cs +++ b/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelProxyDiscoveryManager.cs @@ -287,6 +287,12 @@ private Task InitializeDiscoverTestsOnConcurrentManager(IProxyDiscoveryManager p bool initialized, Task? task) { + // If we do the scheduling incorrectly this will get null. It should not happen, but it has happened before. + if (discoveryCriteria == null) + { + throw new ArgumentNullException(nameof(discoveryCriteria)); + } + // Kick off another discovery task for the next source Task.Run(() => { diff --git a/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelProxyExecutionManager.cs b/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelProxyExecutionManager.cs index ba99581863..b586cfe934 100644 --- a/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelProxyExecutionManager.cs +++ b/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelProxyExecutionManager.cs @@ -159,11 +159,12 @@ public void Close() ? _runCompletedClients == _runStartedClients : _runCompletedClients == _availableWorkloads; - EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Total completed clients = {0}, Run complete = {1}, Run canceled: {2}.", _runCompletedClients, allRunsCompleted, testRunCompleteArgs.IsCanceled); + EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Total workloads = {0}, Total started clients = {1} Total completed clients = {2}, Run complete = {3}, Run canceled: {4}.", _availableWorkloads, _runStartedClients, _runCompletedClients, allRunsCompleted, testRunCompleteArgs.IsCanceled); } if (allRunsCompleted) { + EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: All runs completed stopping all managers."); _parallelOperationManager.StopAllManagers(); return true; } @@ -185,8 +186,13 @@ public void Close() // { // return true; // } + EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Not cancelled or aborted, running next work."); var _ = _parallelOperationManager.RunNextWork(proxyExecutionManager); } + else + { + EqtTrace.Verbose("ParallelProxyExecutionManager: HandlePartialRunComplete: Cancelled or aborted, not running next work."); + } return false; } @@ -403,7 +409,7 @@ private Task PrepareTestRunOnConcurrentManager(IProxyExecutionManager proxyExecu // clients to be done running their workloads when aborting/cancelling and that doesn't // happen with an initialized workload that is never run. // - // Interlocked.Increment(ref _runStartedClients); + // Interlocked.Increment(ref _runStartedClients); <- BUG: Is this a bug waiting to happen for pre-started hosts? proxyExecutionManager.InitializeTestRun(testRunCriteria, eventHandler); }); } @@ -421,54 +427,60 @@ private Task PrepareTestRunOnConcurrentManager(IProxyExecutionManager proxyExecu bool initialized, Task? initTask) { - if (testRunCriteria != null) + // If we do the scheduling incorrectly this will get null. It should not happen, but it has happened before. + if (testRunCriteria == null) { - Task.Run(() => - { - if (!initialized) - { - if (!proxyExecutionManager.IsInitialized) - { - proxyExecutionManager.Initialize(_skipDefaultAdapters); - } + throw new ArgumentNullException(nameof(testRunCriteria)); + } - Interlocked.Increment(ref _runStartedClients); - proxyExecutionManager.InitializeTestRun(testRunCriteria, eventHandler); - } - else + Task.Run(() => + { + if (!initialized) + { + if (!proxyExecutionManager.IsInitialized) { - initTask!.Wait(); + EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Initializing uninitialized client. Started clients: " + _runStartedClients); + proxyExecutionManager.Initialize(_skipDefaultAdapters); } - EqtTrace.Verbose("ParallelProxyExecutionManager: Execution started. Started clients: " + _runStartedClients); + EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Initializing test run. Started clients: " + _runStartedClients); + Interlocked.Increment(ref _runStartedClients); + proxyExecutionManager.InitializeTestRun(testRunCriteria, eventHandler); + } + else + { + EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Waiting for pre-initialized client to finish initialization. Started clients: " + _runStartedClients); + initTask!.Wait(); + EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Pre-initialized client finished initialization. Started clients: " + _runStartedClients); + } - proxyExecutionManager.StartTestRun(testRunCriteria, eventHandler); - }) - .ContinueWith(t => - { - // Just in case, the actual execution couldn't start for an instance. Ensure that - // we call execution complete since we have already fetched a source. Otherwise - // execution will not terminate - EqtTrace.Error("ParallelProxyExecutionManager: Failed to trigger execution. Exception: " + t.Exception); - - var handler = eventHandler; - var exceptionToString = t.Exception?.ToString(); - var testMessagePayload = new TestMessagePayload { MessageLevel = TestMessageLevel.Error, Message = exceptionToString }; - handler.HandleRawMessage(_dataSerializer.SerializePayload(MessageType.TestMessage, testMessagePayload)); - handler.HandleLogMessage(TestMessageLevel.Error, exceptionToString); - - // Send a run complete to caller. Similar logic is also used in ProxyExecutionManager.StartTestRun - // Differences: - // Aborted is sent to allow the current execution manager replaced with another instance - // Ensure that the test run aggregator in parallel run events handler doesn't add these statistics - // (since the test run didn't even start) - var completeArgs = new TestRunCompleteEventArgs(null, false, true, null, new Collection(), new Collection(), TimeSpan.Zero); - handler.HandleTestRunComplete(completeArgs, null, null, null); - }, - TaskContinuationOptions.OnlyOnFaulted); - } + EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Execution starting. Started clients: " + _runStartedClients); - EqtTrace.Verbose("ProxyParallelExecutionManager: No sources available for execution."); + proxyExecutionManager.StartTestRun(testRunCriteria, eventHandler); + EqtTrace.Verbose("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager: Execution started. Started clients: " + _runStartedClients); + }) + .ContinueWith(t => + { + // Just in case, the actual execution couldn't start for an instance. Ensure that + // we call execution complete since we have already fetched a source. Otherwise + // execution will not terminate + EqtTrace.Error("ParallelProxyExecutionManager.StartTestRunOnConcurrentManager(continuation): Failed to trigger execution. Exception: " + t.Exception); + + var handler = eventHandler; + var exceptionToString = t.Exception?.ToString(); + var testMessagePayload = new TestMessagePayload { MessageLevel = TestMessageLevel.Error, Message = exceptionToString }; + handler.HandleRawMessage(_dataSerializer.SerializePayload(MessageType.TestMessage, testMessagePayload)); + handler.HandleLogMessage(TestMessageLevel.Error, exceptionToString); + + // Send a run complete to caller. Similar logic is also used in ProxyExecutionManager.StartTestRun + // Differences: + // Aborted is sent to allow the current execution manager replaced with another instance + // Ensure that the test run aggregator in parallel run events handler doesn't add these statistics + // (since the test run didn't even start) + var completeArgs = new TestRunCompleteEventArgs(null, false, true, null, new Collection(), new Collection(), TimeSpan.Zero); + handler.HandleTestRunComplete(completeArgs, null, null, null); + }, + TaskContinuationOptions.OnlyOnFaulted); } public void InitializeTestRun(TestRunCriteria testRunCriteria, IInternalTestRunEventsHandler eventHandler) diff --git a/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelRunEventsHandler.cs b/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelRunEventsHandler.cs index 1ea36cfd62..9890cf5586 100644 --- a/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelRunEventsHandler.cs +++ b/src/Microsoft.TestPlatform.CrossPlatEngine/Client/Parallel/ParallelRunEventsHandler.cs @@ -66,10 +66,12 @@ internal class ParallelRunEventsHandler : IInternalTestRunEventsHandler ICollection? runContextAttachments, ICollection? executorUris) { + EqtTrace.Verbose($"ParallelRunEventsHandler.HandleTestRunComplete: Handling a run completion, this can be either one part of parallel run completing, or the whole parallel run completing."); var parallelRunComplete = HandleSingleTestRunComplete(testRunCompleteArgs, lastChunkArgs, runContextAttachments, executorUris); if (parallelRunComplete) { + EqtTrace.Verbose($"ParallelRunEventsHandler.HandleTestRunComplete: Whole parallel run completed."); var completedArgs = new TestRunCompleteEventArgs(_runDataAggregator.GetAggregatedRunStats(), _runDataAggregator.IsCanceled, _runDataAggregator.IsAborted, @@ -96,6 +98,10 @@ internal class ParallelRunEventsHandler : IInternalTestRunEventsHandler HandleParallelTestRunComplete(completedArgs); } + else + { + EqtTrace.Verbose($"ParallelRunEventsHandler.HandleTestRunComplete: Single part of parallel run completed, but whole run is not complete yet."); + } } protected bool HandleSingleTestRunComplete(TestRunCompleteEventArgs testRunCompleteArgs,