Skip to content

Commit

Permalink
core-edge adaptations for the new commit process
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Nov 30, 2015
1 parent 6a6d020 commit 32294b4
Show file tree
Hide file tree
Showing 14 changed files with 125 additions and 177 deletions.
Expand Up @@ -19,9 +19,8 @@
*/
package org.neo4j.coreedge.catchup.tx.edge;

import java.io.IOException;

import org.neo4j.function.Supplier;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

Expand All @@ -43,7 +42,7 @@ public void onTxReceived( TxPullResponse tx )
{
transactionApplierSupplier.get().appendToLogAndApplyToStore( tx.tx() );
}
catch ( IOException e )
catch ( TransactionFailureException e )
{
log.error( "Failed to apply transaction.", e );
}
Expand Down
Expand Up @@ -19,96 +19,47 @@
*/
package org.neo4j.coreedge.catchup.tx.edge;

import java.io.IOException;

import org.neo4j.com.Response;
import org.neo4j.com.TransactionStream;
import org.neo4j.com.TransactionStreamResponse;
import org.neo4j.com.storecopy.TransactionObligationFulfiller;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.api.index.ValidatedIndexUpdates;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.Commitment;
import org.neo4j.kernel.impl.transaction.log.LogFile;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;

import static org.neo4j.kernel.impl.api.TransactionApplicationMode.EXTERNAL;
import static org.neo4j.kernel.impl.transaction.tracing.CommitEvent.NULL;

/**
* Receives and unpacks {@link Response responses}.
* Transaction obligations are handled by {@link TransactionObligationFulfiller} and
* {@link TransactionStream transaction streams} are {@link TransactionRepresentationStoreApplier applied to the
* store},
* {@link TransactionStream transaction streams} are {@link TransactionCommitProcess committed to the store},
* in batches.
* <p/>
* It is assumed that any {@link TransactionStreamResponse response carrying transaction data} comes from the one
* and same thread.
*/
public class TransactionApplier
{
private final TransactionAppender appender;
private final TransactionRepresentationStoreApplier storeApplier;
private final IndexUpdatesValidator indexUpdatesValidator;
private final LogFile logFile;
private final LogRotation logRotation;
private final KernelHealth kernelHealth;
private final TransactionRepresentationCommitProcess commitProcess;

public TransactionApplier( DependencyResolver resolver )
{
this.appender = resolver.resolveDependency( TransactionAppender.class );
this.storeApplier = resolver.resolveDependency( TransactionRepresentationStoreApplier.class );
this.indexUpdatesValidator = resolver.resolveDependency( IndexUpdatesValidator.class );
this.logFile = resolver.resolveDependency( LogFile.class );
this.logRotation = resolver.resolveDependency( LogRotation.class );
this.kernelHealth = resolver.resolveDependency( KernelHealth.class );
commitProcess = new TransactionRepresentationCommitProcess(
resolver.resolveDependency( TransactionAppender.class ),
resolver.resolveDependency( StorageEngine.class ),
resolver.resolveDependency( IndexUpdatesValidator.class ) );
}

public void appendToLogAndApplyToStore( CommittedTransactionRepresentation tx ) throws IOException
public void appendToLogAndApplyToStore( CommittedTransactionRepresentation tx ) throws TransactionFailureException
{
// Synchronize to guard for concurrent shutdown
synchronized ( logFile )
{
// Check rotation explicitly, since the version of append that we're calling isn't doing that.
logRotation.rotateLogIfNeeded( LogAppendEvent.NULL );

try
{
LogEntryCommit commitEntry = tx.getCommitEntry();

Commitment commitment = appender.append( tx.getTransactionRepresentation(), commitEntry.getTxId() );

appender.force();

long transactionId = commitEntry.getTxId();
TransactionRepresentation representation = tx.getTransactionRepresentation();
try
{
commitment.publishAsCommitted();
try ( LockGroup locks = new LockGroup();
ValidatedIndexUpdates indexUpdates = indexUpdatesValidator.validate( representation) )
{
storeApplier.apply( representation, indexUpdates, locks, transactionId, EXTERNAL );
}
}
finally
{
commitment.publishAsApplied();
}
}
catch ( IOException e )
{
// Kernel panic is done on this level, i.e. append and apply doesn't do that themselves.
kernelHealth.panic( e );
throw e;
}
}
commitProcess.commit( new TransactionToApply( tx.getTransactionRepresentation(),
tx.getCommitEntry().getTxId() ), NULL, EXTERNAL );
}
}
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.core.InMemoryTokenCache;
import org.neo4j.kernel.impl.core.NonUniqueTokenException;
import org.neo4j.kernel.impl.core.Token;
Expand Down Expand Up @@ -250,7 +251,8 @@ private int applyToStore( Collection<Command> commands ) throws NoSuchEntryExcep

