Skip to content

Commit

Permalink
Merge pull request #5002 from peppy/sync-context-run-inline
Browse files Browse the repository at this point in the history
Refactor `SynchronizationContext` to provide order and execution guarantees
  • Loading branch information
smoogipoo committed Jan 26, 2022
2 parents c689797 + e3da77d commit 5ffc25f
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
// See the LICENCE file in the repository root for full licence text.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using osu.Framework.Allocation;
using osu.Framework.Development;
using osu.Framework.Graphics;
using osu.Framework.Graphics.Shapes;
using osu.Framework.Platform;
using osu.Framework.Threading;
using osuTK;

namespace osu.Framework.Tests.Visual.Drawables
Expand All @@ -29,26 +33,78 @@ public void TestAsyncLoadComplete()
AddUntilStep("has spun", () => box.Rotation == 180);
}

private GameThreadSynchronizationContext syncContext => SynchronizationContext.Current as GameThreadSynchronizationContext;

[Test]
public void TestNoAsyncDoesntUseScheduler()
{
int initialTasksRun = 0;
AddStep("get initial run count", () => initialTasksRun = host.UpdateThread.Scheduler.TotalTasksRun);
AddStep("get initial run count", () => initialTasksRun = syncContext.TotalTasksRun);
AddStep("add box", () => Child = box = new AsyncPerformingBox(false));
AddAssert("no tasks run", () => host.UpdateThread.Scheduler.TotalTasksRun == initialTasksRun);
AddAssert("no tasks run", () => syncContext.TotalTasksRun == initialTasksRun);
AddStep("trigger", () => box.ReleaseAsyncLoadCompleteLock());
AddAssert("no tasks run", () => host.UpdateThread.Scheduler.TotalTasksRun == initialTasksRun);
AddAssert("no tasks run", () => syncContext.TotalTasksRun == initialTasksRun);
}

[Test]
public void TestAsyncUsesScheduler()
{
int initialTasksRun = 0;
AddStep("get initial run count", () => initialTasksRun = host.UpdateThread.Scheduler.TotalTasksRun);
AddStep("get initial run count", () => initialTasksRun = syncContext.TotalTasksRun);
AddStep("add box", () => Child = box = new AsyncPerformingBox(true));
AddAssert("no tasks run", () => host.UpdateThread.Scheduler.TotalTasksRun == initialTasksRun);
AddAssert("no tasks run", () => syncContext.TotalTasksRun == initialTasksRun);
AddStep("trigger", () => box.ReleaseAsyncLoadCompleteLock());
AddUntilStep("one new task run", () => host.UpdateThread.Scheduler.TotalTasksRun == initialTasksRun + 1);
AddUntilStep("one new task run", () => syncContext.TotalTasksRun == initialTasksRun + 1);
}

[Test]
public void TestOrderOfExecutionFlushing()
{
List<int> ran = new List<int>();

AddStep("queue items", () =>
{
SynchronizationContext.Current?.Post(_ => ran.Add(1), null);
SynchronizationContext.Current?.Post(_ => ran.Add(2), null);
SynchronizationContext.Current?.Post(_ => ran.Add(3), null);
Assert.That(ran, Is.Empty);
SynchronizationContext.Current?.Send(_ => ran.Add(4), null);
Assert.That(ran, Is.EqualTo(new[] { 1, 2, 3, 4 }));
});
}

[Test]
public void TestOrderOfExecutionFlushingAsyncThread()
{
ManualResetEventSlim finished = new ManualResetEventSlim();
List<int> ran = new List<int>();

AddStep("queue items", () =>
{
var updateContext = SynchronizationContext.Current;
Debug.Assert(updateContext != null);
updateContext.Post(_ => ran.Add(1), null);
updateContext.Post(_ => ran.Add(2), null);
updateContext.Post(_ => ran.Add(3), null);
Assert.That(ran, Is.Empty);
Task.Factory.StartNew(() =>
{
updateContext.Send(_ => ran.Add(4), null);
Assert.That(ran, Is.EqualTo(new[] { 1, 2, 3, 4 }));
finished.Set();
}, TaskCreationOptions.LongRunning);
});

AddUntilStep("wait for completion", () => finished.IsSet);
}

[Test]
Expand All @@ -62,8 +118,12 @@ public void TestAsyncThrows()
// ReSharper disable once AsyncVoidLambda
host.UpdateThread.Scheduler.Add(async () =>
{
Assert.That(ThreadSafety.IsUpdateThread);
await Task.Delay(100).ConfigureAwait(true);
Assert.That(ThreadSafety.IsUpdateThread);
throw new InvalidOperationException();
});
});
Expand Down
15 changes: 5 additions & 10 deletions osu.Framework/Threading/GameThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public double InactiveHz

