Skip to content

Commit

Permalink
Changes for code review
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Jul 14, 2016
1 parent b1cb7af commit 8aaff03
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 61 deletions.
Expand Up @@ -82,7 +82,7 @@ public State init( SessionStateMachine ctx, String clientName, Map<String,Object
try
{
AuthenticationResult authResult = ctx.spi.authenticate( authToken );
ctx.transactionIdTracker = ctx.spi.versionTracking( currentHighestTransactionId );
ctx.versionTracker = ctx.spi.versionTracker( currentHighestTransactionId );
ctx.authSubject = authResult.getAuthSubject();
ctx.credentialsExpired = authResult.credentialsExpired();
ctx.result( authResult.credentialsExpired() );
Expand Down Expand Up @@ -169,7 +169,7 @@ private State doBeginTransaction( SessionStateMachine ctx, KernelTransaction.Typ
// way, we need a different way to kill statements running in implicit
// transactions, because we do that by calling #terminate() on this tx.
ctx.currentTransaction =
ctx.spi.beginTransaction( type, ctx.authSubject, ctx.transactionIdTracker );
ctx.spi.beginTransaction( type, ctx.authSubject, ctx.versionTracker );
return IN_TRANSACTION;
}
catch ( TransactionFailureException e )
Expand Down Expand Up @@ -691,7 +691,7 @@ public String[] fieldNames()
/** These are the "external" actions the state machine can take */
private final SPI spi;

private SPI.TransactionIdTracker transactionIdTracker;
private VersionTracker versionTracker;

/**
* This SPI encapsulates the "external" actions the state machine can take.
Expand All @@ -712,8 +712,7 @@ interface SPI
String connectionDescriptor();
void reportError( Neo4jError err );
void reportError( String message, Throwable cause );
KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode,
TransactionIdTracker transactionIdTracker )
KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode, VersionTracker versionTracker )
throws TransactionFailureException;
void bindTransactionToCurrentThread( KernelTransaction tx );
void unbindTransactionFromCurrentThread();
Expand All @@ -724,14 +723,7 @@ RecordStream run( SessionStateMachine ctx, String statement, Map<String, Object>
Statement currentStatement();
void sessionActivated( Session session );
void sessionHalted( Session session );
TransactionIdTracker versionTracking( long startingVersion );

interface TransactionIdTracker
{
void assertUpToDate() throws TransactionFailureException;

void updateVersion( long version );
}
VersionTracker versionTracker( long startingVersion );
}

SessionStateMachine( String connectionDescriptor, UsageData usageData, GraphDatabaseAPI db,
Expand Down
Expand Up @@ -95,13 +95,13 @@ public void reportError( String message, Throwable cause )
}

@Override
public KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode, TransactionIdTracker transactionIdTracker )
public KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode, VersionTracker versionTracker )
throws TransactionFailureException
{
transactionIdTracker.assertUpToDate();
versionTracker.assertUpToDate();
db.beginTransaction( type, mode );
KernelTransaction kernelTransaction = txBridge.getKernelTransactionBoundToThisThread( false );
kernelTransaction.registerCloseListener( transactionIdTracker::updateVersion );
kernelTransaction.registerCloseListener( versionTracker::updateVersion );
return kernelTransaction;
}

Expand Down Expand Up @@ -155,8 +155,8 @@ public void sessionHalted( Session session )
sessionTracker.sessionHalted( session );
}

public TransactionIdTracker versionTracking( long startingVersion )
public VersionTracker versionTracker( long startingVersion )
{
return new TransactionIdTracking( transactionIdStore, startingVersion, 30, TimeUnit.SECONDS );
return new TransactionIdTracker( transactionIdStore, startingVersion, 30, TimeUnit.SECONDS );
}
}
Expand Up @@ -22,22 +22,23 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.neo4j.function.Predicates;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;

import static org.neo4j.function.Predicates.tryAwait;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;

class TransactionIdTracking implements SessionStateMachine.SPI.TransactionIdTracker
class TransactionIdTracker implements VersionTracker
{
private final Supplier<TransactionIdStore> transactionIdStore;
private final int timeout;
private final TimeUnit timeoutUnit;

private long txId;

TransactionIdTracking( Supplier<TransactionIdStore> transactionIdStore, long transactionId, int timeout,
TimeUnit unit )
TransactionIdTracker( Supplier<TransactionIdStore> transactionIdStore, long transactionId, int timeout,
TimeUnit unit )
{
this.transactionIdStore = transactionIdStore;
this.txId = transactionId;
Expand All @@ -55,13 +56,18 @@ public void assertUpToDate() throws TransactionFailureException

try
{
Predicates.await( () -> txId <= transactionIdStore.get().getLastClosedTransactionId(), timeout, timeoutUnit,
25, TimeUnit.MILLISECONDS );
if ( !tryAwait( () -> txId <= transactionIdStore.get().getLastClosedTransactionId(), timeout, timeoutUnit,
25, TimeUnit.MILLISECONDS ) )
{
throw new TransactionFailureException( Status.Transaction.InstanceStateChanged,
"Database not up to the requested version: %d. Latest database version is %d", txId,
transactionIdStore.get().getLastClosedTransactionId() );
}
}
catch ( Throwable e )
catch ( InterruptedException e )
{
throw new TransactionFailureException( "Database not up to the requested version: " + txId + ". " +
"Latest database version is " + transactionIdStore.get().getLastClosedTransactionId(), e );
throw new TransactionFailureException( Status.Transaction.TransactionStartFailed, e,
"Thread interrupted when starting transaction" );
}
}

Expand Down
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.runtime.internal;

import org.neo4j.kernel.api.exceptions.TransactionFailureException;

interface VersionTracker
{
void assertUpToDate() throws TransactionFailureException;

void updateVersion( long version );
}
Expand Up @@ -177,7 +177,7 @@ public void reportError( String message, Throwable cause )
}

@Override
public KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode, TransactionIdTracker transactionIdTracker )
public KernelTransaction beginTransaction( KernelTransaction.Type type, AccessMode mode, VersionTracker versionTracker )
{
liveTransactions.incrementAndGet();
return new CloseTrackingKernelTransaction();
Expand Down Expand Up @@ -232,12 +232,12 @@ public void sessionHalted( Session session )
}

@Override
public TransactionIdTracker versionTracking( long startingVersion )
public VersionTracker versionTracker( long startingVersion )
{
return new TransactionIdTracker()
return new VersionTracker()
{
@Override
public void assertUpToDate() throws TransactionFailureException
public void assertUpToDate()
{
}

Expand Down
Expand Up @@ -93,7 +93,7 @@ public void shouldStopRunningTxOnHalt() throws Throwable
// Then
assertThat( machine.state(), CoreMatchers.equalTo( SessionStateMachine.State.STOPPED ) );
verify( spi ).beginTransaction( any( KernelTransaction.Type.class ), any( AccessMode.class ),
any( SessionStateMachine.SPI.TransactionIdTracker.class ) );
any( VersionTracker.class ) );
verify( ktx ).close();
}

Expand Down
Expand Up @@ -40,11 +40,11 @@ public void shouldAlwaysReturnIfTheRequestVersionIsBaseTxIdOrLess() throws Excep
{
// given
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( -1L );
TransactionIdTracking transactionIdTracking =
new TransactionIdTracking( () -> transactionIdStore, BASE_TX_ID, 5, SECONDS );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( () -> transactionIdStore, BASE_TX_ID, 5, SECONDS );

// when
transactionIdTracking.assertUpToDate();
transactionIdTracker.assertUpToDate();

// then all good!
}
Expand All @@ -55,11 +55,11 @@ public void shouldReturnIfTheVersionIsUpToDate() throws Exception
// given
long version = 5L;
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version );
TransactionIdTracking transactionIdTracking =
new TransactionIdTracking( () -> transactionIdStore, version, 5, SECONDS );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( () -> transactionIdStore, version, 5, SECONDS );

// when
transactionIdTracking.assertUpToDate();
transactionIdTracker.assertUpToDate();

// then all good!

Expand All @@ -71,13 +71,13 @@ public void shouldTimeoutIfTheVersionIsTooHigh() throws Exception
// given
long version = 5L;
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version );
TransactionIdTracking transactionIdTracking =
new TransactionIdTracking( () -> transactionIdStore, version + 1, 100, MILLISECONDS );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( () -> transactionIdStore, version + 1, 100, MILLISECONDS );

// when
try
{
transactionIdTracking.assertUpToDate();
transactionIdTracker.assertUpToDate();
fail( "should have thrown" );
}
catch ( TransactionFailureException ex )
Expand All @@ -93,11 +93,11 @@ public void shouldBeKeepCheckingForNewVersionUntilTheTimeoutIsReached() throws E
long version = 5L;
when( transactionIdStore.getLastClosedTransactionId() )
.thenReturn( version, version, version, version, version + 1 );
TransactionIdTracking transactionIdTracking =
new TransactionIdTracking( () -> transactionIdStore, version + 1, 5, SECONDS );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( () -> transactionIdStore, version + 1, 5, SECONDS );

// when
transactionIdTracking.assertUpToDate();
transactionIdTracker.assertUpToDate();

// then all good!
}
Expand All @@ -108,11 +108,11 @@ public void shouldBeAbleToUpdateTheVersion() throws Exception
// given
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( 42L );
long newVersion = 45L;
TransactionIdTracking transactionIdTracking =
new TransactionIdTracking( () -> transactionIdStore, newVersion, 10, MILLISECONDS );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( () -> transactionIdStore, newVersion, 10, MILLISECONDS );
try
{
transactionIdTracking.assertUpToDate();
transactionIdTracker.assertUpToDate();
fail( "should have thrown" );
}
catch ( TransactionFailureException e )
Expand All @@ -123,11 +123,11 @@ public void shouldBeAbleToUpdateTheVersion() throws Exception
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( newVersion );

// when
transactionIdTracking.updateVersion( 46L );
transactionIdTracker.updateVersion( 46L );

try
{
transactionIdTracking.assertUpToDate();
transactionIdTracker.assertUpToDate();
fail( "should have thrown" );
}
catch ( TransactionFailureException e )
Expand All @@ -142,14 +142,14 @@ public void shouldNotUpdateVersionIfNoPreviousTransactionsInTheDatabase() throws
// given
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( 42L );
final int txIdForEmptyDatabase = -1;
TransactionIdTracking transactionIdTracking =
new TransactionIdTracking( () -> transactionIdStore, txIdForEmptyDatabase, 5, SECONDS );
transactionIdTracking.assertUpToDate();
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( () -> transactionIdStore, txIdForEmptyDatabase, 5, SECONDS );
transactionIdTracker.assertUpToDate();

// when
transactionIdTracking.updateVersion( 46L );
transactionIdTracker.updateVersion( 46L );

transactionIdTracking.assertUpToDate();
transactionIdTracker.assertUpToDate();

// then all good!
}
Expand All @@ -160,16 +160,16 @@ public void shouldUpdateVersionUsingTheTransactionIdStoreWhenTheGivenVersionIsBa
// given
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn(
42L, 44L, 43L /* this doesn't make any sense in real life but it helps asserting in this scenario */ );
TransactionIdTracking transactionIdTracking =
new TransactionIdTracking( () -> transactionIdStore, 42L, 10, MILLISECONDS );
transactionIdTracking.assertUpToDate();
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( () -> transactionIdStore, 42L, 10, MILLISECONDS );
transactionIdTracker.assertUpToDate();

// when
transactionIdTracking.updateVersion( BASE_TX_ID );
transactionIdTracker.updateVersion( BASE_TX_ID );

try
{
transactionIdTracking.assertUpToDate();
transactionIdTracker.assertUpToDate();
fail( "should have thrown" );
}
catch ( TransactionFailureException e )
Expand Down
14 changes: 12 additions & 2 deletions community/common/src/main/java/org/neo4j/function/Predicates.java
Expand Up @@ -173,6 +173,16 @@ public static void awaitEx( ThrowingSupplier<Boolean, Exception> condition, long

public static void await( Supplier<Boolean> condition, long timeout, TimeUnit timeoutUnit, long pollInterval, TimeUnit pollUnit )
throws TimeoutException, InterruptedException
{
if ( !tryAwait( condition, timeout, timeoutUnit, pollInterval, pollUnit ) )
{
throw new TimeoutException(
"Waited for " + timeout + " " + timeoutUnit + ", but " + condition + " was not accepted." );
}
}

public static boolean tryAwait( Supplier<Boolean> condition, long timeout, TimeUnit timeoutUnit, long pollInterval, TimeUnit pollUnit )
throws InterruptedException
{
long deadlineMillis = System.currentTimeMillis() + timeoutUnit.toMillis( timeout );
long pollIntervalMillis = pollUnit.toMillis( pollInterval );
Expand All @@ -181,12 +191,12 @@ public static void await( Supplier<Boolean> condition, long timeout, TimeUnit ti
{
if ( condition.get() )
{
return;
return true;
}
Thread.sleep( pollIntervalMillis );
}
while ( System.currentTimeMillis() < deadlineMillis );
throw new TimeoutException( "Waited for " + timeout + " " + timeoutUnit + ", but " + condition + " was not accepted." );
return false;
}

public static void awaitForever( BooleanSupplier condition, long checkInterval, TimeUnit unit ) throws InterruptedException
Expand Down

0 comments on commit 8aaff03

Please sign in to comment.