Skip to content

Commit

Permalink
LocalDatabase is now the sole manager of DSM lifecycle in edges
Browse files Browse the repository at this point in the history
DataSourceManager lifecycle was controlled both directly and through
 LocalDatabase. This caused some confusion, which is now solved.

Effectively this is an implementation of 436737a
 for edges.
  • Loading branch information
digitalstain committed Aug 8, 2016
1 parent 5be4c53 commit d445115
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 46 deletions.
Expand Up @@ -19,29 +19,27 @@
*/
package org.neo4j.coreedge.edge;

import static org.neo4j.coreedge.edge.EnterpriseEdgeEditionModule.extractBoltAddress;

import java.util.concurrent.locks.LockSupport;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.catchup.storecopy.StoreFetcher;
import org.neo4j.coreedge.messaging.routing.CoreMemberSelectionException;
import org.neo4j.coreedge.discovery.EdgeTopologyService;
import org.neo4j.coreedge.messaging.routing.CoreMemberSelectionStrategy;
import org.neo4j.coreedge.core.state.machines.tx.RetryStrategy;
import org.neo4j.coreedge.discovery.EdgeTopologyService;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.messaging.routing.CoreMemberSelectionException;
import org.neo4j.coreedge.messaging.routing.CoreMemberSelectionStrategy;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static org.neo4j.coreedge.edge.EnterpriseEdgeEditionModule.extractBoltAddress;

public class EdgeStartupProcess implements Lifecycle
{
private final StoreFetcher storeFetcher;
private final LocalDatabase localDatabase;
private final Lifecycle txPulling;
private final DataSourceManager dataSourceManager;
private final CoreMemberSelectionStrategy connectionStrategy;
private final Log log;
private final EdgeTopologyService discoveryService;
Expand All @@ -52,15 +50,13 @@ public EdgeStartupProcess(
StoreFetcher storeFetcher,
LocalDatabase localDatabase,
Lifecycle txPulling,
DataSourceManager dataSourceManager,
CoreMemberSelectionStrategy connectionStrategy,
RetryStrategy retryStrategy,
LogProvider logProvider, EdgeTopologyService discoveryService, Config config )
{
this.storeFetcher = storeFetcher;
this.localDatabase = localDatabase;
this.txPulling = txPulling;
this.dataSourceManager = dataSourceManager;
this.connectionStrategy = connectionStrategy;
this.timeout = retryStrategy.newTimeout();
this.log = logProvider.getLog( getClass() );
Expand All @@ -71,14 +67,14 @@ public EdgeStartupProcess(
@Override
public void init() throws Throwable
{
dataSourceManager.init();
localDatabase.init();
txPulling.init();
}

@Override
public void start() throws Throwable
{
dataSourceManager.start();
localDatabase.start();

MemberId memberId = findCoreMemberToCopyFrom();
if ( localDatabase.isEmpty() )
Expand Down Expand Up @@ -120,13 +116,13 @@ private MemberId findCoreMemberToCopyFrom()
public void stop() throws Throwable
{
txPulling.stop();
dataSourceManager.stop();
localDatabase.stop();
}

@Override
public void shutdown() throws Throwable
{
txPulling.shutdown();
dataSourceManager.shutdown();
localDatabase.shutdown();
}
}
Expand Up @@ -218,7 +218,7 @@ edgeToCoreClient, new ConnectToRandomCoreMember( discoveryService ),
platformModule.dataSourceManager,
dependencies.provideDependency( TransactionIdStore.class ),
databaseHealthSupplier, logProvider ),
txPulling, platformModule.dataSourceManager, new ConnectToRandomCoreMember( discoveryService ),
txPulling, new ConnectToRandomCoreMember( discoveryService ),
new ExponentialBackoffStrategy( 1, TimeUnit.SECONDS ), logProvider, discoveryService, config ) );

