Skip to content

Commit

Permalink
Make transaction id accessible after KTI commit
Browse files Browse the repository at this point in the history
Transaction id is only assigned during commit of KernelTransactionImplementation.
It is not assigned if transaction was rolled back or read-only. Currently it is
not possible to obtain committed transaction id because KTI just implements
AutoCloseable and commit happens in #close().

However, it might be sometimes valuable to know committed transaction id and
track it.

This commit makes KTI expose it's id (if committed) via #closeTransaction()
method while still keeping KTI an AutoCloseable.
  • Loading branch information
lutovich committed Jul 6, 2016
1 parent 5e849ec commit 1c2c9b2
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 14 deletions.
Expand Up @@ -30,7 +30,6 @@

import org.neo4j.bolt.security.auth.AuthenticationException;
import org.neo4j.bolt.security.auth.AuthenticationResult;
import org.neo4j.bolt.security.auth.BasicAuthenticationResult;
import org.neo4j.bolt.v1.messaging.MessageHandler;
import org.neo4j.bolt.v1.messaging.message.DiscardAllMessage;
import org.neo4j.bolt.v1.messaging.message.Message;
Expand All @@ -43,11 +42,11 @@
import org.neo4j.bolt.v1.runtime.internal.concurrent.ThreadedSessions;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.lifecycle.LifeSupport;
Expand Down Expand Up @@ -247,9 +246,10 @@ public void failure()
}

@Override
public void close() throws TransactionFailureException
public long closeTransaction() throws TransactionFailureException
{
liveTransactions.decrementAndGet();
return NOT_COMMITTED;
}

@Override
Expand Down
Expand Up @@ -84,6 +84,8 @@ interface CloseListener
void notify( boolean success );
}

long NOT_COMMITTED = -1;

/**
* Acquires a new {@link Statement} for this transaction which allows for reading and writing data from and
* to the underlying database. After the group of reads and writes have been performed the statement
Expand All @@ -105,12 +107,23 @@ interface CloseListener
*/
void failure();

/**
* Closes this transaction, committing its changes iff {@link #success()} has been called and
* {@link #failure()} has NOT been called. Otherwise its changes will be rolled back.
*
* @return id of the committed transaction or {@link #NOT_COMMITTED} if transaction was rolled back or read-only.
*/
long closeTransaction() throws TransactionFailureException;

/**
* Closes this transaction, committing its changes iff {@link #success()} has been called and
* {@link #failure()} has NOT been called. Otherwise its changes will be rolled back.
*/
@Override
void close() throws TransactionFailureException;
default void close() throws TransactionFailureException
{
closeTransaction();
}

