-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
BaseHighLimitRecordFormat.java
324 lines (296 loc) · 15.3 KB
/
BaseHighLimitRecordFormat.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.store.format.highlimit;
import java.io.IOException;
import java.util.function.Function;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.impl.CompositePageCursor;
import org.neo4j.kernel.impl.store.StoreHeader;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.format.BaseOneByteHeaderRecordFormat;
import org.neo4j.kernel.impl.store.format.RecordFormat;
import org.neo4j.kernel.impl.store.id.IdSequence;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.store.record.Record;
import org.neo4j.kernel.impl.store.record.RecordLoad;
import static org.neo4j.kernel.impl.store.RecordPageLocationCalculator.offsetForId;
import static org.neo4j.kernel.impl.store.RecordPageLocationCalculator.pageIdForRecord;
/**
* Base class for record format which utilizes dynamically sized references to other record IDs and with ability
* to use record units, meaning that a record may span two physical records in the store. This to keep store size
* low and only have records that have big references occupy double amount of space. This format supports up to
* 58-bit IDs, which is roughly 280 quadrillion. With that size the ID limits can be considered highlimit,
* hence the format name. The IDs take up between 3-8B depending on the size of the ID where relative ID
* references are used as often as possible. See {@link Reference}.
*
* In case when record is small enough to fit into one record unit and references are not that big yet
* record can be stored in a fixed reference format. Representing record in this format allow
* to save some time on reference encoding/decoding since they will be saved in fixed format instead of
* variable length encoding.
* Fixed reference encoding can be applied only to single record unit records.
* And since record will always contain only one single unit we can reuse bit number 4 as a marker for fixed
* reference.
* To be able to read previously stored records and distinguish them from fixed reference records marker bit is
* inverted: 0 - fixed reference format use, 1 - variable length encoding used.
*
* For consistency, all formats have a one-byte header specifying:
*
* <ol>
* <li>0x1: inUse [0=unused, 1=used]</li>
* <li>0x2: record unit [0=single record, 1=multiple records]</li>
* <li>0x4: record unit type [1=first, 0=consecutive]; fixed reference mark [0=fixed reference; 1=variable length
* encoding]
* <li>0x8 - 0x80 other flags for this record specific to each type</li>
* </ol>
*
* NOTE to the rest of the flags is that a good use of them is to denote whether or not an ID reference is
* null (-1) as to save 3B (smallest compressed size) by not writing a reference at all.
*
* For records that are the first out of multiple record units, then immediately following the header byte is
* the reference (3-8B) to the secondary ID. After that the "statically sized" data and in the end the
* dynamically sized data. The general thinking is that the break-off into the secondary record will happen in
* the sequence of dynamically sized references and this will allow for crossing the record boundary
* in between, but even in the middle of, references quite easily since the {@link CompositePageCursor}
* handles the transition seamlessly.
*
* Assigning secondary record unit IDs is done outside of this format implementation, it is just assumed
* that records that gets {@link RecordFormat#write(AbstractBaseRecord, PageCursor, int) written} have already
* been assigned all required such data.
*
* Usually each records are written and read atomically, so this format requires additional logic to be able to
* write and read multiple records together atomically. For writing then currently this is guarded by
* higher level entity write locks and so the {@link PageCursor} can simply move from the first on to the second
* record and continue writing. For reading, which is optimistic and may require retry, one additional
* {@link PageCursor} needs to be acquired over the second record, checking {@link PageCursor#shouldRetry()}
* on both and potentially re-reading the second or both until a consistent read was had.
*
* @param <RECORD> type of {@link AbstractBaseRecord}
*/
abstract class BaseHighLimitRecordFormat<RECORD extends AbstractBaseRecord>
extends BaseOneByteHeaderRecordFormat<RECORD>
{
static final int HEADER_BYTE = Byte.BYTES;
static final long NULL = Record.NULL_REFERENCE.intValue();
static final int HEADER_BIT_RECORD_UNIT = 0b0000_0010;
static final int HEADER_BIT_FIRST_RECORD_UNIT = 0b0000_0100;
static final int HEADER_BIT_FIXED_REFERENCE = 0b0000_0100;
protected BaseHighLimitRecordFormat( Function<StoreHeader,Integer> recordSize, int recordHeaderSize, int maxIdBits )
{
super( recordSize, recordHeaderSize, IN_USE_BIT, maxIdBits );
}
public void read( RECORD record, PageCursor primaryCursor, RecordLoad mode, int recordSize )
throws IOException
{
int primaryStartOffset = primaryCursor.getOffset();
byte headerByte = primaryCursor.getByte();
boolean inUse = isInUse( headerByte );
boolean doubleRecordUnit = has( headerByte, HEADER_BIT_RECORD_UNIT );
record.setUseFixedReferences( false );
if ( doubleRecordUnit )
{
boolean firstRecordUnit = has( headerByte, HEADER_BIT_FIRST_RECORD_UNIT );
if ( !firstRecordUnit )
{
// This is a record unit and not even the first one, so you cannot go here directly and read it,
// it may only be read as part of reading the primary unit.
record.clear();
// Return and try again
primaryCursor.setCursorException(
"Expected record to be the first unit in the chain, but record header says it's not" );
return;
}
// This is a record that is split into multiple record units. We need a bit more clever
// data structures here. For the time being this means instantiating one object,
// but the trade-off is a great reduction in complexity.
long secondaryId = Reference.decode( primaryCursor );
long pageId = pageIdForRecord( secondaryId, primaryCursor.getCurrentPageSize(), recordSize );
int offset = offsetForId( secondaryId, primaryCursor.getCurrentPageSize(), recordSize );
PageCursor secondaryCursor = primaryCursor.openLinkedCursor( pageId );
if ( (!secondaryCursor.next()) | offset < 0 )
{
// We must have made an inconsistent read of the secondary record unit reference.
// No point in trying to read this.
record.clear();
primaryCursor.setCursorException( illegalSecondaryReferenceMessage( pageId ) );
return;
}
secondaryCursor.setOffset( offset + HEADER_BYTE);
int primarySize = recordSize - (primaryCursor.getOffset() - primaryStartOffset);
// We *could* sanity check the secondary record header byte here, but we won't. If it is wrong, then we most
// likely did an inconsistent read, in which case we'll just retry. Otherwise, if the header byte is wrong,
// then there is little we can do about it here, since we are not allowed to throw exceptions.
int secondarySize = recordSize - HEADER_BYTE;
PageCursor composite = CompositePageCursor.compose(
primaryCursor, primarySize, secondaryCursor, secondarySize );
doReadInternal( record, composite, recordSize, headerByte, inUse );
record.setSecondaryUnitId( secondaryId );
}
else
{
record.setUseFixedReferences( isUseFixedReferences( headerByte ) );
doReadInternal( record, primaryCursor, recordSize, headerByte, inUse );
}
}
private boolean isUseFixedReferences( byte headerByte )
{
return !has( headerByte, HEADER_BIT_FIXED_REFERENCE );
}
private String illegalSecondaryReferenceMessage( long secondaryId )
{
return "Illegal secondary record reference: " + secondaryId;
}
protected abstract void doReadInternal(
RECORD record, PageCursor cursor, int recordSize, long inUseByte, boolean inUse );
@Override
public void write( RECORD record, PageCursor primaryCursor, int recordSize )
throws IOException
{
if ( record.inUse() )
{
// Let the specific implementation provide the additional header bits and we'll provide the core format bits.
byte headerByte = headerBits( record );
assert (headerByte & 0x7) == 0 : "Format-specific header bits (" + headerByte +
") collides with format-generic header bits";
headerByte = set( headerByte, IN_USE_BIT, record.inUse() );
headerByte = set( headerByte, HEADER_BIT_RECORD_UNIT, record.requiresSecondaryUnit() );
if ( record.requiresSecondaryUnit() )
{
headerByte = set( headerByte, HEADER_BIT_FIRST_RECORD_UNIT, true );
}
else
{
headerByte = set( headerByte, HEADER_BIT_FIXED_REFERENCE, !record.isUseFixedReferences() );
}
primaryCursor.putByte( headerByte );
if ( record.requiresSecondaryUnit() )
{
// Write using the normal adapter since the first reference we write cannot really overflow
// into the secondary record
long secondaryUnitId = record.getSecondaryUnitId();
long pageId = pageIdForRecord( secondaryUnitId, primaryCursor.getCurrentPageSize(), recordSize );
int offset = offsetForId( secondaryUnitId, primaryCursor.getCurrentPageSize(), recordSize );
PageCursor secondaryCursor = primaryCursor.openLinkedCursor( pageId );
if ( !secondaryCursor.next() )
{
// We are not allowed to write this much data to the file, apparently.
record.clear();
return;
}
secondaryCursor.setOffset( offset );
secondaryCursor.putByte( (byte) (IN_USE_BIT | HEADER_BIT_RECORD_UNIT) );
int recordSizeWithoutHeader = recordSize - HEADER_BYTE;
PageCursor composite = CompositePageCursor.compose(
primaryCursor, recordSizeWithoutHeader, secondaryCursor, recordSizeWithoutHeader );
Reference.encode( secondaryUnitId, composite );
doWriteInternal( record, composite );
}
else
{
doWriteInternal( record, primaryCursor );
}
}
else
{
markAsUnused( primaryCursor, record, recordSize );
}
}
/*
* Use this instead of {@link #markFirstByteAsUnused(PageCursor)} to mark both record units,
* if record has a reference to a secondary unit.
*/
protected void markAsUnused( PageCursor cursor, RECORD record, int recordSize )
throws IOException
{
markAsUnused( cursor );
if ( record.hasSecondaryUnitId() )
{
long secondaryUnitId = record.getSecondaryUnitId();
long pageIdForSecondaryRecord = pageIdForRecord( secondaryUnitId, cursor.getCurrentPageSize(), recordSize );
int offsetForSecondaryId = offsetForId( secondaryUnitId, cursor.getCurrentPageSize(), recordSize );
if ( !cursor.next( pageIdForSecondaryRecord ) )
{
throw new UnderlyingStorageException( "Couldn't move to secondary page " + pageIdForSecondaryRecord );
}
cursor.setOffset( offsetForSecondaryId );
markAsUnused( cursor );
}
}
protected abstract void doWriteInternal( RECORD record, PageCursor cursor ) throws IOException;
protected abstract byte headerBits( RECORD record );
@Override
public final void prepare( RECORD record, int recordSize, IdSequence idSequence )
{
if ( record.inUse() )
{
record.setUseFixedReferences( canUseFixedReferences( record, recordSize ));
if ( !record.isUseFixedReferences() )
{
int requiredLength = HEADER_BYTE + requiredDataLength( record );
boolean requiresSecondaryUnit = requiredLength > recordSize;
record.setRequiresSecondaryUnit( requiresSecondaryUnit );
if ( record.requiresSecondaryUnit() && !record.hasSecondaryUnitId() )
{
// Allocate a new id at this point, but this is not the time to free this ID the the case where
// this record doesn't need this secondary unit anymore... that needs to be done when applying to store.
record.setSecondaryUnitId( idSequence.nextId() );
}
}
}
}
protected abstract boolean canUseFixedReferences( RECORD record, int recordSize );
/**
* Required length of the data in the given record (without the header byte).
*
* @param record data to check how much space it would require.
* @return length required to store the data in the given record.
*/
protected abstract int requiredDataLength( RECORD record );
protected static int length( long reference )
{
return Reference.length( reference );
}
protected static int length( long reference, long nullValue )
{
return reference == nullValue ? 0 : length( reference );
}
protected static long decodeCompressedReference( PageCursor cursor )
{
return Reference.decode( cursor );
}
protected static long decodeCompressedReference( PageCursor cursor, long headerByte, int headerBitMask, long nullValue )
{
return has( headerByte, headerBitMask ) ? decodeCompressedReference( cursor ) : nullValue;
}
protected static void encode( PageCursor cursor, long reference ) throws IOException
{
Reference.encode( reference, cursor );
}
protected static void encode( PageCursor cursor, long reference, long nullValue ) throws IOException
{
if ( reference != nullValue )
{
Reference.encode( reference, cursor );
}
}
protected static byte set( byte header, int bitMask, long reference, long nullValue )
{
return set( header, bitMask, reference != nullValue );
}
}