Permalink
Browse files

WorkerThread now allows you to abort the thread by returning from the…

… worker function. The thread will be recreated the next time the worker is woken up.

TaskScheduler and ConnectionWrapper now time out their worker threads after a set period of inactivity
TaskScheduler and WorkerThread are a little better about handling disposal now
Stop using Thread.Join in WorkerThread.Dispose since it could introduce pauses into UI threads

git-svn-id: http://fracture.googlecode.com/svn/trunk@428 1953d6fe-3836-0410-a91e-07494a8b0067
  • Loading branch information...
1 parent f734fdc commit 21ae17a0a1de8f66640ed5efac346d8b6e50f24b kevin.gadd@gmail.com committed Mar 24, 2011
Showing with 105 additions and 18 deletions.
  1. +2 −2 Squared/Squared.gallio
  2. +6 −1 Squared/TaskLib/Data/Connection.cs
  3. +51 −3 Squared/TaskLib/TaskScheduler.cs
  4. +46 −12 Squared/TaskLib/WorkerThread.cs
View
@@ -10,8 +10,8 @@
<properties />
</testPackage>
<testFilters>
- <testFilter filterName="LastRun" filterExpr="Id: 6d60e1acda533b2f" />
- <testFilter filterName="AutoSave" filterExpr="Id: 6d60e1acda533b2f" />
+ <testFilter filterName="LastRun" filterExpr="" />
+ <testFilter filterName="AutoSave" filterExpr="" />
</testFilters>
<extensionSpecifications />
<reportDirectory>..\..\..\..\AppData\Roaming\Gallio\Icarus\Reports</reportDirectory>
@@ -17,6 +17,8 @@ public class ConnectionDisposedException : Exception {
}
public class ConnectionWrapper : IDisposable {
+ public static readonly int WorkerThreadTimeoutMs = 10000;
+
protected struct PendingQuery {
public readonly Action ExecuteFunc;
public readonly IFuture Future;
@@ -148,6 +150,7 @@ public ConnectionWrapper (TaskScheduler scheduler, IDbConnection connection)
protected void QueryThreadFunc (ConcurrentQueue<PendingQuery> workItems, ManualResetEvent newWorkItemEvent) {
while (true) {
PendingQuery item;
+
while ((_ActiveQuery == null) && workItems.TryDequeue(out item)) {
NotifyQueryBegan(item.Future);
try {
@@ -158,7 +161,9 @@ public ConnectionWrapper (TaskScheduler scheduler, IDbConnection connection)
}
}
- newWorkItemEvent.WaitOne();
+ if (!newWorkItemEvent.WaitOne(WorkerThreadTimeoutMs))
+ return;
+
newWorkItemEvent.Reset();
}
}
@@ -139,13 +139,16 @@ struct SleepItem : IComparable<SleepItem> {
public delegate bool BackgroundTaskErrorHandler (Exception error);
public class TaskScheduler : IDisposable {
+ public static int SleepThreadTimeoutMs = 10000;
+
const long SleepFudgeFactor = 100;
const long SleepSpinThreshold = 1000;
const long MinimumSleepLength = 10000;
const long MaximumSleepLength = Time.SecondInTicks * 60;
public BackgroundTaskErrorHandler ErrorHandler = null;
+ private bool _IsDisposed = false;
private IJobQueue _JobQueue = null;
private ConcurrentQueue<Action> _StepListeners = new ConcurrentQueue<Action>();
private Internal.WorkerThread<PriorityQueue<SleepItem>> _SleepWorker;
@@ -164,6 +167,9 @@ public TaskScheduler ()
}
public bool WaitForWorkItems (double timeout) {
+ if (_IsDisposed)
+ throw new ObjectDisposedException("TaskScheduler");
+
return _JobQueue.WaitForWorkItems(timeout);
}
@@ -174,6 +180,9 @@ public TaskScheduler ()
}
public void OnTaskError (Exception exception) {
+ if (_IsDisposed)
+ throw new TaskException("Unhandled exception in background task", exception);
+
this.QueueWorkItem(() => {
if (ErrorHandler != null)
if (ErrorHandler(exception))
@@ -214,10 +223,16 @@ public TaskScheduler ()
}
public void QueueWorkItem (Action workItem) {
+ if (_IsDisposed)
+ throw new ObjectDisposedException("TaskScheduler");
+
_JobQueue.QueueWorkItem(workItem);
}
internal void AddStepListener (Action listener) {
+ if (_IsDisposed)
+ return;
+
_StepListeners.Enqueue(listener);
}
@@ -238,10 +253,12 @@ public TaskScheduler ()
} else {
Monitor.Exit(pendingSleeps);
#if XBOX
- newSleepEvent.WaitOne(-1);
+ if (!newSleepEvent.WaitOne(SleepThreadTimeoutMs))
#else
- newSleepEvent.WaitOne(-1, false);
+ if (!newSleepEvent.WaitOne(SleepThreadTimeoutMs))
#endif
+ return;
+
newSleepEvent.Reset();
continue;
}
@@ -286,6 +303,9 @@ public TaskScheduler ()
}
internal void QueueSleep (long completeWhen, IFuture future) {
+ if (_IsDisposed)
+ return;
+
long now = Time.Ticks;
if (now > completeWhen) {
future.Complete();
@@ -307,6 +327,9 @@ public TaskScheduler ()
}
public void Step () {
+ if (_IsDisposed)
+ return;
+
BeforeStep();
_JobQueue.Step();
@@ -323,6 +346,9 @@ public TaskScheduler ()
}
public T WaitFor<T> (Future<T> future) {
+ if (_IsDisposed)
+ throw new ObjectDisposedException("TaskScheduler");
+
while (true) {
BeforeStep();
@@ -332,6 +358,9 @@ public TaskScheduler ()
}
public object WaitFor (IFuture future) {
+ if (_IsDisposed)
+ throw new ObjectDisposedException("TaskScheduler");
+
while (true) {
BeforeStep();
@@ -346,9 +375,28 @@ public TaskScheduler ()
}
}
+ public bool IsDisposed {
+ get {
+ return _IsDisposed;
+ }
+ }
+
public void Dispose () {
- _JobQueue.Dispose();
+ if (_IsDisposed)
+ return;
+
+ _IsDisposed = true;
+ Thread.MemoryBarrier();
+
+ lock (_SleepWorker.WorkItems) {
+ while (_SleepWorker.WorkItems.Count > 0) {
+ var item = _SleepWorker.WorkItems.Dequeue();
+ item.Future.Dispose();
+ }
+ }
+
_SleepWorker.Dispose();
+ _JobQueue.Dispose();
}
}
}
@@ -45,6 +45,7 @@ public class WorkerThread<Container> : IDisposable
private ManualResetEvent _WakeEvent = new ManualResetEvent(false);
private Container _WorkItems = new Container();
private ThreadPriority _Priority;
+ private bool _IsDisposed = false;
public WorkerThread (WorkerThreadFunc<Container> threadFunc, ThreadPriority priority) {
_ThreadFunc = threadFunc;
@@ -58,39 +59,72 @@ public class WorkerThread<Container> : IDisposable
}
public void Wake () {
+ if (_IsDisposed)
+ return;
+
if (_WakeEvent != null)
_WakeEvent.Set();
if (_Thread == null) {
- _Thread = new Thread(() => {
+ var newThread = new Thread(() => {
try {
- _ThreadFunc(_WorkItems, _WakeEvent);
+ var wi = _WorkItems;
+ var we = _WakeEvent;
+
+ // If either of these fields are null, we've probably been disposed
+ if ((wi != null) && (we != null))
+ _ThreadFunc(wi, we);
#if !XBOX
} catch (ThreadInterruptedException) {
#endif
} catch (ThreadAbortException) {
}
+
+ var me = Interlocked.Exchange(ref _Thread, null);
+ if (me == Thread.CurrentThread)
+ OnThreadTerminated(Thread.CurrentThread);
+
});
#if !XBOX
- _Thread.Priority = _Priority;
+ newThread.Priority = _Priority;
#endif
- _Thread.IsBackground = true;
- _Thread.Name = String.Format("{0}_{1}", _ThreadFunc.Method.Name, this.GetHashCode());
- _Thread.Start();
+ newThread.IsBackground = true;
+ newThread.Name = String.Format("{0}_{1}", _ThreadFunc.ToString(), this.GetHashCode());
+
+ if (Interlocked.CompareExchange(ref _Thread, newThread, null) == null)
+ newThread.Start();
}
}
+ private void OnThreadTerminated (Thread theThread) {
+ if (_IsDisposed)
+ return;
+
+ var we = _WakeEvent;
+ if ((we != null) && we.WaitOne(1))
+ Wake();
+ }
+
public void Dispose () {
- if (_Thread != null) {
+ if (_IsDisposed)
+ return;
+
+ _IsDisposed = true;
+ Thread.MemoryBarrier();
+
+ var wakeEvent = Interlocked.Exchange(ref _WakeEvent, null);
+
+ var thread = Interlocked.Exchange(ref _Thread, null);
+ if (thread != null) {
#if !XBOX
- _Thread.Interrupt();
+ thread.Interrupt();
+#else
+ thread.Abort();
#endif
- _Thread.Join(10);
- _Thread.Abort();
- _Thread = null;
}
- _WakeEvent = null;
+ if (wakeEvent != null)
+ wakeEvent.Set();
}
}
}

0 comments on commit 21ae17a

Please sign in to comment.