Skip to content

Commit

Permalink
Merge pull request #10087 from burqen/3.3-native-index-log-crashpoint…
Browse files Browse the repository at this point in the history
…ercleanup

Label and schema index log internal recovery
  • Loading branch information
burqen committed Sep 27, 2017
2 parents b625815 + 91e2868 commit 27be54a
Show file tree
Hide file tree
Showing 29 changed files with 326 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,14 @@ public Result runFullConsistencyCheck( final File storeDir, Config config, Progr
} ) );

// Bootstrap kernel extensions
Monitors monitors = new Monitors();
LifeSupport life = new LifeSupport();
KernelExtensions extensions = life.add( instantiateKernelExtensions( storeDir,
fileSystem, config, new SimpleLogService( logProvider, logProvider ), pageCache,
RECOVERY_PREVENTING_COLLECTOR,
// May be enterprise edition, but in consistency checker we only care about the operational mode
COMMUNITY ) );
COMMUNITY,
monitors ) );

try ( NeoStores neoStores = factory.openAllNeoStores() )
{
Expand All @@ -240,7 +242,7 @@ fileSystem, config, new SimpleLogService( logProvider, logProvider ), pageCache,
SchemaIndexProviderMap indexes = loadSchemaIndexProviders( extensions );

LabelScanStore labelScanStore =
new NativeLabelScanStore( pageCache, storeDir, FullStoreChangeStream.EMPTY, true, new Monitors(),
new NativeLabelScanStore( pageCache, storeDir, FullStoreChangeStream.EMPTY, true, monitors,
RecoveryCleanupWorkCollector.IMMEDIATE );
life.add( labelScanStore );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.neo4j.kernel.impl.transaction.state.DefaultSchemaIndexProviderMap;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;

/**
* Utility for loading {@link SchemaIndexProvider} instances from {@link KernelExtensions}.
Expand All @@ -63,10 +64,10 @@ public static SchemaIndexProviderMap loadSchemaIndexProviders( KernelExtensions
@SuppressWarnings( "unchecked" )
public static KernelExtensions instantiateKernelExtensions( File storeDir, FileSystemAbstraction fileSystem,
Config config, LogService logService, PageCache pageCache,
RecoveryCleanupWorkCollector recoveryCollector, DatabaseInfo databaseInfo )
RecoveryCleanupWorkCollector recoveryCollector, DatabaseInfo databaseInfo, Monitors monitors )
{
Dependencies deps = new Dependencies();
deps.satisfyDependencies( fileSystem, config, logService, pageCache, recoveryCollector );
deps.satisfyDependencies( fileSystem, config, logService, pageCache, recoveryCollector, monitors );
@SuppressWarnings( "rawtypes" )
Iterable kernelExtensions = Service.load( KernelExtensionFactory.class );
KernelContext kernelContext = new SimpleKernelContext( storeDir, databaseInfo, deps );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,18 @@ public DirectStoreAccess directStoreAccess()
IndexStoreView indexStoreView =
new NeoStoreIndexStoreView( LockService.NO_LOCK_SERVICE, nativeStores.getRawNeoStores() );

LabelScanStore labelScanStore = startLabelScanStore( pageCache, indexStoreView );
SchemaIndexProviderMap indexes = createIndexes( pageCache, fileSystem, directory, config, logProvider );
Monitors monitors = new Monitors();
LabelScanStore labelScanStore = startLabelScanStore( pageCache, indexStoreView, monitors );
SchemaIndexProviderMap indexes = createIndexes( pageCache, fileSystem, directory, config, logProvider, monitors);
directStoreAccess = new DirectStoreAccess( nativeStores, labelScanStore, indexes );
}
return directStoreAccess;
}

private LabelScanStore startLabelScanStore( PageCache pageCache, IndexStoreView indexStoreView )
private LabelScanStore startLabelScanStore( PageCache pageCache, IndexStoreView indexStoreView, Monitors monitors )
{
NativeLabelScanStore labelScanStore =
new NativeLabelScanStore( pageCache, directory, new FullLabelStream( indexStoreView ), false, new Monitors(),
new NativeLabelScanStore( pageCache, directory, new FullLabelStream( indexStoreView ), false, monitors,
RecoveryCleanupWorkCollector.IMMEDIATE );
try
{
Expand All @@ -208,11 +209,11 @@ private LabelScanStore startLabelScanStore( PageCache pageCache, IndexStoreView
}

private SchemaIndexProviderMap createIndexes( PageCache pageCache, FileSystemAbstraction fileSystem, File storeDir,
Config config, LogProvider logProvider )
Config config, LogProvider logProvider, Monitors monitors )
{
LogService logService = new SimpleLogService( logProvider, logProvider );
KernelExtensions extensions = life.add( instantiateKernelExtensions( storeDir, fileSystem, config, logService,
pageCache, RECOVERY_PREVENTING_COLLECTOR, DatabaseInfo.COMMUNITY ) );
pageCache, RECOVERY_PREVENTING_COLLECTOR, DatabaseInfo.COMMUNITY, monitors ) );
return loadSchemaIndexProviders( extensions );
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.api.index;

import java.util.Map;

import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.logging.Log;

import static java.lang.String.format;

public class LoggingMonitor implements SchemaIndexProvider.Monitor
{
private final Log log;

public LoggingMonitor( Log log )
{
this.log = log;
}

@Override
public void failedToOpenIndex( long indexId, IndexDescriptor indexDescriptor, String action, Exception cause )
{
log.error( "Failed to open index:" + indexId + ". " + action, cause );
}

@Override
public void recoveryCompleted( long indexId, IndexDescriptor indexDescriptor, Map<String,Object> data )
{
StringBuilder builder =
new StringBuilder(
"Schema index recovery completed: indexId: " + indexId + " descriptor: " + indexDescriptor.toString() );
data.forEach( ( key, value ) -> builder.append( format( " %s: %s", key, value ) ) );
log.info( builder.toString() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.util.Map;

import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.Iterators;
Expand Down Expand Up @@ -92,6 +93,28 @@
*/
public abstract class SchemaIndexProvider extends LifecycleAdapter implements Comparable<SchemaIndexProvider>
{
public interface Monitor
{
Monitor EMPTY = new Monitor.Adaptor();

class Adaptor implements Monitor
{
@Override
public void failedToOpenIndex( long indexId, IndexDescriptor indexDescriptor, String action, Exception cause )
{ // no-op
}

@Override
public void recoveryCompleted( long indexId, IndexDescriptor indexDescriptor, Map<String,Object> data )
{ // no-op
}
}

void failedToOpenIndex( long indexId, IndexDescriptor indexDescriptor, String action, Exception cause );

void recoveryCompleted( long indexId, IndexDescriptor indexDescriptor, Map<String,Object> data );
}

public static final SchemaIndexProvider NO_INDEX_PROVIDER =
new SchemaIndexProvider( new Descriptor( "no-index-provider", "1.0" ), -1, IndexDirectoryStructure.NONE )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.sampling.DefaultNonUniqueIndexSampler;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.api.index.sampling.NonUniqueIndexSampler;
Expand All @@ -43,9 +45,9 @@ class NativeNonUniqueSchemaNumberIndexPopulator<KEY extends SchemaNumberKey, VAL
private NonUniqueIndexSampler sampler;

NativeNonUniqueSchemaNumberIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File storeFile, Layout<KEY,VALUE> layout,
IndexSamplingConfig samplingConfig )
IndexSamplingConfig samplingConfig, SchemaIndexProvider.Monitor monitor, IndexDescriptor descriptor, long indexId )
{
super( pageCache, fs, storeFile, layout );
super( pageCache, fs, storeFile, layout, monitor, descriptor, indexId );
this.samplingConfig = samplingConfig;
this.sampler = new DefaultNonUniqueIndexSampler( samplingConfig.sampleSizeLimit() );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,59 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.index.GBPTreeFileUtil;

import static org.neo4j.helpers.Format.duration;
import static org.neo4j.helpers.collection.MapUtil.map;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_READER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;

class NativeSchemaNumberIndex<KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue>
{
final PageCache pageCache;
final File storeFile;
final Layout<KEY,VALUE> layout;
final GBPTreeFileUtil gbpTreeFileUtil;
private final IndexDescriptor descriptor;
private final long indexId;
private final SchemaIndexProvider.Monitor monitor;

GBPTree<KEY,VALUE> tree;

NativeSchemaNumberIndex( PageCache pageCache, FileSystemAbstraction fs, File storeFile, Layout<KEY,VALUE> layout )
NativeSchemaNumberIndex( PageCache pageCache, FileSystemAbstraction fs, File storeFile, Layout<KEY,VALUE> layout,
SchemaIndexProvider.Monitor monitor, IndexDescriptor descriptor, long indexId )
{
this.pageCache = pageCache;
this.storeFile = storeFile;
this.layout = layout;
this.gbpTreeFileUtil = new GBPTreeFileSystemFileUtil( fs );
this.descriptor = descriptor;
this.indexId = indexId;
this.monitor = monitor;
}

void instantiateTree( RecoveryCleanupWorkCollector recoveryCleanupWorkCollector, Consumer<PageCursor> headerWriter )
throws IOException
{
ensureDirectoryExist();
tree = new GBPTree<>( pageCache, storeFile, layout, 0, NO_MONITOR, NO_HEADER_READER, headerWriter,
recoveryCleanupWorkCollector );
GBPTree.Monitor monitor = treeMonitor();
tree = new GBPTree<>( pageCache, storeFile, layout, 0, monitor, NO_HEADER_READER, headerWriter, recoveryCleanupWorkCollector );
}

private GBPTree.Monitor treeMonitor( )
{
return new GBPTree.Monitor.Adaptor()
{
@Override
public void cleanupFinished( long numberOfPagesVisited, long numberOfCleanedCrashPointers, long durationMillis )
{
monitor.recoveryCompleted( indexId, descriptor, map(
"Number of pages visited", numberOfPagesVisited,
"Number of cleaned crashed pointers", numberOfCleanedCrashPointers,
"Time spent", duration( durationMillis ) ) );
}
};
}

private void ensureDirectoryExist() throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.neo4j.kernel.api.index.IndexAccessor;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.storageengine.api.schema.IndexReader;

Expand All @@ -47,9 +49,10 @@ public class NativeSchemaNumberIndexAccessor<KEY extends SchemaNumberKey, VALUE
private final NativeSchemaNumberIndexUpdater<KEY,VALUE> singleUpdater;

NativeSchemaNumberIndexAccessor( PageCache pageCache, FileSystemAbstraction fs, File storeFile,
Layout<KEY,VALUE> layout, RecoveryCleanupWorkCollector recoveryCleanupWorkCollector ) throws IOException
Layout<KEY,VALUE> layout, RecoveryCleanupWorkCollector recoveryCleanupWorkCollector, SchemaIndexProvider.Monitor monitor,
IndexDescriptor descriptor, long indexId ) throws IOException
{
super( pageCache, fs, storeFile, layout );
super( pageCache, fs, storeFile, layout, monitor, descriptor, indexId );
singleUpdater = new NativeSchemaNumberIndexUpdater<>( layout.newKey(), layout.newValue() );
instantiateTree( recoveryCleanupWorkCollector, NO_HEADER_WRITER );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;

import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;

Expand All @@ -66,9 +68,10 @@ public abstract class NativeSchemaNumberIndexPopulator<KEY extends SchemaNumberK
private byte[] failureBytes;
private boolean dropped;

NativeSchemaNumberIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File storeFile, Layout<KEY,VALUE> layout )
NativeSchemaNumberIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File storeFile, Layout<KEY,VALUE> layout,
SchemaIndexProvider.Monitor monitor, IndexDescriptor descriptor, long indexId )
{
super( pageCache, fs, storeFile, layout );
super( pageCache, fs, storeFile, layout, monitor, descriptor, indexId );
this.treeKey = layout.newKey();
this.treeValue = layout.newValue();
this.conflictDetectingValueMerger = new ConflictDetectingValueMerger<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.storemigration.StoreMigrationParticipant;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static org.neo4j.kernel.impl.index.schema.NativeSchemaNumberIndexPopulator.BYTE_FAILED;
import static org.neo4j.kernel.impl.index.schema.NativeSchemaNumberIndexPopulator.BYTE_ONLINE;
Expand All @@ -52,17 +50,18 @@ public class NativeSchemaNumberIndexProvider extends SchemaIndexProvider

private final PageCache pageCache;
private final FileSystemAbstraction fs;
private final Log log;
private final Monitor monitor;
private final RecoveryCleanupWorkCollector recoveryCleanupWorkCollector;
private final boolean readOnly;

public NativeSchemaNumberIndexProvider( PageCache pageCache, FileSystemAbstraction fs, IndexDirectoryStructure.Factory directoryStructure,
LogProvider logging, RecoveryCleanupWorkCollector recoveryCleanupWorkCollector, boolean readOnly )
public NativeSchemaNumberIndexProvider( PageCache pageCache, FileSystemAbstraction fs,
IndexDirectoryStructure.Factory directoryStructure, Monitor monitor, RecoveryCleanupWorkCollector recoveryCleanupWorkCollector,
boolean readOnly )
{
super( NATIVE_PROVIDER_DESCRIPTOR, 0, directoryStructure );
this.pageCache = pageCache;
this.fs = fs;
this.log = logging.getLog( getClass() );
this.monitor = monitor;
this.recoveryCleanupWorkCollector = recoveryCleanupWorkCollector;
this.readOnly = readOnly;
}
Expand All @@ -79,9 +78,11 @@ public IndexPopulator getPopulator( long indexId, IndexDescriptor descriptor, In
switch ( descriptor.type() )
{
case GENERAL:
return new NativeNonUniqueSchemaNumberIndexPopulator<>( pageCache, fs, storeFile, new NonUniqueNumberLayout(), samplingConfig );
return new NativeNonUniqueSchemaNumberIndexPopulator<>( pageCache, fs, storeFile, new NonUniqueNumberLayout(), samplingConfig,
monitor, descriptor, indexId );
case UNIQUE:
return new NativeUniqueSchemaNumberIndexPopulator<>( pageCache, fs, storeFile, new UniqueNumberLayout() );
return new NativeUniqueSchemaNumberIndexPopulator<>( pageCache, fs, storeFile, new UniqueNumberLayout(), monitor, descriptor,
indexId );
default:
throw new UnsupportedOperationException( "Can not create index populator of type " + descriptor.type() );
}
Expand All @@ -104,7 +105,8 @@ public IndexAccessor getOnlineAccessor( long indexId, IndexDescriptor descriptor
default:
throw new UnsupportedOperationException( "Can not create index accessor of type " + descriptor.type() );
}
return new NativeSchemaNumberIndexAccessor<>( pageCache, fs, storeFile, layout, recoveryCleanupWorkCollector );
return new NativeSchemaNumberIndexAccessor<>( pageCache, fs, storeFile, layout, recoveryCleanupWorkCollector, monitor, descriptor,
indexId );
}

@Override
Expand Down Expand Up @@ -155,7 +157,7 @@ public InternalIndexState getInitialState( long indexId, IndexDescriptor descrip
}
catch ( IOException e )
{
log.error( "Failed to open index:" + indexId + ", requesting re-population.", e );
monitor.failedToOpenIndex( indexId, descriptor, "Requesting re-population.", e );
return InternalIndexState.POPULATING;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.sampling.UniqueIndexSampler;
import org.neo4j.storageengine.api.schema.IndexSample;

Expand All @@ -36,9 +38,10 @@ class NativeUniqueSchemaNumberIndexPopulator<KEY extends SchemaNumberKey, VALUE
{
private final UniqueIndexSampler sampler;

NativeUniqueSchemaNumberIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File storeFile, Layout<KEY,VALUE> layout )
NativeUniqueSchemaNumberIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File storeFile, Layout<KEY,VALUE> layout,
SchemaIndexProvider.Monitor monitor, IndexDescriptor descriptor, long indexId )
{
super( pageCache, fs, storeFile, layout );
super( pageCache, fs, storeFile, layout, monitor, descriptor, indexId );
this.sampler = new UniqueIndexSampler();
}

Expand Down

0 comments on commit 27be54a

Please sign in to comment.