Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.2' into 3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Jul 17, 2017
2 parents c7d48f1 + d18a9fd commit bb7cbab
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class TransactionStateMachineSPI implements TransactionStateMachine.SPI
this.queryExecutionEngine = queryExecutionEngine;

Supplier<TransactionIdStore> transactionIdStoreSupplier = db.getDependencyResolver().provideDependency( TransactionIdStore.class );
this.transactionIdTracker = new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard, clock );
this.transactionIdTracker = new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard );

this.contextFactory = Neo4jTransactionalContextFactory.create( queryService, locker );
this.queryService = queryService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@
*/
package org.neo4j.kernel.api.txtracking;

import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;

import static org.neo4j.function.Predicates.tryAwaitEx;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;

/**
Expand All @@ -38,19 +36,13 @@
*/
public class TransactionIdTracker
{
private static final int POLL_INTERVAL = 25;
private static final TimeUnit POLL_UNIT = TimeUnit.MILLISECONDS;

private final Supplier<TransactionIdStore> transactionIdStoreSupplier;
private final AvailabilityGuard availabilityGuard;
private final Clock clock;

public TransactionIdTracker( Supplier<TransactionIdStore> transactionIdStoreSupplier, AvailabilityGuard availabilityGuard,
Clock clock )
public TransactionIdTracker( Supplier<TransactionIdStore> transactionIdStoreSupplier, AvailabilityGuard availabilityGuard )
{
this.availabilityGuard = availabilityGuard;
this.transactionIdStoreSupplier = transactionIdStoreSupplier;
this.clock = clock;
}

/**
Expand Down Expand Up @@ -82,23 +74,26 @@ public void awaitUpToDate( long oldestAcceptableTxId, Duration timeout ) throws
return;
}

if ( !tryAwaitEx( () -> isReady( oldestAcceptableTxId ), timeout.toMillis(), TimeUnit.MILLISECONDS,
POLL_INTERVAL, POLL_UNIT, clock ) )
if ( !availabilityGuard.isAvailable() )
{
throw new TransactionFailureException( Status.Transaction.InstanceStateChanged,
"Database not up to the requested version: %d. Latest database version is %d", oldestAcceptableTxId,
transactionIdStore().getLastClosedTransactionId() );
throw new TransactionFailureException( Status.General.DatabaseUnavailable, "Database unavailable" );
}
}

private boolean isReady( long oldestAcceptableTxId ) throws TransactionFailureException
{
if ( !availabilityGuard.isAvailable() )
try
{
transactionIdStore().awaitClosedTransactionId( oldestAcceptableTxId, timeout.toMillis() );
}
catch ( InterruptedException | TimeoutException e )
{
throw new TransactionFailureException( Status.General.DatabaseUnavailable,
"Database had become unavailable while waiting for requested version %d.", oldestAcceptableTxId );
if ( e instanceof InterruptedException )
{
Thread.currentThread().interrupt();
}

throw new TransactionFailureException( Status.Transaction.InstanceStateChanged, e,
"Database not up to the requested version: %d. Latest database version is %d", oldestAcceptableTxId,
transactionIdStore().getLastClosedTransactionId() );
}
return oldestAcceptableTxId <= transactionIdStore().getLastClosedTransactionId();
}

private TransactionIdStore transactionIdStore()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.OpenOption;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.helpers.collection.Visitor;
Expand Down Expand Up @@ -781,6 +782,14 @@ public long getLastClosedTransactionId()
return lastClosedTx.getHighestGapFreeNumber();
}

@Override
public void awaitClosedTransactionId( long txId, long timeoutMillis ) throws TimeoutException, InterruptedException
{
assertNotClosed();
checkInitialized( lastCommittingTxField.get() );
lastClosedTx.await( txId, timeoutMillis );
}

@Override
public long[] getLastClosedTransaction()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.store.MetaDataStore;
Expand Down Expand Up @@ -94,6 +95,12 @@ public long getLastClosedTransactionId()
return transactionId;
}

@Override
public void awaitClosedTransactionId( long txId, long timeoutMillis ) throws InterruptedException, TimeoutException
{
throw new UnsupportedOperationException( "Not implemented" );
}

@Override
public long[] getLastClosedTransaction()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.impl.transaction.log;

import java.util.concurrent.TimeoutException;

import org.neo4j.kernel.impl.store.TransactionId;

import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;
Expand Down Expand Up @@ -108,6 +110,16 @@ public interface TransactionIdStore
*/
long getLastClosedTransactionId();

/**
* Awaits gap-free {@link #transactionClosed(long, long, long) closed transaction id}.
*
* @param txId the awaited transaction id.
* @param timeoutMillis the time to wait for it.
* @throws InterruptedException interrupted.
* @throws TimeoutException timed out.
*/
void awaitClosedTransactionId( long txId, long timeoutMillis ) throws InterruptedException, TimeoutException;

/**
* Returns transaction information about the last committed transaction, i.e.
* transaction id as well as the log position following the commit entry in the transaction log.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.impl.util;

import java.util.concurrent.TimeoutException;

import static java.lang.String.format;

/**
Expand All @@ -29,7 +31,7 @@ public class ArrayQueueOutOfOrderSequence implements OutOfOrderSequence
// odd means updating, even means no one is updating
private volatile int version;
// These don't need to be volatile, reading them is "guarded" by version access
private long highestGapFreeNumber;
private volatile long highestGapFreeNumber;
private long[] highestGapFreeMeta;
private final SequenceArray outOfOrderQueue;
private long[] metaArray;
Expand All @@ -53,6 +55,7 @@ public synchronized boolean offer( long number, long[] meta )
highestGapFreeNumber = outOfOrderQueue.pollHighestGapFree( number, metaArray );
highestGapFreeMeta = highestGapFreeNumber == number ? meta : metaArray;
version++;
notifyAll();
return true;
}

Expand Down Expand Up @@ -96,6 +99,24 @@ public long[] get()
return createResult( number, meta );
}

@Override
public synchronized void await( long awaitedNumber, long timeoutMillis ) throws TimeoutException, InterruptedException
{
long endTime = System.currentTimeMillis() + timeoutMillis;
while ( awaitedNumber > highestGapFreeNumber )
{
long timeLeft = endTime - System.currentTimeMillis();
if ( timeLeft > 0 )
{
wait( timeLeft );
}
else
{
throw new TimeoutException( "Awaited number was not reached" );
}
}
}

private long[] createResult( long number, long[] meta )
{
long[] result = new long[meta.length + 1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.impl.util;

import java.util.concurrent.TimeoutException;

/**
* The thinking behind an out-of-order sequence is that, to the outside, there's one "last number"
* which will never be decremented between times of looking at it. It can move in bigger strides
Expand Down Expand Up @@ -49,6 +51,14 @@ public interface OutOfOrderSequence
*/
long[] get();

/**
* Waits for the specified number (gap-free).
*
* @param awaitedNumber the awaited number.
* @param timeoutMillis the maximum time to wait in milliseconds.
*/
void await( long awaitedNumber, long timeoutMillis ) throws TimeoutException, InterruptedException;

/**
* @return the highest gap-free number, without its meta data.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
*/
package org.neo4j.kernel.api.txtracking;

import org.junit.Before;
import org.junit.Test;

import java.time.Clock;
import java.util.function.Supplier;
import java.time.Duration;
import java.util.concurrent.TimeoutException;

import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.exceptions.Status;
Expand All @@ -33,99 +34,93 @@
import static java.time.Duration.ofSeconds;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;

public class TransactionIdTrackerTest
{
private final Supplier<TransactionIdStore> transactionIdStoreSupplier = mock( Supplier.class );
private static final Duration DEFAULT_DURATION = ofSeconds( 10 );

private final TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
private final AvailabilityGuard availabilityGuard = mock( AvailabilityGuard.class );

@Test( timeout = 500 )
public void shouldAlwaysReturnIfTheRequestVersionIsBaseTxIdOrLess() throws Exception
private TransactionIdTracker transactionIdTracker;

@Before
public void setup()
{
// given
TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
when(transactionIdStoreSupplier.get()).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( -1L );
when( availabilityGuard.isAvailable() ).thenReturn( true );
TransactionIdTracker transactionIdTracker = createTracker();
transactionIdTracker = new TransactionIdTracker( () -> transactionIdStore, availabilityGuard );
}

@Test
public void shouldReturnImmediatelyForBaseTxIdOrLess() throws Exception
{
// when
transactionIdTracker.awaitUpToDate( BASE_TX_ID, ofSeconds( 5 ) );

// then all good!
// then
verify( transactionIdStore, never() ).awaitClosedTransactionId( anyLong(), anyLong() );
}

@Test( timeout = 500 )
public void shouldReturnIfTheVersionIsUpToDate() throws Exception
@Test
public void shouldWaitForRequestedVersion() throws Exception
{
// given
long version = 5L;
TransactionIdStore transactionIdStore = mock(TransactionIdStore.class);
when( transactionIdStoreSupplier.get() ).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId()).thenReturn( version );

when( availabilityGuard.isAvailable() ).thenReturn( true );
TransactionIdTracker transactionIdTracker = createTracker();

// when
transactionIdTracker.awaitUpToDate( version, ofSeconds( 5 ) );
transactionIdTracker.awaitUpToDate( version, DEFAULT_DURATION );

// then all good!
// then
verify( transactionIdStore ).awaitClosedTransactionId( version, DEFAULT_DURATION.toMillis() );
}

@Test( timeout = 500 )
public void shouldTimeoutIfTheVersionIsTooHigh() throws Exception
@Test
public void shouldPropagateTimeoutException() throws Exception
{
// given
long version = 5L;
TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
when(transactionIdStoreSupplier.get()).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version );
when( availabilityGuard.isAvailable() ).thenReturn( true );
TransactionIdTracker transactionIdTracker = createTracker();
TimeoutException timeoutException = new TimeoutException();
doThrow( timeoutException ).when( transactionIdStore ).awaitClosedTransactionId( anyLong(), anyLong() );

// when
try
{
// when
transactionIdTracker.awaitUpToDate( version + 1, ofMillis( 50 ) );
fail( "should have thrown" );
}
catch ( TransactionFailureException ex )
{
// then all good!
// then
assertEquals( Status.Transaction.InstanceStateChanged, ex.status() );
assertEquals( timeoutException, ex.getCause() );
}
}

@Test( timeout = 500 )
public void shouldGiveUpWaitingIfTheDatabaseIsUnavailable() throws Exception
@Test
public void shouldNotWaitIfTheDatabaseIsUnavailable() throws Exception
{
// given
long version = 5L;
TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
when(transactionIdStoreSupplier.get()).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version );
when( availabilityGuard.isAvailable() ).thenReturn( false );
TransactionIdTracker transactionIdTracker = createTracker();

// when
try
{
transactionIdTracker.awaitUpToDate( version + 1, ofMillis( 60_000 ) );
// when
transactionIdTracker.awaitUpToDate( 1000, ofMillis( 60_000 ) );
fail( "should have thrown" );
}
catch ( TransactionFailureException ex )
{
// then all good!
// then
assertEquals( Status.General.DatabaseUnavailable, ex.status() );
}
}

private TransactionIdTracker createTracker()
{
return new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard, Clock.systemUTC() );
verify( transactionIdStore, never() ).awaitClosedTransactionId( anyLong(), anyLong() );
}
}

0 comments on commit bb7cbab

Please sign in to comment.