Skip to content

Commit

Permalink
Merge pull request #9664 from martinfurmanski/3.1-improve-bookmark-wa…
Browse files Browse the repository at this point in the history
…iting

Improve bookmark waiting
  • Loading branch information
martinfurmanski committed Jul 17, 2017
2 parents ad82d67 + d94d4ef commit d2f8aa9
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
package org.neo4j.kernel.api.txtracking;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;

import static org.neo4j.function.Predicates.tryAwaitEx;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;

/**
Expand All @@ -37,9 +36,6 @@
*/
public class TransactionIdTracker
{
private static final int POLL_INTERVAL = 25;
private static final TimeUnit POLL_UNIT = TimeUnit.MILLISECONDS;

private final Supplier<TransactionIdStore> transactionIdStoreSupplier;
private final AvailabilityGuard availabilityGuard;

Expand Down Expand Up @@ -69,7 +65,7 @@ public TransactionIdTracker( Supplier<TransactionIdStore> transactionIdStoreSupp
* @param oldestAcceptableTxId id of the Oldest Acceptable Transaction (OAT) that must have been applied before
* continuing work.
* @param timeout maximum duration to wait for OAT to be applied
* @throws TransactionFailureException
* @throws TransactionFailureException transaction failed
*/
public void awaitUpToDate( long oldestAcceptableTxId, Duration timeout ) throws TransactionFailureException
{
Expand All @@ -78,23 +74,26 @@ public void awaitUpToDate( long oldestAcceptableTxId, Duration timeout ) throws
return;
}

if ( !tryAwaitEx( () -> isReady( oldestAcceptableTxId ), timeout.toMillis(), TimeUnit.MILLISECONDS,
POLL_INTERVAL, POLL_UNIT ) )
if ( !availabilityGuard.isAvailable() )
{
throw new TransactionFailureException( Status.Transaction.InstanceStateChanged,
"Database not up to the requested version: %d. Latest database version is %d", oldestAcceptableTxId,
transactionIdStore().getLastClosedTransactionId() );
throw new TransactionFailureException( Status.General.DatabaseUnavailable, "Database unavailable" );
}
}

private boolean isReady( long oldestAcceptableTxId ) throws TransactionFailureException
{
if ( !availabilityGuard.isAvailable() )
try
{
transactionIdStore().awaitClosedTransactionId( oldestAcceptableTxId, timeout.toMillis() );
}
catch ( InterruptedException | TimeoutException e )
{
throw new TransactionFailureException( Status.General.DatabaseUnavailable,
"Database had become unavailable while waiting for requested version %d.", oldestAcceptableTxId );
if ( e instanceof InterruptedException )
{
Thread.currentThread().interrupt();
}

throw new TransactionFailureException( Status.Transaction.InstanceStateChanged, e,
"Database not up to the requested version: %d. Latest database version is %d", oldestAcceptableTxId,
transactionIdStore().getLastClosedTransactionId() );
}
return oldestAcceptableTxId <= transactionIdStore().getLastClosedTransactionId();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.OpenOption;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.helpers.collection.Visitor;
Expand Down Expand Up @@ -787,6 +788,14 @@ public long getLastClosedTransactionId()
return lastClosedTx.getHighestGapFreeNumber();
}

@Override
public void awaitClosedTransactionId( long txId, long timeoutMillis ) throws TimeoutException, InterruptedException
{
assertNotClosed();
checkInitialized( lastCommittingTxField.get() );
lastClosedTx.await( txId, timeoutMillis );
}

@Override
public long[] getLastClosedTransaction()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.store.MetaDataStore;
Expand Down Expand Up @@ -91,6 +92,12 @@ public long getLastClosedTransactionId()
return transactionId;
}

@Override
public void awaitClosedTransactionId( long txId, long timeoutMillis ) throws InterruptedException, TimeoutException
{
throw new UnsupportedOperationException( "Not implemented" );
}

@Override
public long[] getLastClosedTransaction()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.impl.transaction.log;

import java.util.concurrent.TimeoutException;

import org.neo4j.kernel.impl.store.TransactionId;

import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;
Expand Down Expand Up @@ -108,6 +110,16 @@ public interface TransactionIdStore
*/
long getLastClosedTransactionId();

/**
* Awaits gap-free {@link #transactionClosed(long, long, long) closed transaction id}.
*
* @param txId the awaited transaction id.
* @param timeoutMillis the time to wait for it.
* @throws InterruptedException interrupted.
* @throws TimeoutException timed out.
*/
void awaitClosedTransactionId( long txId, long timeoutMillis ) throws InterruptedException, TimeoutException;

