Skip to content

Commit

Permalink
Begin the process of extracting the StoreLayerModule into a StorageEn…
Browse files Browse the repository at this point in the history
…gine

Currently only the parts necessary to build a StoreReadLayer have been pulled into the StorageEngine abstraction.
Everything around creating and applying commands is still missing, as well as a lot of work in encapsulating the internals of it.
  • Loading branch information
chrisvest committed Nov 16, 2015
1 parent c0a8d0d commit 9d519ca
Show file tree
Hide file tree
Showing 6 changed files with 365 additions and 147 deletions.
191 changes: 45 additions & 146 deletions community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java
Expand Up @@ -23,17 +23,14 @@
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;

import org.neo4j.function.Factory;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.config.Setting;
Expand Down Expand Up @@ -71,22 +68,14 @@
import org.neo4j.kernel.impl.api.TransactionHooks;
import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier;
import org.neo4j.kernel.impl.api.UpdateableSchemaState;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.index.OnlineIndexUpdatesValidator;
import org.neo4j.kernel.impl.api.index.RecoveryIndexingUpdatesValidator;
import org.neo4j.kernel.impl.api.index.SchemaIndexProviderMap;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.api.store.CacheLayer;
import org.neo4j.kernel.impl.api.store.DiskLayer;
import org.neo4j.kernel.impl.api.store.ProcedureCache;
import org.neo4j.kernel.impl.api.store.SchemaCache;
import org.neo4j.kernel.impl.api.store.StoreReadLayer;
import org.neo4j.kernel.impl.api.store.StoreStatement;
import org.neo4j.kernel.impl.cache.BridgingCacheAccess;
import org.neo4j.kernel.impl.constraints.ConstraintSemantics;
import org.neo4j.kernel.impl.core.CacheAccessBackDoor;
import org.neo4j.kernel.impl.core.LabelTokenHolder;
Expand All @@ -100,13 +89,12 @@
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ReentrantLockService;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.SchemaStorage;
import org.neo4j.kernel.impl.store.StoreFactory;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.record.SchemaRule;
import org.neo4j.kernel.impl.storemigration.StoreUpgrader;
import org.neo4j.kernel.impl.storemigration.StoreVersionCheck;
import org.neo4j.kernel.impl.storemigration.UpgradableDatabase;
Expand Down Expand Up @@ -148,13 +136,10 @@
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotationImpl;
import org.neo4j.kernel.impl.transaction.log.rotation.StoreFlusher;
import org.neo4j.kernel.impl.transaction.state.DefaultSchemaIndexProviderMap;
import org.neo4j.kernel.impl.transaction.state.IntegrityValidator;
import org.neo4j.kernel.impl.transaction.state.NeoStoreFileListing;
import org.neo4j.kernel.impl.transaction.state.NeoStoreIndexStoreView;
import org.neo4j.kernel.impl.transaction.state.NeoStoreTransactionContextFactory;
import org.neo4j.kernel.impl.transaction.state.NeoStoresSupplier;
import org.neo4j.kernel.impl.transaction.state.PropertyLoader;
import org.neo4j.kernel.impl.transaction.state.RecoveryVisitor;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.impl.util.IdOrderingQueue;
Expand All @@ -177,11 +162,6 @@
import org.neo4j.logging.Logger;
import org.neo4j.unsafe.batchinsert.LabelScanWriter;

import static org.neo4j.helpers.Settings.BOOLEAN;
import static org.neo4j.helpers.Settings.TRUE;
import static org.neo4j.helpers.Settings.setting;
import static org.neo4j.helpers.collection.Iterables.toList;
import static org.neo4j.kernel.impl.locking.LockService.NO_LOCK_SERVICE;
import static org.neo4j.kernel.impl.transaction.log.pruning.LogPruneStrategyFactory.fromConfigValue;

public class NeoStoreDataSource implements NeoStoresSupplier, Lifecycle, IndexProviders
Expand All @@ -191,9 +171,8 @@ private interface CacheModule
UpdateableSchemaState updateableSchemaState();
}

private interface StoreLayerModule
private interface StoreLayerModule extends StorageEngine
{
StoreReadLayer storeLayer();

NeoStores neoStores();
MetaDataStore metaDataStore();
Expand Down Expand Up @@ -327,12 +306,6 @@ boolean applicable( DiagnosticsPhase phase )

public static final String DEFAULT_DATA_SOURCE_NAME = "nioneodb";

/**
* This setting is hidden to the user and is here merely for making it easier to back out of
* a change where reading property chains incurs read locks on {@link LockService}.
*/
private static final Setting<Boolean> use_read_locks_on_property_reads =
setting( "experimental.use_read_locks_on_property_reads", BOOLEAN, TRUE );

private final Monitors monitors;
private final Tracers tracers;
Expand Down Expand Up @@ -375,7 +348,6 @@ boolean applicable( DiagnosticsPhase phase )
private File storeDir;
private boolean readOnly;

private CacheModule cacheModule;
private StoreLayerModule storeLayerModule;
private TransactionLogModule transactionLogModule;
private KernelModule kernelModule;
Expand Down Expand Up @@ -510,12 +482,12 @@ public void start() throws IOException
LegacyIndexApplierLookup legacyIndexApplierLookup =
dependencies.satisfyDependency( new LegacyIndexApplierLookup.Direct( legacyIndexProviderLookup ) );

CacheModule cacheModule = buildCaches();
UpdateableSchemaState updateableSchemaState = new KernelSchemaStateStore( logProvider );

// TODO Introduce a StorageEngine abstraction at the StoreLayerModule boundary
storeLayerModule = buildStoreLayer(
propertyKeyTokenHolder, labelTokens, relationshipTypeTokens,
cacheModule.updateableSchemaState()::clear );
updateableSchemaState::clear );