dependencies.satisfyDependency( createSessionTracker() );
Expand Down
Expand Up @@ -85,7 +85,6 @@ public void edgeServersShouldRegisterThemselvesWithTheTopologyWhenTheyStart() th
final EdgeStartupProcess startupProcess = new EdgeStartupProcess( null,
localDatabase,
mock( Lifecycle.class ),
mock( DataSourceManager.class ),
connectionStrategy,
new ConstantTimeRetryStrategy( 1, TimeUnit.MILLISECONDS ),
NullLogProvider.getInstance(), topology, config );
Expand Down
Expand Up @@ -19,12 +19,18 @@
*/
package org.neo4j.coreedge.edge;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static org.neo4j.helpers.collection.Iterators.asSet;

import java.util.UUID;

import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.catchup.storecopy.StoreFetcher;
import org.neo4j.coreedge.core.state.machines.tx.ConstantTimeRetryStrategy;
Expand All @@ -39,49 +45,41 @@
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.NullLogProvider;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.junit.Assert.fail;

import static org.neo4j.helpers.collection.Iterators.asSet;

public class EdgeStartupProcessTest
{
@Test
public void startShouldReplaceTheEmptyLocalStoreWithStoreFromCoreMemberAndStartPolling() throws Throwable
{
// given
StoreFetcher storeFetcher = Mockito.mock( StoreFetcher.class );
LocalDatabase localDatabase = Mockito.mock( LocalDatabase.class );
StoreFetcher storeFetcher = mock( StoreFetcher.class );
LocalDatabase localDatabase = mock( LocalDatabase.class );

MemberId memberId = new MemberId( UUID.randomUUID() );
TopologyService hazelcastTopology = Mockito.mock( TopologyService.class );
TopologyService hazelcastTopology = mock( TopologyService.class );

ClusterTopology clusterTopology = Mockito.mock( ClusterTopology.class );
Mockito.when( hazelcastTopology.currentTopology() ).thenReturn( clusterTopology );
ClusterTopology clusterTopology = mock( ClusterTopology.class );
when( hazelcastTopology.currentTopology() ).thenReturn( clusterTopology );

Mockito.when( clusterTopology.coreMembers() ).thenReturn( asSet( memberId ) );
Mockito.when( localDatabase.isEmpty() ).thenReturn( true );
when( clusterTopology.coreMembers() ).thenReturn( asSet( memberId ) );
when( localDatabase.isEmpty() ).thenReturn( true );

DataSourceManager dataSourceManager = Mockito.mock( DataSourceManager.class );
Lifecycle txPulling = Mockito.mock( Lifecycle.class );
Lifecycle txPulling = mock( Lifecycle.class );

EdgeStartupProcess edgeStartupProcess = new EdgeStartupProcess( storeFetcher, localDatabase,
txPulling, dataSourceManager, new AlwaysChooseFirstMember( hazelcastTopology ),
txPulling, new AlwaysChooseFirstMember( hazelcastTopology ),
new ConstantTimeRetryStrategy( 1, MILLISECONDS ), NullLogProvider.getInstance(),
Mockito.mock( EdgeTopologyService.class ), Config.empty() );
mock( EdgeTopologyService.class ), Config.empty() );

// when
edgeStartupProcess.start();

// then
Mockito.verify( dataSourceManager ).start();
Mockito.verify( localDatabase ).isEmpty();
Mockito.verify( localDatabase ).stop();
Mockito.verify( localDatabase ).copyStoreFrom( memberId, storeFetcher );
Mockito.verify( localDatabase ).start();
Mockito.verify( localDatabase, times( 2 ) ).start(); // once for initial start, once for after store copy
Mockito.verify( txPulling ).start();
Mockito.verifyNoMoreInteractions( localDatabase, dataSourceManager, txPulling );
Mockito.verifyNoMoreInteractions( localDatabase, txPulling );
}

@Test
Expand All @@ -105,7 +103,7 @@ public void startShouldNotReplaceTheNonEmptyLocalStoreWithStoreFromCoreMemberAnd
Lifecycle txPulling = Mockito.mock( Lifecycle.class );

