Skip to content

Commit

Permalink
Add possibility to register monitor for native label scan store
Browse files Browse the repository at this point in the history
And underlying GBPTree through Monitors.
  • Loading branch information
burqen committed Apr 3, 2017
1 parent 30c5cf6 commit fa38be7
Show file tree
Hide file tree
Showing 17 changed files with 83 additions and 78 deletions.
Expand Up @@ -65,6 +65,7 @@
import org.neo4j.kernel.impl.transaction.state.storeview.NeoStoreIndexStoreView;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.DuplicatingLog;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -239,8 +240,8 @@ public Result runFullConsistencyCheck( final File storeDir, Config config, Progr
{
IndexStoreView indexStoreView = new NeoStoreIndexStoreView( LockService.NO_LOCK_SERVICE, neoStores );
Dependencies dependencies = new Dependencies();
dependencies.satisfyDependencies( config, fileSystem,
new SimpleLogService( logProvider, logProvider ), indexStoreView, pageCache );
dependencies.satisfyDependencies( config, fileSystem, new SimpleLogService( logProvider, logProvider ),
indexStoreView, pageCache, new Monitors() );
KernelContext kernelContext = new SimpleKernelContext( storeDir, UNKNOWN, dependencies );
KernelExtensions extensions = life.add( new KernelExtensions(
kernelContext, (Iterable) load( KernelExtensionFactory.class ), dependencies, ignore() ) );
Expand Down
Expand Up @@ -32,58 +32,40 @@
public class LoggingMonitor implements Monitor
{
private final Log log;
private final Monitor delegate;

public LoggingMonitor( Log log )
{
this( log, Monitor.EMPTY );
}

public LoggingMonitor( Log log, Monitor delegate )
{
this.log = log;
this.delegate = delegate;
}

@Override
public void init()
{
delegate.init();
}

@Override
public void noIndex()
{
log.info( "No scan store found, this might just be first use. Preparing to rebuild." );
delegate.noIndex();
}

@Override
public void lockedIndex( Exception e )
{
log.error( "Scan store is locked by another process or database", e );
delegate.lockedIndex( e );
}

@Override
public void notValidIndex()
{
log.warn( "Scan store could not be read. Preparing to rebuild." );
delegate.notValidIndex();
}

@Override
public void rebuilding()
{
log.info( "Rebuilding scan store, this may take a while" );
delegate.rebuilding();
}

@Override
public void rebuilt( long roughNodeCount )
{
log.info( "Scan store rebuilt (roughly " + roughNodeCount + " nodes)" );
delegate.rebuilt( roughNodeCount );
}

@Override
Expand All @@ -92,6 +74,5 @@ public void recoveryCompleted( Map<String,Object> data )
StringBuilder builder = new StringBuilder( "Scan store recovery completed:" );
data.forEach( (key,value) -> builder.append( format( " %s: %s", key, value ) ) );
log.info( builder.toString() );
delegate.recoveryCompleted( data );
}
}
Expand Up @@ -25,22 +25,20 @@
import org.neo4j.graphdb.factory.GraphDatabaseSettings.LabelIndex;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.labelscan.LoggingMonitor;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.labelscan.LabelScanStore.Monitor;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.impl.api.index.IndexStoreView;
import org.neo4j.kernel.impl.index.labelscan.NativeLabelScanStore;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.spi.KernelContext;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;

public class NativeLabelScanStoreExtension extends
KernelExtensionFactory<NativeLabelScanStoreExtension.Dependencies>
{
private static final String NAME = LabelIndex.NATIVE.name();
private final LabelScanStore.Monitor monitor;

public interface Dependencies
{
Expand All @@ -51,30 +49,28 @@ public interface Dependencies
Supplier<IndexStoreView> indexStoreView();

LogService getLogService();
}

public NativeLabelScanStoreExtension()
{
this( LabelScanStore.Monitor.EMPTY );
Monitors monitors();
}

public NativeLabelScanStoreExtension( LabelScanStore.Monitor monitor )
public NativeLabelScanStoreExtension()
{
super( NAME );
this.monitor = monitor;
}

@Override
public Lifecycle newInstance( KernelContext context, Dependencies dependencies ) throws Throwable
{
Log log = dependencies.getLogService().getInternalLog( NativeLabelScanStore.class );
Monitor monitor = new LoggingMonitor( log, this.monitor );
Monitors monitors = dependencies.monitors();
monitors.addMonitorListener( new LoggingMonitor( log ) );
NativeLabelScanStore labelScanStore = new NativeLabelScanStore(
dependencies.pageCache(),
context.storeDir(),
new FullLabelStream( dependencies.indexStoreView() ),
dependencies.getConfig().get( GraphDatabaseSettings.read_only ),
monitor );
monitors );

return new LabelScanStoreProvider( NAME, labelScanStore );
}
}
Expand Up @@ -40,6 +40,7 @@
import org.neo4j.kernel.api.labelscan.LabelScanWriter;
import org.neo4j.kernel.impl.api.scan.FullStoreChangeStream;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.storageengine.api.schema.LabelScanReader;

