Skip to content

Commit

Permalink
Congregate more dependencies in the StorageEngine and introduce a Com…
Browse files Browse the repository at this point in the history
…mandStream abstraction to decouple store application from transaction representation
  • Loading branch information
chrisvest committed Nov 16, 2015
1 parent e5ca5d1 commit c99c04f
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 209 deletions.
173 changes: 63 additions & 110 deletions community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java

Large diffs are not rendered by default.

Expand Up @@ -23,12 +23,10 @@

import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.RecoveryLabelScanWriterProvider;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.core.CacheAccessBackDoor;
import org.neo4j.kernel.impl.index.IndexConfigStore;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.util.IdOrderingQueue;

/**
Expand All @@ -40,35 +38,35 @@ public class BatchingTransactionRepresentationStoreApplier extends TransactionRe
{
private final RecoveryLabelScanWriterProvider labelScanWriterProvider;
private final RecoveryLegacyIndexApplierLookup legacyIndexApplierLookup;
private KernelHealth health;
private final KernelHealth health;
private final IndexingService indexingService;

public BatchingTransactionRepresentationStoreApplier( IndexingService indexingService,
LabelScanStore labelScanStore, NeoStores neoStore, CacheAccessBackDoor cacheAccess,
LockService lockService, LegacyIndexApplierLookup legacyIndexProviderLookup,
IndexConfigStore indexConfigStore, KernelHealth kernelHealth, IdOrderingQueue legacyIndexTransactionOrdering )
public BatchingTransactionRepresentationStoreApplier(
LockService lockService,
IndexConfigStore indexConfigStore,
IdOrderingQueue legacyIndexTransactionOrdering,
StorageEngine storageEngine )
{
this( indexingService, new RecoveryLabelScanWriterProvider( labelScanStore, 1000 ),
neoStore, cacheAccess, lockService,
new RecoveryLegacyIndexApplierLookup( legacyIndexProviderLookup, 1000 ),
indexConfigStore, kernelHealth, legacyIndexTransactionOrdering );
this.health = kernelHealth;
this( new RecoveryLabelScanWriterProvider( storageEngine.labelScanStore(), 1000 ),
lockService,
new RecoveryLegacyIndexApplierLookup( storageEngine.legacyIndexApplierLookup(), 1000 ),
indexConfigStore, legacyIndexTransactionOrdering, storageEngine );
}

private BatchingTransactionRepresentationStoreApplier(
IndexingService indexingService,
RecoveryLabelScanWriterProvider labelScanWriterProvider,
NeoStores neoStore,
CacheAccessBackDoor cacheAccess,
LockService lockService,
RecoveryLegacyIndexApplierLookup legacyIndexApplierLookup,
IndexConfigStore indexConfigStore,
KernelHealth kernelHealth,
IdOrderingQueue legacyIndexTransactionOrdering )
IdOrderingQueue legacyIndexTransactionOrdering,
StorageEngine storageEngine )
{
super( indexingService, labelScanWriterProvider, neoStore, cacheAccess, lockService, legacyIndexApplierLookup,
indexConfigStore, kernelHealth, legacyIndexTransactionOrdering );
super( labelScanWriterProvider, lockService,
indexConfigStore, legacyIndexTransactionOrdering, storageEngine );
this.labelScanWriterProvider = labelScanWriterProvider;
this.legacyIndexApplierLookup = legacyIndexApplierLookup;
this.health = storageEngine.kernelHealth();
this.indexingService = storageEngine.indexingService();
}

public void closeBatch() throws IOException
Expand Down
Expand Up @@ -23,14 +23,12 @@
import java.util.function.Supplier;

import org.neo4j.concurrent.WorkSync;
import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.index.ValidatedIndexUpdates;
import org.neo4j.kernel.impl.core.CacheAccessBackDoor;
import org.neo4j.kernel.impl.index.IndexConfigStore;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.transaction.CommandStream;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.CacheInvalidationTransactionApplier;
import org.neo4j.kernel.impl.transaction.command.CommandHandler;
Expand All @@ -48,53 +46,48 @@
*/
public class TransactionRepresentationStoreApplier
{
private final NeoStores neoStores;
protected final IndexingService indexingService;
private final CacheAccessBackDoor cacheAccess;
private final LockService lockService;
private final Supplier<LabelScanWriter> labelScanWriters;
private final IndexConfigStore indexConfigStore;
private final LegacyIndexApplierLookup legacyIndexProviderLookup;
private final KernelHealth health;
private final IdOrderingQueue legacyIndexTransactionOrdering;

private final WorkSync<Supplier<LabelScanWriter>,IndexTransactionApplier.LabelUpdateWork> labelScanStoreSync;
private final StorageEngine storageEngine;

public TransactionRepresentationStoreApplier(
IndexingService indexingService, Supplier<LabelScanWriter> labelScanWriters, NeoStores neoStores,
CacheAccessBackDoor cacheAccess, LockService lockService, LegacyIndexApplierLookup
legacyIndexProviderLookup,
IndexConfigStore indexConfigStore, KernelHealth health, IdOrderingQueue legacyIndexTransactionOrdering )
Supplier<LabelScanWriter> labelScanWriters,
LockService lockService,
IndexConfigStore indexConfigStore,
IdOrderingQueue legacyIndexTransactionOrdering,
StorageEngine storageEngine )
{
this.indexingService = indexingService;
this.storageEngine = storageEngine;
this.labelScanWriters = labelScanWriters;
this.neoStores = neoStores;
this.cacheAccess = cacheAccess;
this.lockService = lockService;
this.legacyIndexProviderLookup = legacyIndexProviderLookup;
this.legacyIndexProviderLookup = storageEngine.legacyIndexApplierLookup();
this.indexConfigStore = indexConfigStore;
this.health = health;
this.legacyIndexTransactionOrdering = legacyIndexTransactionOrdering;
labelScanStoreSync = new WorkSync<>( labelScanWriters );
}