private double inactiveHz = DEFAULT_INACTIVE_HZ;

private readonly SchedulerSynchronizationContext synchronizationContext;
private readonly GameThreadSynchronizationContext synchronizationContext;

internal PerformanceMonitor Monitor { get; }

Expand Down Expand Up @@ -167,7 +167,7 @@ internal GameThread(Action onNewFrame = null, string name = "unknown", bool moni
Monitor = new PerformanceMonitor(this, StatisticsCounters);

Scheduler = new GameThreadScheduler(this);
synchronizationContext = new SchedulerSynchronizationContext(Scheduler);
synchronizationContext = new GameThreadSynchronizationContext(this);

IsActive.BindValueChanged(_ => updateMaximumHz(), true);
}
Expand Down Expand Up @@ -436,7 +436,10 @@ void runWork()
Monitor?.NewFrame();

using (Monitor?.BeginCollecting(PerformanceCollectionType.Scheduler))
{
Scheduler.Update();
synchronizationContext.RunWork();
}

using (Monitor?.BeginCollecting(PerformanceCollectionType.Work))
OnNewFrame?.Invoke();
Expand Down Expand Up @@ -489,13 +492,5 @@ private void setExitState(GameThreadState exitState)
state.Value = exitState;
}
}

private class GameThreadScheduler : Scheduler
{
public GameThreadScheduler(GameThread thread)
: base(() => thread.IsCurrent, thread.Clock)
{
}
}
}
}
13 changes: 13 additions & 0 deletions osu.Framework/Threading/GameThreadScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.

namespace osu.Framework.Threading
{
internal class GameThreadScheduler : Scheduler
{
public GameThreadScheduler(GameThread thread)
: base(() => thread.IsCurrent, thread.Clock)
{
}
}
}
58 changes: 58 additions & 0 deletions osu.Framework/Threading/GameThreadSynchronizationContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.

using System.Diagnostics;
using System.Threading;

#nullable enable

namespace osu.Framework.Threading
{
/// <summary>
/// A synchronisation context which posts all continuations to an isolated scheduler instance.
/// </summary>
/// <remarks>
/// This implementation roughly follows the expectations set out for winforms/WPF as per
/// https://docs.microsoft.com/en-us/archive/msdn-magazine/2011/february/msdn-magazine-parallel-computing-it-s-all-about-the-synchronizationcontext.
/// - Calls to <see cref="Post"/> are guaranteed to run asynchronously.
/// - Calls to <see cref="Send"/> will run inline when they can.
/// - Order of execution is guaranteed (in our case, it is guaranteed over <see cref="Send"/> and <see cref="Post"/> calls alike).
/// - To enforce the above, calling <see cref="Send"/> will flush any pending work until the newly queued item has been completed.
/// </remarks>
internal class GameThreadSynchronizationContext : SynchronizationContext
{
/// <summary>
/// The total tasks this synchronization context has run.
/// </summary>
public int TotalTasksRun => scheduler.TotalTasksRun;

private readonly Scheduler scheduler;

public GameThreadSynchronizationContext(GameThread gameThread)
{
scheduler = new GameThreadScheduler(gameThread);
}

public override void Send(SendOrPostCallback callback, object? state)
{
var scheduledDelegate = scheduler.Add(() => callback(state));

Debug.Assert(scheduledDelegate != null);

while (scheduledDelegate.State < ScheduledDelegate.RunState.Complete)
{
if (scheduler.IsMainThread)
scheduler.Update();
else
Thread.Sleep(1);
}
}

public override void Post(SendOrPostCallback callback, object? state) => scheduler.Add(() => callback(state));

/// <summary>
/// Run any pending work queued against this synchronization context.
/// </summary>
public void RunWork() => scheduler.Update();
}
}
2 changes: 1 addition & 1 deletion osu.Framework/Threading/Scheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void UpdateClock(IClock newClock)
/// <summary>
/// Returns whether we are on the main thread or not.
/// </summary>
protected bool IsMainThread => isCurrentThread?.Invoke() ?? true;
internal bool IsMainThread => isCurrentThread?.Invoke() ?? true;

private readonly List<ScheduledDelegate> tasksToSchedule = new List<ScheduledDelegate>();
private readonly List<ScheduledDelegate> tasksToRemove = new List<ScheduledDelegate>();
Expand Down
24 changes: 0 additions & 24 deletions osu.Framework/Threading/SchedulerSynchronizationContext.cs

This file was deleted.

0 comments on commit 5ffc25f

Please sign in to comment.