-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
ReversedSingleFileTransactionCursor.java
182 lines (163 loc) · 7.22 KB
/
ReversedSingleFileTransactionCursor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log;
import java.io.IOException;
import java.util.Arrays;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.util.collection.ArrayCollection;
/**
* Returns transactions in reverse order in a log file. It tries to keep peak memory consumption to a minimum
* by first sketching out the offsets of all transactions in the log. Then it starts from the end and moves backwards,
* taking advantage of read-ahead feature of the {@link ReadAheadLogChannel} by moving in chunks backwards in roughly
* the size of the read-ahead window. Coming across large transactions means moving further back to at least read one transaction
* per chunk "move". This is all internal, so from the outside it simply reverses a transaction log.
* The memory overhead compared to reading a log in the natural order is almost negligible.
*
* This cursor currently only works for a single log file, such that the given {@link ReadAheadLogChannel} should not be
* instantiated with a {@link LogVersionBridge} moving it over to other versions when exhausted. For reversing a whole
* log stream consisting of multiple log files have a look at {@link ReversedMultiFileTransactionCursor}.
*
* @see ReversedMultiFileTransactionCursor
*/
class ReversedSingleFileTransactionCursor implements TransactionCursor
{
// Should this be passed in or extracted from the read-ahead channel instead?
private static final int CHUNK_SIZE = ReadAheadChannel.DEFAULT_READ_AHEAD_SIZE;
private final ReadAheadLogChannel channel;
private final TransactionCursor transactionCursor;
// Should be generally large enough to hold transactions in a chunk, where one chunk is the read-ahead size of ReadAheadLogChannel
private final ArrayCollection<CommittedTransactionRepresentation> chunkTransactions = new ArrayCollection<>( 20 );
// May be longer than required, offsetLength holds the actual length.
private final long[] offsets;
private int offsetsLength;
private int chunkStartOffsetIndex;
private int chunkLength;
private int chunkCursor;
private long totalSize;
ReversedSingleFileTransactionCursor( ReadAheadLogChannel channel, LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader )
throws IOException
{
this.channel = channel;
// There's an assumption here: that the underlying channel can move in between calls and that the
// transaction cursor will just happily read from the new position.
this.transactionCursor = new PhysicalTransactionCursor<>( channel, logEntryReader );
this.offsets = sketchOutTransactionStartOffsets();
}
// Also initializes offset indexes
// This method could use some way of reading log entries w/o creating objects. Would be great
private long[] sketchOutTransactionStartOffsets() throws IOException
{
// Grows on demand. Initially sized to be able to hold all transaction start offsets for a single log file
long[] offsets = new long[10_000];
int offsetCursor = 0;
long logVersion = channel.getVersion();
long startOffset = channel.position();
while ( transactionCursor.next() )
{
if ( offsetCursor == offsets.length )
{ // Grow
offsets = Arrays.copyOf( offsets, offsetCursor * 2 );
}
offsets[offsetCursor++] = startOffset;
startOffset = channel.position();
}
if ( channel.getVersion() != logVersion )
{
throw new IllegalArgumentException( "The channel which was passed in bridged multiple log versions, it started at version " +
logVersion + ", but continued through to version " + channel.getVersion() + ". This isn't supported" );
}
offsetsLength = offsetCursor;
chunkStartOffsetIndex = offsetCursor;
totalSize = channel.position();
return offsets;
}
@Override
public boolean next() throws IOException
{
if ( !exhausted() )
{
if ( currentChunkExhausted() )
{
readNextChunk();
}
chunkCursor++;
assert chunkCursor <= chunkLength;
return true;
}
return false;
}
private void readNextChunk() throws IOException
{
assert chunkStartOffsetIndex > 0;
// Start at lowOffsetIndex - 1 and count backwards until almost reaching the chunk size
long highOffset = chunkStartOffsetIndex == offsetsLength ? totalSize : offsets[chunkStartOffsetIndex];
int newLowOffsetIndex = chunkStartOffsetIndex;
while ( newLowOffsetIndex > 0 )
{
long deltaOffset = highOffset - offsets[--newLowOffsetIndex];
if ( deltaOffset > CHUNK_SIZE )
{ // We've come too far
if ( chunkStartOffsetIndex - newLowOffsetIndex > 1 )
{ // Don't include last this transaction, since it wasn't the only one
newLowOffsetIndex++;
}
break;
}
}
assert chunkStartOffsetIndex - newLowOffsetIndex > 0;
// We've established the chunk boundaries. Initialize all offsets and read the transactions in this
// chunk into actual transaction objects
chunkLength = chunkStartOffsetIndex - newLowOffsetIndex;
chunkStartOffsetIndex = newLowOffsetIndex;
chunkCursor = 0;
channel.setCurrentPosition( offsets[chunkStartOffsetIndex] );
chunkTransactions.clear();
for ( int i = 0; i < chunkLength; i++ )
{
boolean success = transactionCursor.next();
assert success;
chunkTransactions.add( transactionCursor.get() );
}
}
private boolean currentChunkExhausted()
{
return chunkCursor == chunkLength;
}
private boolean exhausted()
{
return chunkStartOffsetIndex == 0 && currentChunkExhausted();
}
@Override
public void close() throws IOException
{
transactionCursor.close(); // closes the channel too
}
@Override
public CommittedTransactionRepresentation get()
{
return chunkTransactions.item( chunkLength - chunkCursor );
}
@Override
public LogPosition position()
{
throw new UnsupportedOperationException( "Perhaps not required" );
}
}