import static org.neo4j.helpers.Format.duration;
Expand Down Expand Up @@ -89,6 +90,11 @@ public class NativeLabelScanStore implements LabelScanStore
*/
private final Monitor monitor;

/**
* Monitors used to pass down monitor to underlying {@link GBPTree}
*/
private final Monitors monitors;

/**
* {@link PageCache} to {@link PageCache#map(File, int, java.nio.file.OpenOption...)}
* store file backing this label scan store. Passed to {@link GBPTree}.
Expand Down Expand Up @@ -138,25 +144,26 @@ public class NativeLabelScanStore implements LabelScanStore

private final NativeLabelScanWriter singleWriter;

public NativeLabelScanStore( PageCache pageCache, File storeDir,
FullStoreChangeStream fullStoreChangeStream, boolean readOnly, Monitor monitor )
public NativeLabelScanStore( PageCache pageCache, File storeDir, FullStoreChangeStream fullStoreChangeStream,
boolean readOnly, Monitors monitors )
{
this( pageCache, storeDir, fullStoreChangeStream, readOnly, monitor, 0/*means no opinion about page size*/ );
this( pageCache, storeDir, fullStoreChangeStream, readOnly, monitors, /*means no opinion about page size*/ 0 );
}

/*
* Test access to be able to control page size.
*/
NativeLabelScanStore( PageCache pageCache, File storeDir,
FullStoreChangeStream fullStoreChangeStream, boolean readOnly, Monitor monitor, int pageSize )
NativeLabelScanStore( PageCache pageCache, File storeDir, FullStoreChangeStream fullStoreChangeStream,
boolean readOnly, Monitors monitors, int pageSize )
{
this.pageCache = pageCache;
this.pageSize = pageSize;
this.fullStoreChangeStream = fullStoreChangeStream;
this.storeFile = new File( storeDir, FILE_NAME );
this.singleWriter = new NativeLabelScanWriter( 1_000 );
this.readOnly = readOnly;
this.monitor = monitor;
this.monitors = monitors;
this.monitor = monitors.newMonitor( Monitor.class );
}

