Skip to content

Commit

Permalink
Internal restarts no longer break bolt connections
Browse files Browse the repository at this point in the history
The Bolt server was not properly refreshing internal components
 it got access to and this caused internal restarts to try and
 access TransactionIdStore after it was closed, leading to errors
 about the neostore file being closed.
Passing instead a Supplier of the same solves the issue.

This is a partial backport of #9114
  • Loading branch information
digitalstain committed May 30, 2017
1 parent 9861cf1 commit 0163d19
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 23 deletions.
Expand Up @@ -29,7 +29,6 @@
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -47,7 +46,6 @@ public class LifecycleManagedBoltFactory extends LifecycleAdapter implements Bol

private QueryExecutionEngine queryExecutionEngine;
private GraphDatabaseQueryService queryService;
private TransactionIdStore transactionIdStore;
private AvailabilityGuard availabilityGuard;

public LifecycleManagedBoltFactory( GraphDatabaseAPI gds, UsageData usageData, LogService logging,
Expand All @@ -74,7 +72,6 @@ public void start() throws Throwable
DependencyResolver dependencyResolver = gds.getDependencyResolver();
queryExecutionEngine = dependencyResolver.resolveDependency( QueryExecutionEngine.class );
queryService = dependencyResolver.resolveDependency( GraphDatabaseQueryService.class );
transactionIdStore = dependencyResolver.resolveDependency( TransactionIdStore.class );
availabilityGuard = dependencyResolver.resolveDependency( AvailabilityGuard.class );
life.start();
}
Expand All @@ -95,8 +92,8 @@ public void shutdown() throws Throwable
public BoltStateMachine newMachine( String connectionDescriptor, Runnable onClose, Clock clock )
{
TransactionStateMachine.SPI transactionSPI =
new TransactionStateMachineSPI( gds, txBridge, queryExecutionEngine, transactionIdStore,
availabilityGuard, queryService, clock );
new TransactionStateMachineSPI( gds, txBridge, queryExecutionEngine, availabilityGuard, queryService,
clock );
BoltStateMachine.SPI boltSPI = new BoltStateMachineSPI( connectionDescriptor, usageData,
logging, authentication, connectionTracker, transactionSPI );
return new BoltStateMachine( boltSPI, onClose, Clock.systemUTC() );
Expand Down
Expand Up @@ -63,15 +63,15 @@ class TransactionStateMachineSPI implements TransactionStateMachine.SPI
TransactionStateMachineSPI( GraphDatabaseAPI db,
ThreadToStatementContextBridge txBridge,
QueryExecutionEngine queryExecutionEngine,
TransactionIdStore transactionIdStoreSupplier,
AvailabilityGuard availabilityGuard,
GraphDatabaseQueryService queryService,
Clock clock )
{
this.db = db;
this.txBridge = txBridge;
this.queryExecutionEngine = queryExecutionEngine;
this.transactionIdTracker = new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard );
this.transactionIdTracker = new TransactionIdTracker(
db.getDependencyResolver().provideDependency( TransactionIdStore.class ), availabilityGuard );
this.contextFactory = Neo4jTransactionalContextFactory.create( queryService, locker );
this.queryService = queryService;
this.clock = clock;
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.exceptions.Status;
Expand All @@ -39,13 +40,13 @@ public class TransactionIdTracker
private static final int POLL_INTERVAL = 25;
private static final TimeUnit POLL_UNIT = TimeUnit.MILLISECONDS;

private final TransactionIdStore transactionIdStore;
private final Supplier<TransactionIdStore> transactionIdStoreSupplier;
private final AvailabilityGuard availabilityGuard;

public TransactionIdTracker( TransactionIdStore transactionIdStore, AvailabilityGuard availabilityGuard )
public TransactionIdTracker( Supplier<TransactionIdStore> transactionIdStoreSupplier, AvailabilityGuard availabilityGuard )
{
this.availabilityGuard = availabilityGuard;
this.transactionIdStore = transactionIdStore;
this.transactionIdStoreSupplier = transactionIdStoreSupplier;
}

/**
Expand Down Expand Up @@ -82,7 +83,7 @@ public void awaitUpToDate( long oldestAcceptableTxId, Duration timeout ) throws
{
throw new TransactionFailureException( Status.Transaction.InstanceStateChanged,
"Database not up to the requested version: %d. Latest database version is %d", oldestAcceptableTxId,
transactionIdStore.getLastClosedTransactionId() );
transactionIdStore().getLastClosedTransactionId() );
}
}

Expand All @@ -93,7 +94,7 @@ private boolean isReady( long oldestAcceptableTxId ) throws TransactionFailureEx
throw new TransactionFailureException( Status.General.DatabaseUnavailable,
"Database had become unavailable while waiting for requested version %d.", oldestAcceptableTxId );
}
return oldestAcceptableTxId <= transactionIdStore.getLastClosedTransactionId();
return oldestAcceptableTxId <= transactionIdStore().getLastClosedTransactionId();
}

/**
Expand All @@ -105,6 +106,11 @@ private boolean isReady( long oldestAcceptableTxId ) throws TransactionFailureEx
*/
public long newestEncounteredTxId()
{
return transactionIdStore.getLastClosedTransactionId();
return transactionIdStore().getLastClosedTransactionId();
}

private TransactionIdStore transactionIdStore()
{
return transactionIdStoreSupplier.get();
}
}
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.api.txtracking;

