Permalink
Browse files

Added more logging around rotation of tm_tx_log. Also extended LogTes…

…tUtils a bit more to be able to read/filter in more ways
  • Loading branch information...
1 parent df1dfc0 commit aeb5f3426c6022911e5d7e313f758bbc6b96f43b @tinwelint committed Mar 13, 2012
@@ -38,6 +38,7 @@
import org.neo4j.kernel.impl.transaction.xaframework.DirectMappedLogBuffer;
import org.neo4j.kernel.impl.transaction.xaframework.ForceMode;
import org.neo4j.kernel.impl.transaction.xaframework.LogBuffer;
+import org.neo4j.kernel.impl.util.StringLogger;
// TODO: fixed sized logs (pre-initialize them)
// keep dangling records in memory for log switch
@@ -60,6 +61,7 @@
public static final byte MARK_COMMIT = 3;
public static final byte TX_DONE = 4;
private final FileSystemAbstraction fileSystem;
+ private final StringLogger msgLog;
/**
* Initializes a transaction log using <CODE>filename</CODE>. If the file
@@ -68,16 +70,18 @@
*
* @param fileName
* Filename of file to use
+ * @param msgLog
* @throws IOException
* If unable to open file
*/
- public TxLog( String fileName, FileSystemAbstraction fileSystem ) throws IOException
+ public TxLog( String fileName, FileSystemAbstraction fileSystem, StringLogger msgLog ) throws IOException
{
if ( fileName == null )
{
throw new IllegalArgumentException( "Null filename" );
}
this.fileSystem = fileSystem;
+ this.msgLog = msgLog;
FileChannel fileChannel = fileSystem.open( fileName, "rw" );
fileChannel.position( fileChannel.size() );
logBuffer = new DirectMappedLogBuffer( fileChannel );
@@ -259,7 +263,13 @@ public String toString()
{
XidImpl xid = new XidImpl( globalId, branchId == null ? new byte[0]
: branchId );
- return "TxLogRecord[" + typeName() + "," + xid + "," + seqNr + "]";
+ return "TxLogRecord[" + typeName() + "," + xid + "," + seqNr + "," + (1+sizeOf( globalId )+sizeOf( branchId )) + "]";
+ }
+
+ private int sizeOf( byte[] id )
+ {
+ if ( id == null ) return 0;
+ return 1+id.length;
}
String typeName()
@@ -473,6 +483,7 @@ public int compare( Record r1, Record r2 )
return r1.getSequenceNumber() - r2.getSequenceNumber();
}
} );
+ msgLog.logMessage( "About to rotate " + name + " to " + newFile + " with dangling records " + records, true );
Iterator<Record> recordItr = records.iterator();
FileChannel fileChannel = fileSystem.open( newFile, "rw" );
fileChannel.position( fileChannel.size() );
@@ -485,5 +496,6 @@ public int compare( Record r1, Record r2 )
writeRecord( record, ForceMode.forced );
}
force();
+ msgLog.logMessage( "Rotated " + name + " to, file channel now at " + fileChannel.position(), true );
}
}
@@ -145,7 +145,7 @@ public void init()
"Unable to start TM, " + "active tx log file[" +
currentTxLog + "] not found."));
}
- txLog = new TxLog( currentTxLog, fileSystem );
+ txLog = new TxLog( currentTxLog, fileSystem, msgLog );
msgLog.logMessage( "TM opening log: " + currentTxLog, true );
}
else
@@ -165,7 +165,7 @@ public void init()
.getBytes( "UTF-8" ) );
FileChannel fc = fileSystem.open( logSwitcherFileName, "rw" );
fc.write( buf );
- txLog = new TxLog( txLogDir + separator + txLog1FileName, fileSystem );
+ txLog = new TxLog( txLogDir + separator + txLog1FileName, fileSystem, msgLog );
msgLog.logMessage( "TM new log: " + txLog1FileName, true );
fc.force( true );
fc.close();
@@ -33,6 +33,7 @@
import org.neo4j.kernel.impl.AbstractNeo4jTestCase;
import org.neo4j.kernel.impl.transaction.TxLog.Record;
import org.neo4j.kernel.impl.transaction.xaframework.ForceMode;
+import org.neo4j.kernel.impl.util.StringLogger;
public class TestTxLog
{
@@ -72,7 +73,7 @@ public void testTxLog() throws IOException
}
try
{
- TxLog txLog = new TxLog( txFile(), CommonFactories.defaultFileSystemAbstraction() );
+ TxLog txLog = new TxLog( txFile(), CommonFactories.defaultFileSystemAbstraction(), StringLogger.DEV_NULL );
assertTrue( !txLog.getDanglingRecords().hasNext() );
byte globalId[] = new byte[64];
byte branchId[] = new byte[45];
@@ -97,7 +98,7 @@ public void testTxLog() throws IOException
txLog.markAsCommitting( globalId, ForceMode.unforced );
assertEquals( 3, txLog.getRecordCount() );
txLog.close();
- txLog = new TxLog( txFile(), CommonFactories.defaultFileSystemAbstraction() );
+ txLog = new TxLog( txFile(), CommonFactories.defaultFileSystemAbstraction(), StringLogger.DEV_NULL );
assertEquals( 0, txLog.getRecordCount() );
lists = getRecordLists( txLog.getDanglingRecords() );
assertEquals( 1, lists.length );
@@ -122,7 +123,7 @@ public void testTxLog() throws IOException
assertEquals( 0,
getRecordLists( txLog.getDanglingRecords() ).length );
txLog.close();
- txLog = new TxLog( txFile(), CommonFactories.defaultFileSystemAbstraction() );
+ txLog = new TxLog( txFile(), CommonFactories.defaultFileSystemAbstraction(), StringLogger.DEV_NULL );
assertEquals( 0,
getRecordLists( txLog.getDanglingRecords() ).length );
txLog.close();
@@ -157,7 +158,7 @@ public void testTruncateTxLog() throws IOException
}
try
{
- TxLog txLog = new TxLog( txFile(), CommonFactories.defaultFileSystemAbstraction() );
+ TxLog txLog = new TxLog( txFile(), CommonFactories.defaultFileSystemAbstraction(), StringLogger.DEV_NULL );
byte globalId[] = new byte[64];
byte branchId[] = new byte[45];
txLog.txStart( globalId );
@@ -167,12 +168,12 @@ public void testTruncateTxLog() throws IOException
assertEquals( 0,
getRecordLists( txLog.getDanglingRecords() ).length );
txLog.close();
- txLog = new TxLog( txFile(), CommonFactories.defaultFileSystemAbstraction() );
+ txLog = new TxLog( txFile(), CommonFactories.defaultFileSystemAbstraction(), StringLogger.DEV_NULL );
txLog.txStart( globalId );
txLog.addBranch( globalId, branchId );
txLog.markAsCommitting( globalId, ForceMode.unforced );
txLog.close();
- txLog = new TxLog( txFile(), CommonFactories.defaultFileSystemAbstraction() );
+ txLog = new TxLog( txFile(), CommonFactories.defaultFileSystemAbstraction(), StringLogger.DEV_NULL );
assertEquals( 1,
getRecordLists( txLog.getDanglingRecords() ).length );
txLog.truncate();
@@ -28,39 +28,169 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
import org.neo4j.helpers.Pair;
import org.neo4j.helpers.Predicate;
import org.neo4j.kernel.impl.nioneo.xa.NeoStoreXaDataSource;
import org.neo4j.kernel.impl.transaction.TxLog;
+import org.neo4j.kernel.impl.transaction.XidImpl;
import org.neo4j.kernel.impl.transaction.xaframework.DirectMappedLogBuffer;
import org.neo4j.kernel.impl.transaction.xaframework.LogBuffer;
import org.neo4j.kernel.impl.transaction.xaframework.LogEntry;
import org.neo4j.kernel.impl.transaction.xaframework.LogIoUtils;
import org.neo4j.kernel.impl.util.DumpLogicalLog.CommandFactory;
+/**
+ * Utility for reading and filtering logical logs as well as tx logs.
+ *
+ * @author Mattias Persson
+ *
+ */
public class LogTestUtils
{
- public static final Predicate<Pair<Byte, List<byte[]>>> EVERYTHING_BUT_DONE_RECORDS = new Predicate<Pair<Byte,List<byte[]>>>()
+ public static interface LogHook<RECORD> extends Predicate<RECORD>
+ {
+ void file( File file );
+
+ void done( File file );
+ }
+
+ public static final LogHook<Pair<Byte, List<byte[]>>> EVERYTHING_BUT_DONE_RECORDS = new LogHook<Pair<Byte,List<byte[]>>>()
{
@Override
public boolean accept( Pair<Byte, List<byte[]>> item )
{
return item.first().byteValue() != TxLog.TX_DONE;
}
+
+ @Override
+ public void file( File file )
+ {
+ }
+
+ @Override
+ public void done( File file )
+ {
+ }
+ };
+
+ public static final LogHook<Pair<Byte, List<byte[]>>> NO_FILTER = new LogHook<Pair<Byte,List<byte[]>>>()
+ {
+ @Override
+ public boolean accept( Pair<Byte, List<byte[]>> item )
+ {
+ return true;
+ }
+
+ @Override
+ public void file( File file )
+ {
+ }
+
+ @Override
+ public void done( File file )
+ {
+ }
};
- public static void filterTxLog( String storeDir, Predicate<Pair<Byte, List<byte[]>>> filter ) throws Exception
+ public static final LogHook<Pair<Byte, List<byte[]>>> PRINT_DANGLING = new LogHook<Pair<Byte,List<byte[]>>>()
+ {
+ private final Map<ByteArray,List<Xid>> xids = new HashMap<ByteArray,List<Xid>>();
+
+ @Override
+ public boolean accept( Pair<Byte, List<byte[]>> item )
+ {
+ if ( item.first().byteValue() == TxLog.BRANCH_ADD )
+ {
+ ByteArray key = new ByteArray( item.other().get( 0 ) );
+ List<Xid> list = xids.get( key );
+ if ( list == null )
+ {
+ list = new ArrayList<Xid>();
+ xids.put( key, list );
+ }
+ Xid xid = new XidImpl( item.other().get( 0 ), item.other().get( 1 ) );
+ list.add( xid );
+ }
+ else if ( item.first().byteValue() == TxLog.TX_DONE )
+ {
+ List<Xid> removed = xids.remove( new ByteArray( item.other().get( 0 ) ) );
+ if ( removed == null )
+ throw new IllegalArgumentException( "Not found" );
+ }
+ return true;
+ }
+
+ @Override
+ public void file( File file )
+ {
+ xids.clear();
+ System.out.println( "=== " + file + " ===" );
+ }
+
+ @Override
+ public void done( File file )
+ {
+ for ( List<Xid> xid : xids.values() )
+ System.out.println( "dangling " + xid );
+ }
+ };
+
+ private static class ByteArray
+ {
+ private final byte[] bytes;
+
+ public ByteArray( byte[] bytes )
+ {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public boolean equals( Object obj )
+ {
+ return Arrays.equals( bytes, ((ByteArray)obj).bytes );
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Arrays.hashCode( bytes );
+ }
+ }
+
+ public static void filterTxLog( String storeDir, LogHook<Pair<Byte, List<byte[]>>> filter ) throws IOException
+ {
+ filterTxLog( storeDir, filter, 0 );
+ }
+
+ public static void filterTxLog( String storeDir, LogHook<Pair<Byte, List<byte[]>>> filter, long startPosition ) throws IOException
+ {
+ for ( File file : oneOrTwo( new File( storeDir, "tm_tx_log" ) ) )
+ filterTxLog( file, filter, startPosition );
+ }
+
+ public static void filterTxLog( File file, LogHook<Pair<Byte, List<byte[]>>> filter ) throws IOException
+ {
+ filterTxLog( file, filter, 0 );
+ }
+
+ public static void filterTxLog( File file, LogHook<Pair<Byte, List<byte[]>>> filter, long startPosition ) throws IOException
{
- File file = oneOrTwo( new File( storeDir, "tm_tx_log" ) );
File tempFile = new File( file.getAbsolutePath() + ".tmp" );
FileChannel in = new RandomAccessFile( file, "r" ).getChannel();
+ in.position( startPosition );
FileChannel out = new RandomAccessFile( tempFile, "rw" ).getChannel();
LogBuffer outBuffer = new DirectMappedLogBuffer( out );
ByteBuffer buffer = ByteBuffer.allocate( 1024*1024 );
try
{
+ filter.file( file );
in.read( buffer );
buffer.flip();
while ( buffer.hasRemaining() )
@@ -71,7 +201,7 @@ public static void filterTxLog( String storeDir, Predicate<Pair<Byte, List<byte[
else if ( type == TxLog.BRANCH_ADD ) xids = readXids( buffer, 2 );
else if ( type == TxLog.MARK_COMMIT ) xids = readXids( buffer, 1 );
else if ( type == TxLog.TX_DONE ) xids = readXids( buffer, 1 );
- else throw new IllegalArgumentException( "Unknown type " + type );
+ else throw new IllegalArgumentException( "Unknown type:" + type + ", position:" + (in.position()-buffer.remaining()) );
if ( filter.accept( Pair.of( type, xids ) ) )
{
@@ -85,15 +215,27 @@ public static void filterTxLog( String storeDir, Predicate<Pair<Byte, List<byte[
safeClose( in );
outBuffer.force();
safeClose( out );
+ filter.done( file );
}
-
- file.delete();
- tempFile.renameTo( file );
+ replace( tempFile, file );
}
- public static void filterNeostoreLogicalLog( String storeDir, Predicate<LogEntry> filter ) throws IOException
+ private static void replace( File tempFile, File file )
+ {
+ file.renameTo( new File( file.getAbsolutePath() + "." + System.currentTimeMillis() ) );
+ tempFile.renameTo( file );
+ }
+
+ public static void filterNeostoreLogicalLog( String storeDir, LogHook<LogEntry> filter ) throws IOException
{
- File file = oneOrTwo( new File( storeDir, NeoStoreXaDataSource.LOGICAL_LOG_DEFAULT_NAME ) );
+ for ( File file : oneOrTwo( new File( storeDir, NeoStoreXaDataSource.LOGICAL_LOG_DEFAULT_NAME ) ) )
+ filterNeostoreLogicalLog( file, filter );
+ }
+
+ private static void filterNeostoreLogicalLog( File file, LogHook<LogEntry> filter )
+ throws IOException
+ {
+ filter.file( file );
File tempFile = new File( file.getAbsolutePath() + ".tmp" );
FileChannel in = new RandomAccessFile( file, "r" ).getChannel();
FileChannel out = new RandomAccessFile( tempFile, "rw" ).getChannel();
@@ -115,10 +257,10 @@ public static void filterNeostoreLogicalLog( String storeDir, Predicate<LogEntry
safeClose( in );
outBuffer.force();
safeClose( out );
+ filter.done( file );
}
-
- file.delete();
- tempFile.renameTo( file );
+
+ replace( tempFile, file );
}
private static void transferLogicalLogHeader( FileChannel in, LogBuffer outBuffer,
@@ -164,12 +306,15 @@ private static void writeXids( List<byte[]> xids, LogBuffer outBuffer ) throws I
return bytes;
}
- private static File oneOrTwo( File file )
+ private static File[] oneOrTwo( File file )
{
+ List<File> files = new ArrayList<File>();
File one = new File( file.getAbsolutePath() + ".1" );
- if ( one.exists() ) return one;
+ if ( one.exists() ) files.add( one );
File two = new File( file.getAbsolutePath() + ".2" );
- if ( two.exists() ) return two;
- throw new IllegalStateException( "Couldn't find any active tm log" );
+ if ( two.exists() ) files.add( two );
+ if ( files.isEmpty() )
+ throw new IllegalStateException( "Couldn't find any active tm log" );
+ return files.toArray( new File[files.size()] );
}
}
Oops, something went wrong.

0 comments on commit aeb5f34

Please sign in to comment.