Skip to content

Commit

Permalink
Allow transaction catchuper to append logs ino correct directory
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Nov 16, 2017
1 parent bb2e2ab commit 4ce4297
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 35 deletions.
Expand Up @@ -51,6 +51,7 @@ public class TransactionLogFiles extends LifecycleAdapter implements LogFiles
{ {
public static final String DEFAULT_NAME = "neostore.transaction.db"; public static final String DEFAULT_NAME = "neostore.transaction.db";
public static final FilenameFilter DEFAULT_FILENAME_FILTER = TransactionLogFilesHelper.DEFAULT_FILENAME_FILTER; public static final FilenameFilter DEFAULT_FILENAME_FILTER = TransactionLogFilesHelper.DEFAULT_FILENAME_FILTER;
private static final File[] EMPTY_FILES_ARRAY = {};


private final TransactionLogFilesContext logFilesContext; private final TransactionLogFilesContext logFilesContext;
private final TransactionLogFileInformation logFileInformation; private final TransactionLogFileInformation logFileInformation;
Expand Down Expand Up @@ -107,7 +108,12 @@ public long getLogVersion( String historyLogFilename )
@Override @Override
public File[] logFiles() public File[] logFiles()
{ {
return fileSystem.listFiles( fileHelper.getParentDirectory(), fileHelper.getLogFilenameFilter() ); File[] files = fileSystem.listFiles( fileHelper.getParentDirectory(), fileHelper.getLogFilenameFilter() );
if ( files == null )
{
return EMPTY_FILES_ARRAY;
}
return files;
} }


@Override @Override
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.consistency.checking.full.ConsistencyCheckIncompleteException; import org.neo4j.consistency.checking.full.ConsistencyCheckIncompleteException;
import org.neo4j.consistency.checking.full.ConsistencyFlags; import org.neo4j.consistency.checking.full.ConsistencyFlags;
import org.neo4j.helpers.progress.ProgressMonitorFactory; import org.neo4j.helpers.progress.ProgressMonitorFactory;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
Expand All @@ -53,26 +54,28 @@ public class BackupFlowTest
public ExpectedException expectedException = ExpectedException.none(); public ExpectedException expectedException = ExpectedException.none();


// dependencies // dependencies
final ConsistencyCheckService consistencyCheckService = mock( ConsistencyCheckService.class ); private final ConsistencyCheckService consistencyCheckService = mock( ConsistencyCheckService.class );
final OutsideWorld outsideWorld = mock( OutsideWorld.class ); private final OutsideWorld outsideWorld = mock( OutsideWorld.class );
final LogProvider logProvider = mock( LogProvider.class ); private final FileSystemAbstraction fileSystem = mock( FileSystemAbstraction.class );
final BackupStrategyWrapper firstStrategy = mock( BackupStrategyWrapper.class ); private final LogProvider logProvider = mock( LogProvider.class );
final BackupStrategyWrapper secondStrategy = mock( BackupStrategyWrapper.class ); private final BackupStrategyWrapper firstStrategy = mock( BackupStrategyWrapper.class );
private final BackupStrategyWrapper secondStrategy = mock( BackupStrategyWrapper.class );


BackupFlow subject; BackupFlow subject;


// test method parameter mocks // test method parameter mocks
final OnlineBackupContext onlineBackupContext = mock( OnlineBackupContext.class ); private final OnlineBackupContext onlineBackupContext = mock( OnlineBackupContext.class );
final OnlineBackupRequiredArguments requiredArguments = mock( OnlineBackupRequiredArguments.class ); private final OnlineBackupRequiredArguments requiredArguments = mock( OnlineBackupRequiredArguments.class );


// mock returns // mock returns
private ProgressMonitorFactory progressMonitorFactory = mock( ProgressMonitorFactory.class ); private final ProgressMonitorFactory progressMonitorFactory = mock( ProgressMonitorFactory.class );
private Path reportDir = mock( Path.class ); private final Path reportDir = mock( Path.class );
private ConsistencyCheckService.Result consistencyCheckResult = mock( ConsistencyCheckService.Result.class ); private final ConsistencyCheckService.Result consistencyCheckResult = mock( ConsistencyCheckService.Result.class );


@Before @Before
public void setup() public void setup()
{ {
when( outsideWorld.fileSystem() ).thenReturn( fileSystem );
when( onlineBackupContext.getRequiredArguments() ).thenReturn( requiredArguments ); when( onlineBackupContext.getRequiredArguments() ).thenReturn( requiredArguments );
when( requiredArguments.getReportDir() ).thenReturn( reportDir ); when( requiredArguments.getReportDir() ).thenReturn( reportDir );
subject = new BackupFlow( consistencyCheckService, outsideWorld, logProvider, progressMonitorFactory, subject = new BackupFlow( consistencyCheckService, outsideWorld, logProvider, progressMonitorFactory,
Expand Down
Expand Up @@ -162,8 +162,8 @@ public void copy( AdvertisedSocketAddress from, StoreId expectedStoreId, File de
private CatchupResult pullTransactions( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir, long fromTxId, boolean asPartOfStoreCopy ) private CatchupResult pullTransactions( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir, long fromTxId, boolean asPartOfStoreCopy )
throws IOException, StoreCopyFailedException throws IOException, StoreCopyFailedException
{ {
try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, logProvider, try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, config,
fromTxId, asPartOfStoreCopy ) ) logProvider, fromTxId, asPartOfStoreCopy ) )
{ {
log.info( "Pulling transactions from: %d", fromTxId ); log.info( "Pulling transactions from: %d", fromTxId );


Expand Down
Expand Up @@ -24,13 +24,15 @@


import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


public class TransactionLogCatchUpFactory public class TransactionLogCatchUpFactory
{ {
public TransactionLogCatchUpWriter create( File storeDir, FileSystemAbstraction fs, PageCache pageCache, public TransactionLogCatchUpWriter create( File storeDir, FileSystemAbstraction fs, PageCache pageCache,
LogProvider logProvider, long fromTxId, boolean asPartOfStoreCopy ) throws IOException Config config, LogProvider logProvider, long fromTxId, boolean asPartOfStoreCopy ) throws IOException
{ {
return new TransactionLogCatchUpWriter( storeDir, fs, pageCache, logProvider, fromTxId, asPartOfStoreCopy ); return new TransactionLogCatchUpWriter( storeDir, fs, pageCache, config, logProvider, fromTxId,
asPartOfStoreCopy );
} }
} }
Expand Up @@ -24,6 +24,7 @@


import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.store.MetaDataStore; import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogPosition; import org.neo4j.kernel.impl.transaction.log.LogPosition;
Expand Down Expand Up @@ -52,14 +53,19 @@ public class TransactionLogCatchUpWriter implements TxPullResponseListener, Auto
private long lastTxId = -1; private long lastTxId = -1;
private long expectedTxId; private long expectedTxId;


TransactionLogCatchUpWriter( File storeDir, FileSystemAbstraction fs, PageCache pageCache, TransactionLogCatchUpWriter( File storeDir, FileSystemAbstraction fs, PageCache pageCache, Config config,
LogProvider logProvider, long fromTxId, boolean asPartOfStoreCopy ) throws IOException LogProvider logProvider, long fromTxId, boolean asPartOfStoreCopy ) throws IOException
{ {
this.pageCache = pageCache; this.pageCache = pageCache;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.asPartOfStoreCopy = asPartOfStoreCopy; this.asPartOfStoreCopy = asPartOfStoreCopy;
this.logFiles = LogFilesBuilder.activeFilesBuilder( storeDir, fs, pageCache ) LogFilesBuilder logFilesBuilder = LogFilesBuilder.activeFilesBuilder( storeDir, fs, pageCache )
.withLastCommittedTransactionIdSupplier( () -> fromTxId - 1 ).build(); .withLastCommittedTransactionIdSupplier( () -> fromTxId - 1 );
if ( !asPartOfStoreCopy )
{
logFilesBuilder.withConfig( config );
}
this.logFiles = logFilesBuilder.build();
this.lifespan.add( logFiles ); this.lifespan.add( logFiles );
this.writer = new TransactionLogWriter( new LogEntryWriter( logFiles.getLogFile().getWriter() ) ); this.writer = new TransactionLogWriter( new LogEntryWriter( logFiles.getLogFile().getWriter() ) );
this.storeDir = storeDir; this.storeDir = storeDir;
Expand Down
Expand Up @@ -135,9 +135,8 @@ public void shouldCloseDownTxLogWriterIfTxStreamingFails() throws Exception
private TransactionLogCatchUpFactory factory( TransactionLogCatchUpWriter writer ) throws IOException private TransactionLogCatchUpFactory factory( TransactionLogCatchUpWriter writer ) throws IOException
{ {
TransactionLogCatchUpFactory factory = mock( TransactionLogCatchUpFactory.class ); TransactionLogCatchUpFactory factory = mock( TransactionLogCatchUpFactory.class );
when( factory.create( isNull(), any( FileSystemAbstraction.class ), when( factory.create( isNull(), any( FileSystemAbstraction.class ), isNull(), any( Config.class ),
isNull(), any( LogProvider.class ), anyLong(), anyBoolean() ) ) any( LogProvider.class ), anyLong(), anyBoolean() ) ).thenReturn( writer );
.thenReturn( writer );
return factory; return factory;
} }
} }
Expand Up @@ -22,9 +22,13 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List;


import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
Expand Down Expand Up @@ -66,24 +70,31 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.neo4j.kernel.impl.transaction.command.Commands.createNode; import static org.neo4j.kernel.impl.transaction.command.Commands.createNode;


@RunWith( Parameterized.class )
public class TransactionLogCatchUpWriterTest public class TransactionLogCatchUpWriterTest
{ {
@Rule @Rule
public final TestDirectory dir = TestDirectory.testDirectory(); public final TestDirectory dir = TestDirectory.testDirectory();

@Rule @Rule
public final DefaultFileSystemRule fsRule = new DefaultFileSystemRule(); public final DefaultFileSystemRule fsRule = new DefaultFileSystemRule();

@Rule @Rule
public final PageCacheRule pageCacheRule = new PageCacheRule(); public final PageCacheRule pageCacheRule = new PageCacheRule();

@Rule @Rule
public NeoStoreDataSourceRule dsRule = new NeoStoreDataSourceRule(); public NeoStoreDataSourceRule dsRule = new NeoStoreDataSourceRule();


@Parameterized.Parameter
public boolean partOfStoreCopy;

private PageCache pageCache; private PageCache pageCache;
private FileSystemAbstraction fs; private FileSystemAbstraction fs;
private File storeDir; private File storeDir;


@Parameterized.Parameters
public static List<Boolean> partOfStoreCopy()
{
return Arrays.asList( Boolean.TRUE, Boolean.FALSE );
}

@Before @Before
public void setup() throws IOException public void setup() throws IOException
{ {
Expand All @@ -101,7 +112,8 @@ public void shouldCreateTransactionLogWithCheckpoint() throws Exception
@Test @Test
public void createTransactionLogWithCheckpointInCustomLocation() throws IOException public void createTransactionLogWithCheckpointInCustomLocation() throws IOException
{ {
createTransactionLogWithCheckpoint( Config.defaults( GraphDatabaseSettings.logical_logs_location, "custom-tx-logs") ); createTransactionLogWithCheckpoint( Config.defaults( GraphDatabaseSettings.logical_logs_location,
"custom-tx-logs") );
} }


private void createTransactionLogWithCheckpoint( Config config ) throws IOException private void createTransactionLogWithCheckpoint( Config config ) throws IOException
Expand All @@ -111,8 +123,8 @@ private void createTransactionLogWithCheckpoint( Config config ) throws IOExcept
int fromTxId = 37; int fromTxId = 37;
int endTxId = fromTxId + 5; int endTxId = fromTxId + 5;


TransactionLogCatchUpWriter catchUpWriter = new TransactionLogCatchUpWriter( storeDir, fs, pageCache, TransactionLogCatchUpWriter catchUpWriter = new TransactionLogCatchUpWriter( storeDir, fs, pageCache, config,
NullLogProvider.getInstance(), fromTxId, true ); NullLogProvider.getInstance(), fromTxId, partOfStoreCopy );


// when // when
for ( int i = fromTxId; i <= endTxId; i++ ) for ( int i = fromTxId; i <= endTxId; i++ )
Expand All @@ -123,28 +135,35 @@ private void createTransactionLogWithCheckpoint( Config config ) throws IOExcept
catchUpWriter.close(); catchUpWriter.close();


// then // then
verifyTransactionsInLog( fromTxId, endTxId, config ); LogFilesBuilder logFilesBuilder = LogFilesBuilder.activeFilesBuilder( storeDir, fs, pageCache );
verifyCheckpointInLog( config ); // necessary for recovery if ( !partOfStoreCopy )
{
logFilesBuilder.withConfig( config );
}
LogFiles logFiles = logFilesBuilder.build();

verifyTransactionsInLog( logFiles, fromTxId, endTxId );
if ( partOfStoreCopy )
{
verifyCheckpointInLog( logFiles );
}
} }


private void verifyCheckpointInLog( Config config ) throws IOException private void verifyCheckpointInLog( LogFiles logFiles )
{ {
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>( LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>(
new RecordStorageCommandReaderFactory(), InvalidLogEntryHandler.STRICT ); new RecordStorageCommandReaderFactory(), InvalidLogEntryHandler.STRICT );
LogFiles logFiles = LogFilesBuilder.activeFilesBuilder( storeDir, fs, pageCache )
.withConfig( config )
.withLogEntryReader( logEntryReader ).build();
final LogTailScanner logTailScanner = new LogTailScanner( logFiles, logEntryReader, new Monitors() ); final LogTailScanner logTailScanner = new LogTailScanner( logFiles, logEntryReader, new Monitors() );


LogTailInformation tailInformation = logTailScanner.getTailInformation(); LogTailInformation tailInformation = logTailScanner.getTailInformation();
assertNotNull( tailInformation.lastCheckPoint ); assertNotNull( tailInformation.lastCheckPoint );
assertTrue( tailInformation.commitsAfterLastCheckpoint() ); assertTrue( tailInformation.commitsAfterLastCheckpoint() );
} }


private void verifyTransactionsInLog( long fromTxId, long endTxId, Config config ) throws IOException private void verifyTransactionsInLog( LogFiles logFiles, long fromTxId, long endTxId ) throws
IOException
{ {
long expectedTxId = fromTxId; long expectedTxId = fromTxId;
LogFiles logFiles = LogFilesBuilder.activeFilesBuilder( storeDir, fs, pageCache ).withConfig( config ).build();
LogVersionedStoreChannel versionedStoreChannel = logFiles.openForVersion( 0 ); LogVersionedStoreChannel versionedStoreChannel = logFiles.openForVersion( 0 );
try ( ReadableLogChannel channel = try ( ReadableLogChannel channel =
new ReadAheadLogChannel( versionedStoreChannel, LogVersionBridge.NO_MORE_CHANNELS, 1024 ) ) new ReadAheadLogChannel( versionedStoreChannel, LogVersionBridge.NO_MORE_CHANNELS, 1024 ) )
Expand Down

0 comments on commit 4ce4297

Please sign in to comment.