Skip to content

Commit

Permalink
Specific hanling of upgrade transaction during update pooling
Browse files Browse the repository at this point in the history
During store upgrade in case if transaction logs are missing upgrade transaction checksum will be set to predefined constant value
(since we can't get find real value in the logs).
In HA setup upgraded store will be copied by slaves and update pullers will try to fetch updates starting from upgrade transaction id.
Master should be able to recognise that specific upgrade transaction id and use checksum that was assign to it during upgrade.
  • Loading branch information
MishaDemianenko authored and digitalstain committed Oct 26, 2016
1 parent 0d64db5 commit 006dce8
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 62 deletions.
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;

import org.neo4j.cursor.IOCursor;
import org.neo4j.helpers.collection.Pair;
Expand All @@ -33,7 +34,6 @@
import org.neo4j.kernel.impl.storemigration.ExistingTargetStrategy;
import org.neo4j.kernel.impl.storemigration.FileOperation;
import org.neo4j.kernel.impl.transaction.log.LogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.NoSuchTransactionException;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
Expand Down Expand Up @@ -118,10 +118,10 @@ public void migrateLogs( File storeDir, File migrationDir ) throws IOException
}
}

public TransactionId getTransactionInformation( File storeDir, long transactionId ) throws IOException
public Optional<TransactionId> getTransactionInformation( File storeDir, long transactionId ) throws IOException
{
List<File> logFiles = Arrays.asList( fs.listFiles( storeDir, versionedLegacyLogFilesFilter ) );
Collections.sort( logFiles, NEWEST_FIRST );
logFiles.sort( NEWEST_FIRST );
for ( File file : logFiles )
{
Pair<LogHeader, IOCursor<LogEntry>> pair = reader.openReadableChannel( file );
Expand All @@ -143,8 +143,8 @@ else if ( logEntry instanceof LogEntryCommit )
LogEntryCommit commitEntry = logEntry.as();
if ( commitEntry.getTxId() == transactionId )
{
return new TransactionId( transactionId, startEntry.checksum(),
commitEntry.getTimeWritten() );
return Optional.of( new TransactionId( transactionId, startEntry.checksum(),
commitEntry.getTimeWritten() ) );
}
}
}
Expand All @@ -155,7 +155,7 @@ else if ( logEntry instanceof LogEntryCommit )
break;
}
}
throw new NoSuchTransactionException( transactionId );
return Optional.empty();
}

public void operate( FileOperation op, File from, File to ) throws IOException
Expand Down
Expand Up @@ -33,7 +33,9 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import org.neo4j.helpers.collection.Iterables;
import org.neo4j.io.fs.FileSystemAbstraction;
Expand Down Expand Up @@ -78,7 +80,6 @@
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.lifecycle.Lifespan;
import org.neo4j.logging.Log;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds;
import org.neo4j.unsafe.impl.batchimport.BatchImporter;
Expand Down Expand Up @@ -136,7 +137,6 @@ public class StoreMigrator extends AbstractStoreMigrationParticipant
private final FileSystemAbstraction fileSystem;
private final PageCache pageCache;
private final SchemaIndexProvider schemaIndexProvider;
private final Log log;

public StoreMigrator( FileSystemAbstraction fileSystem, PageCache pageCache, Config config,
LogService logService, SchemaIndexProvider schemaIndexProvider )
Expand All @@ -154,7 +154,6 @@ public StoreMigrator( FileSystemAbstraction fileSystem, PageCache pageCache, Con
this.logService = logService;
this.schemaIndexProvider = schemaIndexProvider;
this.legacyLogs = legacyLogs;
this.log = logService.getInternalLog( StoreMigrator.class );
}

@Override
Expand Down Expand Up @@ -278,33 +277,43 @@ private static File lastTxLogPositionFile( File migrationDir )
return new File( migrationDir, "lastxlogposition" );
}

// accessible for tests
TransactionId extractTransactionIdInformation( File neoStore, File storeDir, long txId )
TransactionId extractTransactionIdInformation( File neoStore, File storeDir, long lastTransactionId )
throws IOException
{
long checksum = MetaDataStore.getRecord( pageCache, neoStore, Position.LAST_TRANSACTION_CHECKSUM );
long commitTimestamp = MetaDataStore.getRecord( pageCache, neoStore,
Position.LAST_TRANSACTION_COMMIT_TIMESTAMP );
if ( checksum != FIELD_NOT_PRESENT && commitTimestamp != FIELD_NOT_PRESENT )
{
return new TransactionId( txId, checksum, commitTimestamp );
return new TransactionId( lastTransactionId, checksum, commitTimestamp );
}
// The legacy store we're migrating doesn't have this record in neostore so try to extract it from tx log
try
{
return legacyLogs.getTransactionInformation( storeDir, txId );
}
catch ( IOException ioe )
{
log.error( "Extraction of transaction " + txId + " from legacy logs failed.", ioe );
// OK, so we could not get the transaction information from the legacy store logs,
// so just generate a random new one. I don't think it matters since we know that in a
// multi-database scenario there can only be one of them upgrading, the other ones will have to
// copy that database.
return txId == TransactionIdStore.BASE_TX_ID
? new TransactionId( txId, BASE_TX_CHECKSUM, BASE_TX_COMMIT_TIMESTAMP )
: new TransactionId( txId, UNKNOWN_TX_CHECKSUM, UNKNOWN_TX_COMMIT_TIMESTAMP );
}

Optional<TransactionId> transactionInformation = legacyLogs.getTransactionInformation( storeDir, lastTransactionId );
return transactionInformation.orElseGet( specificTransactionInformationSupplier( lastTransactionId ) );
}

