Skip to content

Commit

Permalink
Handle LocalDatabase restart breaking bolt connections due to Transac…
Browse files Browse the repository at this point in the history
…tionIdStore

not being refreshed

* Introduce CANCELLED state so that polling process doesn't get restarted unless we explicitly want to restart it
* Pass around a supplier of TransactionIdStore and resolve it in TransactionIdTracker
* Bumped drivers to 1.2
  • Loading branch information
Mark Needham committed Mar 31, 2017
1 parent 819fcfc commit f8d8157
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.time.Clock;
import java.time.Duration;
import java.util.function.Supplier;

import org.neo4j.bolt.security.auth.Authentication;
import org.neo4j.graphdb.DependencyResolver;
Expand Down Expand Up @@ -49,8 +50,8 @@ public class BoltFactoryImpl extends LifecycleAdapter implements BoltFactory

private QueryExecutionEngine queryExecutionEngine;
private GraphDatabaseQueryService queryService;
private TransactionIdStore transactionIdStore;
private AvailabilityGuard availabilityGuard;
private DependencyResolver dependencyResolver;

public BoltFactoryImpl( GraphDatabaseAPI gds, UsageData usageData, LogService logging,
ThreadToStatementContextBridge txBridge, Authentication authentication,
Expand All @@ -68,10 +69,9 @@ public BoltFactoryImpl( GraphDatabaseAPI gds, UsageData usageData, LogService lo
@Override
public void start() throws Throwable
{
DependencyResolver dependencyResolver = gds.getDependencyResolver();
dependencyResolver = gds.getDependencyResolver();
queryExecutionEngine = dependencyResolver.resolveDependency( QueryExecutionEngine.class );
queryService = dependencyResolver.resolveDependency( GraphDatabaseQueryService.class );
transactionIdStore = dependencyResolver.resolveDependency( TransactionIdStore.class );
availabilityGuard = dependencyResolver.resolveDependency( AvailabilityGuard.class );
}

Expand All @@ -80,7 +80,6 @@ public void stop() throws Throwable
{
queryExecutionEngine = null;
queryService = null;
transactionIdStore = null;
availabilityGuard = null;
}

Expand All @@ -98,7 +97,7 @@ private TransactionStateMachine.SPI createTxSpi( Clock clock )
long bookmarkReadyTimeout = config.get( GraphDatabaseSettings.bookmark_ready_timeout );
Duration txAwaitDuration = Duration.ofMillis( bookmarkReadyTimeout );

return new TransactionStateMachineSPI( gds, txBridge, queryExecutionEngine, transactionIdStore,
return new TransactionStateMachineSPI( gds, txBridge, queryExecutionEngine,
availabilityGuard, queryService, txAwaitDuration, clock );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Clock;
import java.time.Duration;
import java.util.Map;
import java.util.function.Supplier;

import org.neo4j.bolt.v1.runtime.TransactionStateMachine.BoltResultHandle;
import org.neo4j.bolt.v1.runtime.spi.BoltResult;
Expand Down Expand Up @@ -64,7 +65,6 @@ class TransactionStateMachineSPI implements TransactionStateMachine.SPI
TransactionStateMachineSPI( GraphDatabaseAPI db,
ThreadToStatementContextBridge txBridge,
QueryExecutionEngine queryExecutionEngine,
TransactionIdStore transactionIdStoreSupplier,
AvailabilityGuard availabilityGuard,
GraphDatabaseQueryService queryService,
Duration txAwaitDuration,
Expand All @@ -73,7 +73,10 @@ class TransactionStateMachineSPI implements TransactionStateMachine.SPI
this.db = db;
this.txBridge = txBridge;
this.queryExecutionEngine = queryExecutionEngine;

Supplier<TransactionIdStore> transactionIdStoreSupplier = db.getDependencyResolver().provideDependency( TransactionIdStore.class );
this.transactionIdTracker = new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard, clock );

this.contextFactory = Neo4jTransactionalContextFactory.create( queryService, locker );
this.queryService = queryService;
this.txAwaitDuration = txAwaitDuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public void txIdStoreRefreshedAfterRestart() throws Throwable
when( txIdStoreBeforeRestart.getLastClosedTransactionId() ).thenReturn( 42L );
TransactionIdStore txIdStoreAfterRestart = mock( TransactionIdStore.class );
when( txIdStoreAfterRestart.getLastClosedTransactionId() ).thenReturn( 4242L );
when( dependencyResolver.resolveDependency( TransactionIdStore.class ) )
.thenReturn( txIdStoreBeforeRestart ).thenReturn( txIdStoreAfterRestart );
when( dependencyResolver.provideDependency( TransactionIdStore.class ) )
.thenReturn( () -> txIdStoreBeforeRestart ).thenReturn( () -> txIdStoreAfterRestart );

BoltFactoryImpl boltFactory = newBoltFactory( db );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Supplier;

import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.AvailabilityGuard;
Expand Down Expand Up @@ -56,7 +57,7 @@ public class TransactionStateMachineSPITest
public void throwsWhenTxAwaitDurationExpires() throws Exception
{
long lastClosedTransactionId = 100;
TransactionIdStore txIdStore = fixedTxIdStore( lastClosedTransactionId );
Supplier<TransactionIdStore> txIdStore = () -> fixedTxIdStore( lastClosedTransactionId );
Duration txAwaitDuration = Duration.ofSeconds( 42 );
FakeClock clock = new FakeClock();

Expand Down Expand Up @@ -93,7 +94,7 @@ public void throwsWhenTxAwaitDurationExpires() throws Exception
public void doesNotWaitWhenTxIdUpToDate() throws Exception
{
long lastClosedTransactionId = 100;
TransactionIdStore txIdStore = fixedTxIdStore( lastClosedTransactionId );
Supplier<TransactionIdStore> txIdStore = () -> fixedTxIdStore( lastClosedTransactionId );

TransactionStateMachineSPI txSpi = createTxSpi( txIdStore, Duration.ZERO, Clock.systemUTC() );

Expand All @@ -113,21 +114,27 @@ private static TransactionIdStore fixedTxIdStore( long lastClosedTransactionId )
return txIdStore;
}

private static TransactionStateMachineSPI createTxSpi( TransactionIdStore txIdStore, Duration txAwaitDuration,
private static TransactionStateMachineSPI createTxSpi( Supplier<TransactionIdStore> txIdStore, Duration txAwaitDuration,
Clock clock )
{
AvailabilityGuard availabilityGuard = new AvailabilityGuard( clock, NullLog.getInstance() );
return createTxSpi( txIdStore, txAwaitDuration, availabilityGuard, clock );
}

private static TransactionStateMachineSPI createTxSpi( TransactionIdStore txIdStore, Duration txAwaitDuration,
private static TransactionStateMachineSPI createTxSpi( Supplier<TransactionIdStore> txIdStore, Duration txAwaitDuration,
AvailabilityGuard availabilityGuard, Clock clock )
{
GraphDatabaseQueryService queryService = mock( GraphDatabaseQueryService.class );
when( queryService.getDependencyResolver() ).thenReturn( mock( DependencyResolver.class ) );
DependencyResolver dependencyResolver = mock( DependencyResolver.class );
GraphDatabaseAPI db = mock( GraphDatabaseAPI.class );

return new TransactionStateMachineSPI( mock( GraphDatabaseAPI.class ), new ThreadToStatementContextBridge(),
mock( QueryExecutionEngine.class ), txIdStore, availabilityGuard, queryService, txAwaitDuration,
when( queryService.getDependencyResolver() ).thenReturn( dependencyResolver );
when( db.getDependencyResolver() ).thenReturn( dependencyResolver );

when(dependencyResolver.provideDependency( TransactionIdStore.class )).thenReturn( txIdStore );

return new TransactionStateMachineSPI( db, new ThreadToStatementContextBridge(),
mock( QueryExecutionEngine.class ), availabilityGuard, queryService, txAwaitDuration,
clock );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.exceptions.Status;
Expand All @@ -40,15 +41,15 @@ public class TransactionIdTracker
private static final int POLL_INTERVAL = 25;
private static final TimeUnit POLL_UNIT = TimeUnit.MILLISECONDS;

private final TransactionIdStore transactionIdStore;
private final Supplier<TransactionIdStore> transactionIdStoreSupplier;
private final AvailabilityGuard availabilityGuard;
private final Clock clock;

public TransactionIdTracker( TransactionIdStore transactionIdStore, AvailabilityGuard availabilityGuard,
Clock clock )
public TransactionIdTracker( Supplier<TransactionIdStore> transactionIdStoreSupplier, AvailabilityGuard availabilityGuard,
Clock clock )
{
this.availabilityGuard = availabilityGuard;
this.transactionIdStore = transactionIdStore;
this.transactionIdStoreSupplier = transactionIdStoreSupplier;
this.clock = clock;
}

Expand Down Expand Up @@ -86,7 +87,7 @@ public void awaitUpToDate( long oldestAcceptableTxId, Duration timeout ) throws
{
throw new TransactionFailureException( Status.Transaction.InstanceStateChanged,
"Database not up to the requested version: %d. Latest database version is %d", oldestAcceptableTxId,
transactionIdStore.getLastClosedTransactionId() );
transactionIdStore().getLastClosedTransactionId() );
}
}

Expand All @@ -97,7 +98,15 @@ private boolean isReady( long oldestAcceptableTxId ) throws TransactionFailureEx
throw new TransactionFailureException( Status.General.DatabaseUnavailable,
"Database had become unavailable while waiting for requested version %d.", oldestAcceptableTxId );
}
return oldestAcceptableTxId <= transactionIdStore.getLastClosedTransactionId();
return oldestAcceptableTxId <= transactionIdStore().getLastClosedTransactionId();
}

private TransactionIdStore transactionIdStore()
{
// We need to resolve this as late as possible in case the database has been restarted as part of store copy.
// This causes TransactionIdStore staleness and we could get a MetaDataStore closed exception.
// Ideally we'd fix this with some life cycle wizardry but not going to do that for now.
return transactionIdStoreSupplier.get();
}

/**
Expand All @@ -109,6 +118,6 @@ private boolean isReady( long oldestAcceptableTxId ) throws TransactionFailureEx
*/
public long newestEncounteredTxId()
{
return transactionIdStore.getLastClosedTransactionId();
return transactionIdStore().getLastClosedTransactionId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.junit.Test;

import java.time.Clock;
import java.util.function.Supplier;

import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.exceptions.Status;
Expand All @@ -38,13 +39,15 @@

public class TransactionIdTrackerTest
{
private final TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
private final Supplier<TransactionIdStore> transactionIdStoreSupplier = mock( Supplier.class );
private final AvailabilityGuard availabilityGuard = mock( AvailabilityGuard.class );

@Test( timeout = 500 )
public void shouldAlwaysReturnIfTheRequestVersionIsBaseTxIdOrLess() throws Exception
{
// given
TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
when(transactionIdStoreSupplier.get()).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( -1L );
when( availabilityGuard.isAvailable() ).thenReturn( true );
TransactionIdTracker transactionIdTracker = createTracker();
Expand All @@ -60,7 +63,10 @@ public void shouldReturnIfTheVersionIsUpToDate() throws Exception
{
// given
long version = 5L;
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version );
TransactionIdStore transactionIdStore = mock(TransactionIdStore.class);
when( transactionIdStoreSupplier.get() ).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId()).thenReturn( version );

when( availabilityGuard.isAvailable() ).thenReturn( true );
TransactionIdTracker transactionIdTracker = createTracker();

Expand All @@ -75,6 +81,8 @@ public void shouldTimeoutIfTheVersionIsTooHigh() throws Exception
{
// given
long version = 5L;
TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
when(transactionIdStoreSupplier.get()).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version );
when( availabilityGuard.isAvailable() ).thenReturn( true );
TransactionIdTracker transactionIdTracker = createTracker();
Expand All @@ -97,6 +105,8 @@ public void shouldGiveUpWaitingIfTheDatabaseIsUnavailable() throws Exception
{
// given
long version = 5L;
TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
when(transactionIdStoreSupplier.get()).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version );
when( availabilityGuard.isAvailable() ).thenReturn( false );
TransactionIdTracker transactionIdTracker = createTracker();
Expand All @@ -116,6 +126,6 @@ public void shouldGiveUpWaitingIfTheDatabaseIsUnavailable() throws Exception

private TransactionIdTracker createTracker()
{
return new TransactionIdTracker( transactionIdStore, availabilityGuard, Clock.systemUTC() );
return new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard, Clock.systemUTC() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;

import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.CANCELLED;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.PANIC;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.STORE_COPYING;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.TX_PULLING;
Expand All @@ -71,7 +73,8 @@ enum State
{
TX_PULLING,
STORE_COPYING,
PANIC
PANIC,
CANCELLED
}

private final LocalDatabase localDatabase;
Expand Down Expand Up @@ -115,6 +118,7 @@ public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDataba
@Override
public synchronized void start() throws Throwable
{
state = TX_PULLING;
timeout = timeoutService.create( TX_PULLER_TIMEOUT, txPullIntervalMillis, 0, timeout -> onTimeout() );
dbHealth = databaseHealthSupplier.get();
upToDateFuture = new CompletableFuture<>();
Expand All @@ -128,6 +132,7 @@ public Future<Boolean> upToDateFuture() throws InterruptedException
@Override
public void stop() throws Throwable
{
state = CANCELLED;
timeout.cancel();
}

Expand Down Expand Up @@ -162,7 +167,7 @@ private void onTimeout()
panic( e );
}

if ( state != PANIC )
if ( state != PANIC && state != CANCELLED )
{
timeout.renew();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;

import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess;
import org.neo4j.causalclustering.catchup.tx.FileCopyMonitor;
Expand Down Expand Up @@ -462,8 +463,8 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th

private TransactionIdTracker transactionIdTracker( GraphDatabaseAPI database )
{
TransactionIdStore transactionIdStore =
database.getDependencyResolver().resolveDependency( TransactionIdStore.class );
Supplier<TransactionIdStore> transactionIdStore =
database.getDependencyResolver().provideDependency( TransactionIdStore.class );
AvailabilityGuard availabilityGuard =
database.getDependencyResolver().resolveDependency( AvailabilityGuard.class );
return new TransactionIdTracker( transactionIdStore, availabilityGuard, Clock.systemUTC() );
Expand Down

0 comments on commit f8d8157

Please sign in to comment.