Skip to content

Commit

Permalink
RecordStorageEngine pools StoreStatement instances
Browse files Browse the repository at this point in the history
to get the benefit of how the store cursors are designed, which is for reuse.
A StoreStatement has a number of cursors, each instantiating their own data structures
for efficiently being able to move around the stores. Previously all those cursors
were created for every statement, which is a big overhead compared to reusing them.
  • Loading branch information
tinwelint committed Feb 17, 2016
1 parent 3321f97 commit a81a055
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 26 deletions.
Expand Up @@ -63,6 +63,7 @@ public IndexReader newReader( IndexDescriptor descriptor ) throws IndexNotFoundK
return reader;
}

@Override
public IndexReader newUnCachedReader( IndexDescriptor descriptor ) throws IndexNotFoundKernelException
{
IndexProxy index = indexingService.getIndexProxy( descriptor );
Expand All @@ -78,6 +79,7 @@ public void close()
{
indexReader.close();
}
indexReaders.clear();
}
}
}
Expand Down
Expand Up @@ -243,7 +243,7 @@ public KernelStatement acquireStatement()
if ( currentStatement == null )
{
currentStatement = new KernelStatement( this, this, locks, operations,
storageEngine.storeReadLayer().acquireStatement(), procedures );
storeLayer.acquireStatement(), procedures );
}
currentStatement.acquire();
return currentStatement;
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.util.function.Supplier;

import org.neo4j.collection.pool.Pool;
import org.neo4j.collection.primitive.PrimitiveLongCollections;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.cursor.Cursor;
Expand Down Expand Up @@ -63,14 +64,16 @@ public class StoreStatement implements StorageStatement
private final Supplier<LabelScanReader> labelScanStore;
private LabelScanReader labelScanReader;
private boolean closed;
private final Pool<StoreStatement> pool;

public StoreStatement( final NeoStores neoStores, final LockService lockService,
Supplier<IndexReaderFactory> indexReaderFactory,
Supplier<LabelScanReader> labelScanReaderSupplier )
Supplier<LabelScanReader> labelScanReaderSupplier, Pool<StoreStatement> pool )
{
this.neoStores = neoStores;
this.indexReaderFactorySupplier = indexReaderFactory;
this.labelScanStore = labelScanReaderSupplier;
this.pool = pool;
this.nodeStore = neoStores.getNodeStore();
this.relationshipStore = neoStores.getRelationshipStore();

Expand Down Expand Up @@ -112,6 +115,12 @@ protected StoreIteratorRelationshipCursor create()
};
}

public StoreStatement initialize()
{
this.closed = false;
return this;
}

@Override
public Cursor<NodeItem> acquireSingleNodeCursor( long nodeId )
{
Expand Down Expand Up @@ -163,8 +172,10 @@ public void close()
if ( labelScanReader != null )
{
labelScanReader.close();
labelScanReader = null;
}
closed = true;
pool.release( this );
}

private class AllStoreIdIterator extends PrimitiveLongCollections.PrimitiveLongBaseIterator
Expand Down
Expand Up @@ -57,7 +57,6 @@
import org.neo4j.kernel.impl.api.store.CacheLayer;
import org.neo4j.kernel.impl.api.store.DiskLayer;
import org.neo4j.kernel.impl.api.store.SchemaCache;
import org.neo4j.kernel.impl.api.store.StoreStatement;
import org.neo4j.kernel.impl.cache.BridgingCacheAccess;
import org.neo4j.kernel.impl.constraints.ConstraintSemantics;
import org.neo4j.kernel.impl.core.CacheAccessBackDoor;
Expand Down Expand Up @@ -103,7 +102,6 @@
import org.neo4j.storageengine.api.CommandsToApply;
import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.StorageStatement;
import org.neo4j.storageengine.api.StoreReadLayer;
import org.neo4j.storageengine.api.TransactionApplicationMode;
import org.neo4j.storageengine.api.lock.ResourceLocker;
Expand Down Expand Up @@ -151,6 +149,7 @@ public class RecordStorageEngine implements StorageEngine, Lifecycle
private final NeoStoreIndexStoreView indexStoreView;
private final LegacyIndexProviderLookup legacyIndexProviderLookup;
private final PropertyPhysicalToLogicalConverter indexUpdatesConverter;
private final StoreStatements storeStatementSupplier;

// Immutable state for creating/applying commands
private final Loaders loaders;
Expand Down Expand Up @@ -216,10 +215,11 @@ public RecordStorageEngine(
propertyKeyTokenHolder, relationshipTypeTokens, labelTokens );

labelScanStore = labelScanStoreProvider.getLabelScanStore();
storeStatementSupplier = storeStatementSupplier( neoStores, config, lockService );
DiskLayer diskLayer = new DiskLayer(
propertyKeyTokenHolder, labelTokens, relationshipTypeTokens,
schemaStorage, neoStores, indexingService,
storeStatementSupplier( neoStores, config, lockService ) );
storeStatementSupplier );
storeLayer = new CacheLayer( diskLayer, schemaCache );

legacyIndexApplierLookup = new LegacyIndexApplierLookup.Direct( legacyIndexProviderLookup );
Expand Down Expand Up @@ -247,18 +247,14 @@ public RecordStorageEngine(
}
}

