Skip to content

Commit

Permalink
Merge pull request #9014 from martinfurmanski/3.1-backport-hz-robustness
Browse files Browse the repository at this point in the history
Backport hz robustness
  • Loading branch information
mneedham committed Mar 15, 2017
2 parents 5d3489c + cac693d commit 6c27da8
Show file tree
Hide file tree
Showing 35 changed files with 854 additions and 421 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
*/
package org.neo4j.test;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
Expand All @@ -32,7 +34,19 @@

public class OnDemandJobScheduler extends LifecycleAdapter implements JobScheduler
{
private Runnable job;
private List<Runnable> jobs = new CopyOnWriteArrayList<>();

private final boolean removeJobsAfterExecution;

public OnDemandJobScheduler()
{
this( true );
}

public OnDemandJobScheduler( boolean removeJobsAfterExecution)
{
this.removeJobsAfterExecution = removeJobsAfterExecution;
}

@Override
public Executor executor( Group group )
Expand All @@ -42,7 +56,7 @@ public Executor executor( Group group )
@Override
public void execute( Runnable command )
{
job = command;
jobs.add( command );
}
};
}
Expand All @@ -62,42 +76,46 @@ public JobHandle schedule( Group group, Runnable job )
@Override
public JobHandle schedule( Group group, Runnable job, Map<String,String> metadata )
{
this.job = job;
jobs.add( job );
return new OnDemandJobHandle();
}

@Override
public JobHandle schedule( Group group, Runnable job, long initialDelay, TimeUnit timeUnit )
{
this.job = job;
jobs.add( job );
return new OnDemandJobHandle();
}

@Override
public JobHandle scheduleRecurring( Group group, Runnable runnable, long period, TimeUnit timeUnit )
{
this.job = runnable;
jobs.add( runnable );
return new OnDemandJobHandle();
}

@Override
public JobHandle scheduleRecurring( Group group, Runnable runnable, long initialDelay,
long period, TimeUnit timeUnit )
{
this.job = runnable;
jobs.add( runnable );
return new OnDemandJobHandle();
}

public Runnable getJob()
{
return job;
return jobs.size() > 0 ? jobs.get( 0 ) : null;
}

public void runJob()
{
if ( job != null )
for ( Runnable job : jobs )
{
job.run();
if ( removeJobsAfterExecution )
{
jobs.remove( job );
}
}
}

