Skip to content

Commit

Permalink
Post merge review-cleanup for Add Tx Metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Sep 20, 2016
1 parent 036e074 commit 9c03bc4
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 72 deletions.
Expand Up @@ -19,8 +19,6 @@
*/
package org.neo4j.kernel.api;

import java.util.Map;

import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.AccessMode;
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.api;

import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

Expand Down
Expand Up @@ -168,7 +168,7 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
private long transactionId;
private long commitTime;
private volatile int reuseCount;
private Map<String,Object> userMetaData;
private volatile Map<String,Object> userMetaData;

/**
* Lock prevents transaction {@link #markForTermination(Status)} transaction termination} from interfering with
Expand Down
Expand Up @@ -21,20 +21,14 @@

import java.time.Clock;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.neo4j.collection.pool.LinkedQueuePool;
import org.neo4j.collection.pool.MarshlandPool;
import org.neo4j.function.Factory;
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.api.ExecutingQuery;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.KernelTransactionHandle;
import org.neo4j.kernel.api.exceptions.Status;
Expand Down Expand Up @@ -210,24 +204,6 @@ public Set<KernelTransactionHandle> activeTransactions()
.collect( toSet() );
}

/**
* Give an approximate set of all transactions currently running together with associated metadata as
* computed by the provided selector function.
* This is not guaranteed to be exact, as transactions may stop and start while this set is gathered.
*
* @return the (approximate) set of open transactions.
*/
public <T> Set<Pair<KernelTransactionHandle, T>> activeTransactions(
Function<KernelTransactionHandle,Stream<T>> selector
)
{
return allTransactions.stream()
.map( this::createHandle )
.filter( KernelTransactionHandle::isOpen )
.flatMap( tx -> selector.apply( tx ).map( data -> Pair.of( tx, data ) ) )
.collect( toSet() );
}

/**
* Create new handle for the given transaction.
* <p>
Expand Down Expand Up @@ -322,16 +298,4 @@ private void assertCurrentThreadIsNotBlockingNewTransactions()
}
}

public class TxExecutingQuery
{
public final Map<String,String> txMetaData;
public final ExecutingQuery query;

public TxExecutingQuery( Map<String,String> txMetaData, ExecutingQuery query )
{
this.txMetaData = txMetaData;
this.query = query;
}
}

}
Expand Up @@ -216,16 +216,6 @@ public void init( SPI spi, Config config )
.getOrCreateRelationshipIndex( InternalAutoIndexing.RELATIONSHIP_AUTO_INDEX, null ) ),
spi.autoIndexing().relationships() );
this.indexManager = new IndexManagerImpl( spi::currentStatement, idxProvider, nodeAutoIndexer, relAutoIndexer );
}

/**
* This needs to be called after data source creation, when the execution engine dependency is satisfied by the
* cypher module
*
* @see GraphDatabaseFacadeFactory#initFacade(File, Map, GraphDatabaseFacadeFactory.Dependencies, GraphDatabaseFacade)
*/
public void initTransactionalContextFactoryFromSPI()
{
this.contextFactory = new Neo4jTransactionalContextFactory( spi, locker );
}

Expand Down Expand Up @@ -387,9 +377,7 @@ public Result execute( String query, Map<String,Object> parameters ) throws Quer
// ensure we have a tx and create a context (the tx is gonna get closed by the Cypher result)
InternalTransaction transaction = beginTransaction( KernelTransaction.Type.implicit, AccessMode.Static.FULL );

TransactionalContext context =
contextFactory.newContext( QueryEngineProvider.describe(), transaction, query, parameters );
return spi.executeQuery( query, parameters, context );
return execute( transaction, query, parameters );
}

@Override
Expand All @@ -401,7 +389,6 @@ public Result execute( String query, Map<String,Object> parameters, long timeout
return execute( transaction, query, parameters );
}

// This version of execute is only needed for internal testing of LOAD CSV PERIODIC COMMIT. Can be refactored?
public Result execute( InternalTransaction transaction, String query, Map<String,Object> parameters )
throws QueryExecutionException
{
Expand Down
Expand Up @@ -160,7 +160,6 @@ public void registered( NeoStoreDataSource dataSource )
{
engine = QueryEngineProvider.initialize(
platform.dependencies, platform.graphDatabaseFacade, dependencies.executionEngines() );
graphDatabaseFacade.initTransactionalContextFactoryFromSPI();
}

queryEngine.set( engine );
Expand Down
Expand Up @@ -73,7 +73,6 @@ public GraphDatabaseService apply( Context context ) throws ProcedureException
GraphDatabaseFacade facade = new GraphDatabaseFacade();
facade.init( new ProcedureGDBFacadeSPI( owningThread, transaction, queryExecutor, storeDir, resolver,
AutoIndexing.UNSUPPORTED, storeId, availability, urlValidator ), config );
facade.initTransactionalContextFactoryFromSPI();
return facade;
}

Expand Down
Expand Up @@ -22,6 +22,8 @@
import java.util.Map;
import java.util.function.Supplier;

import org.neo4j.function.Suppliers;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.ExecutingQuery;
import org.neo4j.kernel.api.Statement;
Expand All @@ -34,16 +36,16 @@