import java.util.function.Supplier;

import org.junit.Test;

import org.neo4j.kernel.AvailabilityGuard;
Expand All @@ -36,17 +38,19 @@

public class TransactionIdTrackerTest
{
private final TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
private final Supplier<TransactionIdStore> transactionIdStoreSupplier = mock( Supplier.class );
private final AvailabilityGuard availabilityGuard = mock( AvailabilityGuard.class );

@Test( timeout = 500 )
public void shouldAlwaysReturnIfTheRequestVersionIsBaseTxIdOrLess() throws Exception
{
// given
TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
when( transactionIdStoreSupplier.get() ).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( -1L );
when( availabilityGuard.isAvailable() ).thenReturn( true );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( transactionIdStore, availabilityGuard );
new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard );

// when
transactionIdTracker.awaitUpToDate( BASE_TX_ID, ofSeconds( 5 ) );
Expand All @@ -59,10 +63,12 @@ public void shouldReturnIfTheVersionIsUpToDate() throws Exception
{
// given
long version = 5L;
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version );
TransactionIdStore transactionIdStore = mock(TransactionIdStore.class);
when( transactionIdStoreSupplier.get() ).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId()).thenReturn( version );
when( availabilityGuard.isAvailable() ).thenReturn( true );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( transactionIdStore, availabilityGuard );
new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard );

// when
transactionIdTracker.awaitUpToDate( version, ofSeconds( 5 ) );
Expand All @@ -75,10 +81,12 @@ public void shouldTimeoutIfTheVersionIsTooHigh() throws Exception
{
// given
long version = 5L;
TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
when( transactionIdStoreSupplier.get() ).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version );
when( availabilityGuard.isAvailable() ).thenReturn( true );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( transactionIdStore, availabilityGuard );
new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard );

// when
try
Expand All @@ -98,9 +106,12 @@ public void shouldGiveUpWaitingIfTheDatabaseIsUnavailable() throws Exception
{
// given
long version = 5L;
TransactionIdStore transactionIdStore = mock( TransactionIdStore.class );
when( transactionIdStoreSupplier.get() ).thenReturn( transactionIdStore );
when( transactionIdStore.getLastClosedTransactionId() ).thenReturn( version );
when( availabilityGuard.isAvailable() ).thenReturn( false );
TransactionIdTracker transactionIdTracker = new TransactionIdTracker( transactionIdStore, availabilityGuard );
TransactionIdTracker transactionIdTracker =
new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard );

// when
try
Expand Down
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.causalclustering.scenarios;

import java.util.function.Supplier;

import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -98,10 +100,10 @@ public void transactionsShouldNotAppearOnTheReadReplicaWhilePollingIsPaused() th

private TransactionIdTracker transactionIdTracker( GraphDatabaseAPI database )
{
TransactionIdStore transactionIdStore =
database.getDependencyResolver().resolveDependency( TransactionIdStore.class );
Supplier<TransactionIdStore> transactionIdStoreSupplier = database.getDependencyResolver().provideDependency(
TransactionIdStore.class );
AvailabilityGuard availabilityGuard =
database.getDependencyResolver().resolveDependency( AvailabilityGuard.class );
return new TransactionIdTracker( transactionIdStore, availabilityGuard );
return new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard );
}
}
Expand Up @@ -69,6 +69,7 @@
import org.neo4j.graphdb.factory.GraphDatabaseBuilder;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.graphdb.factory.HighlyAvailableGraphDatabaseFactory;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.io.pagecache.IOLimiter;
Expand Down Expand Up @@ -98,7 +99,6 @@
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableMap;

import static org.neo4j.graphdb.factory.GraphDatabaseSettings.boltConnector;
import static org.neo4j.helpers.ArrayUtil.contains;
import static org.neo4j.helpers.collection.Iterables.count;
Expand Down Expand Up @@ -1152,6 +1152,12 @@ public boolean isAvailable( ClusterMember clusterMember )
}
}

public String getBoltAddress( HighlyAvailableGraphDatabase db )
{
return "bolt://" + db.getDependencyResolver().resolveDependency( Config.class ).get(
new GraphDatabaseSettings.BoltConnector( "bolt" ).advertised_address ).toString();
}

