Skip to content

Commit

Permalink
LogTailScanner updated
Browse files Browse the repository at this point in the history
LogTailScanner is now invoked once during startup to collect information about the tail of the transaction logs, and this result it reused by all component interested in such information.

If any log entries with unsupported version is found, a better error message is thrown to avoid confusion as to what went wrong.

We now also properly respect dbms.allow_upgrade when the transaction log format is changed.
  • Loading branch information
klaren committed Sep 5, 2017
1 parent 8ba01f9 commit 7697882
Show file tree
Hide file tree
Showing 29 changed files with 636 additions and 262 deletions.
Expand Up @@ -191,7 +191,7 @@ private void createGraphDbAndKillIt() throws Exception
tx.success();
}

fs.snapshot( () -> db.shutdown() );
fs.snapshot( db::shutdown );
}

private void runConsistencyCheckToolWith( FileSystemAbstraction fileSystem, String... args )
Expand Down
Expand Up @@ -104,6 +104,7 @@
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.LogVersionUpgradeChecker;
import org.neo4j.kernel.impl.transaction.log.LoggingLogFileMonitor;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
Expand Down Expand Up @@ -164,7 +165,6 @@
import org.neo4j.storageengine.api.StoreReadLayer;
import org.neo4j.time.SystemNanoClock;

import static org.neo4j.kernel.impl.transaction.log.entry.InvalidLogEntryHandler.STRICT;
import static org.neo4j.kernel.impl.transaction.log.pruning.LogPruneStrategyFactory.fromConfigValue;

public class NeoStoreDataSource implements Lifecycle, IndexProviders
Expand Down Expand Up @@ -427,9 +427,15 @@ public void start() throws IOException

life.add( new Delegate( Lifecycles.multiple( indexProviders.values() ) ) );

// Check the tail of transaction logs and validate version
final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, PhysicalLogFile.DEFAULT_NAME, fs );
final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>();
LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, logEntryReader );
LogVersionUpgradeChecker.check( tailScanner, config );

// Upgrade the store before we begin
RecordFormats formats = selectStoreFormats( config, storeDir, fs, pageCache, logService );
upgradeStore( formats );
upgradeStore( formats, tailScanner );

// Build all modules and their services
StorageEngine storageEngine = null;
Expand All @@ -446,24 +452,21 @@ public void start() throws IOException
propertyKeyTokenHolder, labelTokens, relationshipTypeTokens, legacyIndexProviderLookup,
indexConfigStore, databaseSchemaState, legacyIndexTransactionOrdering, operationalMode );

LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader =
new VersionAwareLogEntryReader<>( storageEngine.commandReaderFactory(), STRICT );

TransactionIdStore transactionIdStore = dependencies.resolveDependency( TransactionIdStore.class );
LogVersionRepository logVersionRepository = dependencies.resolveDependency( LogVersionRepository.class );
NeoStoreTransactionLogModule transactionLogModule =
buildTransactionLogs( storeDir, config, logProvider, scheduler, fs,
buildTransactionLogs( logFiles, config, logProvider, scheduler, fs,
storageEngine, logEntryReader, legacyIndexTransactionOrdering,
transactionIdStore, logVersionRepository );
transactionLogModule.satisfyDependencies(dependencies);

buildRecovery( fs,
transactionIdStore,
logVersionRepository,
tailScanner,
monitors.newMonitor( Recovery.Monitor.class ),
monitors.newMonitor( PositionToRecoverFrom.Monitor.class ),
transactionLogModule.logFiles(), startupStatistics,
storageEngine, logEntryReader, transactionLogModule.logicalTransactionStore()
logFiles, startupStatistics,
storageEngine, transactionLogModule.logicalTransactionStore()
);

// At the time of writing this comes from the storage engine (IndexStoreView)
Expand Down Expand Up @@ -558,7 +561,7 @@ private static RecordFormats selectStoreFormats( Config config, File storeDir, F
return formats;
}

private void upgradeStore( RecordFormats format )
private void upgradeStore( RecordFormats format, LogTailScanner tailScanner )
{
VisibleMigrationProgressMonitor progressMonitor =
new VisibleMigrationProgressMonitor( logService.getUserLog( StoreMigrator.class ) );
Expand All @@ -570,7 +573,7 @@ private void upgradeStore( RecordFormats format )
schemaIndexProviderMap,
indexProviders,
pageCache,
format ).migrate( storeDir );
format, tailScanner ).migrate( storeDir );
}