/**
* @return {@code true} if the transaction is still open, i.e. if {@link #close()} hasn't been called yet.
Expand Down
Expand Up @@ -353,7 +353,7 @@ public boolean hasTxStateWithChanges()
return txState != null && txState.hasChanges();
}

private void closeTransaction()
private void markAsClosed()
{
assertTransactionOpen();
closed = true;
Expand Down Expand Up @@ -401,7 +401,7 @@ private boolean hasDataChanges()
}

@Override
public void close() throws TransactionFailureException
public long closeTransaction() throws TransactionFailureException
{
assertTransactionOpen();
assertTransactionNotClosing();
Expand All @@ -413,10 +413,11 @@ public void close() throws TransactionFailureException
{
rollback();
failOnNonExplicitRollbackIfNeeded();
return NOT_COMMITTED;
}
else
{
commit();
return commit();
}
}
finally
Expand Down Expand Up @@ -469,7 +470,7 @@ private void failOnNonExplicitRollbackIfNeeded() throws TransactionFailureExcept
}
}

private void commit() throws TransactionFailureException
private long commit() throws TransactionFailureException
{
boolean success = false;

Expand Down Expand Up @@ -531,10 +532,13 @@ private void commit() throws TransactionFailureException
locks.getLockSessionId() );

// Commit the transaction
commitProcess.commit( new TransactionToApply( transactionRepresentation ), commitEvent, INTERNAL );
success = true;
TransactionToApply batch = new TransactionToApply( transactionRepresentation );
return commitProcess.commit( batch, commitEvent, INTERNAL );
}
}
success = true;
return NOT_COMMITTED;
}
catch ( ConstraintValidationKernelException | CreateConstraintFailureException e )
{
Expand Down Expand Up @@ -605,7 +609,7 @@ private void afterCommit()
{
try
{
closeTransaction();
markAsClosed();
if ( beforeHookInvoked )
{
hooks.afterCommit( txState, this, hooksState );
Expand All @@ -621,7 +625,7 @@ private void afterRollback()
{
try
{
closeTransaction();
markAsClosed();
if ( beforeHookInvoked )
{
hooks.afterRollback( txState, this, hooksState );
Expand Down
Expand Up @@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.List;

import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
Expand All @@ -35,6 +34,7 @@
import org.neo4j.kernel.api.index.IndexDescriptor;
import org.neo4j.kernel.api.index.PreexistingIndexEntryConflictException;
import org.neo4j.kernel.api.proc.CallableProcedure;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.txstate.TransactionState;
import org.neo4j.kernel.impl.api.KernelStatement;
import org.neo4j.kernel.impl.api.StatementOperationParts;
Expand Down Expand Up @@ -175,8 +175,9 @@ public void failure()
}

@Override
public void close() throws TransactionFailureException
public long closeTransaction() throws TransactionFailureException
{
return NOT_COMMITTED;
}

@Override
Expand Down
Expand Up @@ -30,19 +30,24 @@
import org.neo4j.collection.primitive.PrimitiveIntIterator;
import org.neo4j.collection.primitive.PrimitiveLongCollections;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.cursor.Cursor;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.SchemaWriteOperations;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.EntityNotFoundException;
import org.neo4j.kernel.api.exceptions.schema.SchemaKernelException;
import org.neo4j.kernel.api.index.IndexDescriptor;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.impl.api.Kernel;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.storageengine.api.NodeItem;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -518,6 +523,76 @@ public void shouldKillTransactionsOnShutdown() throws Throwable
}
}

@Test
public void txReturnsCorrectIdWhenCommitted() throws Exception
{
executeDummyTxs( db, 42 );

KernelTransaction tx = kernel.newTransaction( KernelTransaction.Type.implicit, AccessMode.Static.FULL );
try ( Statement statement = tx.acquireStatement() )
{
statement.dataWriteOperations().nodeCreate();
}
tx.success();

long previousCommittedTxId = lastCommittedTxId( db );

assertEquals( previousCommittedTxId + 1, tx.closeTransaction() );
assertFalse( tx.isOpen() );
}

@Test
public void txReturnsCorrectIdWhenRolledBack() throws Exception
{
executeDummyTxs( db, 42 );

KernelTransaction tx = kernel.newTransaction( KernelTransaction.Type.implicit, AccessMode.Static.FULL );
try ( Statement statement = tx.acquireStatement() )
{
statement.dataWriteOperations().nodeCreate();
}
tx.failure();

assertEquals( KernelTransaction.NOT_COMMITTED, tx.closeTransaction() );
assertFalse( tx.isOpen() );
}

@Test
public void txReturnsCorrectIdWhenReadOnly() throws Exception
{
executeDummyTxs( db, 42 );

KernelTransaction tx = kernel.newTransaction( KernelTransaction.Type.implicit, AccessMode.Static.FULL );
try ( Statement statement = tx.acquireStatement();
Cursor<NodeItem> cursor = statement.readOperations().nodeCursor( 1 ) )
{
assertTrue( cursor.next() );
cursor.close();
}
tx.success();

assertEquals( KernelTransaction.NOT_COMMITTED, tx.closeTransaction() );
assertFalse( tx.isOpen() );
}

private static void executeDummyTxs( GraphDatabaseService db, int count )
{
for ( int i = 0; i < count; i++ )
{
try ( Transaction tx = db.beginTx() )
{
db.createNode();
tx.success();
}
}
}

private static long lastCommittedTxId( GraphDatabaseAPI db )
{
TransactionIdStore txIdStore = db.getDependencyResolver().resolveDependency( TransactionIdStore.class );
return txIdStore.getLastCommittedTransactionId();
}

private IndexDescriptor createIndex( SchemaWriteOperations schemaWriteOperations ) throws SchemaKernelException
{
return schemaWriteOperations.indexCreate( schemaWriteOperations.labelGetOrCreateForName( "hello" ),
Expand Down

0 comments on commit 1c2c9b2

Please sign in to comment.