/**
* @return the current master in the cluster.
* @throws IllegalStateException if there's no current master.
Expand Down Expand Up @@ -1317,6 +1323,11 @@ private RepairKit wrap( final RepairKit actual )
};
}

private AdvertisedSocketAddress socketAddressForServer( String advertisedAddress, InstanceId id )
{
return new AdvertisedSocketAddress( advertisedAddress, ( 5000 + id.toIntegerIndex() ) );
}

private void startMember( InstanceId serverId ) throws URISyntaxException, IOException
{
Cluster.Member member = spec.getMembers().get( serverId.toIntegerIndex() - firstInstanceId );
Expand Down Expand Up @@ -1351,12 +1362,25 @@ private void startMember( InstanceId serverId ) throws URISyntaxException, IOExc
storeDirInitializer.initializeStoreDir( serverId.toIntegerIndex(), storeDir );
}
GraphDatabaseBuilder builder = dbFactory.newEmbeddedDatabaseBuilder( storeDir.getAbsoluteFile() );

builder.setConfig( ClusterSettings.cluster_name, name );
builder.setConfig( ClusterSettings.initial_hosts, initialHosts.toString() );
builder.setConfig( ClusterSettings.server_id, serverId + "" );
builder.setConfig( ClusterSettings.cluster_server, "0.0.0.0:" + clusterPort );
builder.setConfig( HaSettings.ha_server, clusterUri.getHost() + ":" + haPort );
builder.setConfig( OnlineBackupSettings.online_backup_enabled, Settings.FALSE );

String listenAddress = "127.0.0.1";
int boltPort = 8000 + serverId.toIntegerIndex();
AdvertisedSocketAddress advertisedSocketAddress = socketAddressForServer( listenAddress, serverId );
String advertisedAddress = advertisedSocketAddress.getHostname();
String boltAdvertisedAddress = advertisedAddress + ":" + boltPort;

builder.setConfig( new GraphDatabaseSettings.BoltConnector( "bolt" ).type, "BOLT" );
builder.setConfig( new GraphDatabaseSettings.BoltConnector( "bolt" ).enabled, "true" );
builder.setConfig( new GraphDatabaseSettings.BoltConnector( "bolt" ).listen_address, listenAddress + ":" + boltPort );
builder.setConfig( new GraphDatabaseSettings.BoltConnector( "bolt" ).advertised_address.name(), boltAdvertisedAddress );

for ( Map.Entry<String,IntFunction<String>> conf : commonConfig.entrySet() )
{
builder.setConfig( conf.getKey(), conf.getValue().apply( serverId.toIntegerIndex() ) );
Expand Down
101 changes: 101 additions & 0 deletions integrationtests/src/test/java/org/neo4j/ha/BoltHAIT.java
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.ha;

import org.junit.Rule;
import org.junit.Test;

import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase;
import org.neo4j.kernel.impl.ha.ClusterManager;
import org.neo4j.test.ha.ClusterRule;

import static org.junit.Assert.assertEquals;
import static org.neo4j.driver.v1.Values.parameters;
import static org.neo4j.kernel.impl.ha.ClusterManager.clusterOfSize;
import static org.neo4j.kernel.impl.ha.ClusterManager.entireClusterSeesMemberAsNotAvailable;
import static org.neo4j.kernel.impl.ha.ClusterManager.masterSeesMembers;

public class BoltHAIT
{
@Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() ).withCluster( clusterOfSize( 3 ) );

@Test
public void shouldContinueServingBoltRequestsBetweenInteralRestarts() throws Throwable
{
// given
/*
* Interestingly, it is enough to simply start a slave and then direct sessions to it. The problem seems
* to arise immediately, since simply from startup to being into SLAVE at least one internal restart happens
* and that seems sufficient to break the bolt server.
* However, that would make the test really weird, so we'll start the cluster, make sure we can connect and
* then isolate the slave, make it shutdown internally, then have it rejoin and it will switch to slave.
* At the end of this process, it must still be possible to open and execute transactions against the instance.
*/
ClusterManager.ManagedCluster cluster = clusterRule.startCluster();
HighlyAvailableGraphDatabase slave1 = cluster.getAnySlave();

Driver driver = GraphDatabase.driver( cluster.getBoltAddress( slave1 ),
AuthTokens.basic( "neo4j", "neo4j" ) );

/*
* We'll use a bookmark to enforce use of kernel internals by the bolt server, to make sure that parts that are
* switched during an internal restart are actually refreshed. Technically, this is not necessary, since the
* bolt server makes such use for every request. But this puts a nice bow on top of it.
*/
String lastBookmark;

try ( Session session = driver.session() )
{
try ( Transaction tx = session.beginTransaction() )
{
tx.run( "CREATE (person:Person {name: {name}, title: {title}})",
parameters( "name", "Webber", "title", "Mr" ) );
tx.success();
}
lastBookmark = session.lastBookmark();
}

// when
ClusterManager.RepairKit slaveFailRK = cluster.fail( slave1 );

cluster.await( entireClusterSeesMemberAsNotAvailable( slave1 ) );
slaveFailRK.repair();
cluster.await( masterSeesMembers( 3 ) );

// then
try ( Session session = driver.session( AccessMode.READ ) )
{
try ( Transaction tx = session.beginTransaction( lastBookmark ) )
{
Record record = tx.run( "MATCH (n:Person) RETURN COUNT(*) AS count" ).next();
tx.success();
assertEquals( 1, record.get( "count" ).asInt() );
}
}
}
}

0 comments on commit 0163d19

Please sign in to comment.