-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
ReversedSingleFileTransactionCursor.java
215 lines (193 loc) · 8.98 KB
/
ReversedSingleFileTransactionCursor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.reverse;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogVersionBridge;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionCursor;
import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import static java.lang.String.format;
/**
* Returns transactions in reverse order in a log file. It tries to keep peak memory consumption to a minimum
* by first sketching out the offsets of all transactions in the log. Then it starts from the end and moves backwards,
* taking advantage of read-ahead feature of the {@link ReadAheadLogChannel} by moving in chunks backwards in roughly
* the size of the read-ahead window. Coming across large transactions means moving further back to at least read one transaction
* per chunk "move". This is all internal, so from the outside it simply reverses a transaction log.
* The memory overhead compared to reading a log in the natural order is almost negligible.
*
* This cursor currently only works for a single log file, such that the given {@link ReadAheadLogChannel} should not be
* instantiated with a {@link LogVersionBridge} moving it over to other versions when exhausted. For reversing a whole
* log stream consisting of multiple log files have a look at {@link ReversedMultiFileTransactionCursor}.
*
* <pre>
*
* ◄────────────────┤ {@link #chunkTransactions} for the current chunk, reading {@link #readNextChunk()}.
* [2 |3|4 |5 |6 |7 |8 |9 |10 ]
* ▲ ▲ ▲ ▲ ▲ ▲ ▲ ▲ ▲
* │ │ │ │ │ │ │ │ │
* └───┴─┴─────┼───┴───────────┴──┴────┴───────┴─────────── {@link #offsets}
* │
* └─────────────────────────────────────────── {@link #chunkStartOffsetIndex} moves forward in {@link #readNextChunk()}
*
* </pre>
*
* @see ReversedMultiFileTransactionCursor
*/
public class ReversedSingleFileTransactionCursor implements TransactionCursor
{
// Should this be passed in or extracted from the read-ahead channel instead?
private static final int CHUNK_SIZE = ReadAheadChannel.DEFAULT_READ_AHEAD_SIZE;
private final ReadAheadLogChannel channel;
private final ReversedTransactionCursorMonitor monitor;
private final TransactionCursor transactionCursor;
// Should be generally large enough to hold transactions in a chunk, where one chunk is the read-ahead size of ReadAheadLogChannel
private final Deque<CommittedTransactionRepresentation> chunkTransactions = new ArrayDeque<>( 20 );
private CommittedTransactionRepresentation currentChunkTransaction;
// May be longer than required, offsetLength holds the actual length.
private final long[] offsets;
private int offsetsLength;
private int chunkStartOffsetIndex;
private long totalSize;
ReversedSingleFileTransactionCursor( ReadAheadLogChannel channel,
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader,
ReversedTransactionCursorMonitor monitor ) throws IOException
{
this.channel = channel;
this.monitor = monitor;
// There's an assumption here: that the underlying channel can move in between calls and that the
// transaction cursor will just happily read from the new position.
this.transactionCursor = new PhysicalTransactionCursor<>( channel, logEntryReader );
this.offsets = sketchOutTransactionStartOffsets();
}
// Also initializes offset indexes
private long[] sketchOutTransactionStartOffsets() throws IOException
{
// Grows on demand. Initially sized to be able to hold all transaction start offsets for a single log file
long[] offsets = new long[10_000];
int offsetCursor = 0;
long logVersion = channel.getVersion();
long startOffset = channel.position();
try
{
while ( transactionCursor.next() )
{
if ( offsetCursor == offsets.length )
{ // Grow
offsets = Arrays.copyOf( offsets, offsetCursor * 2 );
}
offsets[offsetCursor++] = startOffset;
startOffset = channel.position();
}
}
catch ( Throwable t )
{
monitor.transactionalLogRecordReadFailure( t, offsets, offsetCursor, logVersion );
}
if ( channel.getVersion() != logVersion )
{
throw new IllegalArgumentException( "The channel which was passed in bridged multiple log versions, it started at version " +
logVersion + ", but continued through to version " + channel.getVersion() + ". This isn't supported" );
}
offsetsLength = offsetCursor;
chunkStartOffsetIndex = offsetCursor;
totalSize = channel.position();
return offsets;
}
@Override
public boolean next() throws IOException
{
if ( !exhausted() )
{
if ( currentChunkExhausted() )
{
readNextChunk();
}
currentChunkTransaction = chunkTransactions.pop();
return true;
}
return false;
}
private String buildReadErrorMessage( long[] offsets, int offsetCursor, long logVersion )
{
return offsetCursor > 0 ?
format( "Fail to read transaction log version %d. Last valid transaction start offset is: %d.",
logVersion, offsets[offsetCursor - 1] ) :
format( "Fail to read first transaction of log version %d.", logVersion);
}
private void readNextChunk() throws IOException
{
assert chunkStartOffsetIndex > 0;
// Start at lowOffsetIndex - 1 and count backwards until almost reaching the chunk size
long highOffset = chunkStartOffsetIndex == offsetsLength ? totalSize : offsets[chunkStartOffsetIndex];
int newLowOffsetIndex = chunkStartOffsetIndex;
while ( newLowOffsetIndex > 0 )
{
long deltaOffset = highOffset - offsets[--newLowOffsetIndex];
if ( deltaOffset > CHUNK_SIZE )
{ // We've now read more than the read-ahead size, let's call this the end of this chunk
break;
}
}
assert chunkStartOffsetIndex - newLowOffsetIndex > 0;
// We've established the chunk boundaries. Initialize all offsets and read the transactions in this
// chunk into actual transaction objects
int chunkLength = chunkStartOffsetIndex - newLowOffsetIndex;
chunkStartOffsetIndex = newLowOffsetIndex;
channel.setCurrentPosition( offsets[chunkStartOffsetIndex] );
assert chunkTransactions.isEmpty();
for ( int i = 0; i < chunkLength; i++ )
{
boolean success = transactionCursor.next();
assert success;
chunkTransactions.push( transactionCursor.get() );
}
}
private boolean currentChunkExhausted()
{
return chunkTransactions.isEmpty();
}
private boolean exhausted()
{
return chunkStartOffsetIndex == 0 && currentChunkExhausted();
}
@Override
public void close() throws IOException
{
transactionCursor.close(); // closes the channel too
}
@Override
public CommittedTransactionRepresentation get()
{
return currentChunkTransaction;
}
@Override
public LogPosition position()
{
throw new UnsupportedOperationException( "Should not be called" );
}
}