EdgeStartupProcess edgeStartupProcess = new EdgeStartupProcess( storeFetcher, localDatabase,
txPulling, dataSourceManager, new AlwaysChooseFirstMember( hazelcastTopology ),
txPulling, new AlwaysChooseFirstMember( hazelcastTopology ),
new ConstantTimeRetryStrategy( 1, MILLISECONDS ), NullLogProvider.getInstance(),
Mockito.mock( EdgeTopologyService.class ), Config.empty() );

Expand All @@ -121,7 +119,7 @@ txPulling, dataSourceManager, new AlwaysChooseFirstMember( hazelcastTopology ),
}

// then
Mockito.verify( dataSourceManager ).start();
Mockito.verify( localDatabase ).start();
Mockito.verify( localDatabase ).isEmpty();
Mockito.verify( localDatabase ).ensureSameStoreId( memberId, storeFetcher );
Mockito.verifyNoMoreInteractions( localDatabase, dataSourceManager );
Expand All @@ -147,11 +145,10 @@ public void startShouldSimplyStartPollingOnNonEmptyDatabaseAndMatchingStoreId()
Mockito.when( clusterTopology.coreMembers() ).thenReturn( asSet( memberId ) );
Mockito.when( localDatabase.isEmpty() ).thenReturn( false );

DataSourceManager dataSourceManager = Mockito.mock( DataSourceManager.class );
Lifecycle txPulling = Mockito.mock( Lifecycle.class );

EdgeStartupProcess edgeStartupProcess = new EdgeStartupProcess( storeFetcher, localDatabase,
txPulling, dataSourceManager, new AlwaysChooseFirstMember( hazelcastTopology ),
txPulling, new AlwaysChooseFirstMember( hazelcastTopology ),
new ConstantTimeRetryStrategy( 1, MILLISECONDS ), NullLogProvider.getInstance(),
Mockito.mock( EdgeTopologyService.class ), Config.empty() );

Expand All @@ -161,9 +158,9 @@ txPulling, dataSourceManager, new AlwaysChooseFirstMember( hazelcastTopology ),
// then
Mockito.verify( localDatabase ).isEmpty();
Mockito.verify( localDatabase ).ensureSameStoreId( memberId, storeFetcher );
Mockito.verify( dataSourceManager ).start();
Mockito.verify( localDatabase ).start();
Mockito.verify( txPulling ).start();
Mockito.verifyNoMoreInteractions( localDatabase, dataSourceManager, txPulling );
Mockito.verifyNoMoreInteractions( localDatabase, txPulling );
}

@Test
Expand All @@ -181,10 +178,9 @@ public void stopShouldStopTheDatabaseAndStopPolling() throws Throwable
Mockito.when( hazelcastTopology.currentTopology() ).thenReturn( clusterTopology );
Mockito.when( localDatabase.isEmpty() ).thenReturn( true );

DataSourceManager dataSourceManager = Mockito.mock( DataSourceManager.class );
Lifecycle txPulling = Mockito.mock( Lifecycle.class );
EdgeStartupProcess edgeStartupProcess = new EdgeStartupProcess( storeFetcher, localDatabase,
txPulling, dataSourceManager, new AlwaysChooseFirstMember( hazelcastTopology ),
txPulling, new AlwaysChooseFirstMember( hazelcastTopology ),
new ConstantTimeRetryStrategy( 1, MILLISECONDS ), NullLogProvider.getInstance(),
Mockito.mock( EdgeTopologyService.class ), null );

Expand All @@ -193,7 +189,7 @@ txPulling, dataSourceManager, new AlwaysChooseFirstMember( hazelcastTopology ),

// then
Mockito.verify( txPulling ).stop();
Mockito.verify( dataSourceManager ).stop();
Mockito.verifyNoMoreInteractions( txPulling, dataSourceManager );
Mockito.verify( localDatabase ).stop();
Mockito.verifyNoMoreInteractions( txPulling );
}
}

0 comments on commit d445115

Please sign in to comment.