/**
* In case if we can't find information about transaction in legacy logs we will create new transaction
* information record.
* Those should be used <b>only</b> in case if we do not have any transaction logs available during
* migration.
*
* Logs can be absent in two possible scenarios:
* <ol>
* <li>We do not have any logs since there were not transaction.</li>
* <li>Logs are missing.</li>
* </ol>
* For both of those cases specific informational records will be produced.
*
* @param lastTransactionId last committed transaction id
* @return supplier of custom id records.
*/
private Supplier<TransactionId> specificTransactionInformationSupplier( long lastTransactionId )
{
return () -> lastTransactionId == TransactionIdStore.BASE_TX_ID
? new TransactionId( lastTransactionId, BASE_TX_CHECKSUM, BASE_TX_COMMIT_TIMESTAMP )
: new TransactionId( lastTransactionId, UNKNOWN_TX_CHECKSUM, UNKNOWN_TX_COMMIT_TIMESTAMP );
}

private LogPosition extractTransactionLogPosition( File neoStore, File storeDir, long lastTxId ) throws IOException
Expand Down
Expand Up @@ -43,19 +43,20 @@
import org.neo4j.kernel.impl.transaction.log.ArrayIOCursor;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.MissingLogDataException;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommand;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
import org.neo4j.kernel.impl.transaction.log.entry.OnePhaseCommit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.kernel.impl.store.record.Record.NO_LABELS_FIELD;
import static org.neo4j.kernel.impl.storemigration.legacylogs.LegacyLogFilenames.getLegacyLogFilename;
import static org.neo4j.kernel.impl.storemigration.legacylogs.LegacyLogFilenames.versionedLegacyLogFilesFilter;
import static org.neo4j.kernel.impl.transaction.log.PhysicalLogFile.DEFAULT_NAME;
Expand Down Expand Up @@ -223,9 +224,39 @@ public void transactionInformationRetrievedFromCommitEntries() throws IOExceptio
LegacyLogEntryWriter writer = new LegacyLogEntryWriter( fs );
LegacyLogs legacyLogs = new LegacyLogs( fs, reader, writer );

assertEquals( newTransactionId( 1 ), legacyLogs.getTransactionInformation( storeDir, 1 ) );
assertEquals( newTransactionId( 2 ), legacyLogs.getTransactionInformation( storeDir, 2 ) );
assertEquals( newTransactionId( 3 ), legacyLogs.getTransactionInformation( storeDir, 3 ) );
assertEquals( newTransactionId( 1 ), getTransactionInformation( legacyLogs, 1 ) );
assertEquals( newTransactionId( 2 ), getTransactionInformation( legacyLogs, 2 ) );
assertEquals( newTransactionId( 3 ), getTransactionInformation( legacyLogs, 3 ) );
}

@Test(expected = IOException.class)
@SuppressWarnings( "unchecked" )
public void ioExceptionsPropagatedWhenFailToReadLegacyLog() throws IOException
{
File logFile = new File( LegacyLogFilenames.getLegacyLogFilename( 1 ) );
when( fs.listFiles( any( File.class ), any( FilenameFilter.class ) ) )
.thenReturn( new File[]{logFile} );

when( reader.openReadableChannel( any( File.class ) ) ).thenThrow( IOException.class );

LegacyLogs legacyLogs = new LegacyLogs( fs, reader, writer );
getTransactionInformation( legacyLogs, 1 );
}

@Test
public void noTransactionalInformationWhenLogsNotPresent() throws IOException
{
when( fs.listFiles( any( File.class ), any( FilenameFilter.class ) ) )
.thenReturn( new File[]{} );

LegacyLogs legacyLogs = new LegacyLogs( fs, reader, writer );
assertFalse( "There are not logs. Nothing to return",
legacyLogs.getTransactionInformation( storeDir, 1 ).isPresent() );
}

private TransactionId getTransactionInformation( LegacyLogs legacyLogs, int transactionId ) throws IOException
{
return legacyLogs.getTransactionInformation( storeDir, transactionId ).orElseThrow( MissingLogDataException::new);
}

private String getLogFilenameForVersion( int version )
Expand Down
Expand Up @@ -25,12 +25,15 @@
import org.junit.runners.Parameterized;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.Function;

import org.neo4j.graphdb.mockfs.DelegatingFileSystemAbstraction;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.configuration.Config;
Expand Down
Expand Up @@ -25,6 +25,7 @@

