Skip to content

Commit

Permalink
Raise availability guard when core or read replica copy store
Browse files Browse the repository at this point in the history
Instance might be forced to copy store from another instance if it requires to
pull a transaction log that has already been pruned away. This could happen
when transaction pulling (catch up) experience problems like network outage.
Database is not operational during store copy, most components including the
datasource are shutdown so no transactions can be served.

However it was possible for users (bolt or embedded) to attempt start new
transactions which would hang because of kernel components being stopped.
In similar situations availability guard must be raised to block new
transactions.

This commit makes `LocalDatabase` raise availability guard when it is stopped.
`LocalDatabase` is the component used by cores and read replicas to manage the
lifecycle of the datasource. It is restarted during store copying.
  • Loading branch information
lutovich committed Feb 20, 2017
1 parent bf2784c commit 3719737
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void start()
}

@Override
public void stop() throws Throwable
public void stop()
{
log.info( "CatchUpClient stopping" );
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.AvailabilityGuard.AvailabilityRequirement;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
Expand All @@ -40,15 +42,20 @@
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.StorageEngine;

import static org.neo4j.kernel.AvailabilityGuard.availabilityRequirement;

public class LocalDatabase implements Lifecycle
{
private static final AvailabilityRequirement NOT_STOPPED = availabilityRequirement( "Database is stopped" );

private final File storeDir;

private final StoreFiles storeFiles;
private final DataSourceManager dataSourceManager;
private final PageCache pageCache;
private final FileSystemAbstraction fileSystemAbstraction;
private final Supplier<DatabaseHealth> databaseHealthSupplier;
private final AvailabilityGuard availabilityGuard;
private final Log log;

private volatile StoreId storeId;
Expand All @@ -60,15 +67,19 @@ public class LocalDatabase implements Lifecycle
public LocalDatabase( File storeDir, StoreFiles storeFiles,
DataSourceManager dataSourceManager,
PageCache pageCache, FileSystemAbstraction fileSystemAbstraction,
Supplier<DatabaseHealth> databaseHealthSupplier, LogProvider logProvider )
Supplier<DatabaseHealth> databaseHealthSupplier, AvailabilityGuard availabilityGuard,
LogProvider logProvider )
{
this.storeDir = storeDir;
this.storeFiles = storeFiles;
this.dataSourceManager = dataSourceManager;
this.pageCache = pageCache;
this.fileSystemAbstraction = fileSystemAbstraction;
this.databaseHealthSupplier = databaseHealthSupplier;
this.availabilityGuard = availabilityGuard;
this.log = logProvider.getLog( getClass() );

raiseAvailabilityGuard();
}

@Override
Expand All @@ -85,6 +96,7 @@ public synchronized void start() throws Throwable

dataSourceManager.start();

dropAvailabilityGuard();
started = true;
}

Expand All @@ -96,6 +108,7 @@ public synchronized void stop() throws Throwable
localCommit = null;
dataSourceManager.stop();

raiseAvailabilityGuard();
started = false;
}

Expand Down Expand Up @@ -212,4 +225,14 @@ public TransactionCommitProcess getCommitProcess()
{
return localCommit;
}

private void raiseAvailabilityGuard()
{
availabilityGuard.require( NOT_STOPPED );
}

private void dropAvailabilityGuard()
{
availabilityGuard.fulfill( NOT_STOPPED );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke

LocalDatabase localDatabase = new LocalDatabase( platformModule.storeDir, new StoreFiles( fileSystem ),
platformModule.dataSourceManager, platformModule.pageCache, fileSystem, databaseHealthSupplier,
logProvider );
platformModule.availabilityGuard, logProvider );

IdentityModule identityModule = new IdentityModule( platformModule, clusterStateDirectory.get() );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@
import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.storecopy.CopiedStoreRecovery;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.RemoteStore;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyClient;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.catchup.storecopy.RemoteStore;
import org.neo4j.causalclustering.catchup.storecopy.StoreFiles;
import org.neo4j.causalclustering.catchup.tx.BatchingTxApplier;
import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TxPullClient;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.discovery.procedures.ReadReplicaRoleProcedure;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.messaging.routing.ConnectToRandomCoreMember;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
Expand Down Expand Up @@ -200,6 +200,7 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
pageCache,
fileSystem,
databaseHealthSupplier,
platformModule.availabilityGuard,
logProvider );

