Skip to content

Commit

Permalink
Add KernelStatement interface
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed May 8, 2017
1 parent 61d45b3 commit eef8df3
Show file tree
Hide file tree
Showing 27 changed files with 436 additions and 342 deletions.
Expand Up @@ -35,7 +35,7 @@ import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracerSupplier
import org.neo4j.kernel.api._
import org.neo4j.kernel.api.security.SecurityContext.AUTH_DISABLED
import org.neo4j.kernel.api.security.{AnonymousContext, SecurityContext}
import org.neo4j.kernel.impl.api.{KernelStatement, KernelTransactionImplementation, StatementOperationParts}
import org.neo4j.kernel.impl.api.{KernelStatementImplementation, KernelTransactionImplementation, StatementOperationParts}
import org.neo4j.kernel.impl.coreapi.{InternalTransaction, PropertyContainerLocker}
import org.neo4j.kernel.impl.factory.CanWrite
import org.neo4j.kernel.impl.locking.LockTracer
Expand All @@ -51,7 +51,7 @@ class TransactionBoundQueryContextTest extends CypherFunSuite {

var graph: GraphDatabaseCypherService = null
var outerTx: InternalTransaction = null
var statement: KernelStatement = null
var statement: KernelStatementImplementation = null
val indexSearchMonitor = mock[IndexSearchMonitor]
val locker = mock[PropertyContainerLocker]

Expand All @@ -63,7 +63,7 @@ class TransactionBoundQueryContextTest extends CypherFunSuite {
when(kernelTransaction.securityContext()).thenReturn(AUTH_DISABLED)
val storeStatement = mock[StorageStatement]
val operations = mock[StatementOperationParts](RETURNS_DEEP_STUBS)
statement = new KernelStatement(kernelTransaction, null, storeStatement, new Procedures(), new CanWrite(), LockTracer.NONE)
statement = new KernelStatementImplementation(kernelTransaction, null, storeStatement, new Procedures(), new CanWrite(), LockTracer.NONE)
statement.initialize(null, operations, PageCursorTracerSupplier.NULL.get())
statement.acquire()
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.api;

import java.util.Map;
import java.util.Optional;

import org.neo4j.function.Disposable;
Expand Down Expand Up @@ -183,6 +184,10 @@ default void close() throws TransactionFailureException
*/
long timeout();

void setUserMetaData( Map<String, Object> data );

Map<String, Object> getUserMetaData();

/**
* Register a {@link CloseListener} to be invoked after commit, but before transaction events "after" hooks
* are invoked.
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.guard;

import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.impl.api.KernelStatement;
import org.neo4j.kernel.impl.api.KernelTransactionImplementation;

Expand All @@ -28,6 +29,6 @@
*/
public interface Guard
{
void check( KernelTransactionImplementation transaction );
void check( KernelTransaction transaction );
void check( KernelStatement statement );
}
Expand Up @@ -21,9 +21,9 @@

import java.time.Clock;

import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.api.KernelStatement;
import org.neo4j.kernel.impl.api.KernelTransactionImplementation;
import org.neo4j.logging.Log;

import static org.neo4j.graphdb.factory.GraphDatabaseSettings.UNSPECIFIED_TIMEOUT;
Expand All @@ -46,33 +46,33 @@ public TimeoutGuard( Clock clock, final Log log )
@Override
public void check( KernelStatement statement )
{
check( statement.getTransaction() );
check( statement.transaction() );
}

@Override
public void check( KernelTransactionImplementation transaction )
public void check( KernelTransaction transaction )
{
if ( transaction.timeout() > UNSPECIFIED_TIMEOUT )
{
check( transaction, "Transaction timeout." );
doCheck( transaction );
}
}

private void check( KernelTransactionImplementation transaction, String timeoutDescription )
private void doCheck( KernelTransaction transaction )
{
long now = clock.millis();
long maxCompletionTime = getMaxTransactionCompletionTime( transaction );
if ( maxCompletionTime < now )
{
long overtime = now - maxCompletionTime;
String message = timeoutDescription + " (Overtime: " + overtime + " ms).";
String message = "Transaction timeout. (Overtime: " + overtime + " ms).";
log.warn( message );
transaction.markForTermination( Status.Transaction.TransactionTimedOut );
throw new GuardTimeoutException( message, overtime );
}
}

private static long getMaxTransactionCompletionTime( KernelTransactionImplementation transaction )
private static long getMaxTransactionCompletionTime( KernelTransaction transaction )
{
return transaction.startTime() + transaction.timeout();
}
Expand Down
Expand Up @@ -19,273 +19,36 @@
*/
package org.neo4j.kernel.impl.api;

import java.util.Optional;
import java.util.function.Function;

import org.neo4j.graphdb.NotInTransactionException;
import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracer;
import org.neo4j.kernel.api.DataWriteOperations;
import org.neo4j.kernel.api.ExecutionStatisticsOperations;
import org.neo4j.kernel.api.ProcedureCallOperations;
import org.neo4j.kernel.api.QueryRegistryOperations;
import org.neo4j.kernel.api.ReadOperations;
import org.neo4j.kernel.api.SchemaWriteOperations;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.TokenWriteOperations;
import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.query.ExecutingQuery;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
import org.neo4j.kernel.api.txstate.TransactionState;
import org.neo4j.kernel.api.txstate.TxStateHolder;
import org.neo4j.kernel.impl.factory.AccessCapability;
import org.neo4j.kernel.impl.locking.LockTracer;
import org.neo4j.kernel.impl.locking.StatementLocks;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.storageengine.api.StorageStatement;
import org.neo4j.storageengine.api.txstate.ReadableTransactionState;
import org.neo4j.storageengine.api.txstate.WritableTransactionState;

/**
* A resource efficient implementation of {@link Statement}. Designed to be reused within a
* {@link KernelTransactionImplementation} instance, even across transactions since this instances itself
* doesn't hold essential state. Usage:
*
* <ol>
* <li>Construct {@link KernelStatement} when {@link KernelTransactionImplementation} is constructed</li>
* <li>For every transaction...</li>
* <li>Call {@link #initialize(StatementLocks, StatementOperationParts, PageCursorTracer)} which makes this instance
* full available and ready to use. Call when the {@link KernelTransactionImplementation} is initialized.</li>
* <li>Alternate {@link #acquire()} / {@link #close()} when acquiring / closing a statement for the transaction...
* Temporarily asymmetric number of calls to {@link #acquire()} / {@link #close()} is supported, although in
* the end an equal number of calls must have been issued.</li>
* <li>To be safe call {@link #forceClose()} at the end of a transaction to force a close of the statement,
* even if there are more than one current call to {@link #acquire()}. This instance is now again ready
* to be {@link #initialize(StatementLocks, StatementOperationParts, PageCursorTracer)} initialized} and used for the transaction
* instance again, when it's initialized.</li>
* </ol>
*/
public class KernelStatement implements TxStateHolder, Statement
public interface KernelStatement extends TxStateHolder, Statement
{
private final TxStateHolder txStateHolder;
private final StorageStatement storeStatement;
private final AccessCapability accessCapability;
private final KernelTransactionImplementation transaction;
private final OperationsFacade facade;
private StatementLocks statementLocks;
private PageCursorTracer pageCursorTracer = PageCursorTracer.NULL;
private int referenceCount;
private volatile ExecutingQueryList executingQueryList;
private final LockTracer systemLockTracer;

public KernelStatement( KernelTransactionImplementation transaction,
TxStateHolder txStateHolder,
StorageStatement storeStatement,
Procedures procedures,
AccessCapability accessCapability,
LockTracer systemLockTracer )
{
this.transaction = transaction;
this.txStateHolder = txStateHolder;
this.storeStatement = storeStatement;
this.accessCapability = accessCapability;
this.facade = new OperationsFacade( transaction, this, procedures );
this.executingQueryList = ExecutingQueryList.EMPTY;
this.systemLockTracer = systemLockTracer;
}

@Override
public ReadOperations readOperations()
{
assertAllows( AccessMode::allowsReads, "Read" );
return facade;
}

@Override
public ProcedureCallOperations procedureCallOperations()
{
return facade;
}

@Override
public ExecutionStatisticsOperations executionStatisticsOperations()
{
return facade;
}

@Override
public TokenWriteOperations tokenWriteOperations()
{
accessCapability.assertCanWrite();

return facade;
}

@Override
public DataWriteOperations dataWriteOperations()
throws InvalidTransactionTypeKernelException
{
accessCapability.assertCanWrite();

assertAllows( AccessMode::allowsWrites, "Write" );
transaction.upgradeToDataWrites();
return facade;
}

@Override
public SchemaWriteOperations schemaWriteOperations()
throws InvalidTransactionTypeKernelException
{
accessCapability.assertCanWrite();

assertAllows( AccessMode::allowsSchemaWrites, "Schema" );
transaction.upgradeToSchemaWrites();
return facade;
}

@Override
public QueryRegistryOperations queryRegistration()
{
return facade;
}

@Override
public ReadableTransactionState readableTxState()
{
return txStateHolder.readableTxState();
}

@Override
public WritableTransactionState writableTxState()
{
return txStateHolder.writableTxState();
}

@Override
public LegacyIndexTransactionState legacyIndexTxState()
{
return txStateHolder.legacyIndexTxState();
}

@Override
public void close()
{
// Check referenceCount > 0 since we allow multiple close calls,
// i.e. ignore closing already closed statements
if ( referenceCount > 0 && (--referenceCount == 0) )
{
cleanupResources();
}
}

void assertOpen()
{
if ( referenceCount == 0 )
{
throw new NotInTransactionException( "The statement has been closed." );
}

Optional<Status> terminationReason = transaction.getReasonIfTerminated();
if ( terminationReason.isPresent() )
{
throw new TransactionTerminatedException( terminationReason.get() );
}
}

public void initialize( StatementLocks statementLocks, StatementOperationParts operationParts,
PageCursorTracer pageCursorCounters )
{
this.statementLocks = statementLocks;
this.pageCursorTracer = pageCursorCounters;
facade.initialize( operationParts );
}

public StatementLocks locks()
{
return statementLocks;
}

public LockTracer lockTracer()
{
LockTracer tracer = executingQueryList.top( ExecutingQuery::lockTracer );
return tracer == null ? systemLockTracer : systemLockTracer.combine( tracer );
}

public PageCursorTracer getPageCursorTracer()
{
return pageCursorTracer;
}

public final void acquire()
{
if ( referenceCount++ == 0 )
{
storeStatement.acquire();
}
}
void assertOpen();

final boolean isAcquired()
{
return referenceCount > 0;
}
StatementLocks locks();

final void forceClose()
{
if ( referenceCount > 0 )
{
referenceCount = 0;
cleanupResources();
}
pageCursorTracer.reportEvents();
}
LockTracer lockTracer();

final String username()
{
return transaction.securityContext().subject().username();
}
PageCursorTracer pageCursorTracer();

final ExecutingQueryList executingQueryList()
{
return executingQueryList;
}
StorageStatement storageStatement();

final void startQueryExecution( ExecutingQuery query )
{
this.executingQueryList = executingQueryList.push( query );
}
KernelTransaction transaction();

final void stopQueryExecution( ExecutingQuery executingQuery )
{
this.executingQueryList = executingQueryList.remove( executingQuery );
}
String username();

public StorageStatement getStoreStatement()
{
return storeStatement;
}
ExecutingQueryList executingQueryList();

private void cleanupResources()
{
// closing is done by KTI
storeStatement.release();
executingQueryList = ExecutingQueryList.EMPTY;
}
void startQueryExecution( ExecutingQuery executingQuery );

public KernelTransactionImplementation getTransaction()
{
return transaction;
}
void stopQueryExecution( ExecutingQuery executingQuery );

void assertAllows( Function<AccessMode,Boolean> allows, String mode )
{
AccessMode accessMode = transaction.securityContext().mode();
if ( !allows.apply( accessMode ) )
{
throw accessMode.onViolation(
String.format( "%s operations are not allowed for %s.", mode,
transaction.securityContext().description() ) );
}
}
void assertAllowsTokenCreates();
}

0 comments on commit eef8df3

Please sign in to comment.