import java.io.File;
import java.io.IOException;
import java.util.Optional;

import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
Expand Down Expand Up @@ -127,7 +128,7 @@ public void shouldExtractTransactionInformationFromLegacyLogsWhenCantFindInStore
Config config = mock( Config.class );
LogService logService = mock( LogService.class );
LegacyLogs legacyLogs = mock( LegacyLogs.class );
when( legacyLogs.getTransactionInformation( storeDir, txId ) ).thenReturn( expected );
when( legacyLogs.getTransactionInformation( storeDir, txId ) ).thenReturn( Optional.of( expected ) );

// when
// ... neoStore is empty and with migrator
Expand All @@ -140,23 +141,16 @@ public void shouldExtractTransactionInformationFromLegacyLogsWhenCantFindInStore
}

@Test
public void shouldGenerateTransactionInformationAsLastOption() throws Exception
public void shouldGenerateTransactionInformationWhenLogsNotPresent() throws Exception
{
// given
// ... variables
long txId = 42;
TransactionId expected = new TransactionId( txId, FIELD_NOT_PRESENT, UNKNOWN_TX_COMMIT_TIMESTAMP );

// ... and files
PageCache pageCache = pageCacheRule.getPageCache( fs );
File storeDir = directory.graphDbDir();
File neoStore = new File( storeDir, DEFAULT_NAME );
neoStore.createNewFile();

// ... and mocks
Config config = mock( Config.class );
AssertableLogProvider logProvider = new AssertableLogProvider();
LogService logService = new SimpleLogService( NullLogProvider.getInstance(), logProvider );
LogService logService = new SimpleLogService( NullLogProvider.getInstance(), NullLogProvider.getInstance() );
LegacyLogs legacyLogs = mock( LegacyLogs.class );

// when
Expand All @@ -165,17 +159,45 @@ public void shouldGenerateTransactionInformationAsLastOption() throws Exception
assertEquals( FIELD_NOT_PRESENT, getRecord( pageCache, neoStore, LAST_TRANSACTION_CHECKSUM ) );
assertEquals( FIELD_NOT_PRESENT, getRecord( pageCache, neoStore, LAST_TRANSACTION_COMMIT_TIMESTAMP ) );
// ... and transaction not in log
when( legacyLogs.getTransactionInformation( storeDir, txId ) ).thenThrow( NoSuchTransactionException.class );
when( legacyLogs.getTransactionInformation( storeDir, txId ) ).thenReturn( Optional.empty() );
// ... and with migrator
StoreMigrator migrator = new StoreMigrator( fs, pageCache, config, logService, schemaIndexProvider );
TransactionId actual = migrator.extractTransactionIdInformation( neoStore, storeDir, txId );

// then
logProvider.assertContainsMessageContaining( "Extraction of transaction " + txId + " from legacy logs failed.");
assertEquals( expected.transactionId(), actual.transactionId() );
assertEquals( txId, actual.transactionId() );
assertEquals( TransactionIdStore.UNKNOWN_TX_CHECKSUM, actual.checksum() );
assertEquals( expected.commitTimestamp(), actual.commitTimestamp() );
// We do not expect checksum to be equal as it is randomly generated
assertEquals( TransactionIdStore.UNKNOWN_TX_COMMIT_TIMESTAMP, actual.commitTimestamp() );
}

@Test
public void shouldGenerateTransactionInformationWhenLogsAreEmpty() throws Exception
{
// given
long txId = 1;
PageCache pageCache = pageCacheRule.getPageCache( fs );
File storeDir = directory.graphDbDir();
File neoStore = new File( storeDir, DEFAULT_NAME );
neoStore.createNewFile();
Config config = mock( Config.class );
LogService logService = new SimpleLogService( NullLogProvider.getInstance(), NullLogProvider.getInstance() );
LegacyLogs legacyLogs = mock( LegacyLogs.class );

// when
// ... transaction info not in neo store
assertEquals( FIELD_NOT_PRESENT, getRecord( pageCache, neoStore, LAST_TRANSACTION_ID ) );
assertEquals( FIELD_NOT_PRESENT, getRecord( pageCache, neoStore, LAST_TRANSACTION_CHECKSUM ) );
assertEquals( FIELD_NOT_PRESENT, getRecord( pageCache, neoStore, LAST_TRANSACTION_COMMIT_TIMESTAMP ) );
// ... and transaction not in log
when( legacyLogs.getTransactionInformation( storeDir, txId ) ).thenReturn( Optional.empty() );
// ... and with migrator
StoreMigrator migrator = new StoreMigrator( fs, pageCache, config, logService, schemaIndexProvider );
TransactionId actual = migrator.extractTransactionIdInformation( neoStore, storeDir, txId );

// then
assertEquals( txId, actual.transactionId() );
assertEquals( TransactionIdStore.BASE_TX_CHECKSUM, actual.checksum() );
assertEquals( TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP, actual.commitTimestamp() );
}

@Test
Expand Down

0 comments on commit 006dce8

Please sign in to comment.