Skip to content

Commit

Permalink
Port of 3.0 execution guard
Browse files Browse the repository at this point in the history
Enhance execution guard to be system wide.
Allow transactions and statement to have timeouts that can be global or instance specific.
Expose interface for external validation check of statements and transactions.
Remove custom server-based filters and guard manipulations.
  • Loading branch information
MishaDemianenko committed Sep 7, 2016
1 parent 1c809b1 commit 3165dab
Show file tree
Hide file tree
Showing 58 changed files with 1,569 additions and 799 deletions.
6 changes: 6 additions & 0 deletions community/common/src/main/java/org/neo4j/time/FakeClock.java
Expand Up @@ -65,6 +65,12 @@ public long nanos()
return nanoTime;
}

@Override
public long millis()
{
return TimeUnit.NANOSECONDS.toMillis( nanoTime );
}

public FakeClock forward( long delta, TimeUnit unit )
{
nanoTime += unit.toNanos( delta );
Expand Down
Expand Up @@ -28,8 +28,8 @@
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.security.URLAccessValidationError;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;

Expand Down Expand Up @@ -78,6 +78,13 @@ public InternalTransaction beginTransaction( KernelTransaction.Type type, Access
return graph.beginTransaction( type, accessMode );
}

@Override
public InternalTransaction beginTransaction( KernelTransaction.Type type, AccessMode accessMode,
long timeout )
{
return graph.beginTransaction( type, accessMode, timeout );
}

@Override
public URL validateURLAccess( URL url ) throws URLAccessValidationError
{
Expand Down
Expand Up @@ -72,7 +72,7 @@ class TransactionBoundQueryContextTest extends CypherFunSuite {
// GIVEN
when(outerTx.failure()).thenThrow(new AssertionError("Shouldn't be called"))
val tc = new Neo4jTransactionalContext(graph, outerTx, KernelTransaction.Type.`implicit`, AccessMode.Static.FULL,
statement, null, locker, null, null)
statement, null, locker, null, null, null)
val transactionalContext = new TransactionalContextWrapperv3_1(tc)
val context = new TransactionBoundQueryContext(transactionalContext)(indexSearchMonitor)
// WHEN
Expand Down
Expand Up @@ -208,6 +208,9 @@ public abstract class GraphDatabaseSettings
public static final Setting<Long> transaction_start_timeout =
setting( "unsupported.dbms.transaction_start_timeout", DURATION, "1s" );

@Description("The maximum time interval of a transaction within which it should be completed.")
public static final Setting<Long> transaction_timeout = setting( "dbms.transaction.timeout", DURATION, "60s" );

@Description( "The maximum amount of time to wait for running transactions to complete before allowing "
+ "initiated database shutdown to continue" )
@Internal
Expand Down
Expand Up @@ -37,10 +37,33 @@
public interface GraphDatabaseQueryService
{
DependencyResolver getDependencyResolver();

Node createNode();

Node createNode( Label... labels );
Node getNodeById(long id);
Relationship getRelationshipById(long id);

Node getNodeById( long id );

Relationship getRelationshipById( long id );

/**
* Begin new internal transaction with with default timeout.
*
* @param type transaction type
* @param accessMode transaction access mode
* @return internal transaction
*/
InternalTransaction beginTransaction( KernelTransaction.Type type, AccessMode accessMode );

/**
* Begin new internal transaction with specified timeout in milliseconds.
*
* @param type transaction type
* @param accessMode transaction access mode
* @param timeout transaction timeout in milliseconds
* @return internal transaction
*/
InternalTransaction beginTransaction( KernelTransaction.Type type, AccessMode accessMode, long timeout );

URL validateURLAccess( URL url ) throws URLAccessValidationError;
}
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -47,6 +48,7 @@
import org.neo4j.kernel.api.legacyindex.AutoIndexing;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.extension.dependency.HighestSelectionStrategy;
import org.neo4j.kernel.guard.EmptyGuard;
import org.neo4j.kernel.guard.Guard;
import org.neo4j.kernel.impl.api.CommitProcessFactory;
import org.neo4j.kernel.impl.api.ConstraintEnforcingEntityOperations;
Expand Down Expand Up @@ -294,6 +296,7 @@ boolean applicable( DiagnosticsPhase phase )
private final ConstraintSemantics constraintSemantics;
private final Procedures procedures;
private final IOLimiter ioLimiter;
private final Clock clock;

private Dependencies dependencies;
private LifeSupport life;
Expand Down Expand Up @@ -342,7 +345,8 @@ public NeoStoreDataSource(
Monitors monitors,
Tracers tracers,
Procedures procedures,
IOLimiter ioLimiter )
IOLimiter ioLimiter,
Clock clock )
{
this.storeDir = storeDir;
this.config = config;
Expand Down Expand Up @@ -374,6 +378,7 @@ public NeoStoreDataSource(
this.tracers = tracers;
this.procedures = procedures;
this.ioLimiter = ioLimiter;
this.clock = clock;

readOnly = config.get( Configuration.read_only );
msgLog = logProvider.getLog( getClass() );
Expand Down Expand Up @@ -474,7 +479,7 @@ public void start() throws IOException
dependencies.resolveDependency( IndexingService.class ),
storageEngine.storeReadLayer(),
updateableSchemaState, dependencies.resolveDependency( LabelScanStore.class ),
storageEngine, indexConfigStore, transactionIdStore );
storageEngine, indexConfigStore, transactionIdStore, clock );

// Do these assignments last so that we can ensure no cyclical dependencies exist
this.storageEngine = storageEngine;
Expand Down Expand Up @@ -656,8 +661,7 @@ private TransactionLogModule buildTransactionLogs(
new CountCommittedTransactionThreshold( txThreshold );

long timeMillisThreshold = config.get( GraphDatabaseSettings.check_point_interval_time );
TimeCheckPointThreshold timeCheckPointThreshold =
new TimeCheckPointThreshold( timeMillisThreshold, Clocks.systemClock() );
TimeCheckPointThreshold timeCheckPointThreshold = new TimeCheckPointThreshold( timeMillisThreshold, clock );

CheckPointThreshold threshold =
CheckPointThresholds.or( countCommittedTransactionThreshold, timeCheckPointThreshold );
Expand Down Expand Up @@ -767,7 +771,8 @@ private KernelModule buildKernel( TransactionAppender appender,
UpdateableSchemaState updateableSchemaState, LabelScanStore labelScanStore,
StorageEngine storageEngine,
IndexConfigStore indexConfigStore,
TransactionIdStore transactionIdStore ) throws KernelException, IOException
TransactionIdStore transactionIdStore,
Clock clock ) throws KernelException, IOException
{
TransactionCommitProcess transactionCommitProcess = commitProcessFactory.create( appender, storageEngine,
config );
Expand All @@ -792,9 +797,10 @@ private KernelModule buildKernel( TransactionAppender appender,
KernelTransactions kernelTransactions = life.add( new KernelTransactions( statementLocksFactory,
constraintIndexCreator, statementOperations, schemaWriteGuard, transactionHeaderInformationFactory,
transactionCommitProcess, indexConfigStore, legacyIndexProviderLookup, hooks, transactionMonitor, life,
tracers, storageEngine, procedures, transactionIdStore, Clocks.systemClock() ) );
tracers, storageEngine, procedures, transactionIdStore, clock ) );

final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures );
final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures,
config );