public class Neo4jTransactionalContextFactory implements TransactionalContextFactory
{
private final GraphDatabaseQueryService queryService;
private final Supplier<Statement> statementSupplier;
private final Supplier<GraphDatabaseQueryService> queryServiceSupplier;
private final ThreadToStatementContextBridge txBridge;
private final PropertyContainerLocker locker;
private final DbmsOperations.Factory dbmsOpsFactory;
private final Guard guard;

public Neo4jTransactionalContextFactory( GraphDatabaseFacade.SPI spi, PropertyContainerLocker locker )
{
this( spi.queryService(), locker, spi::currentStatement );
this( locker, spi::currentStatement, spi.resolver() );
}

@Deprecated
Expand All @@ -57,24 +59,36 @@ public Neo4jTransactionalContextFactory( GraphDatabaseQueryService queryService,
Supplier<Statement> statementSupplier )
{
this(
queryService,
statementSupplier,
queryService.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class ),
locker,
queryService.getDependencyResolver().resolveDependency( DbmsOperations.Factory.class ),
queryService.getDependencyResolver().resolveDependency( Guard.class )
Suppliers.singleton( queryService ),
statementSupplier,
queryService.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class ),
locker,
queryService.getDependencyResolver().resolveDependency( DbmsOperations.Factory.class ),
queryService.getDependencyResolver().resolveDependency( Guard.class )
);
}

public Neo4jTransactionalContextFactory( PropertyContainerLocker locker, Supplier<Statement> statementSupplier,
DependencyResolver resolver )
{
this( () -> resolver.resolveDependency( GraphDatabaseQueryService.class ),
statementSupplier,
resolver.resolveDependency( ThreadToStatementContextBridge.class ),
locker,
resolver.resolveDependency( DbmsOperations.Factory.class ),
resolver.resolveDependency( Guard.class )
);
}

public Neo4jTransactionalContextFactory(
GraphDatabaseQueryService queryService,
Supplier<GraphDatabaseQueryService> queryServiceSupplier,
Supplier<Statement> statementSupplier,
ThreadToStatementContextBridge txBridge,
PropertyContainerLocker locker,
DbmsOperations.Factory dbmsOpsFactory,
Guard guard
) {
this.queryService = queryService;
this.queryServiceSupplier = queryServiceSupplier;
this.statementSupplier = statementSupplier;
this.txBridge = txBridge;
this.locker = locker;
Expand All @@ -91,6 +105,7 @@ public Neo4jTransactionalContext newContext(
)
{
Statement statement = statementSupplier.get();
GraphDatabaseQueryService queryService = queryServiceSupplier.get();
QuerySource querySourceWithUserName = querySource.append( tx.mode().name() );
ExecutingQuery executingQuery = statement.queryRegistration().startQueryExecution(
querySourceWithUserName, queryText, queryParameters
Expand All @@ -114,6 +129,7 @@ public Neo4jTransactionalContext newContext(
public Neo4jTransactionalContext newContext( ExecutingQuery query, InternalTransaction transaction )
{
Statement statement = statementSupplier.get();
GraphDatabaseQueryService queryService = queryServiceSupplier.get();
statement.queryRegistration().registerExecutingQuery( query );

return new Neo4jTransactionalContext(
Expand Down
Expand Up @@ -61,14 +61,13 @@ public void setUp()
ThreadToStatementContextBridge contextBridge = mock( ThreadToStatementContextBridge.class );

when( spi.queryService() ).thenReturn( queryService );
when( queryService.getDependencyResolver() ).thenReturn( dependencyResolver );
when( spi.resolver() ).thenReturn( dependencyResolver );
when( dependencyResolver.resolveDependency( ThreadToStatementContextBridge.class ) )
.thenReturn( contextBridge );
when( contextBridge.get() ).thenReturn( statement );
defaultConfig = Config.defaults();

graphDatabaseFacade.init( spi, defaultConfig );
graphDatabaseFacade.initTransactionalContextFactoryFromSPI();
}

@Test
Expand Down
Expand Up @@ -71,7 +71,8 @@
@SuppressWarnings( "unused" )
public class BuiltInProcedures
{
public static Clock clock = Clocks.systemClock();
private static Clock clock = Clocks.systemClock();
private static final int HARD_CHAR_LIMIT = 2048;

@Context
public DependencyResolver resolver;
Expand All @@ -81,17 +82,16 @@ public class BuiltInProcedures

@Context
public KernelTransaction tx;

@Context
public AuthSubject authSubject;

@Procedure( name = "dbms.setTXMetaData", mode = DBMS )
public void setTXMetaData( @Name( value = "data" ) Map<String,Object> data )
{
int totalCharSize = data.entrySet().stream()
.mapToInt( e -> e.getKey().length() + e.getValue().toString().length() ).sum();
.mapToInt( e -> e.getKey().length() + e.getValue().toString().length() )
.sum();

final int HARD_CHAR_LIMIT = 2048;
if ( totalCharSize >= HARD_CHAR_LIMIT )
{
throw new IllegalArgumentException(
Expand Down

0 comments on commit 9c03bc4

Please sign in to comment.