Skip to content

Commit

Permalink
Logs timestamp information when terminating unsafe tx
Browse files Browse the repository at this point in the history
logged at info since this is typically very rarely logged and
makes sense to have enabled by default for visibility
  • Loading branch information
tinwelint committed Jul 26, 2016
1 parent beb1996 commit 056837c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 2 deletions.
Expand Up @@ -21,6 +21,7 @@


import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.Kernel;


/** /**
* Represents a transaction of changes to the underlying graph. * Represents a transaction of changes to the underlying graph.
Expand Down Expand Up @@ -129,6 +130,17 @@ interface CloseListener
*/ */
long lastTransactionTimestampWhenStarted(); long lastTransactionTimestampWhenStarted();


/**
* @return The id of the last transaction that was committed to the store when this transaction started.
*/
long lastTransactionIdWhenStarted();

/**
* @return start time of this transaction, i.e. basically {@link System#currentTimeMillis()} when user called
* {@link Kernel#newTransaction()}.
*/
long localStartTime();

/** /**
* Register a {@link CloseListener} to be invoked after commit, but before transaction events "after" hooks * Register a {@link CloseListener} to be invoked after commit, but before transaction events "after" hooks
* are invoked. * are invoked.
Expand Down
Expand Up @@ -273,6 +273,18 @@ int getReuseCount()
return reuseCount; return reuseCount;
} }


@Override
public long localStartTime()
{
return startTimeMillis;
}

@Override
public long lastTransactionIdWhenStarted()
{
return lastTransactionIdWhenStarted;
}

@Override @Override
public void success() public void success()
{ {
Expand Down
Expand Up @@ -218,6 +218,18 @@ public long lastTransactionTimestampWhenStarted()
public void registerCloseListener( CloseListener listener ) public void registerCloseListener( CloseListener listener )
{ {
} }

@Override
public long lastTransactionIdWhenStarted()
{
return 0;
}

@Override
public long localStartTime()
{
return 0;
}
}; };
} }


Expand Down
Expand Up @@ -51,6 +51,8 @@
import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;


import static org.neo4j.helpers.Format.duration;
import static org.neo4j.helpers.Format.time;
import static org.neo4j.kernel.impl.api.TransactionApplicationMode.EXTERNAL; import static org.neo4j.kernel.impl.api.TransactionApplicationMode.EXTERNAL;


/** /**
Expand Down Expand Up @@ -399,8 +401,9 @@ private boolean batchSizeExceedsSafeZone()


private void markUnsafeTransactionsForTermination() private void markUnsafeTransactionsForTermination()
{ {
long lastAppliedTimestamp = transactionQueue.last().getCommitEntry().getTimeWritten(); long firstCommittedTimestamp = transactionQueue.first().getCommitEntry().getTimeWritten();
long earliestSafeTimestamp = lastAppliedTimestamp - idReuseSafeZoneTime; long lastCommittedTimestamp = transactionQueue.last().getCommitEntry().getTimeWritten();
long earliestSafeTimestamp = lastCommittedTimestamp - idReuseSafeZoneTime;


for ( KernelTransaction tx : kernelTransactions.activeTransactions() ) for ( KernelTransaction tx : kernelTransactions.activeTransactions() )
{ {
Expand All @@ -409,11 +412,36 @@ private void markUnsafeTransactionsForTermination()
if ( commitTimestamp != TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP && if ( commitTimestamp != TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP &&
commitTimestamp < earliestSafeTimestamp ) commitTimestamp < earliestSafeTimestamp )
{ {
log.info( "Marking transaction for termination, " +
"invalidated due to an upcoming batch of changes being applied:" +
"\n" +
" Batch: firstCommittedTxId:" + transactionQueue.first().getCommitEntry().getTxId() +
", firstCommittedTimestamp:" + informativeTimestamp( firstCommittedTimestamp ) +
", lastCommittedTxId:" + transactionQueue.last().getCommitEntry().getTxId() +
", lastCommittedTimestamp:" + informativeTimestamp( lastCommittedTimestamp ) +
", batchTimeRange:" + informativeDuration( lastCommittedTimestamp - firstCommittedTimestamp ) +
", earliestSafeTimstamp:" + informativeTimestamp( earliestSafeTimestamp ) +
", safeZoneDuration:" + informativeDuration( idReuseSafeZoneTime ) +
"\n" +
" Transaction: lastCommittedTimestamp:" +
informativeTimestamp( tx.lastTransactionTimestampWhenStarted() ) +
", lastCommittedTxId:" + tx.lastTransactionIdWhenStarted() +
", localStartTimestamp:" + informativeTimestamp( tx.localStartTime() ) );
tx.markForTermination( Status.Transaction.Outdated ); tx.markForTermination( Status.Transaction.Outdated );
} }
} }
} }


private static String informativeDuration( long duration )
{
return duration( duration ) + "/" + duration;
}

private static String informativeTimestamp( long timestamp )
{
return time( timestamp ) + "/" + timestamp;
}

private void applyQueuedTransactions() throws IOException private void applyQueuedTransactions() throws IOException
{ {
// Synchronize to guard for concurrent shutdown // Synchronize to guard for concurrent shutdown
Expand Down

0 comments on commit 056837c

Please sign in to comment.