private StorageEngine buildStorageEngine(
Expand All @@ -596,7 +599,7 @@ private StorageEngine buildStorageEngine(
}

private NeoStoreTransactionLogModule buildTransactionLogs(
File storeDir,
PhysicalLogFiles logFiles,
Config config,
LogProvider logProvider,
JobScheduler scheduler,
Expand All @@ -607,8 +610,6 @@ private NeoStoreTransactionLogModule buildTransactionLogs(
{
TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache( 100_000 );
LogHeaderCache logHeaderCache = new LogHeaderCache( 1000 );
final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, PhysicalLogFile.DEFAULT_NAME,
fileSystemAbstraction );

final PhysicalLogFile logFile = life.add( new PhysicalLogFile( fileSystemAbstraction, logFiles,
config.get( GraphDatabaseSettings.logical_log_rotation_threshold ),
Expand Down Expand Up @@ -682,20 +683,17 @@ private NeoStoreTransactionLogModule buildTransactionLogs(
private void buildRecovery(
final FileSystemAbstraction fileSystemAbstraction,
TransactionIdStore transactionIdStore,
LogVersionRepository logVersionRepository,
LogTailScanner tailScanner,
Recovery.Monitor recoveryMonitor,
PositionToRecoverFrom.Monitor positionMonitor,
final PhysicalLogFiles logFiles,
final StartupStatisticsProvider startupStatistics,
StorageEngine storageEngine,
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader,
LogicalTransactionStore logicalTransactionStore )
{
final LogTailScanner logTailScanner =
new LogTailScanner( logFiles, fileSystemAbstraction, logEntryReader );
Recovery.SPI spi = new DefaultRecoverySPI(
storageEngine, logFiles, fileSystemAbstraction, logVersionRepository,
logTailScanner, transactionIdStore, logicalTransactionStore, positionMonitor );
Recovery.SPI spi =
new DefaultRecoverySPI( storageEngine, logFiles, fileSystemAbstraction, tailScanner, transactionIdStore,
logicalTransactionStore, positionMonitor );
Recovery recovery = new Recovery( spi, recoveryMonitor );
monitors.addMonitorListener( new Recovery.Monitor()
{
Expand Down
Expand Up @@ -61,11 +61,6 @@ public LogicalTransactionStore logicalTransactionStore()
return logicalTransactionStore;
}

public PhysicalLogFiles logFiles()
{
return logFiles;
}

CheckPointer checkPointing()
{
return checkPointer;
Expand Down
Expand Up @@ -63,11 +63,12 @@ public boolean isRecoveryRequiredAt( File dataDir ) throws IOException
}

long logVersion = MetaDataStore.getRecord( pageCache, neoStore, MetaDataStore.Position.LOG_VERSION );
MetaDataStore.setRecord( pageCache, neoStore, MetaDataStore.Position.LOG_VERSION, logVersion );
PhysicalLogFiles logFiles = new PhysicalLogFiles( dataDir, fs );

LogEntryReader<ReadableClosablePositionAwareChannel> reader = new VersionAwareLogEntryReader<>();

LogTailScanner finder = new LogTailScanner( logFiles, fs, reader );
return new PositionToRecoverFrom( finder, NO_MONITOR ).apply( logVersion ) != LogPosition.UNSPECIFIED;
LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, reader );
return new PositionToRecoverFrom( tailScanner, NO_MONITOR ).get() != LogPosition.UNSPECIFIED;
}
}
Expand Up @@ -110,7 +110,6 @@
import org.neo4j.kernel.spi.legacyindex.IndexImplementation;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.api.CommandReaderFactory;
import org.neo4j.storageengine.api.CommandsToApply;
import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.storageengine.api.StorageEngine;
Expand Down Expand Up @@ -153,7 +152,6 @@ public class RecordStorageEngine implements StorageEngine, Lifecycle
private final IdOrderingQueue legacyIndexTransactionOrdering;
private final LockService lockService;
private final WorkSync<Supplier<LabelScanWriter>,LabelUpdateWork> labelScanStoreSync;
private final CommandReaderFactory commandReaderFactory;
private final WorkSync<IndexingUpdateService,IndexUpdatesWork> indexUpdatesSync;
private final IndexStoreView indexStoreView;
private final LegacyIndexProviderLookup legacyIndexProviderLookup;
Expand Down Expand Up @@ -241,7 +239,6 @@ public RecordStorageEngine(

labelScanStoreSync = new WorkSync<>( labelScanStore::newWriter );

commandReaderFactory = new RecordStorageCommandReaderFactory();
indexUpdatesSync = new WorkSync<>( indexingService );

// Immutable state for creating/applying commands
Expand Down Expand Up @@ -276,12 +273,6 @@ public StoreReadLayer storeReadLayer()
return storeLayer;
}

@Override
public CommandReaderFactory commandReaderFactory()
{
return commandReaderFactory;
}

@SuppressWarnings( "resource" )
@Override
public void createCommands(
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.neo4j.kernel.impl.storemigration.participant.LegacyIndexMigrator;
import org.neo4j.kernel.impl.storemigration.participant.NativeLabelScanStoreMigrator;
import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.spi.legacyindex.IndexImplementation;
import org.neo4j.logging.LogProvider;

Expand All @@ -53,12 +54,13 @@ public class DatabaseMigrator
private final Map<String,IndexImplementation> indexProviders;
private final PageCache pageCache;
private final RecordFormats format;
private final LogTailScanner tailScanner;

public DatabaseMigrator(
MigrationProgressMonitor progressMonitor, FileSystemAbstraction fs,
Config config, LogService logService, SchemaIndexProviderMap schemaIndexProviderMap,
Map<String,IndexImplementation> indexProviders, PageCache pageCache,
RecordFormats format )
RecordFormats format, LogTailScanner tailScanner )
{
this.progressMonitor = progressMonitor;
this.fs = fs;
Expand All @@ -68,6 +70,7 @@ public DatabaseMigrator(
this.indexProviders = indexProviders;
this.pageCache = pageCache;
this.format = format;
this.tailScanner = tailScanner;
}

/**
Expand All @@ -79,8 +82,7 @@ public void migrate( File storeDir )
{
LogProvider logProvider = logService.getInternalLogProvider();
UpgradableDatabase upgradableDatabase =
new UpgradableDatabase( fs, new StoreVersionCheck( pageCache ),
format );
new UpgradableDatabase( new StoreVersionCheck( pageCache ), format, tailScanner );
StoreUpgrader storeUpgrader = new StoreUpgrader( upgradableDatabase, progressMonitor, config, fs, pageCache,
logProvider );

Expand Down
Expand Up @@ -20,9 +20,7 @@
package org.neo4j.kernel.impl.storemigration;

import java.io.File;
import java.io.IOException;

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.format.FormatFamily;
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
Expand All @@ -35,27 +33,23 @@
import org.neo4j.kernel.impl.storemigration.StoreVersionCheck.Result;
import org.neo4j.kernel.impl.storemigration.StoreVersionCheck.Result.Outcome;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;

/**
* Logic to check whether a database version is upgradable to the current version. It looks at the
* version information found in the store files themselves.
*/
public class UpgradableDatabase
{
private final FileSystemAbstraction fs;
private final StoreVersionCheck storeVersionCheck;
private final RecordFormats format;
private final LogTailScanner tailScanner;

public UpgradableDatabase( FileSystemAbstraction fs,
StoreVersionCheck storeVersionCheck, RecordFormats format )
public UpgradableDatabase( StoreVersionCheck storeVersionCheck, RecordFormats format,
LogTailScanner tailScanner )
{
this.fs = fs;
this.storeVersionCheck = storeVersionCheck;
this.format = format;
this.tailScanner = tailScanner;
}

/**
Expand Down Expand Up @@ -103,7 +97,7 @@ public RecordFormats checkUpgradeable( File storeDirectory )
}
else
{
result = checkCleanShutDownByCheckPoint( storeDirectory );
result = checkCleanShutDownByCheckPoint();
if ( result.outcome.isSuccessful() )
{
return fromFormat;
Expand Down Expand Up @@ -133,22 +127,17 @@ public RecordFormats checkUpgradeable( File storeDirectory )
}
}

private Result checkCleanShutDownByCheckPoint( File storeDirectory )
private Result checkCleanShutDownByCheckPoint()
{
// check version
PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDirectory, fs );
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>();
LogTailScanner logTailScanner =
new LogTailScanner( logFiles, fs, logEntryReader );
try
{
LogTailScanner.LogTailInformation logTailInformation = logTailScanner.find( logFiles.getHighestLogVersion() );
if ( !logTailInformation.commitsAfterLastCheckPoint )
if ( !tailScanner.getTailInformation().commitsAfterLastCheckPoint )
{
return new Result( Result.Outcome.ok, null, null );
}
}
catch ( IOException e )
catch ( Throwable throwable )
{
// ignore exception and return db not cleanly shutdown
}
Expand Down

0 comments on commit 7697882

Please sign in to comment.