Skip to content

Commit

Permalink
Cleaned up SafeZone implementation on Slave side
Browse files Browse the repository at this point in the history
Moved safeZone handling logic from BatchingResponseHandler to new TransactionQueue.Applier to have a cleaner separation of responsibility.

 Now TransactionQueue drive batching and transaction application instead of BatchingResponseHandler.
  • Loading branch information
burqen committed Jul 5, 2016
1 parent ad0ee65 commit 56620a0
Show file tree
Hide file tree
Showing 9 changed files with 433 additions and 207 deletions.
Expand Up @@ -29,7 +29,7 @@ public class TransactionQueue
@FunctionalInterface @FunctionalInterface
public interface Applier public interface Applier
{ {
void apply( TransactionToApply batch ) throws Exception; void apply( TransactionToApply first, TransactionToApply last ) throws Exception;
} }


private final int maxSize; private final int maxSize;
Expand All @@ -43,15 +43,7 @@ public TransactionQueue( int maxSize, Applier applier )
this.applier = applier; this.applier = applier;
} }


public void queueAndDrainIfBatchSizeReached( TransactionToApply transaction ) throws Exception public void queue( TransactionToApply transaction ) throws Exception
{
if ( queue( transaction ) )
{
empty();
}
}

public boolean queue( TransactionToApply transaction ) throws Exception
{ {
if ( isEmpty() ) if ( isEmpty() )
{ {
Expand All @@ -62,14 +54,17 @@ public boolean queue( TransactionToApply transaction ) throws Exception
last.next( transaction ); last.next( transaction );
last = transaction; last = transaction;
} }
return ++size == maxSize; if ( ++size == maxSize )
{
empty();
}
} }


public void empty() throws Exception public void empty() throws Exception
{ {
if ( size > 0 ) if ( size > 0 )
{ {
applier.apply( first ); applier.apply( first, last );
first = last = null; first = last = null;
size = 0; size = 0;
} }
Expand Down
Expand Up @@ -79,10 +79,10 @@ private void queue( TransactionRepresentation txRepresentation, long txId ) thro
TransactionToApply tx = new TransactionToApply( txRepresentation, txId ); TransactionToApply tx = new TransactionToApply( txRepresentation, txId );
tx.commitment( NO_COMMITMENT, txId ); tx.commitment( NO_COMMITMENT, txId );


queue.queueAndDrainIfBatchSizeReached( tx ); queue.queue( tx );
} }


private void applyQueue( TransactionToApply batch ) throws Exception private void applyQueue( TransactionToApply batch, TransactionToApply last ) throws Exception
{ {
storageEngine.apply( batch, RECOVERY ); storageEngine.apply( batch, RECOVERY );
} }
Expand Down
Expand Up @@ -45,24 +45,24 @@ public void shouldEmptyIfTooMany() throws Exception
// WHEN // WHEN
for ( int i = 0; i < 9; i++ ) for ( int i = 0; i < 9; i++ )
{ {
queue.queueAndDrainIfBatchSizeReached( mock( TransactionToApply.class ) ); queue.queue( mock( TransactionToApply.class ) );
verifyNoMoreInteractions( applier ); verifyNoMoreInteractions( applier );
} }
queue.queueAndDrainIfBatchSizeReached( mock( TransactionToApply.class ) ); queue.queue( mock( TransactionToApply.class ) );
verify( applier, times( 1 ) ).apply( any() ); verify( applier, times( 1 ) ).apply( any(), any() );
reset( applier ); reset( applier );


// THEN // THEN
queue.queueAndDrainIfBatchSizeReached( mock( TransactionToApply.class ) ); queue.queue( mock( TransactionToApply.class ) );


// and WHEN emptying in the end // and WHEN emptying in the end
for ( int i = 0; i < 2; i++ ) for ( int i = 0; i < 2; i++ )
{ {
queue.queueAndDrainIfBatchSizeReached( mock( TransactionToApply.class ) ); queue.queue( mock( TransactionToApply.class ) );
verifyNoMoreInteractions( applier ); verifyNoMoreInteractions( applier );
} }
queue.empty(); queue.empty();
verify( applier, times( 1 ) ).apply( any() ); verify( applier, times( 1 ) ).apply( any(), any() );
} }


