Skip to content

Commit

Permalink
simplify aspects of catchup tx writer
Browse files Browse the repository at this point in the history
Removed some unnecessary usage of AtomicLong since everything
is under synchronized blocks.

Removed an unnecessary post-streaming update of the header since
it could just as well be written correctly to begin with.
  • Loading branch information
martinfurmanski committed Nov 24, 2016
1 parent d1e563c commit f17a044
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 28 deletions.
Expand Up @@ -94,7 +94,7 @@ public void copyStore( MemberId from, StoreId expectedStoreId, File destDir )

private CatchupResult pullTransactions( MemberId from, StoreId expectedStoreId, File storeDir, long fromTxId ) throws IOException, StoreCopyFailedException
{
try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider ) )
try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider, fromTxId ) )
{
log.info( "Pulling transactions from: %d", fromTxId );

Expand Down
Expand Up @@ -28,8 +28,8 @@

public class TransactionLogCatchUpFactory
{
public TransactionLogCatchUpWriter create( File storeDir, FileSystemAbstraction fs, PageCache pageCache, LogProvider logProvider ) throws IOException
public TransactionLogCatchUpWriter create( File storeDir, FileSystemAbstraction fs, PageCache pageCache, LogProvider logProvider, long lastCommittedTxId ) throws IOException
{
return new TransactionLogCatchUpWriter( storeDir, fs, pageCache, logProvider );
return new TransactionLogCatchUpWriter( storeDir, fs, pageCache, logProvider, lastCommittedTxId );
}
}
Expand Up @@ -21,7 +21,6 @@

import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
Expand All @@ -33,19 +32,15 @@
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadOnlyLogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.ReadOnlyTransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.TransactionLogWriter;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter;
import org.neo4j.kernel.lifecycle.Lifespan;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.Math.max;
import static org.neo4j.kernel.impl.store.MetaDataStore.Position.LAST_TRANSACTION_ID;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderWriter.writeLogHeader;

public class TransactionLogCatchUpWriter implements TxPullResponseListener, AutoCloseable
{
Expand All @@ -54,25 +49,21 @@ public class TransactionLogCatchUpWriter implements TxPullResponseListener, Auto
private final PageCache pageCache;
private final Log log;
private final TransactionLogWriter writer;

private AtomicLong firstTxId = new AtomicLong( BASE_TX_ID );
private AtomicLong lastTxId = new AtomicLong( BASE_TX_ID );
private final PhysicalLogFiles logFiles;
private final FileSystemAbstraction fs;
private final File storeDir;

TransactionLogCatchUpWriter( File storeDir, FileSystemAbstraction fs, PageCache pageCache, LogProvider logProvider )
private long lastTxId = -1;

TransactionLogCatchUpWriter( File storeDir, FileSystemAbstraction fs, PageCache pageCache, LogProvider logProvider, long lastCommittedTxId )
throws IOException
{
this.neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME );
this.pageCache = pageCache;
this.log = logProvider.getLog( getClass() );
this.fs = fs;
this.logFiles = new PhysicalLogFiles( storeDir, fs );
ReadOnlyLogVersionRepository logVersionRepository = new ReadOnlyLogVersionRepository( pageCache, storeDir );
ReadOnlyTransactionIdStore readOnlyTransactionIdStore = new ReadOnlyTransactionIdStore( pageCache, storeDir );
LogFile logFile = lifespan.add( new PhysicalLogFile( fs, logFiles, Long.MAX_VALUE /*don't rotate*/,
() -> readOnlyTransactionIdStore.getLastCommittedTransactionId() - 1, logVersionRepository,
() -> lastCommittedTxId - 1, logVersionRepository,
new Monitors().newMonitor( PhysicalLogFile.Monitor.class ), new LogHeaderCache( 10 ) ) );
this.writer = new TransactionLogWriter( new LogEntryWriter( logFile.getWriter() ) );
this.storeDir = storeDir;
Expand All @@ -84,12 +75,8 @@ public synchronized void onTxReceived( TxPullResponse txPullResponse )
CommittedTransactionRepresentation tx = txPullResponse.tx();
try
{
long receivedTxId = tx.getCommitEntry().getTxId();

lastTxId.set( receivedTxId );
firstTxId.compareAndSet( BASE_TX_ID, receivedTxId );

writer.append( tx.getTransactionRepresentation(), this.lastTxId.get() );
lastTxId = tx.getCommitEntry().getTxId();
writer.append( tx.getTransactionRepresentation(), lastTxId );
}
catch ( IOException e )
{
Expand All @@ -100,23 +87,22 @@ public synchronized void onTxReceived( TxPullResponse txPullResponse )
@Override
public synchronized void close() throws IOException
{
/* A checkpoint which points to the beginning of the log file, meaning that
all the streamed transactions will be applied as part of recovery. */
long logVersion = logFiles.getHighestLogVersion();
writer.checkPoint( new LogPosition( logVersion, LOG_HEADER_SIZE ) );
lifespan.close();

File currentLogFile = logFiles.getLogFileForVersion( logVersion );
writeLogHeader( fs, currentLogFile, logVersion, max( BASE_TX_ID, firstTxId.get() - 1 ) );

File neoStore = new File( storeDir, MetaDataStore.DEFAULT_NAME );
MetaDataStore.setRecord(
pageCache,
neoStore,
MetaDataStore.Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET,
LOG_HEADER_SIZE );

if ( lastTxId.get() != -1 )
if ( lastTxId != -1 )
{
MetaDataStore.setRecord( pageCache, neoStoreFile, LAST_TRANSACTION_ID, lastTxId.get() );
MetaDataStore.setRecord( pageCache, neoStoreFile, LAST_TRANSACTION_ID, lastTxId );
}
}
}
Expand Up @@ -133,7 +133,7 @@ private TransactionLogCatchUpFactory factory( TransactionLogCatchUpWriter writer
{
TransactionLogCatchUpFactory factory = mock( TransactionLogCatchUpFactory.class );
when( factory.create( any( File.class ), any( FileSystemAbstraction.class ),
any( PageCache.class ), any( LogProvider.class ) ) ).thenReturn( writer );
any( PageCache.class ), any( LogProvider.class ), anyLong() ) ).thenReturn( writer );
return factory;
}
}

0 comments on commit f17a044

Please sign in to comment.