kernel.registerTransactionHook( transactionEventHandlers );

Expand Down Expand Up @@ -1003,7 +1009,7 @@ private StatementOperationParts buildStatementOperations(
parts = parts.override( null, null, null, lockingContext, lockingContext, lockingContext, lockingContext,
lockingContext, null, null, null, null );
// + Guard
if ( guard != null )
if ( !EmptyGuard.EMPTY_GUARD.equals( guard ) )
{
GuardingStatementOperations guardingOperations = new GuardingStatementOperations(
parts.entityWriteOperations(), parts.entityReadOperations(), guard );
Expand Down
Expand Up @@ -40,10 +40,21 @@ public interface KernelAPI
* underlying graph.
*
* @param type the type of the new transaction: implicit (internally created) or explicit (created by the user)
* @param accessMode
* @param accessMode transaction access mode
*/
KernelTransaction newTransaction( KernelTransaction.Type type, AccessMode accessMode ) throws TransactionFailureException;

/**
* Creates and returns a new {@link KernelTransaction} capable of modifying the
* underlying graph with custom timeout in milliseconds.
*
* @param type the type of the new transaction: implicit (internally created) or explicit (created by the user)
* @param accessMode transaction access mode
* @param timeout transaction timeout in millisiseconds
*/
KernelTransaction newTransaction( KernelTransaction.Type type, AccessMode accessMode, long timeout )
throws TransactionFailureException;

/**
* Registers a {@link TransactionHook} that will receive notifications about committing transactions
* and the changes they commit.
Expand Down
Expand Up @@ -171,7 +171,13 @@ default void close() throws TransactionFailureException
* @return start time of this transaction, i.e. basically {@link System#currentTimeMillis()} when user called
* {@link Kernel#newTransaction(Type, AccessMode)}.
*/
long localStartTime();
long startTime();

/**
* Timeout for transaction.
* @return transaction timeout
*/
long timeout();

/**
* Register a {@link CloseListener} to be invoked after commit, but before transaction events "after" hooks
Expand Down
Expand Up @@ -51,7 +51,7 @@ public interface KernelTransactionHandle
*
* @return the transaction start time.
*/
long localStartTime();
long startTime();

/**
* Check if the underlying transaction is open.
Expand Down
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.guard;

import org.neo4j.kernel.impl.api.KernelStatement;

public class EmptyGuard implements Guard
{
public static final EmptyGuard EMPTY_GUARD = new EmptyGuard();

private EmptyGuard()
{
}

@Override
public void check( KernelStatement statement )
{
// empty
}
}
115 changes: 7 additions & 108 deletions community/kernel/src/main/java/org/neo4j/kernel/guard/Guard.java
Expand Up @@ -19,114 +19,13 @@
*/
package org.neo4j.kernel.guard;

import org.neo4j.logging.Log;
import org.neo4j.kernel.impl.api.KernelStatement;

import static java.lang.System.currentTimeMillis;

public class Guard
/**
* Guard that check entities for compatibility with some kind of guard criteria.
* As soon as entity do not satisfy that criteria {@link GuardException } will be thrown.
*/
public interface Guard
{
private final ThreadLocal<GuardInternal> threadLocal = new ThreadLocal<>();

private final Log log;

public Guard( final Log log )
{
this.log = log;
}

public void check()
{
GuardInternal guardInternal = currentGuard();
if ( guardInternal != null )
{
guardInternal.check();
}
}

public <T extends GuardInternal> T currentGuard()
{
return (T) threadLocal.get();
}

public void startOperationsCount( final long maxOps )
{
start( new OperationsCount( maxOps ) );
}

public void startTimeout( final long validForInMilliSeconds )
{
final Timeout timeout = new Timeout( validForInMilliSeconds + currentTimeMillis() );
start( timeout );
}

public void start( final GuardInternal guard )
{
threadLocal.set( guard );
}

public <T extends GuardInternal> T stop()
{
T guardInternal = Guard.this.<T>currentGuard();
if ( guardInternal != null )
{
threadLocal.remove();
}
return guardInternal;
}

public interface GuardInternal
{
void check();
}

public class OperationsCount implements GuardInternal
{
private final long max;
private long opsCount = 0;

private OperationsCount( final long max )
{
this.max = max;
}

@Override
public void check()
{
opsCount++;

if ( max < opsCount )
{
log.warn( "guard-timeout: node-ops: more than " + max );
throw new GuardOperationsCountException( opsCount );
}
}

public long getOpsCount()
{
return opsCount;
}
}

public class Timeout implements GuardInternal
{
private final long valid;
private final long start;

private Timeout( final long valid )
{
this.valid = valid;
this.start = currentTimeMillis();
}

@Override
public void check()
{
if ( valid < currentTimeMillis() )
{
final long overtime = currentTimeMillis() - valid;
log.warn( "guard-timeout:" + (valid - start) + "(+" + overtime + ")ms" );
throw new GuardTimeoutException( overtime );
}
}
}
void check( KernelStatement statement );
}

0 comments on commit 3165dab

Please sign in to comment.