@Test @Test
Expand All @@ -77,11 +77,11 @@ public void shouldLinkTogetherTransactions() throws Exception
TransactionToApply[] txs = new TransactionToApply[batchSize]; TransactionToApply[] txs = new TransactionToApply[batchSize];
for ( int i = 0; i < batchSize; i++ ) for ( int i = 0; i < batchSize; i++ )
{ {
queue.queueAndDrainIfBatchSizeReached( txs[i] = new TransactionToApply( mock( TransactionRepresentation.class ) ) ); queue.queue( txs[i] = new TransactionToApply( mock( TransactionRepresentation.class ) ) );
} }


// THEN // THEN
verify( applier, times( 1 ) ).apply( any() ); verify( applier, times( 1 ) ).apply( any(), any() );
for ( int i = 0; i < txs.length-1; i++ ) for ( int i = 0; i < txs.length-1; i++ )
{ {
assertEquals( txs[i+1], txs[i].next() ); assertEquals( txs[i+1], txs[i].next() );
Expand Down
Expand Up @@ -57,10 +57,8 @@
import org.neo4j.test.TargetDirectory; import org.neo4j.test.TargetDirectory;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.Workers; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.Workers;


import static org.junit.Assert.assertEquals;

import static java.util.Arrays.asList; import static java.util.Arrays.asList;

import static org.junit.Assert.assertEquals;
import static org.neo4j.helpers.TimeUtil.parseTimeMillis; import static org.neo4j.helpers.TimeUtil.parseTimeMillis;
import static org.neo4j.kernel.api.properties.Property.noNodeProperty; import static org.neo4j.kernel.api.properties.Property.noNodeProperty;
import static org.neo4j.kernel.api.properties.Property.property; import static org.neo4j.kernel.api.properties.Property.property;
Expand Down Expand Up @@ -166,7 +164,7 @@ public void run()
{ {
try try
{ {
TransactionQueue queue = new TransactionQueue( batchSize, (tx) -> { TransactionQueue queue = new TransactionQueue( batchSize, (tx, last) -> {
// Apply // Apply
storageEngine.apply( tx, TransactionApplicationMode.EXTERNAL ); storageEngine.apply( tx, TransactionApplicationMode.EXTERNAL );


Expand All @@ -176,7 +174,7 @@ public void run()
} ); } );
for ( ; !end.get(); i++ ) for ( ; !end.get(); i++ )
{ {
queue.queueAndDrainIfBatchSizeReached( createNodeAndProperty( i ) ); queue.queue( createNodeAndProperty( i ) );
} }
queue.empty(); queue.empty();
} }
Expand Down
Expand Up @@ -26,9 +26,6 @@
import org.neo4j.com.Response.Handler; import org.neo4j.com.Response.Handler;
import org.neo4j.com.storecopy.ResponseUnpacker.TxHandler; import org.neo4j.com.storecopy.ResponseUnpacker.TxHandler;
import org.neo4j.helpers.collection.Visitor; import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.api.TransactionQueue; import org.neo4j.kernel.impl.api.TransactionQueue;
import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
Expand All @@ -48,19 +45,13 @@ class BatchingResponseHandler implements Response.Handler,
private final TransactionObligationFulfiller obligationFulfiller; private final TransactionObligationFulfiller obligationFulfiller;
private final Log log; private final Log log;


private final KernelTransactions kernelTransactions;
private final long idReuseSafeZoneTime;

public BatchingResponseHandler( int maxBatchSize, TransactionQueue.Applier applier, public BatchingResponseHandler( int maxBatchSize, TransactionQueue.Applier applier,
TransactionObligationFulfiller obligationFulfiller, TxHandler txHandler, Log log, TransactionObligationFulfiller obligationFulfiller, TxHandler txHandler, Log log )
KernelTransactions kernelTransactions, long idReuseSafeZoneTime )
{ {
this.obligationFulfiller = obligationFulfiller; this.obligationFulfiller = obligationFulfiller;
this.txHandler = txHandler; this.txHandler = txHandler;
this.queue = new TransactionQueue( maxBatchSize, applier ); this.queue = new TransactionQueue( maxBatchSize, applier );
this.log = log; this.log = log;
this.kernelTransactions = kernelTransactions;
this.idReuseSafeZoneTime = idReuseSafeZoneTime;
} }


@Override @Override
Expand Down Expand Up @@ -95,7 +86,7 @@ public Visitor<CommittedTransactionRepresentation,Exception> transactions()
@Override @Override
public boolean visit( CommittedTransactionRepresentation transaction ) throws Exception public boolean visit( CommittedTransactionRepresentation transaction ) throws Exception
{ {
boolean batchSizeReached = this.queue.queue( new TransactionToApply( queue.queue( new TransactionToApply(
transaction.getTransactionRepresentation(), transaction.getTransactionRepresentation(),
transaction.getCommitEntry().getTxId() ) transaction.getCommitEntry().getTxId() )
{ {
Expand All @@ -107,94 +98,11 @@ public void commitment( Commitment commitment, long transactionId )
txHandler.accept( transactionId ); txHandler.accept( transactionId );
} }
} ); } );

if ( batchSizeReached )
{
applyQueuedTransactionsIfNeeded();
}

return false; return false;
} }


public void applyQueuedTransactionsIfNeeded() throws Exception void applyQueuedTransactions() throws Exception
{
if ( queue.isEmpty() )
{
return;
}

/*
Case 1 (Not really a problem):
- chunk of batch is smaller than safe zone
- tx started after activeTransactions() is called
is safe because those transactions will see the latest state of store before chunk is applied and
because chunk is smaller than safe zone we are guarantied to not see two different states of any record
when applying the chunk.
activeTransactions() is called
| start committing chunk
---|----+---|--|------> TIME
| |
| Start applying chunk
New tx starts here. Does not get terminated because not among active transactions, this is safe.
Case 2:
- chunk of batch is larger than safe zone
- tx started after activeTransactions() but before apply
activeTransactions() is called
| start committing chunk
---|--------|+-|------> TIME
| |
| Start applying chunk
New tx starts here. Does not get terminated because not among active transactions, but will
read outdated data and can be affected by reuse contamination.
*/

if ( batchSizeExceedsSafeZone() )
{
// We stop new transactions from starting to avoid problem described in (2)
kernelTransactions.blockNewTransactions();
try
{
markUnsafeTransactionsForTermination();
queue.empty();
}
finally
{
kernelTransactions.unblockNewTransactions();
}
}
else
{
markUnsafeTransactionsForTermination();
queue.empty();
}
}

private boolean batchSizeExceedsSafeZone()
{ {
long lastAppliedTimestamp = queue.last().transactionRepresentation().getTimeCommitted(); queue.empty();
long firstAppliedTimestamp = queue.first().transactionRepresentation().getTimeCommitted();
long chunkLength = lastAppliedTimestamp - firstAppliedTimestamp;

return chunkLength > idReuseSafeZoneTime;
}

private void markUnsafeTransactionsForTermination()
{
long lastAppliedTimestamp = queue.last().transactionRepresentation().getTimeCommitted();
long earliestSafeTimestamp = lastAppliedTimestamp - idReuseSafeZoneTime;

for ( KernelTransaction tx : kernelTransactions.activeTransactions() )
{
long commitTimestamp = tx.lastTransactionTimestampWhenStarted();

if ( commitTimestamp != TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP &&
commitTimestamp < earliestSafeTimestamp )
{
tx.markForTermination( Status.Transaction.Outdated );
}
}
} }
} }

0 comments on commit 56620a0

Please sign in to comment.