-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
InputEntityReader.java
217 lines (198 loc) · 7.61 KB
/
InputEntityReader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.input;
import java.io.IOException;
import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveIntObjectMap;
import org.neo4j.helpers.collection.PrefetchingIterator;
import org.neo4j.io.ByteUnit;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.transaction.log.LogPositionMarker;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosableChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.unsafe.impl.batchimport.InputIterator;
import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_ENTITIES;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_HEADER;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.GROUP_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_FIRST_PROPERTY_ID;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HIGH_TOKEN_TYPE;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_GROUP;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_GROUP;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.PROPERTY_KEY_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.RELATIONSHIP_TYPE_TOKEN;
/**
* Abstract class for reading cached entities previously stored using {@link InputEntityCacher} or derivative.
*/
abstract class InputEntityReader<ENTITY extends InputEntity> extends PrefetchingIterator<ENTITY>
implements InputIterator<ENTITY>
{
protected final ReadableLogChannel channel;
private final LogPositionMarker positionMarker = new LogPositionMarker();
private int lineNumber;
private final Group[] previousGroups;
private final PrimitiveIntObjectMap<String>[] tokens;
private final Runnable closeAction;
@SuppressWarnings( "unchecked" )
InputEntityReader( StoreChannel channel, StoreChannel header, int bufferSize, int groupSlots,
Runnable closeAction ) throws IOException
{
tokens = new PrimitiveIntObjectMap[HIGH_TOKEN_TYPE];
tokens[PROPERTY_KEY_TOKEN] = Primitive.intObjectMap();
tokens[LABEL_TOKEN] = Primitive.intObjectMap();
tokens[RELATIONSHIP_TYPE_TOKEN] = Primitive.intObjectMap();
tokens[GROUP_TOKEN] = Primitive.intObjectMap();
this.previousGroups = new Group[groupSlots];
for ( int i = 0; i < groupSlots; i++ )
{
previousGroups[i] = Group.GLOBAL;
}
this.channel = reader( channel, bufferSize );
this.closeAction = closeAction;
readHeader( header );
}
private ReadAheadLogChannel reader( StoreChannel channel, int bufferSize ) throws IOException
{
return new ReadAheadLogChannel(
new PhysicalLogVersionedStoreChannel( channel, 0, (byte) 0 ), NO_MORE_CHANNELS, bufferSize );
}
private void readHeader( StoreChannel header ) throws IOException
{
try ( ReadableClosableChannel reader = reader( header, (int) ByteUnit.kibiBytes( 8 ) ) )
{
short[] tokenIds = new short[HIGH_TOKEN_TYPE];
byte type;
while ( (type = reader.get()) != END_OF_HEADER )
{
short tokenId = tokenIds[type]++;
String name = (String) ValueType.stringType().read( reader );
tokens[type].put( tokenId, name );
}
}
}
@Override
protected final ENTITY fetchNextOrNull()
{
try
{
lineNumber++;
Object properties = readProperties();
if ( properties == null )
{
return null;
}
return readNextOrNull( properties );
}
catch ( IOException e )
{
throw new InputException( "Couldn't read cached node data", e );
}
}
protected abstract ENTITY readNextOrNull( Object properties ) throws IOException;
private Object readProperties() throws IOException
{
short count = channel.getShort();
switch ( count )
{
// This is a special value denoting the end of the stream. This is done like this since
// properties are the first thing read for every entity.
case END_OF_ENTITIES: return null;
case HAS_FIRST_PROPERTY_ID: return channel.getLong();
case 0: return InputEntity.NO_PROPERTIES;
default:
Object[] properties = new Object[count*2];
for ( int i = 0; i < properties.length; i++ )
{
properties[i++] = readToken( PROPERTY_KEY_TOKEN );
properties[i] = readValue();
}
return properties;
}
}
protected Object readToken( byte type ) throws IOException
{
short id = channel.getShort();
if ( id == -1 )
{
// This is a real token id
int tokenId = channel.getShort() & 0xFFFF;
return tokenId; // as Integer
}
String name = tokens[type].get( id );
if ( name == null )
{
throw new IllegalArgumentException( "Unknown token " + id );
}
return name;
}
protected Object readValue() throws IOException
{
return ValueType.typeOf( channel.get() ).read( channel );
}
protected Group readGroup( int slot ) throws IOException
{
byte groupMode = channel.get();
switch ( groupMode )
{
case SAME_GROUP: return previousGroups[slot];
case NEW_GROUP: return previousGroups[slot] = new Group.Adapter( channel.getInt(),
(String) readToken( GROUP_TOKEN ) );
default: throw new IllegalArgumentException( "Unknown group mode " + groupMode );
}
}
@Override
public String sourceDescription()
{
return "cache"; // it's OK we shouldn't need these things the second time around
}
@Override
public long lineNumber()
{
return lineNumber;
}
@Override
public long position()
{
try
{
return channel.getCurrentPosition( positionMarker ).getByteOffset();
}
catch ( IOException e )
{
throw new InputException( "Couldn't get position from cached input data", e );
}
}
@Override
public void close()
{
try
{
channel.close();
closeAction.run();
}
catch ( IOException e )
{
throw new InputException( "Couldn't close channel for cached input data", e );
}
}
}