RemoteStore remoteStore = new RemoteStore( platformModule.logging.getInternalLogProvider(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.catchup.storecopy;

import org.junit.Test;

import java.io.File;
import java.time.Clock;

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.logging.NullLog;
import org.neo4j.logging.NullLogProvider;

import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

public class LocalDatabaseTest
{
@Test
public void availabilityGuardRaisedOnCreation() throws Throwable
{
AvailabilityGuard guard = newAvailabilityGuard();
assertTrue( guard.isAvailable() );
LocalDatabase localDatabase = newLocalDatabase( guard );

assertNotNull( localDatabase );
assertDatabaseIsStoppedAndUnavailable( guard );
}

@Test
public void availabilityGuardDroppedOnStart() throws Throwable
{
AvailabilityGuard guard = newAvailabilityGuard();
assertTrue( guard.isAvailable() );

LocalDatabase localDatabase = newLocalDatabase( guard );
assertFalse( guard.isAvailable() );

localDatabase.start();
assertTrue( guard.isAvailable() );
}

@Test
public void availabilityGuardRaisedOnStop() throws Throwable
{
AvailabilityGuard guard = newAvailabilityGuard();
assertTrue( guard.isAvailable() );

LocalDatabase localDatabase = newLocalDatabase( guard );
assertFalse( guard.isAvailable() );

localDatabase.start();
assertTrue( guard.isAvailable() );

localDatabase.stop();
assertDatabaseIsStoppedAndUnavailable( guard );
}

private static LocalDatabase newLocalDatabase( AvailabilityGuard availabilityGuard )
{
return new LocalDatabase( mock( File.class ), mock( StoreFiles.class ), mock( DataSourceManager.class ),
mock( PageCache.class ), mock( FileSystemAbstraction.class ), () -> mock( DatabaseHealth.class ),
availabilityGuard, NullLogProvider.getInstance() );
}

private static AvailabilityGuard newAvailabilityGuard()
{
return new AvailabilityGuard( Clock.systemUTC(), NullLog.getInstance() );
}

private static void assertDatabaseIsStoppedAndUnavailable( AvailabilityGuard guard )
{
assertFalse( guard.isAvailable() );
assertThat( guard.describeWhoIsBlocking(), containsString( "Database is stopped" ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public String routingURI()
}

@Override

public void start()
{
database = new ReadReplicaGraphDatabase( storeDir, config,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.scenarios;

import org.junit.Rule;
import org.junit.Test;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.neo4j.causalclustering.catchup.tx.FileCopyMonitor;
import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.causalclustering.discovery.ReadReplica;
import org.neo4j.causalclustering.readreplica.ReadReplicaGraphDatabase;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.causalclustering.ClusterRule;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.neo4j.kernel.configuration.Settings.FALSE;
import static org.neo4j.test.assertion.Assert.assertEventually;

public class ReadReplicaStoreCopyIT
{
@Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() )
.withSharedCoreParam( GraphDatabaseSettings.keep_logical_logs, FALSE )
.withNumberOfCoreMembers( 3 )
.withNumberOfReadReplicas( 1 );

@Test( timeout = 120_000 )
public void shouldNotBePossibleToStartTransactionsWhenReadReplicaCopiesStore() throws Throwable
{
Cluster cluster = clusterRule.startCluster();

ReadReplica readReplica = cluster.findAnyReadReplica();

readReplica.txPollingClient().stop();

writeSomeDataAndForceLogRotations( cluster );
Semaphore storeCopyBlockingSemaphore = addStoreCopyBlockingMonitor( readReplica );
try
{
readReplica.txPollingClient().start();
waitForStoreCopyToStartAndBlock( storeCopyBlockingSemaphore );

ReadReplicaGraphDatabase replicaGraphDatabase = readReplica.database();
try
{
replicaGraphDatabase.beginTx();
fail( "Exception expected" );
}
catch ( Exception e )
{
assertThat( e, instanceOf( TransactionFailureException.class ) );
}
}
finally
{
// release all waiters of the semaphore
storeCopyBlockingSemaphore.release( Integer.MAX_VALUE );
}
}

private static void writeSomeDataAndForceLogRotations( Cluster cluster ) throws Exception
{
for ( int i = 0; i < 20; i++ )
{
cluster.coreTx( ( db, tx ) ->
{
db.execute( "CREATE ()" );
tx.success();
} );

forceLogRotationOnAllCores( cluster );
}
}

private static void forceLogRotationOnAllCores( Cluster cluster )
{
for ( CoreClusterMember core : cluster.coreMembers() )
{
forceLogRotationAndPruning( core );
}
}

private static void forceLogRotationAndPruning( CoreClusterMember core )
{
try
{
DependencyResolver dependencyResolver = core.database().getDependencyResolver();
dependencyResolver.resolveDependency( LogRotation.class ).rotateLogFile();
SimpleTriggerInfo info = new SimpleTriggerInfo( "test" );
dependencyResolver.resolveDependency( CheckPointer.class ).forceCheckPoint( info );
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}

private static Semaphore addStoreCopyBlockingMonitor( ReadReplica readReplica )
{
DependencyResolver dependencyResolver = readReplica.database().getDependencyResolver();
Monitors monitors = dependencyResolver.resolveDependency( Monitors.class );

Semaphore semaphore = new Semaphore( 0 );

monitors.addMonitorListener( (FileCopyMonitor) file ->
{
try
{
semaphore.acquire();
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
throw new RuntimeException( e );
}
} );

return semaphore;
}

private static void waitForStoreCopyToStartAndBlock( Semaphore storeCopyBlockingSemaphore ) throws Exception
{
assertEventually( "Read replica did not copy files", storeCopyBlockingSemaphore::hasQueuedThreads,
is( true ), 60, TimeUnit.SECONDS );
}
}

0 comments on commit 3719737

Please sign in to comment.