TransactionLogModule transactionLogModule =
buildTransactionLogs( storeDir, config, logProvider, scheduler, storeLayerModule.labelScanStore(),
Expand All @@ -532,20 +504,29 @@ public void start() throws IOException
transactionLogModule.transactionAppender(), storeLayerModule.neoStores(),
transactionLogModule.storeApplier(), storeLayerModule.indexingService(),
storeLayerModule.indexUpdatesValidator(),
storeLayerModule.storeLayer(),
cacheModule.updateableSchemaState(), storeLayerModule.labelScanStore(),
storeLayerModule.storeReadLayer(),
updateableSchemaState, storeLayerModule.labelScanStore(),
storeLayerModule.schemaIndexProviderMap(), storeLayerModule.procedureCache() );


// Do these assignments last so that we can ensure no cyclical dependencies exist
this.cacheModule = cacheModule;
this.storeLayerModule = storeLayerModule;
this.transactionLogModule = transactionLogModule;
this.kernelModule = kernelModule;

dependencies.satisfyDependency( this );
satisfyDependencies( cacheModule, storeLayerModule, transactionLogModule,
kernelModule );
dependencies.satisfyDependency( updateableSchemaState );
dependencies.satisfyDependencies( storeLayerModule.cacheAccess() );
dependencies.satisfyDependencies( storeLayerModule.indexingService() );
dependencies.satisfyDependencies( storeLayerModule.indexUpdatesValidator() );
dependencies.satisfyDependencies( storeLayerModule.integrityValidator() );
dependencies.satisfyDependencies( storeLayerModule.labelScanStore() );
dependencies.satisfyDependencies( storeLayerModule.metaDataStore() );
dependencies.satisfyDependencies( storeLayerModule.neoStores() );
dependencies.satisfyDependencies( storeLayerModule.procedureCache() );
dependencies.satisfyDependencies( storeLayerModule.schemaIndexProviderMap() );
dependencies.satisfyDependencies( storeLayerModule.storeReadLayer() );
satisfyDependencies( transactionLogModule, kernelModule );
}
catch ( Throwable e )
{
Expand Down Expand Up @@ -609,101 +590,30 @@ private void upgradeStore( File storeDir, StoreUpgrader storeMigrationProcess, S
storeMigrationProcess.migrateIfNeeded( storeDir, upgradableDatabase, indexProvider );
}

private CacheModule buildCaches()
{
final UpdateableSchemaState updateableSchemaState = new KernelSchemaStateStore( logProvider );

return new CacheModule()
{
@Override
public UpdateableSchemaState updateableSchemaState()
{
return updateableSchemaState;
}
};
}

private StoreLayerModule buildStoreLayer(
PropertyKeyTokenHolder propertyKeyTokenHolder, LabelTokenHolder labelTokens,
RelationshipTypeTokenHolder relationshipTypeTokens,
Runnable schemaStateChangeCallback )
{
life.add( new LifecycleAdapter()
{
@Override
public void start() throws IOException
{
storeLayerModule.neoStores().makeStoreOk();

propertyKeyTokenHolder.setInitialTokens(
storeLayerModule.neoStores().getPropertyKeyTokenStore().getTokens( Integer.MAX_VALUE ) );
relationshipTypeTokens.setInitialTokens(
storeLayerModule.neoStores().getRelationshipTypeTokenStore().getTokens( Integer.MAX_VALUE ) );
labelTokens.setInitialTokens(
storeLayerModule.neoStores().getLabelTokenStore().getTokens( Integer.MAX_VALUE ) );

storeLayerModule.neoStores().rebuildCountStoreIfNeeded(); // TODO: move this to counts store lifecycle
storeLayerModule.loadSchemaCache();
}
} );

final StoreFactory storeFactory = new StoreFactory( storeDir, config, idGeneratorFactory, pageCache, fs, logProvider );
final NeoStores neoStores = storeFactory.openAllNeoStores( true );

final DefaultSchemaIndexProviderMap providerMap;
final IndexingService indexingService;
final IntegrityValidator integrityValidator;
final IndexUpdatesValidator indexUpdatesValidator;
final LabelScanStore labelScanStore;
final SchemaCache schemaCache;
final CacheAccessBackDoor cacheAccess;
final StoreReadLayer storeLayer;
final ProcedureCache procedureCache;

try
{
providerMap = new DefaultSchemaIndexProviderMap( indexProvider );

indexingService = IndexingService.create(
new IndexSamplingConfig( config ), scheduler, providerMap,
new NeoStoreIndexStoreView( lockService, neoStores ), tokenNameLookup,
toList( new SchemaStorage( neoStores.getSchemaStore() ).allIndexRules() ), logProvider,
indexingServiceMonitor, schemaStateChangeCallback );
integrityValidator = new IntegrityValidator( neoStores, indexingService );

indexUpdatesValidator = dependencies.satisfyDependency(
new OnlineIndexUpdatesValidator( neoStores, kernelHealth, new PropertyLoader( neoStores ),
indexingService, IndexUpdateMode.ONLINE ) );

// TODO Move to constructor
labelScanStore = dependencyResolver.resolveDependency( LabelScanStoreProvider.class,
LabelScanStoreProvider.HIGHEST_PRIORITIZED ).getLabelScanStore();

life.add( indexingService );
life.add( labelScanStore );


schemaCache = new SchemaCache( constraintSemantics, Collections.<SchemaRule>emptyList() );
cacheAccess = new BridgingCacheAccess( schemaCache, schemaStateChangeCallback,
propertyKeyTokenHolder, relationshipTypeTokens, labelTokens );
procedureCache = new ProcedureCache();
SchemaStorage schemaStorage = new SchemaStorage( neoStores.getSchemaStore() );
DiskLayer diskLayer = new DiskLayer( propertyKeyTokenHolder, labelTokens, relationshipTypeTokens, schemaStorage,
neoStores, indexingService, storeStatementFactory( neoStores ) );
storeLayer = new CacheLayer( diskLayer, schemaCache, procedureCache );
}
catch ( Throwable failure )
class Module extends RecordStorageEngine implements StoreLayerModule
{
neoStores.close();
throw failure;
}

return new StoreLayerModule()
{
@Override
public StoreReadLayer storeLayer()
public Module( File storeDir, Config config, IdGeneratorFactory idGeneratorFactory,
PageCache pageCache, FileSystemAbstraction fs, LogProvider logProvider,
PropertyKeyTokenHolder propertyKeyTokenHolder,
LabelTokenHolder labelTokens,
RelationshipTypeTokenHolder relationshipTypeTokens,
Runnable schemaStateChangeCallback,
ConstraintSemantics constraintSemantics, JobScheduler scheduler,
TokenNameLookup tokenNameLookup, LockService lockService,
SchemaIndexProvider indexProvider,
IndexingService.Monitor indexingServiceMonitor, KernelHealth kernelHealth,
LabelScanStoreProvider labelScanStoreProvider )
{
return storeLayer;
super( storeDir, config, idGeneratorFactory, pageCache, fs, logProvider, propertyKeyTokenHolder,
labelTokens,
relationshipTypeTokens, schemaStateChangeCallback, constraintSemantics, scheduler,
tokenNameLookup,
lockService, indexProvider, indexingServiceMonitor, kernelHealth, labelScanStoreProvider );
}

@Override
Expand Down Expand Up @@ -759,21 +669,15 @@ public ProcedureCache procedureCache()
{
return procedureCache;
}
}

@Override
public void loadSchemaCache()
{
List<SchemaRule> schemaRules = toList( neoStores.getSchemaStore().loadAllSchemaRules() );
schemaCache.load( schemaRules );
}
};
}