private Supplier<StorageStatement> storeStatementSupplier(
private StoreStatements storeStatementSupplier(
NeoStores neoStores, Config config, LockService lockService )
{
final LockService currentLockService =
config.get( use_read_locks_on_property_reads ) ? lockService : NO_LOCK_SERVICE;
final Supplier<IndexReaderFactory> indexReaderFactory = () -> {
return new IndexReaderFactory.Caching( indexingService );
};
final Supplier<IndexReaderFactory> indexReaderFactory = () -> new IndexReaderFactory.Caching( indexingService );

return () -> {
return new StoreStatement( neoStores, currentLockService, indexReaderFactory, labelScanStore::newReader );
};
return new StoreStatements( neoStores, currentLockService, indexReaderFactory, labelScanStore::newReader );
}

@Override
Expand Down Expand Up @@ -432,6 +428,7 @@ public void stop() throws Throwable
@Override
public void shutdown() throws Throwable
{
storeStatementSupplier.close();
labelScanStore.shutdown();
indexingService.shutdown();
neoStores.close();
Expand Down
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.storageengine.impl.recordstorage;

import java.util.function.Supplier;

import org.neo4j.collection.pool.MarshlandPool;
import org.neo4j.function.Factory;
import org.neo4j.kernel.impl.api.IndexReaderFactory;
import org.neo4j.kernel.impl.api.store.StoreStatement;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.storageengine.api.StorageStatement;
import org.neo4j.storageengine.api.schema.LabelScanReader;

/**
* {@link Supplier} of {@link StoreStatement} instances for {@link RecordStorageEngine}.
* Internally uses pooling to reduce need for actual instantiation.
*/
public class StoreStatements implements Supplier<StorageStatement>, AutoCloseable
{
private final NeoStores neoStores;
private final LockService lockService;
private final Supplier<IndexReaderFactory> indexReaderFactory;
private final Supplier<LabelScanReader> labelScanReader;
private final Factory<StoreStatement> factory = new Factory<StoreStatement>()
{
@Override
public StoreStatement newInstance()
{
return new StoreStatement( neoStores, lockService, indexReaderFactory, labelScanReader, pool );
}
};
private final MarshlandPool<StoreStatement> pool = new MarshlandPool<>( factory );

public StoreStatements( NeoStores neoStores, LockService lockService,
Supplier<IndexReaderFactory> indexReaderFactory, Supplier<LabelScanReader> labelScanReader )
{
this.neoStores = neoStores;
this.lockService = lockService;
this.indexReaderFactory = indexReaderFactory;
this.labelScanReader = labelScanReader;
}

@Override
public StorageStatement get()
{
return pool.acquire().initialize();
}

@Override
public void close()
{
pool.close();
}
}
Expand Up @@ -24,19 +24,14 @@
import org.mockito.stubbing.Answer;

import java.util.Collection;
import java.util.function.Supplier;

import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.store.StoreStatement;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ReentrantLockService;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifeSupport;
Expand Down Expand Up @@ -191,17 +186,13 @@ private static KernelTransactions newKernelTransactions( TransactionCommitProces
Locks locks = mock( Locks.class );
when( locks.newClient() ).thenReturn( mock( Locks.Client.class ) );

MetaDataStore metaDataStore = mock( MetaDataStore.class );
NeoStores neoStores = mock( NeoStores.class );

StoreReadLayer readLayer = mock( StoreReadLayer.class );
when( readLayer.acquireStatement() ).thenAnswer( new Answer<StorageStatement>()
{
@Override
public StorageStatement answer( InvocationOnMock invocation ) throws Throwable
{
return new StoreStatement( neoStores, new ReentrantLockService(),
mock( Supplier.class ), null );
return mock( StorageStatement.class );
}
} );

Expand All @@ -227,7 +218,7 @@ public Void answer( InvocationOnMock invocation ) throws Throwable
null, null, null, TransactionHeaderInformationFactory.DEFAULT,
commitProcess, null,
null, new TransactionHooks(), mock( TransactionMonitor.class ), life,
tracers, storageEngine, new Procedures(), metaDataStore );
tracers, storageEngine, new Procedures(), mock( TransactionIdStore.class ) );
}

private static TransactionCommitProcess newRememberingCommitProcess( final TransactionRepresentation[] slot )
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.Set;

import org.neo4j.collection.pool.Pool;
import org.neo4j.collection.primitive.PrimitiveLongCollections;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.cursor.Cursor;
Expand Down Expand Up @@ -450,7 +451,7 @@ private static class StoreStatementWithSingleFreshIndexReader extends StoreState
StoreStatementWithSingleFreshIndexReader( IndexReader reader )
{
super( mock( NeoStores.class ), new ReentrantLockService(), () -> mock( IndexReaderFactory.class ),
() -> mock( LabelScanReader.class ) );
() -> mock( LabelScanReader.class ), mock( Pool.class ) );
this.reader = reader;
}

Expand Down
Expand Up @@ -23,6 +23,7 @@

import java.util.function.Supplier;

import org.neo4j.collection.pool.Pool;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.storageengine.api.schema.LabelScanReader;
Expand All @@ -45,7 +46,7 @@ public void shouldCloseOpenedLabelScanReader() throws Exception
when( scanStore.get() ).thenReturn( scanReader );
StoreStatement statement = new StoreStatement(
mock( NeoStores.class ), LockService.NO_LOCK_SERVICE,
mock( Supplier.class ), scanStore );
mock( Supplier.class ), scanStore, mock( Pool.class ) );

// when
LabelScanReader actualReader = statement.getLabelScanReader();
Expand Down

0 comments on commit a81a055

Please sign in to comment.