forked from andrewkirillov/AForge.NET
-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathParallel.cs
237 lines (209 loc) · 7.83 KB
/
Parallel.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
// AForge Core Library
// AForge.NET framework
// http://www.aforgenet.com/framework/
//
// Copyright © Andrew Kirillov, 2007-2009
// andrew.kirillov@aforgenet.com
//
// Copyright © Israel Lot, 2008
// israel.lot@gmail.com
//
namespace AForge
{
using System;
using System.Threading;
/// <summary>
/// The class provides support for parallel computations, paralleling loop's iterations.
/// </summary>
///
/// <remarks><para>The class allows to parallel loop's iteration computing them in separate threads,
/// what allows their simultaneous execution on multiple CPUs/cores.
/// </para></remarks>
///
public sealed class Parallel
{
/// <summary>
/// Delegate defining for-loop's body.
/// </summary>
///
/// <param name="index">Loop's index.</param>
///
public delegate void ForLoopBody( int index );
// number of threads for parallel computations
private static int threadsCount = System.Environment.ProcessorCount;
// object used for synchronization
private static object sync = new Object( );
// single instance of the class to implement singleton pattern
private static volatile Parallel instance = null;
// background threads for parallel computation
private Thread[] threads = null;
// events to signal about job availability and thread availability
private AutoResetEvent[] jobAvailable = null;
private ManualResetEvent[] threadIdle = null;
// loop's body and its current and stop index
private int currentIndex;
private int stopIndex;
private ForLoopBody loopBody;
/// <summary>
/// Number of threads used for parallel computations.
/// </summary>
///
/// <remarks><para>The property sets how many worker threads are created for paralleling
/// loops' computations.</para>
///
/// <para>By default the property is set to number of CPU's in the system
/// (see <see cref="System.Environment.ProcessorCount"/>).</para>
/// </remarks>
///
public static int ThreadsCount
{
get { return threadsCount; }
set
{
lock ( sync )
{
threadsCount = Math.Max( 1, value );
}
}
}
/// <summary>
/// Executes a for-loop in which iterations may run in parallel.
/// </summary>
///
/// <param name="start">Loop's start index.</param>
/// <param name="stop">Loop's stop index.</param>
/// <param name="loopBody">Loop's body.</param>
///
/// <remarks><para>The method is used to parallel for-loop running its iterations in
/// different threads. The <b>start</b> and <b>stop</b> parameters define loop's
/// starting and ending loop's indexes. The number of iterations is equal to <b>stop - start</b>.
/// </para>
///
/// <para>Sample usage:</para>
/// <code>
/// Parallel.For( 0, 20, delegate( int i )
/// // which is equivalent to
/// // for ( int i = 0; i < 20; i++ )
/// {
/// System.Diagnostics.Debug.WriteLine( "Iteration: " + i );
/// // ...
/// } );
/// </code>
/// </remarks>
///
public static void For( int start, int stop, ForLoopBody loopBody )
{
lock ( sync )
{
// get instance of parallel computation manager
Parallel instance = Instance;
instance.currentIndex = start - 1;
instance.stopIndex = stop;
instance.loopBody = loopBody;
// signal about available job for all threads and mark them busy
for ( int i = 0; i < threadsCount; i++ )
{
instance.threadIdle[i].Reset( );
instance.jobAvailable[i].Set( );
}
// wait until all threads become idle
for ( int i = 0; i < threadsCount; i++ )
{
instance.threadIdle[i].WaitOne( );
}
instance.loopBody = null;
}
}
// Private constructor to avoid class instantiation
private Parallel( ) { }
// Get instace of the Parallel class
private static Parallel Instance
{
get
{
if ( instance == null )
{
instance = new Parallel( );
instance.Initialize( );
}
else
{
if ( instance.threads.Length != threadsCount )
{
// terminate old threads
instance.Terminate( );
// reinitialize
instance.Initialize( );
// TODO: change reinitialization to reuse already created objects
}
}
return instance;
}
}
// Initialize Parallel class's instance creating required number of threads
// and synchronization objects
private void Initialize( )
{
// array of events, which signal about available job
jobAvailable = new AutoResetEvent[threadsCount];
// array of events, which signal about available thread
threadIdle = new ManualResetEvent[threadsCount];
// array of threads
threads = new Thread[threadsCount];
for ( int i = 0; i < threadsCount; i++ )
{
jobAvailable[i] = new AutoResetEvent( false );
threadIdle[i] = new ManualResetEvent( true );
threads[i] = new Thread( new ParameterizedThreadStart( WorkerThread ) );
threads[i].Name = "AForge.Parallel";
threads[i].IsBackground = true;
threads[i].Start( i );
}
}
// Terminate all worker threads used for parallel computations and close all
// synchronization objects
private void Terminate( )
{
// finish thread by setting null loop body and signaling about available work
loopBody = null;
for ( int i = 0, threadsCount = threads.Length ; i < threadsCount; i++ )
{
jobAvailable[i].Set( );
// wait for thread termination
threads[i].Join( );
// close events
jobAvailable[i].Close( );
threadIdle[i].Close( );
}
// clean all array references
jobAvailable = null;
threadIdle = null;
threads = null;
}
// Worker thread performing parallel computations in loop
private void WorkerThread( object index )
{
int threadIndex = (int) index;
int localIndex = 0;
while ( true )
{
// wait until there is job to do
jobAvailable[threadIndex].WaitOne( );
// exit on null body
if ( loopBody == null )
break;
while ( true )
{
// get local index incrementing global loop's current index
localIndex = Interlocked.Increment( ref currentIndex );
if ( localIndex >= stopIndex )
break;
// run loop's body
loopBody( localIndex );
}
// signal about thread availability
threadIdle[threadIndex].Set( );
}
}
}
}