private Factory<StoreStatement> storeStatementFactory( final NeoStores neoStores )
{
final LockService lockService =
config.get( use_read_locks_on_property_reads ) ? this.lockService : NO_LOCK_SERVICE;
return () -> new StoreStatement( neoStores, lockService );
LabelScanStoreProvider labelScanStore = dependencyResolver.resolveDependency( LabelScanStoreProvider.class,
LabelScanStoreProvider.HIGHEST_PRIORITIZED );
return life.add(
new Module( storeDir, config, idGeneratorFactory, pageCache, fs, logProvider, propertyKeyTokenHolder,
labelTokens, relationshipTypeTokens, schemaStateChangeCallback, constraintSemantics, scheduler,
tokenNameLookup, lockService, indexProvider, indexingServiceMonitor, kernelHealth,
labelScanStore ) );
}

private TransactionLogModule buildTransactionLogs( File storeDir, Config config, LogProvider logProvider,
Expand Down Expand Up @@ -1094,11 +998,6 @@ public IndexingService getIndexService()
return storeLayerModule.indexingService();
}

public SchemaIndexProvider getIndexProvider()
{
return indexProvider;
}

public LabelScanStore getLabelScanStore()
{
return storeLayerModule.labelScanStore();
Expand Down Expand Up @@ -1239,7 +1138,7 @@ public NeoStores get()

public StoreReadLayer getStoreLayer()
{
return storeLayerModule.storeLayer();
return storeLayerModule.storeReadLayer();
}

public DependencyResolver getDependencyResolver()
Expand Down
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.storageengine;

import org.neo4j.kernel.impl.api.store.StoreReadLayer;

/**
* A StorageEngine provides the functionality to durably store data, and read it back.
*/
public interface StorageEngine
{
StoreReadLayer storeReadLayer();
}

0 comments on commit 9d519ca

Please sign in to comment.