Skip to content

Commit

Permalink
Make Server enableable disableable
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW committed Apr 19, 2018
1 parent b6c5a36 commit 22355d4
Show file tree
Hide file tree
Showing 12 changed files with 523 additions and 36 deletions.
Expand Up @@ -37,14 +37,14 @@
import org.neo4j.causalclustering.core.consensus.schedule.TimerService.TimerName; import org.neo4j.causalclustering.core.consensus.schedule.TimerService.TimerName;
import org.neo4j.causalclustering.core.state.snapshot.TopologyLookupException; import org.neo4j.causalclustering.core.state.snapshot.TopologyLookupException;
import org.neo4j.causalclustering.discovery.TopologyService; import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.helper.Enableable;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionException; import org.neo4j.causalclustering.upstream.UpstreamDatabaseSelectionException;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector; import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
Expand Down Expand Up @@ -86,7 +86,7 @@ enum State


private final LocalDatabase localDatabase; private final LocalDatabase localDatabase;
private final Log log; private final Log log;
private final Lifecycle startStopOnStoreCopy; private final Enableable enableDisableOnStoreCopy;
private final StoreCopyProcess storeCopyProcess; private final StoreCopyProcess storeCopyProcess;
private final Supplier<DatabaseHealth> databaseHealthSupplier; private final Supplier<DatabaseHealth> databaseHealthSupplier;
private final CatchUpClient catchUpClient; private final CatchUpClient catchUpClient;
Expand All @@ -103,14 +103,15 @@ enum State
private CompletableFuture<Boolean> upToDateFuture; // we are up-to-date when we are successfully pulling private CompletableFuture<Boolean> upToDateFuture; // we are up-to-date when we are successfully pulling
private volatile long latestTxIdOfUpStream; private volatile long latestTxIdOfUpStream;


public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDatabase, Lifecycle startStopOnStoreCopy, CatchUpClient catchUpClient, public CatchupPollingProcess( LogProvider logProvider, LocalDatabase localDatabase, Enableable enableDisableOnSoreCopy, CatchUpClient catchUpClient,
UpstreamDatabaseStrategySelector selectionStrategy, TimerService timerService, long txPullIntervalMillis, BatchingTxApplier applier, UpstreamDatabaseStrategySelector selectionStrategy, TimerService timerService, long txPullIntervalMillis,
Monitors monitors, StoreCopyProcess storeCopyProcess, Supplier<DatabaseHealth> databaseHealthSupplier, TopologyService topologyService ) BatchingTxApplier applier, Monitors monitors, StoreCopyProcess storeCopyProcess,
Supplier<DatabaseHealth> databaseHealthSupplier, TopologyService topologyService )


