Skip to content

Commit

Permalink
Introduce start time and timeout into kernel statement and transaction.
Browse files Browse the repository at this point in the history
Update guard to be able to check new statement and transaction timeout.
  • Loading branch information
MishaDemianenko committed Aug 16, 2016
1 parent 52de8cb commit 7544371
Show file tree
Hide file tree
Showing 27 changed files with 278 additions and 449 deletions.
Expand Up @@ -320,7 +320,13 @@ public long lastTransactionIdWhenStarted()
}

@Override
public long localStartTime()
public long startTime()
{
return 0;
}

@Override
public long timeout()
{
return 0;
}
Expand Down
Expand Up @@ -31,11 +31,12 @@ import org.neo4j.cypher.javacompat.internal.GraphDatabaseCypherService
import org.neo4j.graphdb._
import org.neo4j.graphdb.config.Setting
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.helpers.Clock
import org.neo4j.kernel.api._
import org.neo4j.kernel.api.security.AccessMode
import org.neo4j.kernel.impl.api.{KernelStatement, KernelTransactionImplementation}
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge
import org.neo4j.kernel.impl.coreapi.{PropertyContainerLocker, InternalTransaction}
import org.neo4j.kernel.impl.coreapi.{InternalTransaction, PropertyContainerLocker}
import org.neo4j.kernel.impl.proc.Procedures
import org.neo4j.kernel.impl.query.Neo4jTransactionalContext
import org.neo4j.test.TestGraphDatabaseFactory
Expand All @@ -56,7 +57,7 @@ class TransactionBoundQueryContextTest extends CypherFunSuite {
outerTx = mock[InternalTransaction]
val kernelTransaction = mock[KernelTransactionImplementation]
when(kernelTransaction.mode()).thenReturn(AccessMode.Static.FULL)
statement = new KernelStatement(kernelTransaction, null, null, null, new Procedures())
statement = new KernelStatement(kernelTransaction, null, null, null, new Procedures(), Clock.SYSTEM_CLOCK)
}

override def afterEach() {
Expand Down
Expand Up @@ -204,10 +204,10 @@ public abstract class GraphDatabaseSettings
setting( "unsupported.dbms.transaction_start_timeout", DURATION, "1s" );

@Description("The maximum time interval of a transaction within which it should be completed.")
public static final Setting<Long> transaction_timeout = setting( "dbms.transaction.timeout", DURATION, NO_DEFAULT );
public static final Setting<Long> transaction_timeout = setting( "dbms.transaction.timeout", DURATION, "0" );

@Description("The maximum time interval of a query within which it should be completed.")
public static final Setting<Long> query_timeout = setting( "dbms.query.timeout", DURATION, NO_DEFAULT );
public static final Setting<Long> query_timeout = setting( "dbms.query.timeout", DURATION, "0" );

@Description( "The maximum amount of time to wait for running transactions to complete before allowing "
+ "initiated database shutdown to continue" )
Expand Down
Expand Up @@ -798,7 +798,8 @@ private KernelModule buildKernel( TransactionAppender appender,
transactionCommitProcess, indexConfigStore, legacyIndexProviderLookup, hooks, transactionMonitor,
life, tracers, storageEngine, procedures, transactionIdStore, config, Clock.SYSTEM_CLOCK ) );

final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures );
final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures,
config );

kernel.registerTransactionHook( transactionEventHandlers );

