-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
ForkedProcessorStep.java
297 lines (266 loc) · 11 KB
/
ForkedProcessorStep.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.staging;
import java.util.concurrent.atomic.AtomicReference;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil;
import static java.lang.Integer.max;
import static java.lang.Integer.min;
import static java.lang.System.nanoTime;
import static org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil.getFieldOffset;
/**
* Executes batches by multiple threads. Each threads only processes its own part, e.g. based on node id,
* of a batch such that all threads together fully processes the entire batch.
* This is a very useful technique when the code processing a batch uses data structures that aren't,
* or cannot trivially or efficiently be synchronized and access order, e.g. per node id, must be preserved.
* This is different from {@link ProcessorStep} which has ability of running multiple batches in parallel,
* each batch processed by one thread.
*/
public abstract class ForkedProcessorStep<T> extends AbstractStep<T>
{
// ID 0 is the id of a processor which is always present, no matter how many or few processors
// are assigned to process a batch. Therefore some tasks can be put on this processor, tasks
// which may affect the batches as a whole.
protected static final int MAIN = 0;
private final long COMPLETED_PROCESSORS_OFFSET = getFieldOffset( Unit.class, "completedProcessors" );
private final long PROCESSING_TIME_OFFSET = getFieldOffset( Unit.class, "processingTime" );
private final Object[] forkedProcessors;
private volatile int numberOfForkedProcessors;
private final Unit noop = new Unit( -1, null, 0 );
private final AtomicReference<Unit> head = new AtomicReference<>( noop );
private final AtomicReference<Unit> tail = new AtomicReference<>( noop );
private final Thread downstreamSender;
private volatile int numberOfProcessors = 1;
private final int maxProcessors;
private final int maxQueueLength;
private volatile Thread receiverThread;
protected ForkedProcessorStep( StageControl control, String name, Configuration config )
{
super( control, name, config );
this.maxProcessors = config.maxNumberOfProcessors();
this.forkedProcessors = new Object[this.maxProcessors];
applyProcessorCount();
downstreamSender = new CompletedBatchesSender( name + " [CompletedBatchSender]" );
maxQueueLength = 200 + maxProcessors;
}
private void applyProcessorCount()
{
if ( numberOfForkedProcessors != numberOfProcessors )
{
synchronized ( this )
{
int processors = numberOfProcessors;
while ( numberOfForkedProcessors < processors )
{
forkedProcessors[numberOfForkedProcessors] =
new ForkedProcessor( numberOfForkedProcessors, head.get() );
numberOfForkedProcessors++;
}
while ( numberOfForkedProcessors > processors )
{
numberOfForkedProcessors--;
// It will notice itself later, the most important thing here is that further Units
// will have a lower number of processor as expected max
}
awaitEmpty();
}
}
}
private void awaitEmpty()
{
while ( head.get().ticket - tail.get().ticket > 0 )
{
PARK.park( receiverThread = Thread.currentThread() );
}
}
@Override
public synchronized int processors( int delta )
{
numberOfProcessors = max( 1, min( numberOfProcessors + delta, maxProcessors ) );
return numberOfProcessors;
}
@Override
public void start( int orderingGuarantees )
{
super.start( orderingGuarantees );
downstreamSender.start();
}
@Override
public long receive( long ticket, T batch )
{
long time = nanoTime();
applyProcessorCount();
while ( head.get().ticket - tail.get().ticket >= maxQueueLength )
{
PARK.park( receiverThread = Thread.currentThread() );
}
queuedBatches.incrementAndGet();
Unit unit = new Unit( ticket, batch, numberOfForkedProcessors );
// [old head] [unit]
// ^
// head
Unit myHead = head.getAndSet( unit );
// [old head] -next-> [unit]
myHead.next = unit;
return nanoTime() - time;
}
protected abstract void forkedProcess( int id, int processors, T batch ) throws Throwable;
@SuppressWarnings( "unchecked" )
void sendDownstream( Unit unit )
{
downstreamIdleTime.add( downstream.receive( unit.ticket, unit.batch ) );
}
// One unit of work. Contains the batch along with ticket and meta state during processing such
// as how many processors are done with this batch and link to next batch in the queue.
class Unit
{
private final long ticket;
private final T batch;
// Number of processors which is expected to process this batch, this is the number of processors
// assigned at the time of enqueueing this unit.
private final int processors;
// Updated when a ForkedProcessor have processed this unit.
// Atomic since changed by UnsafeUtil#getAndAddInt/Long.
// Volatile since read by CompletedBatchesSender.
@SuppressWarnings( "unused" )
private volatile int completedProcessors;
@SuppressWarnings( "unused" )
private volatile long processingTime;
// Volatile since assigned by thread enqueueing this unit after changing head of the queue.
private volatile Unit next;
Unit( long ticket, T batch, int processors )
{
this.ticket = ticket;
this.batch = batch;
this.processors = processors;
}
boolean isCompleted()
{
return processors > 0 && processors == completedProcessors;
}
void processorDone( long time )
{
UnsafeUtil.getAndAddLong( this, PROCESSING_TIME_OFFSET, time );
int prevCompletedProcessors = UnsafeUtil.getAndAddInt( this, COMPLETED_PROCESSORS_OFFSET, 1 );
assert prevCompletedProcessors < processors;
}
}
/**
* Checks tail of queue and sends fully completed units downstream. Since
* {@link ForkedProcessorStep#receive(long, Object)} may park on queue bound, this thread will
* unpark the most recent thread calling receive to close that wait gap.
* {@link ForkedProcessor}, the last one processing a unit, will unpark this thread.
*/
private final class CompletedBatchesSender extends Thread
{
CompletedBatchesSender( String name )
{
super( name );
}
@Override
public void run()
{
Unit current = tail.get();
while ( !isCompleted() )
{
Unit candidate = current.next;
if ( candidate != null && candidate.isCompleted() )
{
if ( downstream != null )
{
sendDownstream( candidate );
}
current.next = null;
current = candidate;
tail.set( current );
queuedBatches.decrementAndGet();
doneBatches.incrementAndGet();
totalProcessingTime.add( candidate.processingTime );
checkNotifyEndDownstream();
}
else
{
Thread receiver = ForkedProcessorStep.this.receiverThread;
if ( receiver != null )
{
PARK.unpark( receiver );
}
PARK.park( this );
}
}
}
}
// Processes units, forever walking the queue looking for more units to process.
// If there's no work to do it will park a while, otherwise it will exhaust the queue and process
// as far as it can without park. No external thread unparks these forked processors.
// So in scenarios where a processor isn't fully saturated there may be short periods of parking,
// but should saturate without any park as long as there are units to process.
class ForkedProcessor extends Thread
{
private final int id;
private Unit current;
ForkedProcessor( int id, Unit startingUnit )
{
super( name() + "-" + id );
this.id = id;
this.current = startingUnit;
start();
}
@Override
public void run()
{
try
{
while ( !isCompleted() )
{
Unit candidate = current.next;
if ( candidate != null )
{
if ( id < candidate.processors )
{
// There's work to do
long time = nanoTime();
forkedProcess( id, candidate.processors, candidate.batch );
candidate.processorDone( nanoTime() - time );
}
else
{
// The id of this processor is greater than that of the next unit's expected max.
// This means that the number of assigned processors to this step has decreased
// and that this processor have reached the end of its life.
break;
}
current = candidate;
}
else
{
// There's no work to be done right now, park a while. When we wake up and work have accumulated
// we'll plow throw them w/o park in between anyway.
PARK.park( this );
}
}
}
catch ( Throwable e )
{
issuePanic( e, false );
}
}
}
}