try ( LockGroup lockGroup = new LockGroup() )
{
commitProcess.commit( representation, lockGroup, CommitEvent.NULL, TransactionApplicationMode.EXTERNAL );
commitProcess.commit( new TransactionToApply( representation ),
CommitEvent.NULL, TransactionApplicationMode.EXTERNAL );
}
catch ( TransactionFailureException e )
{
Expand Down
Expand Up @@ -24,8 +24,7 @@
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;

/**
Expand All @@ -45,14 +44,14 @@ public ReplayableCommitProcess( TransactionCommitProcess localCommitProcess, Tra
}

@Override
public long commit( TransactionRepresentation representation, LockGroup locks,
public long commit( TransactionToApply batch,
CommitEvent commitEvent,
TransactionApplicationMode mode ) throws TransactionFailureException
{
long txId = lastLocalTxId.incrementAndGet();
if ( txId > transactionCounter.lastCommittedTransactionId() )
{
return localCommitProcess.commit( representation, locks, commitEvent, mode );
return localCommitProcess.commit( batch, commitEvent, mode );
}
return txId;
}
Expand Down
Expand Up @@ -25,15 +25,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.neo4j.coreedge.raft.replication.session.LocalSessionPool;
import org.neo4j.coreedge.raft.replication.session.OperationContext;
import org.neo4j.coreedge.raft.replication.Replicator;
import org.neo4j.coreedge.raft.replication.Replicator.ReplicationFailedException;
import org.neo4j.coreedge.raft.replication.session.LocalSessionPool;
import org.neo4j.coreedge.raft.replication.session.OperationContext;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

Expand All @@ -42,7 +41,7 @@
public class ReplicatedTransactionCommitProcess extends LifecycleAdapter implements TransactionCommitProcess
{
private final Replicator replicator;
private ReplicatedTransactionStateMachine replicatedTxListener;
private final ReplicatedTransactionStateMachine replicatedTxListener;
private final LocalSessionPool sessionPool;

public ReplicatedTransactionCommitProcess( Replicator replicator,
Expand All @@ -56,7 +55,7 @@ public ReplicatedTransactionCommitProcess( Replicator replicator,
}

@Override
public long commit( final TransactionRepresentation tx, final LockGroup locks,
public long commit( final TransactionToApply tx,
final CommitEvent commitEvent,
TransactionApplicationMode mode ) throws TransactionFailureException
{
Expand All @@ -65,7 +64,8 @@ public long commit( final TransactionRepresentation tx, final LockGroup locks,
ReplicatedTransaction transaction;
try
{
transaction = createImmutableReplicatedTransaction( tx, operationContext.globalSession(), operationContext.localOperationId() );
transaction = createImmutableReplicatedTransaction( tx.transactionRepresentation(),
operationContext.globalSession(), operationContext.localOperationId() );
}
catch ( IOException e )
{
Expand Down
Expand Up @@ -19,17 +19,17 @@
*/
package org.neo4j.coreedge.raft.replication.tx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.neo4j.coreedge.raft.replication.session.GlobalSession;
import org.neo4j.coreedge.raft.replication.session.LocalOperationId;
import org.neo4j.coreedge.raft.net.NetworkReadableLogChannelNetty4;
import org.neo4j.coreedge.raft.net.NetworkWritableLogChannelNetty4;
import org.neo4j.coreedge.raft.replication.session.GlobalSession;
import org.neo4j.coreedge.raft.replication.session.LocalOperationId;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.log.CommandWriter;
Expand All @@ -42,7 +42,8 @@

public class ReplicatedTransactionFactory
{
public static ReplicatedTransaction createImmutableReplicatedTransaction( TransactionRepresentation tx, GlobalSession globalSession, LocalOperationId localOperationId ) throws IOException
public static ReplicatedTransaction createImmutableReplicatedTransaction(
TransactionRepresentation tx, GlobalSession globalSession, LocalOperationId localOperationId ) throws IOException
{
ByteBuf transactionBuffer = Unpooled.buffer();

Expand Down
Expand Up @@ -26,16 +26,16 @@

import org.neo4j.concurrent.CompletableFuture;
import org.neo4j.coreedge.raft.locks.CoreServiceAssignment;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.Replicator;
import org.neo4j.coreedge.raft.replication.session.GlobalSession;
import org.neo4j.coreedge.raft.replication.session.GlobalSessionTracker;
import org.neo4j.coreedge.raft.replication.session.LocalOperationId;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.Replicator;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;

Expand Down Expand Up @@ -100,12 +100,9 @@ private void handleTransaction( ReplicatedTransaction replicatedTransaction )
long txId = -1;
if ( !shouldReject )
{
try ( LockGroup lockGroup = new LockGroup() )
{
txId = localCommitProcess.commit( tx, lockGroup, CommitEvent.NULL,
TransactionApplicationMode.EXTERNAL );
lastCommittedTxId = txId;
}
txId = localCommitProcess.commit( new TransactionToApply( tx ), CommitEvent.NULL,
TransactionApplicationMode.EXTERNAL );
lastCommittedTxId = txId;
}

if ( replicatedTransaction.globalSession().equals( myGlobalSession ) )
Expand Down
Expand Up @@ -21,21 +21,21 @@

import java.io.File;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.catchup.storecopy.StoreFiles;
import org.neo4j.coreedge.catchup.storecopy.edge.CopiedStoreRecovery;
import org.neo4j.coreedge.catchup.storecopy.edge.EdgeToCoreClient;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreCopyClient;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreFetcher;
import org.neo4j.coreedge.catchup.tx.edge.ApplyPulledTransactions;
import org.neo4j.coreedge.catchup.tx.edge.TransactionApplier;
import org.neo4j.coreedge.catchup.tx.edge.TransactionLogCatchUpFactory;
import org.neo4j.coreedge.catchup.tx.edge.TxPollingClient;
import org.neo4j.coreedge.catchup.tx.edge.TxPullClient;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.catchup.storecopy.edge.EdgeToCoreClient;
import org.neo4j.coreedge.discovery.EdgeDiscoveryService;
import org.neo4j.coreedge.server.Expiration;
import org.neo4j.coreedge.catchup.tx.edge.ApplyPulledTransactions;
import org.neo4j.coreedge.catchup.tx.edge.TxPollingClient;
import org.neo4j.coreedge.server.ExpiryScheduler;
import org.neo4j.coreedge.catchup.storecopy.edge.CopiedStoreRecovery;
import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreCopyClient;
import org.neo4j.coreedge.catchup.storecopy.StoreFiles;
import org.neo4j.coreedge.catchup.tx.edge.TransactionApplier;
import org.neo4j.function.Supplier;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
Expand All @@ -55,7 +55,6 @@
import org.neo4j.kernel.impl.api.ReadOnlyTransactionCommitProcess;
import org.neo4j.kernel.impl.api.SchemaWriteGuard;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.constraints.StandardConstraintSemantics;
import org.neo4j.kernel.impl.core.DelegatingLabelTokenHolder;
Expand All @@ -67,6 +66,7 @@
import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory;
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.storemigration.ConfigMapUpgradeConfiguration;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
Expand Down Expand Up @@ -202,11 +202,11 @@ private CommitProcessFactory readOnly()
return new CommitProcessFactory()
{
@Override
public TransactionCommitProcess create( TransactionAppender appender, TransactionRepresentationStoreApplier applier, IndexUpdatesValidator indexUpdatesValidator, Config config )
public TransactionCommitProcess create( TransactionAppender appender, StorageEngine storageEngine,
IndexUpdatesValidator indexUpdatesValidator, Config config )
{
return new ReadOnlyTransactionCommitProcess();
}

};
}

Expand Down
Expand Up @@ -19,15 +19,11 @@
*/
package org.neo4j.coreedge.catchup.tx.edge;

import java.io.IOException;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import org.junit.Test;

import org.neo4j.coreedge.catchup.tx.edge.ApplyPulledTransactions;
import org.neo4j.coreedge.catchup.tx.edge.TxPullResponse;
import org.neo4j.coreedge.catchup.tx.edge.TransactionApplier;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.logging.Log;
Expand Down Expand Up @@ -71,16 +67,15 @@ public void shouldLogIfTransactionCannotBeApplied() throws Exception
StoreId storeId = new StoreId( 1, 1, 1, 1 );

TransactionApplier transactionApplier = mock( TransactionApplier.class );
doThrow( IOException.class ).when( transactionApplier ).appendToLogAndApplyToStore( any(
doThrow( TransactionFailureException.class ).when( transactionApplier ).appendToLogAndApplyToStore( any(
CommittedTransactionRepresentation.class ) );


LogProvider logProvider = mock( LogProvider.class );
Log log = mock( Log.class );
when( logProvider.getLog( ApplyPulledTransactions.class ) ).thenReturn( log );

ApplyPulledTransactions handler = new ApplyPulledTransactions( logProvider, singleton( transactionApplier )
);
ApplyPulledTransactions handler = new ApplyPulledTransactions( logProvider, singleton( transactionApplier ) );

// when
handler.onTxReceived( new TxPullResponse( storeId, mock( CommittedTransactionRepresentation.class ) ) );
Expand Down

0 comments on commit 32294b4

Please sign in to comment.