Skip to content

Commit

Permalink
Use monitors in log tail scanner for logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Oct 3, 2017
1 parent e51a186 commit 364b4f1
Show file tree
Hide file tree
Showing 28 changed files with 201 additions and 105 deletions.
Expand Up @@ -43,9 +43,9 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.pagecache.ConfigurableStandalonePageCacheFactory;
import org.neo4j.kernel.impl.recovery.RecoveryRequiredChecker;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.FormattedLogProvider;

import static java.lang.String.format;
Expand Down Expand Up @@ -224,7 +224,8 @@ private void checkDbState( File storeDir, Config additionalConfiguration ) throw
PageCache pageCache = ConfigurableStandalonePageCacheFactory
.createPageCache( fileSystem, additionalConfiguration ) )
{
RecoveryRequiredChecker requiredChecker = new RecoveryRequiredChecker( fileSystem, pageCache, NullLogService.getInstance() );
RecoveryRequiredChecker requiredChecker =
new RecoveryRequiredChecker( fileSystem, pageCache, new Monitors() );
if ( requiredChecker.isRecoveryRequiredAt( storeDir ) )
{
throw new CommandFailed(
Expand Down
Expand Up @@ -35,9 +35,9 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.pagecache.ConfigurableStandalonePageCacheFactory;
import org.neo4j.kernel.impl.recovery.RecoveryRequiredChecker;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.FormattedLogProvider;
import org.neo4j.logging.LogProvider;

Expand Down Expand Up @@ -134,7 +134,7 @@ private void checkDbState( File storeDir, Config tuningConfiguration ) throws To
{
try ( PageCache pageCache = ConfigurableStandalonePageCacheFactory.createPageCache( fs, tuningConfiguration ) )
{
RecoveryRequiredChecker requiredChecker = new RecoveryRequiredChecker( fs, pageCache, NullLogService.getInstance() );
RecoveryRequiredChecker requiredChecker = new RecoveryRequiredChecker( fs, pageCache, new Monitors() );
if ( requiredChecker.isRecoveryRequiredAt( storeDir ) )
{
throw new ToolFailureException( Strings.joinAsLines(
Expand Down
Expand Up @@ -102,7 +102,6 @@
import org.neo4j.kernel.impl.transaction.log.LogFileInformation;
import org.neo4j.kernel.impl.transaction.log.LogHeaderCache;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.LogVersionUpgradeChecker;
import org.neo4j.kernel.impl.transaction.log.LoggingLogFileMonitor;
Expand Down Expand Up @@ -150,6 +149,8 @@
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.kernel.monitoring.tracing.Tracers;
import org.neo4j.kernel.recovery.DefaultRecoverySPI;
import org.neo4j.kernel.recovery.LogTailScanner;
import org.neo4j.kernel.recovery.LoggingLogTailScannerMonitor;
import org.neo4j.kernel.recovery.PositionToRecoverFrom;
import org.neo4j.kernel.recovery.Recovery;
import org.neo4j.kernel.recovery.TransactionLogPruner;
Expand Down Expand Up @@ -428,7 +429,8 @@ public void start() throws IOException
// Check the tail of transaction logs and validate version
final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, PhysicalLogFile.DEFAULT_NAME, fs );
final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>();
LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, logEntryReader, logService );
LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, logEntryReader, monitors );
monitors.addMonitorListener( new LoggingLogTailScannerMonitor( logService.getInternalLog( LogTailScanner.class ) ) );
LogVersionUpgradeChecker.check( tailScanner, config );

// Upgrade the store before we begin
Expand Down
Expand Up @@ -24,14 +24,14 @@

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.kernel.recovery.LogTailScanner;
import org.neo4j.kernel.recovery.PositionToRecoverFrom;

import static org.neo4j.kernel.recovery.PositionToRecoverFrom.NO_MONITOR;
Expand All @@ -43,13 +43,13 @@ public class RecoveryRequiredChecker
{
private final FileSystemAbstraction fs;
private final PageCache pageCache;
private final LogService logService;
private final Monitors monitors;

public RecoveryRequiredChecker( FileSystemAbstraction fs, PageCache pageCache, LogService logService )
public RecoveryRequiredChecker( FileSystemAbstraction fs, PageCache pageCache, Monitors monitors )
{
this.fs = fs;
this.pageCache = pageCache;
this.logService = logService;
this.monitors = monitors;
}

public boolean isRecoveryRequiredAt( File dataDir ) throws IOException
Expand All @@ -63,7 +63,7 @@ public boolean isRecoveryRequiredAt( File dataDir ) throws IOException

PhysicalLogFiles logFiles = new PhysicalLogFiles( dataDir, fs );
LogEntryReader<ReadableClosablePositionAwareChannel> reader = new VersionAwareLogEntryReader<>();
LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, reader, logService );
LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, reader, monitors );
return new PositionToRecoverFrom( tailScanner, NO_MONITOR ).get() != LogPosition.UNSPECIFIED;
}
}
Expand Up @@ -33,7 +33,7 @@
import org.neo4j.kernel.impl.storemigration.participant.ExplicitIndexMigrator;
import org.neo4j.kernel.impl.storemigration.participant.NativeLabelScanStoreMigrator;
import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.recovery.LogTailScanner;
import org.neo4j.kernel.spi.explicitindex.IndexImplementation;
import org.neo4j.logging.LogProvider;

Expand Down Expand Up @@ -81,8 +81,7 @@ public DatabaseMigrator(
public void migrate( File storeDir )
{
LogProvider logProvider = logService.getInternalLogProvider();
UpgradableDatabase upgradableDatabase =
new UpgradableDatabase( new StoreVersionCheck( pageCache ), format, tailScanner, logService );
UpgradableDatabase upgradableDatabase = new UpgradableDatabase( new StoreVersionCheck( pageCache ), format, tailScanner );
StoreUpgrader storeUpgrader = new StoreUpgrader( upgradableDatabase, progressMonitor, config, fs, pageCache,
logProvider );

Expand Down
Expand Up @@ -21,7 +21,6 @@

import java.io.File;

import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.format.FormatFamily;
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
Expand All @@ -33,7 +32,7 @@
import org.neo4j.kernel.impl.storemigration.StoreUpgrader.UpgradingStoreVersionNotFoundException;
import org.neo4j.kernel.impl.storemigration.StoreVersionCheck.Result;
import org.neo4j.kernel.impl.storemigration.StoreVersionCheck.Result.Outcome;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.recovery.LogTailScanner;

/**
* Logic to check whether a database version is upgradable to the current version. It looks at the
Expand All @@ -44,15 +43,13 @@ public class UpgradableDatabase
private final StoreVersionCheck storeVersionCheck;
private final RecordFormats format;
private final LogTailScanner tailScanner;
private final LogService logService;

public UpgradableDatabase( StoreVersionCheck storeVersionCheck, RecordFormats format,
LogTailScanner tailScanner, LogService logService )
LogTailScanner tailScanner )
{
this.storeVersionCheck = storeVersionCheck;
this.format = format;
this.tailScanner = tailScanner;
this.logService = logService;
}

/**
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.storemigration.UpgradeNotAllowedByConfigurationException;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion;
import org.neo4j.kernel.recovery.LogTailScanner;

/**
* Here we check the latest entry in the transaction log and make sure it matches the current version, if this check
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
Expand Down
Expand Up @@ -17,22 +17,29 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log;
package org.neo4j.kernel.recovery;

import java.io.IOException;

import org.neo4j.helpers.Exceptions;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.transaction.log.LogEntryCursor;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.entry.CheckPoint;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion;
import org.neo4j.kernel.impl.transaction.log.entry.UnsupportedLogVersionException;
import org.neo4j.logging.Log;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;

import static java.lang.String.format;
Expand All @@ -48,20 +55,20 @@
*/
public class LogTailScanner
{
public static long NO_TRANSACTION_ID = -1;
static long NO_TRANSACTION_ID = -1;
private final PhysicalLogFiles logFiles;
private final FileSystemAbstraction fileSystem;
private final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader;
private LogTailInformation logTailInformation;
private final Log log;
private final LogTailScannerMonitor monitor;

public LogTailScanner( PhysicalLogFiles logFiles, FileSystemAbstraction fileSystem,
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader, LogService logService )
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader, Monitors monitors )
{
this.logFiles = logFiles;
this.fileSystem = fileSystem;
this.logEntryReader = logEntryReader;
this.log = logService.getInternalLog( LogTailScanner.class );
this.monitor = monitors.newMonitor( LogTailScannerMonitor.class );
}

private LogTailInformation findLogTail() throws IOException
Expand Down Expand Up @@ -130,8 +137,7 @@ else if ( entry instanceof LogEntryStart )
{
if ( FeatureToggles.flag( LogTailScanner.class, "force", false ) )
{
log.warn( "Unsupported log version was found in transactional logs, but log processing was " +
"forced.", t );
monitor.forced( t );
}
else
{
Expand All @@ -142,7 +148,7 @@ else if ( entry instanceof LogEntryStart )
}
}
corruptedTransactionLogs = true;
log.warn( format( "Fail to read transaction log version %d.", version ), t );
monitor.corruptedLogFile( version, t );
}

if ( latestCheckPoint != null )
Expand Down Expand Up @@ -218,9 +224,9 @@ protected ExtractedTransactionRecord extractFirstTxIdAfterPosition( LogPosition
}
}
}
catch ( Exception e )
catch ( Throwable t )
{
// TODO: propagate error whats was wrong
monitor.corruptedLogFile( currentPosition.getLogVersion(), t );
transactionRecord.setFailure( true );
return transactionRecord;
}
Expand Down
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.recovery;

public interface LogTailScannerMonitor
{
void forced( Throwable t );

void corruptedLogFile( long version, Throwable t );
}
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.recovery;

import org.neo4j.logging.Log;

public class LoggingLogTailScannerMonitor implements LogTailScannerMonitor
{
private final Log log;

public LoggingLogTailScannerMonitor( Log log )
{
this.log = log;
}

@Override
public void forced( Throwable t )
{
log.warn( "Unsupported log version was found in transactional logs, but log processing was forced.", t );
}

@Override
public void corruptedLogFile( long version, Throwable t )
{
log.warn( String.format( "Fail to read transaction log version %d.", version ), t );
}
}
Expand Up @@ -24,7 +24,6 @@
import org.neo4j.function.ThrowingSupplier;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;

import static org.neo4j.kernel.impl.transaction.log.LogVersionRepository.INITIAL_LOG_VERSION;

Expand Down
Expand Up @@ -49,7 +49,6 @@
import org.neo4j.kernel.impl.transaction.log.FlushablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.LogHeaderCache;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
Expand All @@ -68,6 +67,7 @@
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.Lifespan;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.kernel.recovery.LogTailScanner;
import org.neo4j.kernel.recovery.Recovery;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.storageengine.api.StorageCommand;
Expand Down

0 comments on commit 364b4f1

Please sign in to comment.