Skip to content

Commit

Permalink
core-edge: unite all the RPC timeouts and default to 5 seconds
Browse files Browse the repository at this point in the history
This is an intermediate quick-fix that will work well in most
situations, pending a more crafty solution.
  • Loading branch information
martinfurmanski committed Sep 20, 2016
1 parent 6e69798 commit 9c08322
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 14 deletions.
Expand Up @@ -46,6 +46,8 @@


public class CatchUpClient extends LifecycleAdapter public class CatchUpClient extends LifecycleAdapter
{ {
private static final int DEFAULT_INACTIVITY_TIMEOUT = 5; // seconds

private final LogProvider logProvider; private final LogProvider logProvider;
private final TopologyService discoveryService; private final TopologyService discoveryService;
private final Log log; private final Log log;
Expand All @@ -64,14 +66,20 @@ public CatchUpClient( TopologyService discoveryService, LogProvider logProvider,
this.monitors = monitors; this.monitors = monitors;
} }


public <T> T makeBlockingRequest( MemberId memberId, CatchUpRequest request, public <T> T makeBlockingRequest( MemberId target, CatchUpRequest request,
CatchUpResponseCallback<T> responseHandler ) throws CatchUpClientException, NoKnownAddressesException
{
return makeBlockingRequest( target, request, DEFAULT_INACTIVITY_TIMEOUT, TimeUnit.SECONDS, responseHandler );
}

public <T> T makeBlockingRequest( MemberId target, CatchUpRequest request,
long inactivityTimeout, TimeUnit timeUnit, long inactivityTimeout, TimeUnit timeUnit,
CatchUpResponseCallback<T> responseHandler ) CatchUpResponseCallback<T> responseHandler )
throws CatchUpClientException, NoKnownAddressesException throws CatchUpClientException, NoKnownAddressesException
{ {
CompletableFuture<T> future = new CompletableFuture<>(); CompletableFuture<T> future = new CompletableFuture<>();
AdvertisedSocketAddress catchUpAddress = AdvertisedSocketAddress catchUpAddress =
discoveryService.coreServers().find( memberId ).getCatchupServer(); discoveryService.coreServers().find( target ).getCatchupServer();
CatchUpChannel channel = pool.acquire( catchUpAddress ); CatchUpChannel channel = pool.acquire( catchUpAddress );


future.whenComplete( ( result, e ) -> { future.whenComplete( ( result, e ) -> {
Expand All @@ -88,7 +96,7 @@ public <T> T makeBlockingRequest( MemberId memberId, CatchUpRequest request,
channel.setResponseHandler( responseHandler, future ); channel.setResponseHandler( responseHandler, future );
channel.send( request ); channel.send( request );


String operation = String.format( "Timed out executing operation %s on %s", request, memberId ); String operation = String.format( "Timed out executing operation %s on %s", request, target );


return TimeoutLoop.waitForCompletion( future, operation, return TimeoutLoop.waitForCompletion( future, operation,
channel::millisSinceLastResponse, inactivityTimeout, timeUnit ); channel::millisSinceLastResponse, inactivityTimeout, timeUnit );
Expand Down
Expand Up @@ -45,7 +45,7 @@ long copyStoreFiles( MemberId from, StoreId expectedStoreId, StoreFileStreams st
{ {
try try
{ {
return catchUpClient.makeBlockingRequest( from, new GetStoreRequest( expectedStoreId ), 30, SECONDS, return catchUpClient.makeBlockingRequest( from, new GetStoreRequest( expectedStoreId ),
new CatchUpResponseAdaptor<Long>() new CatchUpResponseAdaptor<Long>()
{ {
private long expectedBytes = 0; private long expectedBytes = 0;
Expand Down Expand Up @@ -89,7 +89,7 @@ StoreId fetchStoreId( MemberId from ) throws StoreIdDownloadFailedException
{ {
try try
{ {
return catchUpClient.makeBlockingRequest( from, new GetStoreIdRequest(), 30, SECONDS, return catchUpClient.makeBlockingRequest( from, new GetStoreIdRequest(),
new CatchUpResponseAdaptor<StoreId>() new CatchUpResponseAdaptor<StoreId>()
{ {
@Override @Override
Expand Down
Expand Up @@ -106,7 +106,7 @@ private synchronized void onTimeout()
long lastAppliedTxId = applier.lastAppliedTxId(); long lastAppliedTxId = applier.lastAppliedTxId();
pullRequestMonitor.txPullRequest( lastAppliedTxId ); pullRequestMonitor.txPullRequest( lastAppliedTxId );
TxPullRequest txPullRequest = new TxPullRequest( lastAppliedTxId, localDatabase.get() ); TxPullRequest txPullRequest = new TxPullRequest( lastAppliedTxId, localDatabase.get() );
catchUpClient.makeBlockingRequest( transactionServer, txPullRequest, 30, TimeUnit.SECONDS, catchUpClient.makeBlockingRequest( transactionServer, txPullRequest,
new CatchUpResponseAdaptor<CatchupResult>() new CatchUpResponseAdaptor<CatchupResult>()
{ {
@Override @Override
Expand Down
Expand Up @@ -48,7 +48,7 @@ public CatchupResult pullTransactions( MemberId from, StoreId storeId, long star
throws CatchUpClientException, NoKnownAddressesException throws CatchUpClientException, NoKnownAddressesException
{ {
pullRequestMonitor.txPullRequest( startTxId ); pullRequestMonitor.txPullRequest( startTxId );
return catchUpClient.makeBlockingRequest( from, new TxPullRequest( startTxId, storeId ), 30, SECONDS, return catchUpClient.makeBlockingRequest( from, new TxPullRequest( startTxId, storeId ),
new CatchUpResponseAdaptor<CatchupResult>() new CatchUpResponseAdaptor<CatchupResult>()
{ {
@Override @Override
Expand Down
Expand Up @@ -88,7 +88,7 @@ public synchronized void downloadSnapshot( MemberId source, CoreState coreState
* in the copied store. */ * in the copied store. */


CoreSnapshot coreSnapshot = catchUpClient.makeBlockingRequest( source, new CoreSnapshotRequest(), CoreSnapshot coreSnapshot = catchUpClient.makeBlockingRequest( source, new CoreSnapshotRequest(),
1, MINUTES, new CatchUpResponseAdaptor<CoreSnapshot>() new CatchUpResponseAdaptor<CoreSnapshot>()
{ {
@Override @Override
public void onCoreSnapshot( CompletableFuture<CoreSnapshot> signal, CoreSnapshot response ) public void onCoreSnapshot( CompletableFuture<CoreSnapshot> signal, CoreSnapshot response )
Expand Down
Expand Up @@ -58,12 +58,11 @@ public class TxPollingClientTest
private final BatchingTxApplier txApplier = mock( BatchingTxApplier.class ); private final BatchingTxApplier txApplier = mock( BatchingTxApplier.class );
private final ControlledRenewableTimeoutService timeoutService = new ControlledRenewableTimeoutService(); private final ControlledRenewableTimeoutService timeoutService = new ControlledRenewableTimeoutService();


private final long txPullTimeoutMillis = 100; private final long txPullIntervalMillis = 100;
private final StoreId storeId = new StoreId( 1, 2, 3, 4 ); private final StoreId storeId = new StoreId( 1, 2, 3, 4 );


private final TxPollingClient txPuller = new TxPollingClient( NullLogProvider.getInstance(), () -> storeId, private final TxPollingClient txPuller = new TxPollingClient( NullLogProvider.getInstance(), () -> storeId,
catchUpClient, serverSelection, catchUpClient, serverSelection, timeoutService, txPullIntervalMillis, txApplier, new Monitors() );
timeoutService, txPullTimeoutMillis, txApplier, new Monitors() );


@Before @Before
public void before() throws Throwable public void before() throws Throwable
Expand All @@ -85,7 +84,7 @@ public void shouldSendPullRequestOnTick() throws Throwable


// then // then
verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ),
anyLong(), any( TimeUnit.class ), any( CatchUpResponseCallback.class ) ); any( CatchUpResponseCallback.class ) );
} }


@Test @Test
Expand All @@ -99,7 +98,7 @@ public void shouldNotScheduleNewPullIfThereIsWorkPending() throws Exception


// then // then
verify( catchUpClient, never() ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), verify( catchUpClient, never() ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ),
anyLong(), any( TimeUnit.class ), any( CatchUpResponseCallback.class ) ); any( CatchUpResponseCallback.class ) );
} }


@Test @Test
Expand All @@ -111,7 +110,7 @@ public void shouldResetTxReceivedTimeoutOnTxReceived() throws Throwable
ArgumentCaptor<CatchUpResponseCallback> captor = ArgumentCaptor.forClass( CatchUpResponseCallback.class ); ArgumentCaptor<CatchUpResponseCallback> captor = ArgumentCaptor.forClass( CatchUpResponseCallback.class );


verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ), verify( catchUpClient ).makeBlockingRequest( any( MemberId.class ), any( TxPullRequest.class ),
anyLong(), any( TimeUnit.class ), captor.capture() ); captor.capture() );


captor.getValue().onTxPullResponse( null, new TxPullResponse( storeId, captor.getValue().onTxPullResponse( null, new TxPullResponse( storeId,
mock( CommittedTransactionRepresentation.class ) ) ); mock( CommittedTransactionRepresentation.class ) ) );
Expand Down

0 comments on commit 9c08322

Please sign in to comment.