{ {
this.localDatabase = localDatabase; this.localDatabase = localDatabase;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.startStopOnStoreCopy = startStopOnStoreCopy; this.enableDisableOnStoreCopy = enableDisableOnSoreCopy;
this.catchUpClient = catchUpClient; this.catchUpClient = catchUpClient;
this.selectionStrategyPipeline = selectionStrategy; this.selectionStrategyPipeline = selectionStrategy;
this.timerService = timerService; this.timerService = timerService;
Expand Down Expand Up @@ -312,7 +313,7 @@ private void downloadDatabase( StoreId localStoreId )
try try
{ {
localDatabase.stopForStoreCopy(); localDatabase.stopForStoreCopy();
startStopOnStoreCopy.stop(); enableDisableOnStoreCopy.disable();
} }
catch ( Throwable throwable ) catch ( Throwable throwable )
{ {
Expand All @@ -339,7 +340,7 @@ private void downloadDatabase( StoreId localStoreId )
try try
{ {
localDatabase.start(); localDatabase.start();
startStopOnStoreCopy.start(); enableDisableOnStoreCopy.enable();
} }
catch ( Throwable throwable ) catch ( Throwable throwable )
{ {
Expand Down
Expand Up @@ -24,7 +24,6 @@
import java.util.Collection; import java.util.Collection;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;


import org.neo4j.causalclustering.ReplicationModule; import org.neo4j.causalclustering.ReplicationModule;
Expand All @@ -33,7 +32,6 @@
import org.neo4j.causalclustering.catchup.CatchupProtocolServerInstaller; import org.neo4j.causalclustering.catchup.CatchupProtocolServerInstaller;
import org.neo4j.causalclustering.catchup.CatchupServerBuilder; import org.neo4j.causalclustering.catchup.CatchupServerBuilder;
import org.neo4j.causalclustering.catchup.CatchupServerHandler; import org.neo4j.causalclustering.catchup.CatchupServerHandler;
import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.catchup.CheckpointerSupplier; import org.neo4j.causalclustering.catchup.CheckpointerSupplier;
import org.neo4j.causalclustering.catchup.RegularCatchupServerHandler; import org.neo4j.causalclustering.catchup.RegularCatchupServerHandler;
import org.neo4j.causalclustering.catchup.storecopy.CommitStateHelper; import org.neo4j.causalclustering.catchup.storecopy.CommitStateHelper;
Expand Down Expand Up @@ -65,8 +63,10 @@
import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService; import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService;
import org.neo4j.causalclustering.core.state.storage.DurableStateStorage; import org.neo4j.causalclustering.core.state.storage.DurableStateStorage;
import org.neo4j.causalclustering.core.state.storage.StateStorage; import org.neo4j.causalclustering.core.state.storage.StateStorage;
import org.neo4j.causalclustering.helper.CompositeEnableable;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy; import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler; import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.causalclustering.helper.Enableable;
import org.neo4j.causalclustering.net.InstalledProtocolHandler; import org.neo4j.causalclustering.net.InstalledProtocolHandler;
import org.neo4j.causalclustering.net.Server; import org.neo4j.causalclustering.net.Server;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller; import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
Expand Down Expand Up @@ -147,7 +147,7 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla
this.logProvider = logging.getInternalLogProvider(); this.logProvider = logging.getInternalLogProvider();
LogProvider userLogProvider = logging.getUserLogProvider(); LogProvider userLogProvider = logging.getUserLogProvider();


LifeSupport servicesToStopOnStoreCopy = new LifeSupport(); CompositeEnableable servicesToStopOnStoreCopy = new CompositeEnableable();


StateStorage<Long> lastFlushedStorage = platformModule.life.add( StateStorage<Long> lastFlushedStorage = platformModule.life.add(
new DurableStateStorage<>( platformModule.fileSystem, clusterStateDirectory, LAST_FLUSHED_NAME, new LongIndexMarshal(), new DurableStateStorage<>( platformModule.fileSystem, clusterStateDirectory, LAST_FLUSHED_NAME, new LongIndexMarshal(),
Expand Down Expand Up @@ -251,7 +251,7 @@ private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPip
return catchUpClient; return catchUpClient;
} }


private CoreStateDownloader createCoreStateDownloader( LifeSupport servicesToStopOnStoreCopy, CatchUpClient catchUpClient ) private CoreStateDownloader createCoreStateDownloader( Enableable servicesToStopOnStoreCopy, CatchUpClient catchUpClient )
{ {
ExponentialBackoffStrategy storeCopyBackoffStrategy = ExponentialBackoffStrategy storeCopyBackoffStrategy =
new ExponentialBackoffStrategy( 1, config.get( CausalClusteringSettings.store_copy_backoff_max_wait ).toMillis(), TimeUnit.MILLISECONDS ); new ExponentialBackoffStrategy( 1, config.get( CausalClusteringSettings.store_copy_backoff_max_wait ).toMillis(), TimeUnit.MILLISECONDS );
Expand Down
Expand Up @@ -37,9 +37,9 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreIdDownloadFailedException; import org.neo4j.causalclustering.catchup.storecopy.StoreIdDownloadFailedException;
import org.neo4j.causalclustering.core.state.CoreSnapshotService; import org.neo4j.causalclustering.core.state.CoreSnapshotService;
import org.neo4j.causalclustering.core.state.machines.CoreStateMachines; import org.neo4j.causalclustering.core.state.machines.CoreStateMachines;
import org.neo4j.causalclustering.helper.Enableable;
import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleException; import org.neo4j.kernel.lifecycle.LifecycleException;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
Expand All @@ -51,7 +51,7 @@
public class CoreStateDownloader public class CoreStateDownloader
{ {
private final LocalDatabase localDatabase; private final LocalDatabase localDatabase;
private final Lifecycle startStopOnStoreCopy; private final Enableable enableDisableOnStoreCopy;
private final RemoteStore remoteStore; private final RemoteStore remoteStore;
private final CatchUpClient catchUpClient; private final CatchUpClient catchUpClient;
private final Log log; private final Log log;
Expand All @@ -60,13 +60,13 @@ public class CoreStateDownloader
private final CoreSnapshotService snapshotService; private final CoreSnapshotService snapshotService;
private CommitStateHelper commitStateHelper; private CommitStateHelper commitStateHelper;


public CoreStateDownloader( LocalDatabase localDatabase, Lifecycle startStopOnStoreCopy, RemoteStore remoteStore, public CoreStateDownloader( LocalDatabase localDatabase, Enableable enableDisableOnStoreCopy, RemoteStore remoteStore,
CatchUpClient catchUpClient, LogProvider logProvider, StoreCopyProcess storeCopyProcess, CatchUpClient catchUpClient, LogProvider logProvider, StoreCopyProcess storeCopyProcess,
CoreStateMachines coreStateMachines, CoreSnapshotService snapshotService, CoreStateMachines coreStateMachines, CoreSnapshotService snapshotService,
CommitStateHelper commitStateHelper ) CommitStateHelper commitStateHelper )
{ {
this.localDatabase = localDatabase; this.localDatabase = localDatabase;
this.startStopOnStoreCopy = startStopOnStoreCopy; this.enableDisableOnStoreCopy = enableDisableOnStoreCopy;
this.remoteStore = remoteStore; this.remoteStore = remoteStore;
this.catchUpClient = catchUpClient; this.catchUpClient = catchUpClient;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
Expand Down Expand Up @@ -129,7 +129,7 @@ boolean downloadSnapshot( CatchupAddressProvider addressProvider )
return false; return false;
} }


ensure( startStopOnStoreCopy::stop, "stop auxiliary services before store copy" ); ensure( enableDisableOnStoreCopy::disable, "disable auxiliary services before store copy" );
ensure( localDatabase::stopForStoreCopy, "stop local database for store copy" ); ensure( localDatabase::stopForStoreCopy, "stop local database for store copy" );


log.info( "Downloading snapshot from core server at %s", primary ); log.info( "Downloading snapshot from core server at %s", primary );
Expand Down Expand Up @@ -211,7 +211,7 @@ else if ( catchupResult != SUCCESS_END_OF_STREAM )
ensure( localDatabase::start, "start local database after store copy" ); ensure( localDatabase::start, "start local database after store copy" );


coreStateMachines.installCommitProcess( localDatabase.getCommitProcess() ); coreStateMachines.installCommitProcess( localDatabase.getCommitProcess() );
ensure( startStopOnStoreCopy::start, "start auxiliary services after store copy" ); ensure( enableDisableOnStoreCopy::enable, "start auxiliary services after store copy" );


return true; return true;
} }
Expand Down
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.helper;

import java.util.ArrayList;
import java.util.List;

public class CompositeEnableable implements Enableable
{
private final List<Enableable> enableables = new ArrayList<>();

public void add( Enableable enableable )
{
enableables.add( enableable );
}

@Override
public void enable()
{
enableables.forEach( Enableable::enable );
}

@Override
public void disable()
{
enableables.forEach( Enableable::disable );
}
}
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.helper;

public interface Enableable
{
void enable();

void disable();
}
Expand Up @@ -30,14 +30,15 @@
import java.net.BindException; import java.net.BindException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.neo4j.causalclustering.helper.Enableable;
import org.neo4j.helpers.ListenSocketAddress; import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider; import org.neo4j.logging.NullLogProvider;


public class Server extends LifecycleAdapter public class Server extends LifecycleAdapter implements Enableable
{ {
private final Log debugLog; private final Log debugLog;
private final Log userLog; private final Log userLog;
Expand All @@ -50,15 +51,17 @@ public class Server extends LifecycleAdapter


private EventLoopGroup workerGroup; private EventLoopGroup workerGroup;
private Channel channel; private Channel channel;
private boolean enabled = true;
private boolean stoppedByLifeCycle = true;


public Server( ChildInitializer childInitializer, LogProvider debugLogProvider, LogProvider userLogProvider, ListenSocketAddress listenAddress, public Server( ChildInitializer childInitializer, LogProvider debugLogProvider, LogProvider userLogProvider, ListenSocketAddress listenAddress,
String serverName ) String serverName )
{ {
this( childInitializer, null, debugLogProvider, userLogProvider, listenAddress, serverName ); this( childInitializer, null, debugLogProvider, userLogProvider, listenAddress, serverName );
} }


public Server( ChildInitializer childInitializer, ChannelInboundHandler parentHandler, LogProvider debugLogProvider, LogProvider userLogProvider, public Server( ChildInitializer childInitializer, ChannelInboundHandler parentHandler, LogProvider debugLogProvider, LogProvider userLogProvider,
ListenSocketAddress listenAddress, String serverName ) ListenSocketAddress listenAddress, String serverName )
{ {
this.childInitializer = childInitializer; this.childInitializer = childInitializer;
this.parentHandler = parentHandler; this.parentHandler = parentHandler;
Expand All @@ -76,6 +79,32 @@ public Server( ChildInitializer childInitializer, ListenSocketAddress listenAddr


@Override @Override
public synchronized void start() public synchronized void start()
{
stoppedByLifeCycle = false;
if ( !enabled )
{
debugLog.info( "Start call from lifecycle is ignored because server is disabled." );
}
else
{
doStart();
}
}

@Override
public void stop()
{
stoppedByLifeCycle = true;
doStop();
}

@Override
public void shutdown()
{
stoppedByLifeCycle = true;
}

private void doStart()
{ {
if ( channel != null ) if ( channel != null )
{ {
Expand All @@ -84,8 +113,7 @@ public synchronized void start()


workerGroup = new NioEventLoopGroup( 0, threadFactory ); workerGroup = new NioEventLoopGroup( 0, threadFactory );


ServerBootstrap bootstrap = new ServerBootstrap() ServerBootstrap bootstrap = new ServerBootstrap().group( workerGroup )
.group( workerGroup )
.channel( NioServerSocketChannel.class ) .channel( NioServerSocketChannel.class )
.option( ChannelOption.SO_REUSEADDR, Boolean.TRUE ) .option( ChannelOption.SO_REUSEADDR, Boolean.TRUE )
.localAddress( listenAddress.socketAddress() ) .localAddress( listenAddress.socketAddress() )
Expand Down Expand Up @@ -114,8 +142,12 @@ public synchronized void start()
} }
} }


@Override public boolean isRunnig()
public synchronized void stop() {
return channel != null;
}

private void doStop()
{ {
if ( channel == null ) if ( channel == null )
{ {
Expand Down Expand Up @@ -145,4 +177,25 @@ public ListenSocketAddress address()
{ {
return listenAddress; return listenAddress;
} }

@Override
public synchronized void enable()
{
enabled = true;
if ( !stoppedByLifeCycle )
{
doStart();
}
else
{
debugLog.info( "Server will not start. It was enabled but is stopped by lifecycle" );
}
}

@Override
public synchronized void disable()
{
enabled = false;
doStop();
}
} }
Expand Up @@ -61,6 +61,7 @@
import org.neo4j.causalclustering.handlers.DuplexPipelineWrapperFactory; import org.neo4j.causalclustering.handlers.DuplexPipelineWrapperFactory;
import org.neo4j.causalclustering.handlers.PipelineWrapper; import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory; import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.causalclustering.helper.CompositeEnableable;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy; import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.net.InstalledProtocolHandler; import org.neo4j.causalclustering.net.InstalledProtocolHandler;
Expand Down Expand Up @@ -285,7 +286,7 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule,


txPulling.add( copiedStoreRecovery ); txPulling.add( copiedStoreRecovery );


LifeSupport servicesToStopOnStoreCopy = new LifeSupport(); CompositeEnableable servicesToStopOnStoreCopy = new CompositeEnableable();


StoreCopyProcess storeCopyProcess = new StoreCopyProcess( fileSystem, pageCache, localDatabase, StoreCopyProcess storeCopyProcess = new StoreCopyProcess( fileSystem, pageCache, localDatabase,
copiedStoreRecovery, remoteStore, logProvider ); copiedStoreRecovery, remoteStore, logProvider );
Expand Down

0 comments on commit 22355d4

Please sign in to comment.