Expand All @@ -106,7 +124,7 @@ private class OnDemandJobHandle implements JobHandle
@Override
public void cancel( boolean mayInterruptIfRunning )
{
job = null;
jobs.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,12 @@ public <T> T makeBlockingRequest( MemberId target, CatchUpRequest request,
Optional<AdvertisedSocketAddress> catchUpAddress =
discoveryService.coreServers().find( target ).map( CoreAddresses::getCatchupServer );

CatchUpChannel channel = pool.acquire( catchUpAddress.orElseThrow(
() -> new CatchUpClientException( "Cannot find the target member socket address" ) ) );
if ( !catchUpAddress.isPresent() )
{
throw new CatchUpClientException( "Cannot find the target member socket address" );
}

CatchUpChannel channel = pool.acquire( catchUpAddress.get() );

future.whenComplete( ( result, e ) -> {
if ( e == null )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException;
Expand All @@ -47,6 +46,7 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.PANIC;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.STORE_COPYING;
import static org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.State.TX_PULLING;
Expand Down Expand Up @@ -87,9 +87,10 @@ enum State
private final PullRequestMonitor pullRequestMonitor;

private RenewableTimeout timeout;
private State state = TX_PULLING;
private volatile State state = TX_PULLING;
private DatabaseHealth dbHealth;
private CompletableFuture<Boolean> upToDateFuture; // we are up-to-date when we are successfully pulling
private volatile long latestTxIdOfUpStream;

public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDatabase,
Lifecycle startStopOnStoreCopy, CatchUpClient catchUpClient,
Expand Down Expand Up @@ -239,23 +240,23 @@ private boolean pullAndApplyBatchOfTransactions( MemberId core, StoreId localSto
TxPullRequest txPullRequest = new TxPullRequest( lastQueuedTxId, localStoreId );
log.debug( "Pull transactions where tx id > %d [batch #%d]", lastQueuedTxId, batchCount );

CatchupResult catchupResult;
TxStreamFinishedResponse response;
try
{
catchupResult = catchUpClient.makeBlockingRequest( core, txPullRequest, new CatchUpResponseAdaptor<CatchupResult>()
response = catchUpClient.makeBlockingRequest( core, txPullRequest, new CatchUpResponseAdaptor<TxStreamFinishedResponse>()
{
@Override
public void onTxPullResponse( CompletableFuture<CatchupResult> signal, TxPullResponse response )
public void onTxPullResponse( CompletableFuture<TxStreamFinishedResponse> signal, TxPullResponse response )
{
handleTransaction( response.tx() );
}

@Override
public void onTxStreamFinishedResponse( CompletableFuture<CatchupResult> signal,
public void onTxStreamFinishedResponse( CompletableFuture<TxStreamFinishedResponse> signal,
TxStreamFinishedResponse response )
{
streamComplete();
signal.complete( response.status() );
signal.complete( response );
}
} );
}
Expand All @@ -265,7 +266,9 @@ public void onTxStreamFinishedResponse( CompletableFuture<CatchupResult> signal,
return false;
}

switch ( catchupResult )
latestTxIdOfUpStream = response.latestTxId();

switch ( response.status() )
{
case SUCCESS_END_OF_BATCH:
return true;
Expand Down Expand Up @@ -333,7 +336,20 @@ private void downloadDatabase( MemberId core, StoreId localStoreId )
throw new RuntimeException( throwable );
}

latestTxIdOfUpStream = 0; // we will find out on the next pull request response
state = TX_PULLING;
applier.refreshFromNewStore();
}

public String describeState()
{
if ( state == TX_PULLING && applier.lastQueuedTxId() > 0 && latestTxIdOfUpStream > 0 )
{
return format( "%s (%d of %d)", TX_PULLING.name(), applier.lastQueuedTxId(), latestTxIdOfUpStream );
}
else
{
return state.name();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class TxPullRequestHandler extends SimpleChannelInboundHandler<TxPullRequ

public TxPullRequestHandler( CatchupServerProtocol protocol, Supplier<StoreId> storeIdSupplier,
BooleanSupplier databaseAvailable, Supplier<TransactionIdStore> transactionIdStoreSupplier,

Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier, int batchSize, Monitors monitors, LogProvider logProvider )
{
this.protocol = protocol;
Expand All @@ -80,6 +79,8 @@ protected void channelRead0( ChannelHandlerContext ctx, final TxPullRequest msg
CatchupResult status = SUCCESS_END_OF_STREAM;
StoreId localStoreId = storeIdSupplier.get();

long lastCommittedTransactionId = transactionIdStore.getLastCommittedTransactionId();

if ( localStoreId == null || !localStoreId.equals( msg.expectedStoreId() ) )
{
status = E_STORE_ID_MISMATCH;
Expand All @@ -93,7 +94,7 @@ else if ( !databaseAvailable.getAsBoolean() )
status = E_STORE_UNAVAILABLE;
log.info( "Failed to serve TxPullRequest for tx %d because the local database is unavailable.", lastTxId );
}
else if ( transactionIdStore.getLastCommittedTransactionId() >= firstTxId )
else if ( lastCommittedTransactionId >= firstTxId )
{
try ( IOCursor<CommittedTransactionRepresentation> cursor =
logicalTransactionStore.getTransactions( firstTxId ) )
Expand Down Expand Up @@ -124,7 +125,7 @@ else if ( transactionIdStore.getLastCommittedTransactionId() >= firstTxId )
}

ctx.write( ResponseMessageType.TX_STREAM_FINISHED );
TxStreamFinishedResponse response = new TxStreamFinishedResponse( status );
TxStreamFinishedResponse response = new TxStreamFinishedResponse( status, lastCommittedTransactionId );
ctx.write( response );
ctx.flush();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,22 @@
public class TxStreamFinishedResponse
{
private final CatchupResult status;
private final long latestTxId;

public CatchupResult status()
{
return status;
}

TxStreamFinishedResponse( CatchupResult status )
TxStreamFinishedResponse( CatchupResult status, long latestTxId )
{
this.status = status;
this.latestTxId = latestTxId;
}

public long latestTxId()
{
return latestTxId;
}

@Override
Expand All @@ -47,18 +54,22 @@ public boolean equals( Object o )
if ( o == null || getClass() != o.getClass() )
{ return false; }
TxStreamFinishedResponse that = (TxStreamFinishedResponse) o;
return status == that.status;
return latestTxId == that.latestTxId &&
status == that.status;
}

@Override
public int hashCode()
{
return Objects.hash( status );
return Objects.hash( status, latestTxId );
}

@Override
public String toString()
{
return format( "TxStreamFinishedResponse{status=%s}", status );
return "TxStreamFinishedResponse{" +
"status=" + status +
", latestTxId=" + latestTxId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public class TxStreamFinishedResponseDecoder extends ByteToMessageDecoder
protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out ) throws Exception
{
int ordinal = msg.readInt();
long latestTxid = msg.readLong();
CatchupResult status = CatchupResult.values()[ordinal];
out.add( new TxStreamFinishedResponse( status ) );
out.add( new TxStreamFinishedResponse( status, latestTxid ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ protected void encode( ChannelHandlerContext ctx, TxStreamFinishedResponse respo
Exception
{
out.writeInt( response.status().ordinal() );
out.writeLong( response.latestTxId() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,7 @@ public class CausalClusteringSettings
public static final Setting<Long> read_replica_time_to_live =
setting( "causal_clustering.read_replica_time_to_live", DURATION, "1m", min(60_000L) );

@Description( "Read replica 'call home' frequency" )
public static final Setting<Long> read_replica_refresh_rate =
setting( "causal_clustering.read_replica_refresh_rate", DURATION, "5s", min(5_000L) );

@Description( "How long drivers should cache the data from the `dbms.cluster.routing.getServers()` procedure." )
@Description( "How long drivers should cache the data from the `dbms.cluster.routing.getServers()` procedure." )
public static final Setting<Long> cluster_routing_ttl =
setting( "causal_clustering.cluster_routing_ttl", DURATION, "5m", min(1_000L) );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public interface CoreTopologyService extends TopologyService
*/
boolean setClusterId( ClusterId clusterId );

void refreshCoreTopology();

interface Listener
{
void onCoreTopologyChange( CoreTopology coreTopology );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.causalclustering.discovery;

import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.util.JobScheduler;
Expand All @@ -31,5 +30,5 @@ CoreTopologyService coreTopologyService( Config config, MemberId myself, JobSche
LogProvider logProvider, LogProvider userLogProvider );

TopologyService readReplicaDiscoveryService( Config config, LogProvider logProvider,
DelayedRenewableTimeoutService timeoutService, long readReplicaTimeToLiveTimeout, long readReplicaRefreshRate );
JobScheduler jobScheduler );
}

0 comments on commit 6c27da8

Please sign in to comment.