/**
* Returns transaction information about the last committed transaction, i.e.
* transaction id as well as the log position following the commit entry in the transaction log.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.impl.util;

import java.util.concurrent.TimeoutException;

import static java.lang.String.format;

/**
Expand All @@ -29,7 +31,7 @@ public class ArrayQueueOutOfOrderSequence implements OutOfOrderSequence
// odd means updating, even means no one is updating
private volatile int version;
// These don't need to be volatile, reading them is "guarded" by version access
private long highestGapFreeNumber;
private volatile long highestGapFreeNumber;
private long[] highestGapFreeMeta;
private final SequenceArray outOfOrderQueue;
private long[] metaArray;
Expand All @@ -53,6 +55,7 @@ public synchronized boolean offer( long number, long[] meta )
highestGapFreeNumber = outOfOrderQueue.pollHighestGapFree( number, metaArray );
highestGapFreeMeta = highestGapFreeNumber == number ? meta : metaArray;
version++;
notifyAll();
return true;
}

Expand Down Expand Up @@ -96,6 +99,24 @@ public long[] get()
return createResult( number, meta );
}

@Override
public synchronized void await( long awaitedNumber, long timeoutMillis ) throws TimeoutException, InterruptedException
{
long endTime = System.currentTimeMillis() + timeoutMillis;
while ( awaitedNumber > highestGapFreeNumber )
{
long timeLeft = endTime - System.currentTimeMillis();
if ( timeLeft > 0 )
{
wait( timeLeft );
}
else
{
throw new TimeoutException( "Awaited number was not reached" );
}
}
}

private long[] createResult( long number, long[] meta )
{
long[] result = new long[meta.length + 1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.impl.util;

import java.util.concurrent.TimeoutException;

/**
* The thinking behind an out-of-order sequence is that, to the outside, there's one "last number"
* which will never be decremented between times of looking at it. It can move in bigger strides
Expand Down Expand Up @@ -49,6 +51,14 @@ public interface OutOfOrderSequence
*/
long[] get();

/**
* Waits for the specified number (gap-free).
*
* @param awaitedNumber the awaited number.
* @param timeoutMillis the maximum time to wait in milliseconds.
*/
void await( long awaitedNumber, long timeoutMillis ) throws TimeoutException, InterruptedException;

/**
* @return the highest gap-free number, without its meta data.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
*/
package org.neo4j.kernel.api.txtracking;

import java.util.function.Supplier;

import org.junit.Before;
import org.junit.Test;

import java.time.Duration;
import java.util.concurrent.TimeoutException;

import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
Expand All @@ -32,97 +34,93 @@
import static java.time.Duration.ofSeconds;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;

public class TransactionIdTrackerTest
{
private final Supplier<TransactionIdStore> transactionIdStoreSupplier = mock( Supplier.class );
private static final Duration DEFAULT_DURATION = ofSeconds( 10 );

private final TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
private final AvailabilityGuard availabilityGuard = mock( AvailabilityGuard.class );

@Test( timeout = 500 )
public void shouldAlwaysReturnIfTheRequestVersionIsBaseTxIdOrLess() throws Exception
private TransactionIdTracker transactionIdTracker;

@Before
public void setup()
{
// given
TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
when( transactionIdStoreSupplier.get() ).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( -1L );
when( availabilityGuard.isAvailable() ).thenReturn( true );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard );
transactionIdTracker = new TransactionIdTracker( () -> transactionIdStore, availabilityGuard );
}

@Test
public void shouldReturnImmediatelyForBaseTxIdOrLess() throws Exception
{
// when
transactionIdTracker.awaitUpToDate( BASE_TX_ID, ofSeconds( 5 ) );

// then all good!
// then
verify( transactionIdStore, never() ).awaitClosedTransactionId( anyLong(), anyLong() );
}

@Test( timeout = 500 )
public void shouldReturnIfTheVersionIsUpToDate() throws Exception
@Test
public void shouldWaitForRequestedVersion() throws Exception
{
// given
long version = 5L;
TransactionIdStore transactionIdStore = mock(TransactionIdStore.class);
when( transactionIdStoreSupplier.get() ).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId()).thenReturn( version );
when( availabilityGuard.isAvailable() ).thenReturn( true );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard );

// when
transactionIdTracker.awaitUpToDate( version, ofSeconds( 5 ) );
transactionIdTracker.awaitUpToDate( version, DEFAULT_DURATION );

// then all good!
// then
verify( transactionIdStore ).awaitClosedTransactionId( version, DEFAULT_DURATION.toMillis() );
}

@Test( timeout = 500 )
public void shouldTimeoutIfTheVersionIsTooHigh() throws Exception
@Test
public void shouldPropagateTimeoutException() throws Exception
{
// given
long version = 5L;
TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
when( transactionIdStoreSupplier.get() ).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version );
when( availabilityGuard.isAvailable() ).thenReturn( true );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard );
TimeoutException timeoutException = new TimeoutException();
doThrow( timeoutException ).when( transactionIdStore ).awaitClosedTransactionId( anyLong(), anyLong() );

// when
try
{
// when
transactionIdTracker.awaitUpToDate( version + 1, ofMillis( 50 ) );
fail( "should have thrown" );
}
catch ( TransactionFailureException ex )
{
// then all good!
// then
assertEquals( Status.Transaction.InstanceStateChanged, ex.status() );
assertEquals( timeoutException, ex.getCause() );
}
}

@Test( timeout = 500 )
public void shouldGiveUpWaitingIfTheDatabaseIsUnavailable() throws Exception
@Test
public void shouldNotWaitIfTheDatabaseIsUnavailable() throws Exception
{
// given
long version = 5L;
TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
when( transactionIdStoreSupplier.get() ).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version );
when( availabilityGuard.isAvailable() ).thenReturn( false );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard );

// when
try
{
transactionIdTracker.awaitUpToDate( version + 1, ofMillis( 60_000 ) );
// when
transactionIdTracker.awaitUpToDate( 1000, ofMillis( 60_000 ) );
fail( "should have thrown" );
}
catch ( TransactionFailureException ex )
{
// then all good!
// then
assertEquals( Status.General.DatabaseUnavailable, ex.status() );
}

verify( transactionIdStore, never() ).awaitClosedTransactionId( anyLong(), anyLong() );
}
}

0 comments on commit d2f8aa9

Please sign in to comment.