-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
RecordStore.java
576 lines (497 loc) · 22.2 KB
/
RecordStore.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.store;
import java.io.File;
import java.util.Collection;
import java.util.function.Predicate;
import org.neo4j.graphdb.ResourceIterable;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.helpers.progress.ProgressListener;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.store.id.IdRange;
import org.neo4j.kernel.impl.store.id.IdSequence;
import org.neo4j.kernel.impl.store.id.IdType;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.store.record.DynamicRecord;
import org.neo4j.kernel.impl.store.record.LabelTokenRecord;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.PropertyKeyTokenRecord;
import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.kernel.impl.store.record.Record;
import org.neo4j.kernel.impl.store.record.RecordLoad;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.kernel.impl.store.record.RelationshipTypeTokenRecord;
/**
* A store for {@link #updateRecord(AbstractBaseRecord) updating} and
* {@link #getRecord(long, AbstractBaseRecord, RecordLoad) getting} records.
*
* There are two ways of getting records, either one-by-one using
* {@link #getRecord(long, AbstractBaseRecord, RecordLoad)}, passing in record retrieved from {@link #newRecord()}.
* This to make a conscious decision about who will create the record instance and in that process figure out
* ways to reduce number of record instances created. The other way is to use a {@link RecordCursor}, created
* by {@link #newRecordCursor(AbstractBaseRecord)} and placed at a certain record using
* {@link RecordCursor#placeAt(long, RecordLoad)}. A {@link RecordCursor} will keep underlying
* {@link PageCursor} open until until the {@link RecordCursor} is closed and so will be efficient if multiple
* records are retrieved from it. A {@link RecordCursor} will follow {@link #getNextRecordReference(AbstractBaseRecord)}
* references to get to {@link RecordCursor#next()} record.
*
* @param <RECORD> type of {@link AbstractBaseRecord}.
*/
public interface RecordStore<RECORD extends AbstractBaseRecord> extends IdSequence
{
/**
* @return the {@link File} that backs this store.
*/
File getStorageFileName();
/**
* @return high id of this store, i.e an id higher than any in use record.
*/
long getHighId();
/**
* @return highest id in use in this store.
*/
long getHighestPossibleIdInUse();
/**
* Sets highest id in use for this store. This is for when records are applied to this store where
* the ids have been generated through some other means. Having an up to date highest possible id
* makes sure that closing this store truncates at the right place and that "all record scans" can
* see all records.
*
* @param highestIdInUse highest id that is now in use in this store.
*/
void setHighestPossibleIdInUse( long highestIdInUse );
/**
* @return a new record instance for receiving data by {@link #getRecord(long, AbstractBaseRecord, RecordLoad)}
* and {@link #newRecordCursor(AbstractBaseRecord)}.
*/
RECORD newRecord();
/**
* Reads a record from the store into {@code target}. Depending on {@link RecordLoad} given there will
* be different behavior, although the {@code target} record will be marked with the specified
* {@code id} after participating in this method call.
* <ul>
* <li>{@link RecordLoad#CHECK}: As little data as possible is read to determine whether or not the record
* is in use. If not in use then no more data will be loaded into the target record and
* the the data of the record will be {@link AbstractBaseRecord#clear() cleared}.</li>
* <li>{@link RecordLoad#NORMAL}: Just like {@link RecordLoad#CHECK}, but with the difference that
* an {@link InvalidRecordException} will be thrown if the record isn't in use.</li>
* <li>{@link RecordLoad#FORCE}: The entire contents of the record will be loaded into the target record
* regardless if the record is in use or not. This leaves no guarantees about the data in the record
* after this method call, except that the id will be the specified {@code id}.
*
* @param id the id of the record to load.
* @param target record where data will be loaded into. This record will have its id set to the specified
* {@code id} as part of this method call.
* @param mode loading behaviour, read more in method description.
* @return the record that was passed in, for convenience.
* @throws InvalidRecordException if record not in use and the {@code mode} allows for throwing.
*/
RECORD getRecord( long id, RECORD target, RecordLoad mode ) throws InvalidRecordException;
/**
* Opens a {@link PageCursor} on this store, capable of reading records using
* {@link #getRecordByCursor(long, AbstractBaseRecord, RecordLoad, PageCursor)}.
* The caller is responsible for closing it when done with it.
*
* @param id cursor will initially be placed at the page containing this record id.
* @return PageCursor for reading records.
*/
PageCursor openPageCursorForReading( long id );
/**
* Reads a record from the store into {@code target}, see
* {@link RecordStore#getRecord(long, AbstractBaseRecord, RecordLoad)}.
* <p>
* The provided page cursor will be used to get the record, and in doing this it will be redirected to the
* correct page if needed.
*
* @param id the record id, understood to be the absolute reference to the store.
* @param target the record to fill.
* @param mode loading behaviour, read more in {@link RecordStore#getRecord(long, AbstractBaseRecord, RecordLoad)}.
* @param cursor the PageCursor to use for record loading.
* @throws InvalidRecordException if record not in use and the {@code mode} allows for throwing.
*/
void getRecordByCursor( long id, RECORD target, RecordLoad mode, PageCursor cursor ) throws InvalidRecordException;
/**
* Reads a record from the store into {@code target}, see
* {@link RecordStore#getRecord(long, AbstractBaseRecord, RecordLoad)}.
* <p>
* This method requires that the cursor page and offset point to the first byte of the record in target on calling.
* The provided page cursor will be used to get the record, and in doing this it will be redirected to the
* next page if the input record was the last on it's page.
*
* @param target the record to fill.
* @param mode loading behaviour, read more in {@link RecordStore#getRecord(long, AbstractBaseRecord, RecordLoad)}.
* @param cursor the PageCursor to use for record loading.
* @throws InvalidRecordException if record not in use and the {@code mode} allows for throwing.
*/
void nextRecordByCursor( RECORD target, RecordLoad mode, PageCursor cursor ) throws InvalidRecordException;
/**
* For stores that have other stores coupled underneath, the "top level" record will have a flag
* saying whether or not it's light. Light means that no records from the coupled store have been loaded yet.
* This method can load those records and enrich the target record with those, marking it as heavy.
*
* @param record record to make heavy, if not already.
*/
void ensureHeavy( RECORD record );
/**
* Reads records that belong together, a chain of records that as a whole forms the entirety of a data item.
*
* @param firstId record id of the first record to start loading from.
* @param mode {@link RecordLoad} mode.
* @return {@link Collection} of records in the loaded chain.
* @throws InvalidRecordException if some record not in use and the {@code mode} is allows for throwing.
*/
Collection<RECORD> getRecords( long firstId, RecordLoad mode ) throws InvalidRecordException;
/**
* Instantiates a new record cursor capable of iterating over records in this store. A {@link RecordCursor}
* gets created with one record and will use every time it reads records.
*
* @param record instance to use when reading record data.
* @return a new {@link RecordCursor} instance capable of reading records in this store.
*/
RecordCursor<RECORD> newRecordCursor( RECORD record );
/**
* Returns another record id which the given {@code record} references and which a {@link RecordCursor}
* would follow and read next.
*
* @param record to read the "next" reference from.
* @return record id of "next" record that the given {@code record} references, or {@link Record#NULL_REFERENCE}
* if the record doesn't reference a next record.
*/
long getNextRecordReference( RECORD record );
/**
* Updates this store with the contents of {@code record} at the record id
* {@link AbstractBaseRecord#getId() specified} by the record. The whole record will be written if
* the given record is {@link AbstractBaseRecord#inUse() in use}, not necessarily so if it's not in use.
*
* @param record containing data to write to this store at the {@link AbstractBaseRecord#getId() id}
* specified by the record.
*/
void updateRecord( RECORD record );
/**
* Lets {@code record} be processed by {@link Processor}.
*
* @param processor {@link Processor} of records.
* @param record to process.
* @throws FAILURE if the processor fails.
*/
<FAILURE extends Exception> void accept( Processor<FAILURE> processor, RECORD record ) throws FAILURE;
/**
* @return number of bytes each record in this store occupies. All records in a store is of the same size.
*/
int getRecordSize();
/**
* @deprecated since it's exposed through the generic {@link RecordStore} interface although only
* applicable to one particular type of of implementation of it.
* @return record "data" size, only applicable to dynamic record stores where record size may be specified
* at creation time and later used every time the store is opened. Data size refers to number of bytes
* of a record without header information, such as "inUse" and "next".
*/
@Deprecated
int getRecordDataSize();
/**
* @return underlying storage is assumed to work with pages. This method returns number of records that
* will fit into each page.
*/
int getRecordsPerPage();
/**
* Closes this store and releases any resource attached to it.
*/
void close();
/**
* Flushes all pending {@link #updateRecord(AbstractBaseRecord) updates} to underlying storage.
* This call is blocking and will ensure all updates since last call to this method are durable
* once the call returns.
*/
void flush();
/**
* Some stores may have meta data stored in the header of the store file. Since all records in a store
* are of the same size the means of storing that meta data is to occupy one or more records at the
* beginning of the store (0...).
*
* @return the number of records in the beginning of the file that are reserved for header meta data.
*/
int getNumberOfReservedLowIds();
/**
* Returns store header (see {@link #getNumberOfReservedLowIds()}) as {@code int}. Exposed like this
* for convenience since all known store headers are ints.
*
* @return store header as an int value, e.g the first 4 bytes of the first (reserved) record in this store.
*/
int getStoreHeaderInt();
/**
* Called once all changes to a record is ready to be converted into a command.
*
* @param record record to prepare, potentially updating it with more information before converting into a command.
*/
void prepareForCommit( RECORD record );
/**
* Called once all changes to a record is ready to be converted into a command.
* WARNING this is for advanced use, please consider using {@link #prepareForCommit(AbstractBaseRecord)} instead.
*
* @param record record to prepare, potentially updating it with more information before converting into a command.
* @param idSequence {@link IdSequence} to use for potentially generating additional ids required by this record.
*/
void prepareForCommit( RECORD record, IdSequence idSequence );
/**
* Scan the given range of records both inclusive, and pass all the in-use ones to the given processor, one by one.
*
* The record passed to the NodeRecordScanner is reused instead of reallocated for every record, so it must be
* cloned if you want to save it for later.
* @param visitor {@link Visitor} notified about all records.
* @throws Exception on error reading from store.
*/
<EXCEPTION extends Exception> void scanAllRecords( Visitor<RECORD,EXCEPTION> visitor ) throws EXCEPTION;
void freeId( long id );
Predicate<AbstractBaseRecord> IN_USE = AbstractBaseRecord::inUse;
class Delegator<R extends AbstractBaseRecord> implements RecordStore<R>
{
private final RecordStore<R> actual;
@Override
public void setHighestPossibleIdInUse( long highestIdInUse )
{
actual.setHighestPossibleIdInUse( highestIdInUse );
}
@Override
public R newRecord()
{
return actual.newRecord();
}
@Override
public R getRecord( long id, R target, RecordLoad mode ) throws InvalidRecordException
{
return actual.getRecord( id, target, mode );
}
@Override
public PageCursor openPageCursorForReading( long id )
{
return actual.openPageCursorForReading( id );
}
@Override
public void getRecordByCursor( long id, R target, RecordLoad mode, PageCursor cursor ) throws InvalidRecordException
{
actual.getRecordByCursor( id, target, mode, cursor );
}
@Override
public void nextRecordByCursor( R target, RecordLoad mode, PageCursor cursor ) throws InvalidRecordException
{
actual.nextRecordByCursor( target, mode, cursor );
}
@Override
public Collection<R> getRecords( long firstId, RecordLoad mode ) throws InvalidRecordException
{
return actual.getRecords( firstId, mode );
}
@Override
public RecordCursor<R> newRecordCursor( R record )
{
return actual.newRecordCursor( record );
}
@Override
public long getNextRecordReference( R record )
{
return actual.getNextRecordReference( record );
}
public Delegator( RecordStore<R> actual )
{
this.actual = actual;
}
@Override
public long nextId()
{
return actual.nextId();
}
@Override
public IdRange nextIdBatch( int size )
{
return actual.nextIdBatch( size );
}
@Override
public File getStorageFileName()
{
return actual.getStorageFileName();
}
@Override
public long getHighId()
{
return actual.getHighId();
}
@Override
public long getHighestPossibleIdInUse()
{
return actual.getHighestPossibleIdInUse();
}
@Override
public void updateRecord( R record )
{
actual.updateRecord( record );
}
@Override
public <FAILURE extends Exception> void accept( Processor<FAILURE> processor, R record ) throws FAILURE
{
actual.accept( processor, record );
}
@Override
public int getRecordSize()
{
return actual.getRecordSize();
}
@Override
public int getRecordDataSize()
{
return actual.getRecordDataSize();
}
@Override
public int getRecordsPerPage()
{
return actual.getRecordsPerPage();
}
@Override
public int getStoreHeaderInt()
{
return actual.getStoreHeaderInt();
}
@Override
public void close()
{
actual.close();
}
@Override
public int getNumberOfReservedLowIds()
{
return actual.getNumberOfReservedLowIds();
}
@Override
public void flush()
{
actual.flush();
}
@Override
public void ensureHeavy( R record )
{
actual.ensureHeavy( record );
}
@Override
public void prepareForCommit( R record )
{
actual.prepareForCommit( record );
}
@Override
public void prepareForCommit( R record, IdSequence idSequence )
{
actual.prepareForCommit( record, idSequence );
}
@Override
public <EXCEPTION extends Exception> void scanAllRecords( Visitor<R,EXCEPTION> visitor ) throws EXCEPTION
{
actual.scanAllRecords( visitor );
}
@Override
public void freeId( long id )
{
actual.freeId( id );
}
}
@SuppressWarnings( "unchecked" )
abstract class Processor<FAILURE extends Exception>
{
// Have it volatile so that it can be stopped from a different thread.
private volatile boolean shouldStop;
public void stop()
{
shouldStop = true;
}
public abstract void processSchema( RecordStore<DynamicRecord> store, DynamicRecord schema ) throws FAILURE;
public abstract void processNode( RecordStore<NodeRecord> store, NodeRecord node ) throws FAILURE;
public abstract void processRelationship( RecordStore<RelationshipRecord> store, RelationshipRecord rel )
throws FAILURE;
public abstract void processProperty( RecordStore<PropertyRecord> store, PropertyRecord property ) throws
FAILURE;
public abstract void processString( RecordStore<DynamicRecord> store, DynamicRecord string, IdType idType )
throws FAILURE;
public abstract void processArray( RecordStore<DynamicRecord> store, DynamicRecord array ) throws FAILURE;
public abstract void processLabelArrayWithOwner( RecordStore<DynamicRecord> store, DynamicRecord labelArray )
throws FAILURE;
public abstract void processRelationshipTypeToken( RecordStore<RelationshipTypeTokenRecord> store,
RelationshipTypeTokenRecord record ) throws FAILURE;
public abstract void processPropertyKeyToken( RecordStore<PropertyKeyTokenRecord> store, PropertyKeyTokenRecord
record ) throws FAILURE;
public abstract void processLabelToken( RecordStore<LabelTokenRecord> store, LabelTokenRecord record ) throws
FAILURE;
public abstract void processRelationshipGroup( RecordStore<RelationshipGroupRecord> store,
RelationshipGroupRecord record ) throws FAILURE;
protected <R extends AbstractBaseRecord> R getRecord( RecordStore<R> store, long id, R into )
{
store.getRecord( id, into, RecordLoad.FORCE );
return into;
}
public <R extends AbstractBaseRecord> void applyFiltered( RecordStore<R> store,
Predicate<? super R>... filters ) throws FAILURE
{
apply( store, ProgressListener.NONE, filters );
}
public <R extends AbstractBaseRecord> void applyFiltered( RecordStore<R> store,
ProgressListener progressListener,
Predicate<? super R>... filters ) throws FAILURE
{
apply( store, progressListener, filters );
}
private <R extends AbstractBaseRecord> void apply( RecordStore<R> store, ProgressListener progressListener,
Predicate<? super R>... filters ) throws FAILURE
{
ResourceIterable<R> iterable = Scanner.scan( store, true, filters );
try ( ResourceIterator<R> scan = iterable.iterator() )
{
while ( scan.hasNext() )
{
R record = scan.next();
if ( shouldStop )
{
break;
}
store.accept( this, record );
progressListener.set( record.getId() );
}
progressListener.done();
}
}
}
/**
* Utility methods for reading records. These are not on the interface itself since it should be
* an explicit choice when to create the record instances passed into it.
* Also for mocking purposes it's less confusing and error prone having only a single method.
*/
static <R extends AbstractBaseRecord> R getRecord( RecordStore<R> store, long id, RecordLoad mode )
{
R record = store.newRecord();
store.getRecord( id, record, mode );
return record;
}
/**
* Utility methods for reading records. These are not on the interface itself since it should be
* an explicit choice when to create the record instances passed into it.
* Also for mocking purposes it's less confusing and error prone having only a single method.
*/
static <R extends AbstractBaseRecord> R getRecord( RecordStore<R> store, long id )
{
return getRecord( store, id, RecordLoad.NORMAL );
}
}