Expand Down
Expand Up @@ -150,7 +150,13 @@ interface CloseListener
* @return start time of this transaction, i.e. basically {@link System#currentTimeMillis()} when user called
* {@link Kernel#newTransaction(Type, AccessMode)}.
*/
long localStartTime();
long startTime();

/**
* Timeout for transaction.
* @return transaction timeout
*/
long timeout();

/**
* Register a {@link CloseListener} to be invoked after commit, but before transaction events "after" hooks
Expand Down
Expand Up @@ -48,7 +48,7 @@ public interface KernelTransactionHandle
*
* @return the transaction start time.
*/
long localStartTime();
long startTime();

/**
* Check if the underlying transaction is open.
Expand Down
110 changes: 30 additions & 80 deletions community/kernel/src/main/java/org/neo4j/kernel/guard/Guard.java
Expand Up @@ -19,114 +19,64 @@
*/
package org.neo4j.kernel.guard;

import org.neo4j.logging.Log;
import java.util.function.Supplier;

import static java.lang.System.currentTimeMillis;
import org.neo4j.helpers.Clock;
import org.neo4j.kernel.impl.api.KernelStatement;
import org.neo4j.kernel.impl.api.KernelTransactionImplementation;
import org.neo4j.logging.Log;

public class Guard
{
private final ThreadLocal<GuardInternal> threadLocal = new ThreadLocal<>();

private final Log log;
private Clock clock;

public Guard( final Log log )
public Guard( final Log log, Clock clock )
{
this.log = log;
this.clock = clock;
}

public void check()
{
GuardInternal guardInternal = currentGuard();
if ( guardInternal != null )
{
guardInternal.check();
}
}

public <T extends GuardInternal> T currentGuard()
public void check( KernelStatement statement)
{
return (T) threadLocal.get();
check( maxStatementCompletionTimeSupplier( statement ), "Statement timeout." );
check( statement.getTransaction() );
}

public void startOperationsCount( final long maxOps )
public void check (KernelTransactionImplementation transaction)
{
start( new OperationsCount( maxOps ) );
check( maxTransactionCompletionTimeSupplier( transaction ), "Transaction timeout." );
}

public void startTimeout( final long validForInMilliSeconds )
private void check( Supplier<Long> completionTimeSupplier, String timeoutDescription )
{
final Timeout timeout = new Timeout( validForInMilliSeconds + currentTimeMillis() );
start( timeout );
}

public void start( final GuardInternal guard )
{
threadLocal.set( guard );
}

public <T extends GuardInternal> T stop()
{
T guardInternal = Guard.this.<T>currentGuard();
if ( guardInternal != null )
long now = clock.currentTimeMillis();
long transactionCompletionTime = completionTimeSupplier.get();
if ( transactionCompletionTime < now )
{
threadLocal.remove();
final long overtime = now - transactionCompletionTime;
log.warn( timeoutDescription + " ( Overtime: " + overtime + " ms)." );
throw new GuardTimeoutException( overtime );
}
return guardInternal;
}

public interface GuardInternal
private static Supplier<Long> maxTransactionCompletionTimeSupplier( KernelTransactionImplementation transaction )
{
void check();
return () -> getMaxTransactionCompletionTime( transaction );
}

public class OperationsCount implements GuardInternal
private static Supplier<Long> maxStatementCompletionTimeSupplier( KernelStatement statement )
{
private final long max;
private long opsCount = 0;

private OperationsCount( final long max )
{
this.max = max;
}

@Override
public void check()
{
opsCount++;

if ( max < opsCount )
{
log.warn( "guard-timeout: node-ops: more than " + max );
throw new GuardOperationsCountException( opsCount );
}
}

public long getOpsCount()
{
return opsCount;
}
return () -> getMaxStatementCompletionTime( statement );
}

public class Timeout implements GuardInternal
private static long getMaxStatementCompletionTime( KernelStatement statement )
{
private final long valid;
private final long start;

private Timeout( final long valid )
{
this.valid = valid;
this.start = currentTimeMillis();
}
return statement.startTime() + statement.timeout();
}

@Override
public void check()
{
if ( valid < currentTimeMillis() )
{
final long overtime = currentTimeMillis() - valid;
log.warn( "guard-timeout:" + (valid - start) + "(+" + overtime + ")ms" );
throw new GuardTimeoutException( overtime );
}
}
private static long getMaxTransactionCompletionTime( KernelTransactionImplementation transaction )
{
return transaction.startTime() + transaction.timeout();
}
}

0 comments on commit 7544371

Please sign in to comment.