public void apply( TransactionRepresentation representation, ValidatedIndexUpdates indexUpdates, LockGroup locks,
public void apply( CommandStream representation, ValidatedIndexUpdates indexUpdates, LockGroup locks,
long transactionId, TransactionApplicationMode mode ) throws IOException
{
// Graph store application. The order of the decorated store appliers is irrelevant
CommandHandler storeApplier = new NeoStoreTransactionApplier(
neoStores, cacheAccess, lockService, locks, transactionId );
storageEngine.neoStores(), storageEngine.cacheAccess(), lockService, locks, transactionId );
if ( mode.needsIdTracking() )
{
storeApplier = new HighIdTransactionApplier( storeApplier, neoStores );
storeApplier = new HighIdTransactionApplier( storeApplier, storageEngine.neoStores() );
}
if ( mode.needsCacheInvalidationOnUpdates() )
{
storeApplier = new CacheInvalidationTransactionApplier( storeApplier, neoStores, cacheAccess );
storeApplier = new CacheInvalidationTransactionApplier( storeApplier, storageEngine.neoStores(), storageEngine.cacheAccess() );
}

// Schema index application
IndexTransactionApplier indexApplier = new IndexTransactionApplier( indexingService, indexUpdates,
IndexTransactionApplier indexApplier = new IndexTransactionApplier( storageEngine.indexingService(), indexUpdates,
labelScanStoreSync );

// Legacy index application
Expand All @@ -112,15 +105,15 @@ public void apply( TransactionRepresentation representation, ValidatedIndexUpdat
}
catch ( Throwable cause )
{
health.panic( cause );
storageEngine.kernelHealth().panic( cause );
throw cause;
}
}

private CommandHandler getCountsStoreApplier( long transactionId, TransactionApplicationMode mode )
{
Optional<CommandHandler> handlerOption =
neoStores.getCounts().apply( transactionId ).map( CountsStoreApplier.FACTORY );
storageEngine.neoStores().getCounts().apply( transactionId ).map( CountsStoreApplier.FACTORY );
if ( mode == TransactionApplicationMode.RECOVERY )
{
handlerOption = handlerOption.or( CommandHandler.EMPTY );
Expand All @@ -131,7 +124,8 @@ private CommandHandler getCountsStoreApplier( long transactionId, TransactionApp
public TransactionRepresentationStoreApplier withLegacyIndexTransactionOrdering(
IdOrderingQueue legacyIndexTransactionOrdering )
{
return new TransactionRepresentationStoreApplier( indexingService, labelScanWriters, neoStores, cacheAccess,
lockService, legacyIndexProviderLookup, indexConfigStore, health, legacyIndexTransactionOrdering );
return new TransactionRepresentationStoreApplier( labelScanWriters,
lockService, indexConfigStore, legacyIndexTransactionOrdering,
storageEngine );
}
}
Expand Up @@ -19,7 +19,9 @@
*/
package org.neo4j.kernel.impl.storageengine;

import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.impl.api.LegacyIndexApplierLookup;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.index.SchemaIndexProviderMap;
Expand Down Expand Up @@ -63,4 +65,12 @@ public interface StorageEngine

@Deprecated
CacheAccessBackDoor cacheAccess();

@Deprecated
LegacyIndexApplierLookup legacyIndexApplierLookup();

@Deprecated
KernelHealth kernelHealth();

void loadSchemaCache();
}
Expand Up @@ -33,6 +33,8 @@
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.LegacyIndexApplierLookup;
import org.neo4j.kernel.impl.api.LegacyIndexProviderLookup;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.api.index.IndexingService;
Expand Down Expand Up @@ -83,22 +85,28 @@ public class RecordStorageEngine implements StorageEngine, Lifecycle
setting( "experimental.use_read_locks_on_property_reads", BOOLEAN, TRUE );

private final StoreReadLayer storeLayer;
protected final IndexingService indexingService;
protected final NeoStores neoStores;
private final IndexingService indexingService;
private final NeoStores neoStores;
private final PropertyKeyTokenHolder propertyKeyTokenHolder;
private final RelationshipTypeTokenHolder relationshipTypeTokenHolder;
private final LabelTokenHolder labelTokenHolder;
private final KernelHealth kernelHealth;
private final SchemaCache schemaCache;
protected final IntegrityValidator integrityValidator;
protected final IndexUpdatesValidator indexUpdatesValidator;
protected final CacheAccessBackDoor cacheAccess;
protected final LabelScanStore labelScanStore;
protected final DefaultSchemaIndexProviderMap providerMap;
protected final ProcedureCache procedureCache;
private final IntegrityValidator integrityValidator;
private final IndexUpdatesValidator indexUpdatesValidator;
private final CacheAccessBackDoor cacheAccess;
private final LabelScanStore labelScanStore;
private final DefaultSchemaIndexProviderMap providerMap;
private final ProcedureCache procedureCache;
private final LegacyIndexApplierLookup legacyIndexApplierLookup;

public RecordStorageEngine(
File storeDir, Config config, IdGeneratorFactory idGeneratorFactory, PageCache pageCache,
FileSystemAbstraction fs, LogProvider logProvider,
File storeDir,
Config config,
IdGeneratorFactory idGeneratorFactory,
PageCache pageCache,
FileSystemAbstraction fs,
LogProvider logProvider,
PropertyKeyTokenHolder propertyKeyTokenHolder,
LabelTokenHolder labelTokens,
RelationshipTypeTokenHolder relationshipTypeTokens,
Expand All @@ -110,11 +118,13 @@ public RecordStorageEngine(
SchemaIndexProvider indexProvider,
IndexingService.Monitor indexingServiceMonitor,
KernelHealth kernelHealth,
LabelScanStoreProvider labelScanStoreProvider )
LabelScanStoreProvider labelScanStoreProvider,
LegacyIndexProviderLookup legacyIndexProviderLookup )
{
this.propertyKeyTokenHolder = propertyKeyTokenHolder;
this.relationshipTypeTokenHolder = relationshipTypeTokens;
this.labelTokenHolder = labelTokens;
this.kernelHealth = kernelHealth;
final StoreFactory storeFactory = new StoreFactory( storeDir, config, idGeneratorFactory, pageCache, fs, logProvider );
neoStores = storeFactory.openAllNeoStores( true );

Expand Down Expand Up @@ -143,6 +153,7 @@ neoStores, kernelHealth, new PropertyLoader( neoStores ),
procedureCache = new ProcedureCache();
storeLayer = new CacheLayer( diskLayer, schemaCache, procedureCache );
this.labelScanStore = labelScanStoreProvider.getLabelScanStore();
legacyIndexApplierLookup = new LegacyIndexApplierLookup.Direct( legacyIndexProviderLookup );
}
catch ( Throwable failure )
{
Expand Down Expand Up @@ -219,6 +230,18 @@ public CacheAccessBackDoor cacheAccess()
return cacheAccess;
}

@Override
public LegacyIndexApplierLookup legacyIndexApplierLookup()
{
return legacyIndexApplierLookup;
}

@Override
public KernelHealth kernelHealth()
{
return kernelHealth;
}

@Override
public void init() throws Throwable
{
Expand All @@ -244,6 +267,7 @@ public void start() throws Throwable
labelScanStore.start();
}

@Override
public void loadSchemaCache()
{
List<SchemaRule> schemaRules = toList( neoStores.getSchemaStore().loadAllSchemaRules() );
Expand Down
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction;

import java.io.IOException;

import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.impl.transaction.command.Command;

/**
* A stream of commands from one or more transactions, that can be serialised to a transaction log or applied to a
* store.
*/
public interface CommandStream
{
/**
* Accepts a visitor into the commands making up this transaction.
* @param visitor {@link Visitor} which will see the commands.
* @throws IOException if there were any problem reading the commands.
*/
void accept( Visitor<Command,IOException> visitor ) throws IOException;
}
Expand Up @@ -28,15 +28,8 @@
/**
* Representation of a transaction that can be written to a {@link TransactionAppender} and read back later.
*/
public interface TransactionRepresentation
public interface TransactionRepresentation extends CommandStream
{
/**
* Accepts a visitor into the commands making up this transaction.
* @param visitor {@link Visitor} which will see the commands.
* @throws IOException if there were any problem reading the commands.
*/
void accept( Visitor<Command, IOException> visitor ) throws IOException;

/**
* @return an additional header of this transaction. Just arbitrary bytes that means nothing
* to this transaction representation.
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.neo4j.graphdb.index.IndexImplementation;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.store.NeoStores;

public class StoreFlusher
Expand All @@ -31,13 +32,13 @@ public class StoreFlusher
private final LabelScanStore labelScanStore;
private final Iterable<IndexImplementation> indexProviders;

public StoreFlusher( NeoStores neoStores, IndexingService indexingService,
LabelScanStore labelScanStore,
public StoreFlusher(
StorageEngine storageEngine,
Iterable<IndexImplementation> indexProviders )
{
this.neoStores = neoStores;
this.indexingService = indexingService;
this.labelScanStore = labelScanStore;
this.neoStores = storageEngine.neoStores();
this.indexingService = storageEngine.indexingService();
this.labelScanStore = storageEngine.labelScanStore();
this.indexProviders = indexProviders;
}

Expand Down

0 comments on commit c99c04f

Please sign in to comment.