-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
DumpLogicalLog.java
397 lines (363 loc) · 14.8 KB
/
DumpLogicalLog.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.tools.dump;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.TimeZone;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.neo4j.cursor.IOCursor;
import org.neo4j.helpers.Args;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory;
import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand;
import org.neo4j.kernel.impl.transaction.command.Command.PropertyCommand;
import org.neo4j.kernel.impl.transaction.command.Command.RelationshipCommand;
import org.neo4j.kernel.impl.transaction.command.Command.RelationshipGroupCommand;
import org.neo4j.kernel.impl.transaction.command.Command.SchemaRuleCommand;
import org.neo4j.kernel.impl.transaction.log.FilteringIOCursor;
import org.neo4j.kernel.impl.transaction.log.LogEntryCursor;
import org.neo4j.kernel.impl.transaction.log.LogVersionBridge;
import org.neo4j.kernel.impl.transaction.log.LogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.ReaderLogVersionBridge;
import org.neo4j.kernel.impl.transaction.log.entry.InvalidLogEntryHandler;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommand;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.tools.dump.inconsistency.ReportInconsistencies;
import org.neo4j.tools.dump.log.TransactionLogEntryCursor;
import static java.util.TimeZone.getTimeZone;
import static org.neo4j.helpers.Format.DEFAULT_TIME_ZONE;
import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS;
import static org.neo4j.kernel.impl.transaction.log.ReadAheadChannel.DEFAULT_READ_AHEAD_SIZE;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderReader.readLogHeader;
/**
* Tool to represent logical logs in readable format for further analysis.
*/
public class DumpLogicalLog
{
private static final String TO_FILE = "tofile";
private static final String TX_FILTER = "txfilter";
private static final String CC_FILTER = "ccfilter";
private static final String LENIENT = "lenient";
private final FileSystemAbstraction fileSystem;
public DumpLogicalLog( FileSystemAbstraction fileSystem )
{
this.fileSystem = fileSystem;
}
public void dump( String filenameOrDirectory, PrintStream out,
Predicate<LogEntry[]> filter, Function<LogEntry,String> serializer,
InvalidLogEntryHandler invalidLogEntryHandler ) throws IOException
{
File file = new File( filenameOrDirectory );
printFile( file, out );
File firstFile;
LogVersionBridge bridge;
if ( file.isDirectory() )
{
// Use natural log version bridging if a directory is supplied
final PhysicalLogFiles logFiles = new PhysicalLogFiles( file, fileSystem );
bridge = new ReaderLogVersionBridge( fileSystem, logFiles )
{
@Override
public LogVersionedStoreChannel next( LogVersionedStoreChannel channel ) throws IOException
{
LogVersionedStoreChannel next = super.next( channel );
if ( next != channel )
{
printFile( logFiles.getLogFileForVersion( next.getVersion() ), out );
}
return next;
}
};
firstFile = logFiles.getLogFileForVersion( logFiles.getLowestLogVersion() );
}
else
{
// Use no bridging, simple reading this single log file if a file is supplied
firstFile = file;
bridge = NO_MORE_CHANNELS;
}
StoreChannel fileChannel = fileSystem.open( firstFile, "r" );
ByteBuffer buffer = ByteBuffer.allocateDirect( LOG_HEADER_SIZE );
LogHeader logHeader;
try
{
logHeader = readLogHeader( buffer, fileChannel, false, firstFile );
}
catch ( IOException ex )
{
out.println( "Unable to read timestamp information, no records in logical log." );
out.println( ex.getMessage() );
fileChannel.close();
throw ex;
}
out.println( "Logical log format: " + logHeader.logFormatVersion + " version: " + logHeader.logVersion +
" with prev committed tx[" + logHeader.lastCommittedTxId + "]" );
PhysicalLogVersionedStoreChannel channel = new PhysicalLogVersionedStoreChannel(
fileChannel, logHeader.logVersion, logHeader.logFormatVersion );
ReadableClosablePositionAwareChannel logChannel = new ReadAheadLogChannel( channel, bridge,
DEFAULT_READ_AHEAD_SIZE );
LogEntryReader<ReadableClosablePositionAwareChannel> entryReader = new VersionAwareLogEntryReader<>(
new RecordStorageCommandReaderFactory(), invalidLogEntryHandler );
IOCursor<LogEntry> entryCursor = new LogEntryCursor( entryReader, logChannel );
TransactionLogEntryCursor transactionCursor = new TransactionLogEntryCursor( entryCursor );
try ( IOCursor<LogEntry[]> cursor = filter == null ? transactionCursor
: new FilteringIOCursor<>( transactionCursor, filter ) )
{
while ( cursor.next() )
{
for ( LogEntry entry : cursor.get() )
{
out.println( serializer.apply( entry ) );
}
}
}
}
private static void printFile( File file, PrintStream out )
{
out.println( "=== " + file.getAbsolutePath() + " ===" );
}
private static class TransactionRegexCriteria implements Predicate<LogEntry[]>
{
private final Pattern pattern;
private final TimeZone timeZone;
TransactionRegexCriteria( String regex, TimeZone timeZone )
{
this.pattern = Pattern.compile( regex );
this.timeZone = timeZone;
}
@Override
public boolean test( LogEntry[] transaction )
{
for ( LogEntry entry : transaction )
{
if ( pattern.matcher( entry.toString( timeZone ) ).find() )
{
return true;
}
}
return false;
}
}
public static class ConsistencyCheckOutputCriteria implements Predicate<LogEntry[]>, Function<LogEntry,String>
{
private final TimeZone timeZone;
private ReportInconsistencies inconsistencies;
public ConsistencyCheckOutputCriteria( String ccFile, TimeZone timeZone ) throws IOException
{
this.timeZone = timeZone;
inconsistencies = new ReportInconsistencies();
new InconsistencyReportReader( inconsistencies ).read( new File( ccFile ) );
}
@Override
public boolean test( LogEntry[] transaction )
{
for ( LogEntry logEntry : transaction )
{
if ( matches( logEntry ) )
{
return true;
}
}
return false;
}
private boolean matches( LogEntry logEntry )
{
if ( logEntry instanceof LogEntryCommand )
{
if ( matches( ((LogEntryCommand)logEntry).getXaCommand() ) )
{
return true;
}
}
return false;
}
private boolean matches( StorageCommand command )
{
if ( command instanceof NodeCommand )
{
return inconsistencies.containsNodeId( ((NodeCommand) command).getKey() );
}
if ( command instanceof RelationshipCommand )
{
return inconsistencies.containsRelationshipId( ((RelationshipCommand) command).getKey() );
}
if ( command instanceof PropertyCommand )
{
return inconsistencies.containsPropertyId( ((PropertyCommand) command).getKey() );
}
if ( command instanceof RelationshipGroupCommand )
{
return inconsistencies.containsRelationshipGroupId( ((RelationshipGroupCommand) command).getKey() );
}
if ( command instanceof SchemaRuleCommand )
{
return inconsistencies.containsSchemaIndexId( ((SchemaRuleCommand) command).getKey() );
}
return false;
}
@Override
public String apply( LogEntry logEntry )
{
String result = logEntry.toString( timeZone );
if ( matches( logEntry ) )
{
result += " <----";
}
return result;
}
}
/**
* Usage: [--txfilter "regex"] [--ccfilter cc-report-file] [--tofile] [--lenient] storeDirOrFile1 storeDirOrFile2 ...
*
* --txfilter
* Will match regex against each {@link LogEntry} and if there is a match,
* include transaction containing the LogEntry in the dump.
* regex matching is done with {@link Pattern}
*
* --ccfilter
* Will look at an inconsistency report file from consistency checker and
* include transactions that are relevant to them
*
* --tofile
* Redirects output to dump-logical-log.txt in the store directory
*
* --lenient
* Will attempt to read log entries even if some look broken along the way
*/
public static void main( String[] args ) throws IOException
{
Args arguments = Args.withFlags( TO_FILE, LENIENT ).parse( args );
TimeZone timeZone = parseTimeZoneConfig( arguments );
Predicate<LogEntry[]> filter = parseFilter( arguments, timeZone );
Function<LogEntry,String> serializer = parseSerializer( filter, timeZone );
Function<PrintStream,InvalidLogEntryHandler> invalidLogEntryHandler = parseInvalidLogEntryHandler( arguments );
try ( Printer printer = getPrinter( arguments ) )
{
for ( String fileAsString : arguments.orphans() )
{
PrintStream out = printer.getFor( fileAsString );
new DumpLogicalLog( new DefaultFileSystemAbstraction() ).dump( fileAsString, out, filter, serializer,
invalidLogEntryHandler.apply( out ) );
}
}
}
private static Function<PrintStream,InvalidLogEntryHandler> parseInvalidLogEntryHandler( Args arguments )
{
if ( arguments.getBoolean( LENIENT ) )
{
return out -> new LenientInvalidLogEntryHandler( out );
}
return out -> InvalidLogEntryHandler.STRICT;
}
@SuppressWarnings( "unchecked" )
private static Function<LogEntry,String> parseSerializer( Predicate<LogEntry[]> filter, TimeZone timeZone )
{
if ( filter instanceof Function )
{
return (Function<LogEntry,String>) filter;
}
return logEntry -> logEntry.toString( timeZone );
}
private static Predicate<LogEntry[]> parseFilter( Args arguments, TimeZone timeZone ) throws IOException
{
String regex = arguments.get( TX_FILTER );
if ( regex != null )
{
return new TransactionRegexCriteria( regex, timeZone );
}
String cc = arguments.get( CC_FILTER );
if ( cc != null )
{
return new ConsistencyCheckOutputCriteria( cc, timeZone );
}
return null;
}
public static Printer getPrinter( Args args )
{
boolean toFile = args.getBoolean( TO_FILE, false, true );
return toFile ? new FilePrinter() : SYSTEM_OUT_PRINTER;
}
public interface Printer extends AutoCloseable
{
PrintStream getFor( String file ) throws FileNotFoundException;
@Override
void close();
}
private static final Printer SYSTEM_OUT_PRINTER = new Printer()
{
@Override
public PrintStream getFor( String file )
{
return System.out;
}
@Override
public void close()
{ // Don't close System.out
}
};
private static class FilePrinter implements Printer
{
private File directory;
private PrintStream out;
@Override
public PrintStream getFor( String file ) throws FileNotFoundException
{
File absoluteFile = new File( file ).getAbsoluteFile();
File dir = absoluteFile.isDirectory() ? absoluteFile : absoluteFile.getParentFile();
if ( !dir.equals( directory ) )
{
close();
File dumpFile = new File( dir, "dump-logical-log.txt" );
System.out.println( "Redirecting the output to " + dumpFile.getPath() );
out = new PrintStream( dumpFile );
directory = dir;
}
return out;
}
@Override
public void close()
{
if ( out != null )
{
out.close();
}
}
}
public static TimeZone parseTimeZoneConfig( Args arguments )
{
return getTimeZone( arguments.get( "timezone", DEFAULT_TIME_ZONE.getID() ) );
}
}