/**
Expand Down Expand Up @@ -323,7 +330,9 @@ private FileHandle storeFileHandle() throws IOException

private void instantiateTree() throws IOException
{
index = new GBPTree<>( pageCache, storeFile, new LabelScanLayout(), pageSize, treeMonitor(), NO_HEADER );
monitors.addMonitorListener( treeMonitor() );
GBPTree.Monitor monitor = monitors.newMonitor( GBPTree.Monitor.class );
index = new GBPTree<>( pageCache, storeFile, new LabelScanLayout(), pageSize, monitor, NO_HEADER );
}

private GBPTree.Monitor treeMonitor()
Expand Down
Expand Up @@ -152,6 +152,7 @@
import org.neo4j.kernel.internal.EmbeddedGraphDatabase;
import org.neo4j.kernel.internal.StoreLocker;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLog;
Expand Down Expand Up @@ -279,7 +280,8 @@ public BatchInserterImpl( final File storeDir, final FileSystemAbstraction fileS
indexStoreView = new NeoStoreIndexStoreView( LockService.NO_LOCK_SERVICE, neoStores );

Dependencies deps = new Dependencies();
deps.satisfyDependencies( fileSystem, config, logService, indexStoreView, pageCache );
Monitors monitors = new Monitors();
deps.satisfyDependencies( fileSystem, config, logService, indexStoreView, pageCache, monitors );

KernelExtensions extensions = life.add( new KernelExtensions(
new SimpleKernelContext( storeDir, DatabaseInfo.UNKNOWN, deps ),
Expand Down
Expand Up @@ -55,6 +55,7 @@
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
import org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds;
import org.neo4j.unsafe.impl.batchimport.Configuration;
Expand All @@ -65,7 +66,6 @@

import static java.lang.String.valueOf;
import static java.nio.file.StandardOpenOption.DELETE_ON_CLOSE;

import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.pagecache_memory;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
Expand Down Expand Up @@ -161,6 +161,7 @@ private BatchingNeoStores( FileSystemAbstraction fileSystem, PageCache pageCache
dependencies.satisfyDependency( logService );
dependencies.satisfyDependency( IndexStoreView.EMPTY );
dependencies.satisfyDependency( pageCache );
dependencies.satisfyDependency( new Monitors() );
KernelContext kernelContext = new SimpleKernelContext( storeDir, DatabaseInfo.UNKNOWN, dependencies );
@SuppressWarnings( { "unchecked", "rawtypes" } )
KernelExtensions extensions = life.add( new KernelExtensions(
Expand Down
Expand Up @@ -30,11 +30,11 @@

import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.labelscan.LabelScanWriter;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.kernel.impl.api.scan.FullStoreChangeStream;
import org.neo4j.kernel.lifecycle.LifeRule;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.storageengine.api.schema.LabelScanReader;
import org.neo4j.test.rule.PageCacheRule;
import org.neo4j.test.rule.RandomRule;
Expand Down Expand Up @@ -62,9 +62,8 @@ public class NativeLabelScanStoreIT
public void before()
{
PageCache pageCache = pageCacheRule.getPageCache( new DefaultFileSystemAbstraction() );
store = life.add( new NativeLabelScanStore( pageCache, directory.absolutePath(),
FullStoreChangeStream.EMPTY,
false, LabelScanStore.Monitor.EMPTY,
store = life.add( new NativeLabelScanStore( pageCache, directory.absolutePath(), FullStoreChangeStream.EMPTY,
false, new Monitors(),
// a bit of random pageSize
Math.min( pageCache.pageSize(), 256 << random.nextInt( 5 ) ) ) );
}
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.kernel.api.impl.labelscan.LabelScanStoreTest;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.rule.PageCacheRule;

public class NativeLabelScanStoreTest extends LabelScanStoreTest
Expand All @@ -44,10 +45,11 @@ protected LabelScanStore createLabelScanStore( FileSystemAbstraction fileSystemA
List<NodeLabelUpdate> existingData, boolean usePersistentStore, boolean readOnly,
LabelScanStore.Monitor monitor )
{
Monitors monitors = new Monitors();
monitors.addMonitorListener( monitor );
PageCache pageCache = pageCacheRule.getPageCache( fileSystemAbstraction );
NativeLabelScanStore nativeLabelScanStore = new NativeLabelScanStore( pageCache, rootFolder,
asStream( existingData ), readOnly, monitor );
return nativeLabelScanStore;
return new NativeLabelScanStore( pageCache, rootFolder,
asStream( existingData ), readOnly, monitors );
}

@Override
Expand Down
Expand Up @@ -125,7 +125,7 @@ public NeoStoreDataSource getDataSource( File storeDir, FileSystemAbstraction fs
JobScheduler jobScheduler = mock( JobScheduler.class, RETURNS_MOCKS );
Monitors monitors = new Monitors();
LabelScanStoreProvider labelScanStoreProvider =
nativeLabelScanStoreProvider( storeDir, fs, pageCache, config, logService );
nativeLabelScanStoreProvider( storeDir, fs, pageCache, config, logService, monitors );
SystemNanoClock clock = Clocks.nanoClock();
dataSource = new NeoStoreDataSource( storeDir, config, idGeneratorFactory, IdReuseEligibility.ALWAYS,
idConfigurationProvider,
Expand All @@ -148,18 +148,19 @@ fs, mock( TransactionMonitor.class ), databaseHealth,
}

public static LabelScanStoreProvider nativeLabelScanStoreProvider( File storeDir, FileSystemAbstraction fs,
PageCache pageCache )
PageCache pageCache, Monitors monitors )
{
return nativeLabelScanStoreProvider( storeDir, fs, pageCache, Config.defaults(), NullLogService.getInstance() );
return nativeLabelScanStoreProvider( storeDir, fs, pageCache, Config.defaults(), NullLogService.getInstance(),
monitors );
}

public static LabelScanStoreProvider nativeLabelScanStoreProvider( File storeDir, FileSystemAbstraction fs,
PageCache pageCache, Config config, LogService logService )
PageCache pageCache, Config config, LogService logService, Monitors monitors )
{
try
{
Dependencies dependencies = new Dependencies();
dependencies.satisfyDependencies( pageCache, config, IndexStoreView.EMPTY, logService );
dependencies.satisfyDependencies( pageCache, config, IndexStoreView.EMPTY, logService, monitors );
KernelContext kernelContext =
new SimpleKernelContext( storeDir, DatabaseInfo.COMMUNITY, dependencies );
return (LabelScanStoreProvider) new NativeLabelScanStoreExtension()
Expand Down
Expand Up @@ -56,6 +56,7 @@
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.internal.KernelEventHandlers;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLog;
import org.neo4j.logging.NullLogProvider;
Expand Down Expand Up @@ -93,14 +94,16 @@ public Builder getWith( FileSystemAbstraction fs, PageCache pageCache )

private RecordStorageEngine get( FileSystemAbstraction fs, PageCache pageCache,
SchemaIndexProvider schemaIndexProvider, DatabaseHealth databaseHealth, File storeDirectory,
Function<BatchTransactionApplierFacade,BatchTransactionApplierFacade> transactionApplierTransformer )
Function<BatchTransactionApplierFacade,BatchTransactionApplierFacade> transactionApplierTransformer,
Monitors monitors )
{
if ( !fs.fileExists( storeDirectory ) && !fs.mkdir( storeDirectory ) )
{
throw new IllegalStateException();
}
IdGeneratorFactory idGeneratorFactory = new EphemeralIdGenerator.Factory();
LabelScanStoreProvider labelScanStoreProvider = nativeLabelScanStoreProvider( storeDirectory, fs, pageCache );
LabelScanStoreProvider labelScanStoreProvider =
nativeLabelScanStoreProvider( storeDirectory, fs, pageCache, monitors );
LegacyIndexProviderLookup legacyIndexProviderLookup = mock( LegacyIndexProviderLookup.class );
when( legacyIndexProviderLookup.all() ).thenReturn( Iterables.empty() );
IndexConfigStore indexConfigStore = new IndexConfigStore( storeDirectory, fs );
Expand Down Expand Up @@ -137,6 +140,7 @@ public class Builder
private Function<BatchTransactionApplierFacade,BatchTransactionApplierFacade> transactionApplierTransformer =
applierFacade -> applierFacade;
private SchemaIndexProvider schemaIndexProvider = SchemaIndexProvider.NO_INDEX_PROVIDER;
private Monitors monitors = new Monitors();

public Builder( FileSystemAbstraction fs, PageCache pageCache )
{
Expand Down Expand Up @@ -169,12 +173,18 @@ public Builder storeDirectory( File storeDirectory )
return this;
}

public Builder monitors( Monitors monitors )
{
this.monitors = monitors;
return this;
}

// Add more here

public RecordStorageEngine build()
{
return get( fs, pageCache, schemaIndexProvider, databaseHealth, storeDirectory,
transactionApplierTransformer );
transactionApplierTransformer, monitors );
}
}

Expand Down

0 comments on commit fa38be7

Please sign in to comment.