diff --git a/nanoFramework.System.Threading/System/Runtime/CompilerServices/AsyncTaskMethodBuilder.cs b/nanoFramework.System.Threading/System/Runtime/CompilerServices/AsyncTaskMethodBuilder.cs index efa58a8..83f7b1f 100644 --- a/nanoFramework.System.Threading/System/Runtime/CompilerServices/AsyncTaskMethodBuilder.cs +++ b/nanoFramework.System.Threading/System/Runtime/CompilerServices/AsyncTaskMethodBuilder.cs @@ -9,6 +9,109 @@ namespace System.Runtime.CompilerServices { + /// + /// Represents a builder for asynchronous methods that return a task. + /// + public struct AsyncTaskMethodBuilder + { + /// + /// Creates an instance of the class. + /// + /// A new instance of the builder. + public static AsyncTaskMethodBuilder Create() + { + Debug.WriteLine($"AsyncTaskMethodBuilder:Create"); + return new AsyncTaskMethodBuilder(); + } + + Task task; + /// + /// Gets the task for this builder + /// + /// + /// The task for this builder. + /// + public Task Task => task; + + /// + /// Schedules the state machine to proceed to the next action when the specified awaiter completes. + /// + /// + /// + /// + /// + public void AwaitOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine) + where TAwaiter : INotifyCompletion + where TStateMachine : IAsyncStateMachine + { + Debug.WriteLine($"AsyncTaskMethodBuilder:AwaitOnCompleted"); + var _stateMachine = stateMachine; + awaiter.OnCompleted(() => + { + Debug.WriteLine($"AsyncTaskMethodBuilder:OnCompleted"); + _stateMachine.MoveNext(); + }); + } + + /// + /// Schedules the state machine to proceed to the next action when the specified awaiter completes. This method can be called from partially trusted code. + /// + /// + /// + /// + /// + public void AwaitUnsafeOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine) + where TAwaiter : ICriticalNotifyCompletion + where TStateMachine : IAsyncStateMachine + { + Debug.WriteLine($"AsyncTaskMethodBuilder:AwaitUnsafeOnCompleted"); + var _stateMachine = stateMachine; + awaiter.OnCompleted(() => + { + Debug.WriteLine($"AsyncTaskMethodBuilder:OnCompleted"); + _stateMachine.MoveNext(); + }); + } + + /// + /// Marks the task as failed and binds the specified exception to the task. + /// + /// + public void SetException(Exception exception) + { + Debug.WriteLine($"AsyncTaskMethodBuilder:SetException"); + task.CompleteWithException(exception); + } + /// + /// Marks the task as successfully completed. + /// + public void SetResult() + { + task.Complete(); + Debug.WriteLine($"AsyncTaskMethodBuilder:SetResult"); + } + /// + /// Associates the builder with the specified state machine. + /// + /// + public void SetStateMachine(IAsyncStateMachine stateMachine) + { + Debug.WriteLine($"AsyncTaskMethodBuilder:SetStateMachine"); + } + + /// + /// Begins running the builder with the associated state machine. + /// + /// + /// + public void Start(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine + { + Debug.WriteLine($"AsyncTaskMethodBuilder:Start"); + task = new Task(); + stateMachine.MoveNext(); + } + } + /// /// Represents a builder for asynchronous methods that return a task. /// @@ -18,7 +121,7 @@ public struct AsyncTaskMethodBuilder Task task; /// - /// Gets the task for this bu + /// Gets the task for this builder /// /// /// The task for this builder. diff --git a/nanoFramework.System.Threading/Threading/CancellationToken.cs b/nanoFramework.System.Threading/Threading/CancellationToken.cs new file mode 100644 index 0000000..6de1ca7 --- /dev/null +++ b/nanoFramework.System.Threading/Threading/CancellationToken.cs @@ -0,0 +1,189 @@ +using System; + +namespace System.Threading +{ + /// + /// Propagates notification that operation should be cancelled. + /// + public struct CancellationToken + { + CancellationTokenSource source; + /// + /// Initializes the System.Threading.CancellationToken. + /// + /// The canceled state for the token. + internal CancellationToken(CancellationTokenSource source) + { + this.source = source; + } + + /// + /// Initializes the System.Threading.CancellationToken. + /// + /// The canceled state for the token. + public CancellationToken(bool canceled) + { + this.source = null; + } + + static readonly CancellationToken none = new CancellationToken(); + + /// + /// Returns an empty System.Threading.CancellationToken value. + /// + /// An empty cancellation token. + public static CancellationToken None => none; + + /// + /// Gets whether cancellation has been requested for this token. + /// + /// true if cancellation has been requested for this token; otherwise, false. + public bool IsCancellationRequested => source.IsCancellationRequested; + + /// + /// Gets whether this token is capable of being in the canceled state. + /// + /// true if this token is capable of being in the canceled state; otherwise, false. + public bool CanBeCanceled => !source.IsCancellationRequested; + + /// + /// Gets a System.Threading.WaitHandle that is signaled when the token is canceled. + /// + /// A System.Threading.WaitHandle that is signaled when the token is canceled. + public WaitHandle WaitHandle => source.WaitHandle; + + /// + /// Determines whether the current System.Threading.CancellationToken instance is equal to the specified token. + /// + /// + /// + /// true if the instances are equal; otherwise, false. Two tokens are equal if they + /// are associated with the same System.Threading.CancellationTokenSource or if they + /// were both constructed from public System.Threading.CancellationToken constructors + /// and their System.Threading.CancellationToken.IsCancellationRequested values are + /// equal. + /// + /// An associated System.Threading.CancellationTokenSource has been disposed. + public bool Equals(CancellationToken other) + { + if (this.source == other.source) + return true; + return false; + } + /// + /// Determines whether the current System.Threading.CancellationToken instance is equal to the specified System.Object. + /// + /// The other object to which to compare this instance. + /// + /// true if other is a System.Threading.CancellationToken and if the two instances + /// are equal; otherwise, false. Two tokens are equal if they are associated with + /// the same System.Threading.CancellationTokenSource or if they were both constructed + /// from public System.Threading.CancellationToken constructors and their System.Threading.CancellationToken.IsCancellationRequested + /// values are equal. + /// + /// An associated System.Threading.CancellationTokenSource has been disposed. + public override bool Equals(object other) + { + if (other is CancellationToken token) + { + return Equals(token); + } + return false; + } + + /// + /// Serves as a hash function for a System.Threading.CancellationToken. + /// + /// A hash code for the current System.Threading.CancellationToken instance. + public override int GetHashCode() + { + return source.GetHashCode(); + } + + /// + /// Registers a delegate that will be called when this System.Threading.CancellationToken is canceled. + /// + /// The delegate to be executed when the System.Threading.CancellationToken is canceled. + /// The System.Threading.CancellationTokenRegistration instance that can be used to deregister the callback. + /// callback is null + public CancellationTokenRegistration Register(Action callback) + { + return Register(callback, true); + } + + /// + /// Registers a delegate that will be called when this System.Threading.CancellationToken is canceled. + /// + /// The delegate to be executed when the System.Threading.CancellationToken is canceled. + /// A value that indicates whether to capture the current System.Threading.SynchronizationContext and use it when invoking the callback. + /// The System.Threading.CancellationTokenRegistration instance that can be used to deregister the callback. + /// callback is null + public CancellationTokenRegistration Register(Action callback, bool useSynchronizationContext) + { + callback = callback ?? throw new ArgumentNullException("callback"); + if (source.IsCancellationRequested) + { + callback(); + } + else if (source != null) + { + return source.NotifyOnCancelled(callback, useSynchronizationContext); + } + return new CancellationTokenRegistration(); + } + /// + /// Registers a delegate that will be called when this System.Threading.CancellationToken is canceled. + /// + /// The delegate to be executed when the System.Threading.CancellationToken is canceled. + /// The state to pass to the callback when the delegate is invoked. This may be null. + /// The System.Threading.CancellationTokenRegistration instance that can be used to deregister the callback. + /// callback is null + public CancellationTokenRegistration Register(Action callback, object state) + { + return Register(() => callback(state), true); + } + /// + /// Registers a delegate that will be called when this System.Threading.CancellationToken is canceled. + /// + /// The delegate to be executed when the System.Threading.CancellationToken is canceled. + /// The state to pass to the callback when the delegate is invoked. This may be null. + /// A Boolean value that indicates whether to capture the current System.Threading.SynchronizationContext and use it when invoking the callback. + /// The System.Threading.CancellationTokenRegistration instance that can be used to deregister the callback. + /// callback is null + public CancellationTokenRegistration Register(Action callback, object state, bool useSynchronizationContext) + { + return Register(() => callback(state), useSynchronizationContext); + } + + /// + /// Throws a System.OperationCanceledException if this token has had cancellation requested + /// + public void ThrowIfCancellationRequested() + { + if (source.IsCancellationRequested) + throw new SystemException();//TODO: throw OperationCanceledException, not defined yet + } + + /// + /// Determines whether two System.Threading.CancellationToken instances are equal. + /// + /// The first instance. + /// The second instance. + /// true if the instances are equal; otherwise, false. + public static bool operator ==(CancellationToken left, CancellationToken right) + { + return left.Equals(right); + } + + /// + /// Determines whether two System.Threading.CancellationToken instances are not equal. + /// + /// The first instance. + /// The second instance. + /// true if the instances are not equal; otherwise, false. + public static bool operator !=(CancellationToken left, CancellationToken right) + { + return !left.Equals(right); + } + } +} diff --git a/nanoFramework.System.Threading/Threading/CancellationTokenRegistration.cs b/nanoFramework.System.Threading/Threading/CancellationTokenRegistration.cs new file mode 100644 index 0000000..511a923 --- /dev/null +++ b/nanoFramework.System.Threading/Threading/CancellationTokenRegistration.cs @@ -0,0 +1,89 @@ +using System; + +namespace System.Threading +{ + /// + /// Represents a callback delegate that has been registered with a System.Threading.CancellationToken. + /// + public struct CancellationTokenRegistration : IDisposable + { + CancellationTokenSource source; + Action onDisposed; + internal CancellationTokenRegistration(CancellationTokenSource source, Action onDisposed) + { + this.source = source; + this.onDisposed = onDisposed; + } + + /// + /// Releases all resources used by the current instance of the System.Threading.CancellationTokenRegistration + /// + public void Dispose() + { + onDisposed?.Invoke(); + } + + /// + /// Determines whether the current System.Threading.CancellationTokenRegistration instance is equal to the specified System.Threading.CancellationTokenRegistration. + /// + /// The other System.Threading.CancellationTokenRegistration to which to compare this instance. + /// + /// True, if both this and other are equal. False, otherwise. Two System.Threading.CancellationTokenRegistration + /// instances are equal if they both refer to the output of a single call to the + /// same Register method of a System.Threading.CancellationToken. + /// + public override bool Equals(object obj) + { + if (obj is CancellationTokenRegistration other) + { + return source == other.source; + } + return false; + } + + /// + /// Determines whether the current System.Threading.CancellationTokenRegistration instance is equal to the specified System.Threading.CancellationTokenRegistration. + /// + /// The other System.Threading.CancellationTokenRegistration to which to compare this instance. + /// + /// True, if both this and other are equal. False, otherwise. Two System.Threading.CancellationTokenRegistration + /// instances are equal if they both refer to the output of a single call to the + /// same Register method of a System.Threading.CancellationToken. + /// + public bool Equals(CancellationTokenRegistration other) + { + return source == other.source; + } + + /// + /// Serves as a hash function for a System.Threading.CancellationTokenRegistration. + /// + /// A hash code for the current System.Threading.CancellationTokenRegistration instance. + public override int GetHashCode() + { + return source.GetHashCode(); + } + + /// + /// Determines whether two System.Threading.CancellationTokenRegistration instances are equal. + /// + /// The first instance. + /// The second instance. + /// True if the instances are not equal; otherwise, false. + public static bool operator ==(CancellationTokenRegistration left, CancellationTokenRegistration right) + { + return left.Equals(right); + } + + /// + /// Determines whether two System.Threading.CancellationTokenRegistration instances are not equal. + /// + /// The first instance. + /// The second instance. + /// True if the instances are not equal; otherwise, false. + public static bool operator !=(CancellationTokenRegistration left, CancellationTokenRegistration right) + { + return !left.Equals(right); + } + } +} diff --git a/nanoFramework.System.Threading/Threading/CancellationTokenSource.cs b/nanoFramework.System.Threading/Threading/CancellationTokenSource.cs new file mode 100644 index 0000000..a64a92d --- /dev/null +++ b/nanoFramework.System.Threading/Threading/CancellationTokenSource.cs @@ -0,0 +1,173 @@ +using System; +using System.Collections; + +namespace System.Threading +{ + /// + /// Signals to a System.Threading.CancellationToken that it should be canceled. + /// + public class CancellationTokenSource : IDisposable + { + /// + /// Initializes a new instance of the System.Threading.CancellationTokenSource class. + /// + public CancellationTokenSource() + { + } + + /// + /// Initializes a new instance of the System.Threading.CancellationTokenSource class + // that will be canceled after the specified time span. + /// + /// The time interval to wait before canceling this System.Threading.CancellationTokenSource. + public CancellationTokenSource(TimeSpan delay) + { + CancelAfter((int)delay.TotalMilliseconds); + } + + /// + /// Initializes a new instance of the System.Threading.CancellationTokenSource class + // that will be canceled after the specified delay in milliseconds. + /// + /// The time interval in milliseconds to wait before canceling this System.Threading.CancellationTokenSource. + public CancellationTokenSource(int millisecondsDelay) + { + CancelAfter(millisecondsDelay); + } + + /// + /// Gets whether cancellation has been requested for this System.Threading.CancellationTokenSource. + /// + /// + /// true if cancellation has been requested for this System.Threading.CancellationTokenSource; + /// otherwise, false. + /// + public bool IsCancellationRequested { get; private set; } + + /// + /// Gets the System.Threading.CancellationToken associated with this System.Threading.CancellationTokenSource. + /// + /// + /// The System.Threading.CancellationToken associated with this System.Threading.CancellationTokenSource. + /// + public CancellationToken Token => new CancellationToken(this); + + /// + /// Communicates a request for cancellation. + /// + /// This System.Threading.CancellationTokenSource has been disposed + public void Cancel() + { + DoCancellation(); + } + /// + /// true if exceptions should immediately propagate; otherwise, false. + /// + /// + /// This System.Threading.CancellationTokenSource has been disposed. + public void Cancel(bool throwOnFirstException) + { + DoCancellation(); + } + /// + /// Schedules a cancel operation on this System.Threading.CancellationTokenSource + /// after the specified time span. + /// + /// The time span to wait before canceling this System.Threading.CancellationTokenSource. + /// This System.Threading.CancellationTokenSource has been disposed. + public void CancelAfter(TimeSpan delay) + { + CancelAfter((int)delay.TotalMilliseconds); + } + + /// + /// Schedules a cancel operation on this System.Threading.CancellationTokenSource + /// after the specified number of milliseconds. + /// + /// The time span to wait before canceling this System.Threading.CancellationTokenSource. + /// This System.Threading.CancellationTokenSource has been disposed. + public void CancelAfter(int millisecondsDelay) + { + if (isDisposed) + throw new ObjectDisposedException(); + new Thread(() => + { + Thread.Sleep(millisecondsDelay); + DoCancellation(); + }).Start(); + } + + struct CancellationNotificationInfo + { + public Action Action; + public SynchronizationContext SynchronizationContext; + } + + AutoResetEvent cancelSignal; + internal WaitHandle WaitHandle + { + get + { + if (cancelSignal == null) + cancelSignal = new AutoResetEvent(false); + return cancelSignal; + } + } + + ArrayList cancelNotifications = new ArrayList(); + void DoCancellation() + { + if (isDisposed) + throw new ObjectDisposedException(); + if (IsCancellationRequested) + { + throw new Exception("Already cancelled"); + } + IsCancellationRequested = true; + if (cancelSignal != null) + { + cancelSignal.Set(); + } + foreach(var item in cancelNotifications) + { + var kv = (DictionaryEntry)item; + var notification = (CancellationNotificationInfo)kv.Value; + if (notification.SynchronizationContext != null) + { + notification.SynchronizationContext.Post((_) => notification.Action(), null); + } + else + { + notification.Action(); + } + } + cancelNotifications.Clear(); + } + + internal CancellationTokenRegistration NotifyOnCancelled(Action action, bool useSynchronizationContext) + { + int id = action.GetHashCode(); + var entry = new DictionaryEntry(id, new CancellationNotificationInfo() + { + Action = action, + SynchronizationContext = useSynchronizationContext ? SynchronizationContext.Current : null + }); + CancellationTokenRegistration registration = new CancellationTokenRegistration(this, () => + { + cancelNotifications.Remove(id); + }); + cancelNotifications.Add(entry); + return registration; + } + + bool isDisposed; + /// + /// Releases all resources used by the current instance of the System.Threading.CancellationTokenSource + /// class. + /// + public void Dispose() + { + isDisposed = true; + } + } +} diff --git a/nanoFramework.System.Threading/Threading/Internal/CircularQueue.cs b/nanoFramework.System.Threading/Threading/Internal/CircularQueue.cs index 0642feb..4f5c972 100644 --- a/nanoFramework.System.Threading/Threading/Internal/CircularQueue.cs +++ b/nanoFramework.System.Threading/Threading/Internal/CircularQueue.cs @@ -22,11 +22,11 @@ public CircularQueue(int depth) put = 0; get = 0; this.depth = depth; - data = new T[(long)depth]; + data = new T[depth]; size = 0; } - public bool Get(ref T data) + public bool Dequeue(ref T data) { if (size == 0) return false; @@ -46,14 +46,11 @@ public bool Peek(ref T data) return true; } - public bool Put(T data) + public bool Enqueue(T data) { - Debug.WriteLine("Put"); if (size >= depth) return false; - Debug.WriteLine("Putt"); this.data[put] = data; - Debug.WriteLine("Puttt"); put++; size++; if (put >= depth) diff --git a/nanoFramework.System.Threading/Threading/Internal/ThreadWorker.cs b/nanoFramework.System.Threading/Threading/Internal/ThreadWorker.cs index c9f835e..e57e83b 100644 --- a/nanoFramework.System.Threading/Threading/Internal/ThreadWorker.cs +++ b/nanoFramework.System.Threading/Threading/Internal/ThreadWorker.cs @@ -9,7 +9,7 @@ namespace System.Threading.Internal { - internal class ThreadWorker// : IDisposable + internal class ThreadWorker { Thread thread; WaitCallback callback; @@ -17,7 +17,7 @@ internal class ThreadWorker// : IDisposable internal bool IsFree => thread == null || callback == null || thread.ThreadState == ThreadState.Suspended; internal int Id => thread != null ? thread.ManagedThreadId : -1; internal SynchronizationContext SynchronizationContext { get; private set; } - static bool cantSuspend = true;//false; + AutoResetEvent run = new AutoResetEvent(false); void Start() { @@ -38,28 +38,7 @@ void Start() if (callback != null) continue; Debug.WriteLine($"Thread {Id} exited"); - if (cantSuspend)//if platform doesnt support suspend, use polling - { - while (callback == null) - { - Thread.Sleep(100); - } - } - else - { - try - { - Thread.CurrentThread.Suspend(); //if platform doesnt support suspend, use polling - } - catch - { - cantSuspend = true; - while (callback == null) - { - Thread.Sleep(100); - } - } - } + run.WaitOne(); } }); SynchronizationContext = new NanoFrameworkSynchronizationContext(thread.ManagedThreadId); @@ -76,15 +55,9 @@ public void Post(WaitCallback callback, object state) } else { - if (!cantSuspend) - thread.Resume(); + run.Set(); } } - - //public void Dispose() - //{ - // thread?.Abort(); - //} } } diff --git a/nanoFramework.System.Threading/Threading/Tasks/Task.cs b/nanoFramework.System.Threading/Threading/Tasks/Task.cs index 3a066f0..4433217 100644 --- a/nanoFramework.System.Threading/Threading/Tasks/Task.cs +++ b/nanoFramework.System.Threading/Threading/Tasks/Task.cs @@ -18,14 +18,24 @@ public class Task TaskAwaiter awaiter; ArrayList _continuations = new ArrayList(); + protected AutoResetEvent resultWaitHandle = new AutoResetEvent(false); + bool isCompleted; /// /// Gets a value that indicates whether the task has completed. /// /// /// if the task has completed (that is, the task is in one of the three final states: RanToCompletion, Faulted, or Canceled); otherwise, . /// - public bool IsCompleted { get; protected set; } + public bool IsCompleted + { + get => isCompleted; + set + { + isCompleted = true; + resultWaitHandle.Set(); + } + } /// /// Gets the AggregateException that caused the to end prematurely. If the completed successfully or has not yet thrown any exceptions, this will return . @@ -43,7 +53,22 @@ public class Task /// public bool IsCompletedSuccessfully => IsCompleted && Exception == null; -// TODO this is not in the .NET API + /// + /// Control flags for a task + /// + protected enum Flags + { + /// + /// Run the task continuations on the same thread that start the task + /// + ContinueOnSameContext = 1 + } + /// + /// Internal control paramters for this task + /// + protected Flags flags = Flags.ContinueOnSameContext; + + // TODO this is not in the .NET API public static Task FromEvent(EventHandler callback) { var tcs = new TaskCompletionSource(); @@ -179,7 +204,14 @@ public Task(Action action):this() { action(); Debug.WriteLine("Calling Complete"); - syncContext.Post((__) => Complete(), null); + if (flags.HasFlag(Flags.ContinueOnSameContext)) + { + syncContext.Post((__) => Complete(), null); + } + else + { + Complete(); + } Debug.WriteLine("Called Complete"); } catch (Exception e) @@ -231,6 +263,20 @@ public Task ContinueWith(Action continuation) return task; } + /// + /// Make task continuation not neccessarily run on the same context where it is started. Gives better performance + /// + /// + /// Same instance of task + public Task ConfigureAwait(bool continueOnSameContext = true) + { + if (continueOnSameContext == false) + { + flags &= ~Flags.ContinueOnSameContext; + } + return this; + } + internal void OnCompleted(Action continuation) { if (IsCompleted) @@ -239,12 +285,6 @@ internal void OnCompleted(Action continuation) _continuations.Add(continuation); } - //public Task ContinueWith(Action continuation) - //{ - // var task = new Task(continuation); - // return task; - //} - internal void Complete() { Debug.WriteLine("Completing"); @@ -260,11 +300,18 @@ protected internal void CompleteWithException(Exception e) } // TODO check .NET + /// + /// Get the result of the async operation. Block the calling thread until task completes + /// public void GetResult() { - while (!IsCompleted) ; + resultWaitHandle.WaitOne(); + //use it as one shot + resultWaitHandle.Set(); if (Exception != null) + { throw Exception; + } } } @@ -287,8 +334,10 @@ public TResult Result { get { - // TODO check loop - while (!IsCompleted); + resultWaitHandle.WaitOne(); + resultWaitHandle.Set(); //use it as one shot + //// TODO check loop + //while (!IsCompleted); if (Exception != null) { @@ -326,7 +375,14 @@ public Task(Func function) :this() try { var result = function(); - syncContext.Post((r) => Complete((TResult)r), result); + if (flags.HasFlag(Flags.ContinueOnSameContext)) + { + syncContext.Post((r) => Complete((TResult)r), result); + } + else + { + Complete(result); + } } catch (Exception e) { @@ -353,12 +409,29 @@ public Task ContinueWith(Func conti return task; } + /// + /// Make task continuation not neccessarily run on the same context where it is started. Gives better performance + /// + /// + /// same instance of task + public new Task ConfigureAwait(bool continueOnSameContext = true) + { + if (continueOnSameContext == false) + { + flags &= ~Flags.ContinueOnSameContext; + } + return this; + } + internal void OnCompleted(Action continuation) { OnCompleted(() => continuation(GetResult())); } // TODO check .NET + /// + /// Get the result of the async operation. Block the calling thread until task completes + /// public new TResult GetResult() { return Result; @@ -366,10 +439,9 @@ internal void OnCompleted(Action continuation) internal void Complete(TResult result) { - IsCompleted = true; Result = result; RunContinuations(); - Debug.WriteLine("Complete Dome"); + Debug.WriteLine("Complete Done"); } /// diff --git a/nanoFramework.System.Threading/Threading/ThreadPool.cs b/nanoFramework.System.Threading/Threading/ThreadPool.cs index 6e111d5..d626a82 100644 --- a/nanoFramework.System.Threading/Threading/ThreadPool.cs +++ b/nanoFramework.System.Threading/Threading/ThreadPool.cs @@ -4,6 +4,7 @@ // See LICENSE file in the project root for full license information. // +using System.Diagnostics; using System.Threading.Internal; namespace System.Threading @@ -13,8 +14,10 @@ namespace System.Threading /// public static class ThreadPool { - const int Workers = 12; - const int WorkItems = 13; + const int Workers = 64; //maximum number of workers + //maximum numbers of queued works. works are queued when all workes are already working + //So we san have a maximum number of (Workers+WorkItems) posted concurrently + const int WorkItems = 64; //using fixed array for performance static CircularQueue workers = new CircularQueue(Workers); static CircularQueue pendingWorks = new CircularQueue(WorkItems); @@ -25,12 +28,15 @@ static internal void RunPendingWorkItems(ThreadWorker callingWorker) { lock (mlock) { - //first find the first workitem that was requested to be run on this calling pool and start it + //first find the first workitem that was requested to be run on this calling worker and start it for (int i = 0; i < pendingWorks.Count; i++) { - WorkItem work = pendingWorks[i]; //TODO: remove this workitem from queue - if (work.workerId == callingWorker.Id) //if the work must be run on this thread + //TODO: remove this workitem from queue, figured it will be removed below + WorkItem work = pendingWorks[i]; + //if the work must be run on this thread + if (work.workerId == callingWorker.Id) { + //post the job back to the callingWorker callingWorker.Post(work.callBack, work.state); } } @@ -44,9 +50,10 @@ static internal void RunPendingWorkItems(ThreadWorker callingWorker) do { WorkItem work = default; - if (pendingWorks.Get(ref work)) + if (pendingWorks.Dequeue(ref work)) { - if (work.workerId < 0) //if the work can be run on any thread + //if the work can be run on any thread + if (work.workerId < 0) { pool.Post(work.callBack, work.state); } @@ -68,32 +75,37 @@ static ThreadWorker GetWorkerById(int id) { pool = workers[i]; if (pool.Id == id) + { return pool; + } } return null; } static ThreadWorker GetFreeWorker() { - ThreadWorker pool = null; + ThreadWorker worker = null; for (int i = 0; i < workers.Count; i++) { - pool = workers[i]; - if (pool.IsFree) - return pool; + worker = workers[i]; + if (worker.IsFree) + { + return worker; + } } return null; } static ThreadWorker GetOrCreateFreeWorker() { - ThreadWorker pool = GetFreeWorker(); - if (pool == null) + ThreadWorker worker = GetFreeWorker(); + if (worker == null) { - pool = new ThreadWorker(); - workers.Put(pool); + worker = new ThreadWorker(); + workers.Enqueue(worker); + Debug.WriteLine($"{workers.Count} workers started"); } - return pool; + return worker; } /// @@ -170,13 +182,14 @@ public static void GetMinThreads(out int workerThreads, out int completionPortTh /// true if the method is successfully queued; System.NotSupportedException is thrown if the work item could not be queued. public static bool QueueUserWorkItem(WaitCallback callBack) { - ThreadWorker pool = GetOrCreateFreeWorker(); - if (pool != null) + ThreadWorker worker = GetOrCreateFreeWorker(); + if (worker != null) { - pool.Post(callBack, null); + worker.Post(callBack, null); return true; } - return pendingWorks.Put(new WorkItem(callBack, null)); + //queue a work item that is not bound to a specific thread context + return pendingWorks.Enqueue(new WorkItem(callBack, null)); } /// @@ -187,13 +200,14 @@ public static bool QueueUserWorkItem(WaitCallback callBack) /// if the method is successfully queued; is thrown if the work item could not be queued. public static bool QueueUserWorkItem(WaitCallback callBack, object state) { - ThreadWorker pool = GetOrCreateFreeWorker(); - if (pool != null) + ThreadWorker worker = GetOrCreateFreeWorker(); + if (worker != null) { - pool.Post(callBack, state); + worker.Post(callBack, state); return true; } - return pendingWorks.Put(new WorkItem(callBack, state)); + //queue a work item that is not bound to a specific thread context + return pendingWorks.Enqueue(new WorkItem(callBack, state)); } /// @@ -211,35 +225,36 @@ public static bool QueueUserWorkItem(Action callBack, TState sta throw new Exception("PreferLocal:true not supported"); } - ThreadWorker pool = GetOrCreateFreeWorker(); + ThreadWorker worker = GetOrCreateFreeWorker(); - if (pool != null) + if (worker != null) { - pool.Post((_) => callBack(state), null); + worker.Post((_) => callBack(state), null); return true; } - return pendingWorks.Put(new WorkItem((_) => callBack(state), state)); + return pendingWorks.Enqueue(new WorkItem((_) => callBack(state), state)); } internal static bool QueueUserWorkItemOnSpecificWorker(int threadId, WaitCallback callBack, object state) { - ThreadWorker pool = GetWorkerById(threadId); + ThreadWorker worker = GetWorkerById(threadId); - if (pool == null) + if (worker == null) { throw new Exception($"No such worker with id {threadId}"); } - if (pool.IsFree) + if (worker.IsFree) { - pool.Post(callBack, state); + worker.Post(callBack, state); return true; } else { - return pendingWorks.Put(new WorkItem((_) => callBack(state), state, threadId)); + //queue a work item that is bound to a specific thread context(threadId) + return pendingWorks.Enqueue(new WorkItem((_) => callBack(state), state, threadId)); } } diff --git a/nanoFramework.System.Threading/nanoFramework.System.Threading.nfproj b/nanoFramework.System.Threading/nanoFramework.System.Threading.nfproj index 2100764..4e8688b 100644 --- a/nanoFramework.System.Threading/nanoFramework.System.Threading.nfproj +++ b/nanoFramework.System.Threading/nanoFramework.System.Threading.nfproj @@ -1,8 +1,8 @@  - - - + + + $(MSBuildToolsPath)..\..\..\nanoFramework\v1.0\ @@ -62,6 +62,9 @@ + + + @@ -84,6 +87,11 @@ ..\packages\nanoFramework.CoreLibrary.1.10.1-preview.7\lib\mscorlib.dll True + + ..\packages\nanoFramework.System.Collections.1.2.0-preview.22\lib\nanoFramework.System.Collections.dll + True + True + @@ -97,14 +105,14 @@ This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105.The missing file is {0}. - - - - - - + + + + + + - - - - + + + + \ No newline at end of file diff --git a/nanoFramework.System.Threading/packages.config b/nanoFramework.System.Threading/packages.config index 955c683..1be7e18 100644 --- a/nanoFramework.System.Threading/packages.config +++ b/nanoFramework.System.Threading/packages.config @@ -1,8 +1,9 @@  - - - + + + + \ No newline at end of file