-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
ParallelInputEntityDeserializer.java
310 lines (285 loc) · 12.9 KB
/
ParallelInputEntityDeserializer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.input.csv;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.neo4j.csv.reader.BufferedCharSeeker;
import org.neo4j.csv.reader.CharSeeker;
import org.neo4j.csv.reader.ProcessingSource;
import org.neo4j.csv.reader.SourceTraceability;
import org.neo4j.csv.reader.Source.Chunk;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.collection.PrefetchingIterator;
import org.neo4j.kernel.impl.util.Validator;
import org.neo4j.kernel.impl.util.collection.ContinuableArrayCursor;
import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.input.InputException;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.input.csv.InputGroupsDeserializer.DeserializerFactory;
import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing;
import static org.neo4j.csv.reader.Source.singleChunk;
import static org.neo4j.kernel.impl.util.Validators.emptyValidator;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.noDecorator;
/**
* Deserializes CSV into {@link InputNode} and {@link InputRelationship} and does so by reading characters
* in a dedicated thread while letting one or more threads parse the data. This can only safely be used if
* {@link Configuration#multilineFields()} is {@code false}. Initially only one parsing thread is assigned,
* more can be assigned at any point in time using {@link #processors(int)}.
*
* This class accepts {@link DeserializerFactory}, which normally instantiates {@link InputEntityDeserializer}
* instances.
*
* @param <ENTITY> type of {@link InputEntity} to deserialize into
*/
public class ParallelInputEntityDeserializer<ENTITY extends InputEntity> extends InputIterator.Adapter<ENTITY>
{
private final ProcessingSource source;
private final TicketedProcessing<CharSeeker,Header,ENTITY[]> processing;
private final ContinuableArrayCursor<ENTITY> cursor;
private SourceTraceability last = SourceTraceability.EMPTY;
private final Future<Void> processingCompletion;
@SuppressWarnings( "unchecked" )
public ParallelInputEntityDeserializer( Data<ENTITY> data, Header.Factory headerFactory, Configuration config,
IdType idType, int maxProcessors, int initialProcessors, DeserializerFactory<ENTITY> factory,
Validator<ENTITY> validator, Class<ENTITY> entityClass )
{
// Reader of chunks, characters aligning to nearest newline
source = new ProcessingSource( data.stream(), config.bufferSize(), maxProcessors );
try
{
// Read first chunk explicitly here since it contains the header
Chunk firstChunk = source.nextChunk();
if ( firstChunk.length() == 0 )
{
throw new InputException( "No header defined" );
}
CharSeeker firstSeeker = new BufferedCharSeeker( singleChunk( firstChunk ), config );
Header dataHeader = headerFactory.create( firstSeeker, config, idType );
// Initialize the processing logic for parsing the data in the first chunk, as well as in all other chunk
Decorator<ENTITY> decorator = data.decorator();
// Check if each individual processor can decorate-and-validate themselves or we have to
// defer that to the batch supplier below. We have to defer is decorator is mutable.
boolean deferredValidation = decorator.isMutable();
Decorator<ENTITY> batchDecorator = deferredValidation ? noDecorator() : decorator;
Validator<ENTITY> batchValidator = deferredValidation ? emptyValidator() : validator;
processing = new TicketedProcessing<>( "Parallel input parser", maxProcessors, (seeker, header) ->
{
// Create a local deserializer for this chunk with NO decoration/validation,
// this will happen in an orderly fashion in our post-processor below and done like this
// to cater for decorators which may be mutable and sensitive to ordering, while still putting
// the work of decorating and validating on the processing threads as to not affect performance.
InputEntityDeserializer<ENTITY> chunkDeserializer =
factory.create( header, seeker, batchDecorator, batchValidator );
chunkDeserializer.initialize();
List<ENTITY> entities = new ArrayList<>();
while ( chunkDeserializer.hasNext() )
{
ENTITY next = chunkDeserializer.next();
entities.add( next );
}
return entities.toArray( (ENTITY[]) Array.newInstance( entityClass, entities.size() ) );
},
() -> dataHeader.clone() /*We need to clone the stateful header to each processing thread*/ );
processing.processors( initialProcessors - processing.processors( 0 ) );
// Utility cursor which takes care of moving over processed results from chunk to chunk
Supplier<ENTITY[]> batchSupplier = rebaseBatches( processing );
batchSupplier = deferredValidation ?
decorateAndValidate( batchSupplier, decorator, validator ) : batchSupplier;
cursor = new ContinuableArrayCursor<>( batchSupplier );
// Start an asynchronous slurp of the chunks fed directly into the processors
processingCompletion = processing.slurp( seekers( firstSeeker, source, config ), true );
}
catch ( IOException e )
{
throw new InputException( "Couldn't read first chunk from input", e );
}
}
private Supplier<ENTITY[]> decorateAndValidate( Supplier<ENTITY[]> actual,
Decorator<ENTITY> decorator, Validator<ENTITY> validator )
{
return () ->
{
ENTITY[] entities = actual.get();
if ( entities != null )
{
for ( int i = 0; i < entities.length; i++ )
{
ENTITY entity = decorator.apply( entities[i] );
validator.validate( entity );
entities[i] = entity;
}
}
return entities;
};
}
@Override
protected ENTITY fetchNextOrNull()
{
boolean hasNext;
try
{
hasNext = cursor.next();
}
catch ( TaskExecutionPanicException e )
{
// Getting this exception here means that a processor got an exception and put
// the executor in panic mode. The user would like to see the actual exception
// so we're going to do a little thing here where we take the cause of this
// IllegalStateException and throw it, since this ISE is just a wrapper.
throw Exceptions.launderedException( e.getCause() );
}
if ( hasNext )
{
ENTITY next = cursor.get();
// We keep a reference to the last fetched so that the methods from SourceTraceability can
// be implemented and executed correctly.
last = next;
return next;
}
return null;
}
private static <ENTITY extends InputEntity> Supplier<ENTITY[]> rebaseBatches(
TicketedProcessing<CharSeeker,Header,ENTITY[]> processing )
{
return new Supplier<ENTITY[]>()
{
private String currentSourceDescription;
private long baseLineNumber;
private long basePosition;
@Override
public ENTITY[] get()
{
ENTITY[] batch = processing.next();
if ( batch != null && batch.length > 0 )
{
// OK so we got the next batch from an arbitrary processor (other thread).
// It creates the entities with batch-local line number and position because that's all it knows.
// We, however, know about all the batches and the order of them so we convert the local
// source traceability numbers to global. This will change some fields in the entities
// and for thread-visibility it's OK since this thread which executes right here is the one
// which gets the batches from this deserializer in the end.
// Reset the base numbers if we're venturing into a new source. We rely on the fact that
// the ProcessingSource spawning the chunks which have been processed into entities
// don't mix entities from different sources in the same batch.
ENTITY lastEntity = batch[batch.length-1];
if ( currentSourceDescription == null ||
!currentSourceDescription.equals( lastEntity.sourceDescription() ) )
{
currentSourceDescription = lastEntity.sourceDescription();
baseLineNumber = basePosition = 0;
currentSourceDescription = lastEntity.sourceDescription();
}
// Now we rebase the entities on top of the previous batch we've seen
for ( ENTITY entity : batch )
{
entity.rebase( baseLineNumber, basePosition );
}
// Remember the new numbers to rebase forthcoming batches on
if ( lastEntity.sourceDescription().equals( currentSourceDescription ) )
{
baseLineNumber = lastEntity.lineNumber();
basePosition = lastEntity.position();
}
}
return batch;
}
};
}
private static Iterator<CharSeeker> seekers( CharSeeker firstSeeker, ProcessingSource source, Configuration config )
{
return new PrefetchingIterator<CharSeeker>()
{
private boolean firstReturned;
@Override
protected CharSeeker fetchNextOrNull()
{
// We have the first here explicitly since we read it before starting the general processing
// and extract the header. We want to read the data in it as well and that's why we get it here
if ( !firstReturned )
{
firstReturned = true;
return firstSeeker;
}
// Continue read the next chunk from the source file(s)
try
{
Chunk chunk = source.nextChunk();
return chunk.length() > 0 ? new BufferedCharSeeker( singleChunk( chunk ), config ) : null;
}
catch ( IOException e )
{
throw new InputException( "Couldn't get chunk from source", e );
}
}
};
}
@Override
public void close()
{
if ( processingCompletion.isDone() )
{
processing.shutdown();
}
else
{
processing.panic( new IllegalStateException( "Processing not completed when closing, indicating panic" ) );
}
try
{
source.close();
}
catch ( IOException e )
{
throw new InputException( "Couldn't close source of data chunks", e );
}
finally
{
super.close();
}
}
@Override
public int processors( int delta )
{
return processing.processors( delta );
}
@Override
public String sourceDescription()
{
return last.sourceDescription();
}
@Override
public long lineNumber()
{
return last.lineNumber();
}
@Override
public long position()
{
return last.position();
}
}