From c78064829797f728b387b93ded17694f0aca7955 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 16 Jan 2017 16:19:37 +0100 Subject: [PATCH] Fix store file checking in StreamToDisk * Also clean up various causal clustering test code. * Also make sure that the Cluster and ClusterRule make sure that the cluster-wide and instance configurations are properly passed through to late coming cluster member additions. --- .../catchup/storecopy/StreamToDisk.java | 21 ++-- .../causalclustering/discovery/Cluster.java | 115 +++++++++--------- .../discovery/ReadReplica.java | 8 -- .../scenarios/ClusterIdentityIT.java | 4 + .../scenarios/CoreToCoreCopySnapshotIT.java | 11 +- .../scenarios/ReadReplicaReplicationIT.java | 29 +---- .../scenarios/SampleData.java | 23 ++-- 7 files changed, 102 insertions(+), 109 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StreamToDisk.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StreamToDisk.java index 5352e9eb65805..3d4334e7b43ae 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StreamToDisk.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StreamToDisk.java @@ -32,6 +32,7 @@ import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PagedFile; +import org.neo4j.kernel.impl.store.StoreType; import org.neo4j.kernel.monitoring.Monitors; class StreamToDisk implements StoreFileStreams @@ -50,8 +51,8 @@ class StreamToDisk implements StoreFileStreams this.pageCache = pageCache; fs.mkdirs( storeDir ); this.fileCopyMonitor = monitors.newMonitor( FileCopyMonitor.class ); - channels = new HashMap(); - pagedFiles = new HashMap(); + channels = new HashMap<>(); + pagedFiles = new HashMap<>(); } @@ -62,14 +63,7 @@ public void write( String destination, int requiredAlignment, byte[] data ) thro fs.mkdirs( fileName.getParentFile() ); fileCopyMonitor.copyFile( fileName ); - if ( destination.endsWith( ".id" ) ) - { - try ( OutputStream outputStream = fs.openAsOutputStream( fileName, true ) ) - { - outputStream.write( data ); - } - } - else + if ( StoreType.typeOf( destination ).map( StoreType::isRecordStore ).orElse( false ) ) { WritableByteChannel channel = channels.get( destination ); if ( channel == null ) @@ -87,6 +81,13 @@ public void write( String destination, int requiredAlignment, byte[] data ) thro channel.write( buffer ); } } + else + { + try ( OutputStream outputStream = fs.openAsOutputStream( fileName, true ) ) + { + outputStream.write( data ); + } + } } @Override diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/Cluster.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/Cluster.java index 7e1d4dd8b34d3..80bc1b3715c78 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/Cluster.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/Cluster.java @@ -60,7 +60,6 @@ import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException; import org.neo4j.test.DbRepresentation; -import static java.util.Collections.emptyMap; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static org.neo4j.concurrent.Futures.combine; @@ -68,7 +67,6 @@ import static org.neo4j.function.Predicates.awaitEx; import static org.neo4j.function.Predicates.notNull; import static org.neo4j.helpers.collection.Iterables.firstOrNull; -import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.kernel.api.exceptions.Status.Transaction.LockSessionExpired; public class Cluster @@ -77,19 +75,29 @@ public class Cluster private static final int DEFAULT_CLUSTER_SIZE = 3; private final File parentDir; + private final Map coreParams; + private final Map> instanceCoreParams; + private final Map readReplicaParams; + private final Map> instanceReadReplicaParams; + private final String recordFormat; private final DiscoveryServiceFactory discoveryServiceFactory; - private Map coreMembers = new ConcurrentHashMap<>(); + private Map coreMembers = new ConcurrentHashMap<>(); private Map readReplicas = new ConcurrentHashMap<>(); public Cluster( File parentDir, int noOfCoreMembers, int noOfReadReplicas, DiscoveryServiceFactory discoveryServiceFactory, - Map coreParams, Map> instanceCoreParams, - Map readReplicaParams, Map> instanceReadReplicaParams, + Map coreParams, Map> instanceCoreParams, + Map readReplicaParams, Map> instanceReadReplicaParams, String recordFormat ) { this.discoveryServiceFactory = discoveryServiceFactory; this.parentDir = parentDir; + this.coreParams = coreParams; + this.instanceCoreParams = instanceCoreParams; + this.readReplicaParams = readReplicaParams; + this.instanceReadReplicaParams = instanceReadReplicaParams; + this.recordFormat = recordFormat; HashSet coreServerIds = new HashSet<>(); for ( int i = 0; i < noOfCoreMembers; i++ ) { @@ -97,7 +105,8 @@ public Cluster( File parentDir, int noOfCoreMembers, int noOfReadReplicas, } List initialHosts = buildAddresses( coreServerIds ); createCoreMembers( noOfCoreMembers, initialHosts, coreParams, instanceCoreParams, recordFormat ); - createReadReplicas( noOfReadReplicas, initialHosts, readReplicaParams, instanceReadReplicaParams, recordFormat ); + createReadReplicas( noOfReadReplicas, initialHosts, readReplicaParams, instanceReadReplicaParams, + recordFormat ); } public void start() throws InterruptedException, ExecutionException @@ -114,15 +123,6 @@ public void start() throws InterruptedException, ExecutionException } } - private void waitForReadReplicas( CompletionService readReplicaGraphDatabaseCompletionService ) throws - InterruptedException, ExecutionException - { - for ( int i = 0; i < readReplicas.size(); i++ ) - { - readReplicaGraphDatabaseCompletionService.take().get(); - } - } - public Set healthyCoreMembers() { return coreMembers.values().stream() @@ -143,38 +143,50 @@ public ReadReplica getReadReplicaById( int memberId ) public CoreClusterMember addCoreMemberWithId( int memberId ) { - return addCoreMemberWithId( memberId, stringMap(), emptyMap(), Standard.LATEST_NAME ); + List advertisedAddress = buildAddresses( coreMembers.keySet() ); + return addCoreMemberWithId( memberId, coreParams, instanceCoreParams, recordFormat, advertisedAddress ); + } + + public CoreClusterMember addCoreMemberWithIdAndInitialMembers( + int memberId, List initialMembers ) + { + return addCoreMemberWithId( memberId, coreParams, instanceCoreParams, recordFormat, initialMembers ); } - public CoreClusterMember addCoreMemberWithIdAndInitialMembers( int memberId, - List initialMembers ) + private CoreClusterMember addCoreMemberWithId( + int memberId, + Map extraParams, + Map> instanceExtraParams, + String recordFormat, + List advertisedAddress ) { - CoreClusterMember coreClusterMember = new CoreClusterMember( memberId, DEFAULT_CLUSTER_SIZE, initialMembers, - discoveryServiceFactory, Standard.LATEST_NAME, parentDir, - emptyMap(), emptyMap() ); + CoreClusterMember coreClusterMember = new CoreClusterMember( + memberId, DEFAULT_CLUSTER_SIZE, advertisedAddress, discoveryServiceFactory, recordFormat, parentDir, + extraParams, instanceExtraParams ); coreMembers.put( memberId, coreClusterMember ); return coreClusterMember; } public ReadReplica addReadReplicaWithIdAndRecordFormat( int memberId, String recordFormat ) { - List hazelcastAddresses = buildAddresses( coreMembers.keySet() ); - ReadReplica member = new ReadReplica( parentDir, memberId, discoveryServiceFactory, - hazelcastAddresses, stringMap(), emptyMap(), recordFormat ); - readReplicas.put( memberId, member ); - return member; + return addReadReplica( memberId, recordFormat, new Monitors() ); } public ReadReplica addReadReplicaWithId( int memberId ) { - return addReadReplicaWithIdAndRecordFormat( memberId, Standard.LATEST_NAME ); + return addReadReplicaWithIdAndRecordFormat( memberId, recordFormat ); } public ReadReplica addReadReplicaWithIdAndMonitors( int memberId, Monitors monitors ) + { + return addReadReplica( memberId, recordFormat, monitors ); + } + + private ReadReplica addReadReplica( int memberId, String recordFormat, Monitors monitors ) { List hazelcastAddresses = buildAddresses( coreMembers.keySet() ); ReadReplica member = new ReadReplica( parentDir, memberId, discoveryServiceFactory, - hazelcastAddresses, stringMap(), emptyMap(), Standard.LATEST_NAME, monitors ); + hazelcastAddresses, readReplicaParams, instanceReadReplicaParams, recordFormat, monitors ); readReplicas.put( memberId, member ); return member; } @@ -292,7 +304,8 @@ public CoreClusterMember awaitLeader( long timeout, TimeUnit timeUnit ) throws T return awaitCoreMemberWithRole( Role.LEADER, timeout, timeUnit ); } - public CoreClusterMember awaitCoreMemberWithRole( Role role, long timeout, TimeUnit timeUnit ) throws TimeoutException + public CoreClusterMember awaitCoreMemberWithRole( Role role, long timeout, TimeUnit timeUnit ) + throws TimeoutException { return await( () -> getDbWithRole( role ), notNull(), timeout, timeUnit ); } @@ -309,29 +322,20 @@ public int numberOfCoreMembersReportedByTopology() /** * Perform a transaction against the core cluster, selecting the target and retrying as necessary. */ - public CoreClusterMember coreTx( BiConsumer op ) throws Exception + public CoreClusterMember coreTx( BiConsumer op ) throws Exception { // this currently wraps the leader-only strategy, since it is the recommended and only approach return leaderTx( op, DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); } - private CoreClusterMember addCoreMemberWithId( int memberId, Map extraParams, Map> instanceExtraParams, String recordFormat ) - { - List advertisedAddress = buildAddresses( coreMembers.keySet() ); - CoreClusterMember coreClusterMember = new CoreClusterMember( memberId, DEFAULT_CLUSTER_SIZE, advertisedAddress, - discoveryServiceFactory, recordFormat, parentDir, - extraParams, instanceExtraParams ); - coreMembers.put( memberId, coreClusterMember ); - return coreClusterMember; - } - /** * Perform a transaction against the leader of the core cluster, retrying as necessary. */ private CoreClusterMember leaderTx( BiConsumer op, int timeout, TimeUnit timeUnit ) throws Exception { - ThrowingSupplier supplier = () -> { + ThrowingSupplier supplier = () -> + { CoreClusterMember member = awaitLeader( timeout, timeUnit ); CoreGraphDatabase db = member.database(); if ( db == null ) @@ -367,29 +371,28 @@ private boolean isTransientFailure( Throwable e ) // TODO: This should really catch all cases of transient failures. Must be able to express that in a clearer // manner... return (e instanceof IdGenerationException) || isLockExpired( e ) || isLockOnFollower( e ) || - isWriteNotOnLeader( e ); - + isWriteNotOnLeader( e ); } private boolean isWriteNotOnLeader( Throwable e ) { return e instanceof WriteOperationsNotAllowedException && - e.getMessage().startsWith( String.format( LeaderCanWrite.NOT_LEADER_ERROR_MSG, "" ) ); + e.getMessage().startsWith( String.format( LeaderCanWrite.NOT_LEADER_ERROR_MSG, "" ) ); } private boolean isLockOnFollower( Throwable e ) { return e instanceof AcquireLockTimeoutException && - (e.getMessage().equals( LeaderOnlyLockManager.LOCK_NOT_ON_LEADER_ERROR_MESSAGE ) || - e.getCause() instanceof NoLeaderFoundException); + (e.getMessage().equals( LeaderOnlyLockManager.LOCK_NOT_ON_LEADER_ERROR_MESSAGE ) || + e.getCause() instanceof NoLeaderFoundException); } private boolean isLockExpired( Throwable e ) { return e instanceof TransactionFailureException && - e.getCause() instanceof org.neo4j.kernel.api.exceptions.TransactionFailureException && - ((org.neo4j.kernel.api.exceptions.TransactionFailureException) e.getCause()).status() == - LockSessionExpired; + e.getCause() instanceof org.neo4j.kernel.api.exceptions.TransactionFailureException && + ((org.neo4j.kernel.api.exceptions.TransactionFailureException) e.getCause()).status() == + LockSessionExpired; } public static List buildAddresses( Set coreServerIds ) @@ -403,8 +406,8 @@ public static AdvertisedSocketAddress socketAddressForServer( int id ) } private void createCoreMembers( final int noOfCoreMembers, - List addresses, Map extraParams, - Map> instanceExtraParams, String recordFormat ) + List addresses, Map extraParams, + Map> instanceExtraParams, String recordFormat ) { for ( int i = 0; i < noOfCoreMembers; i++ ) { @@ -453,15 +456,15 @@ private void startReadReplicas( ExecutorService executor ) throws InterruptedExc } private void createReadReplicas( int noOfReadReplicas, - final List coreMemberAddresses, - Map extraParams, - Map> instanceExtraParams, - String recordFormat ) + final List coreMemberAddresses, + Map extraParams, + Map> instanceExtraParams, + String recordFormat ) { for ( int i = 0; i < noOfReadReplicas; i++ ) { readReplicas.put( i, new ReadReplica( parentDir, i, discoveryServiceFactory, coreMemberAddresses, - extraParams, instanceExtraParams, recordFormat ) ); + extraParams, instanceExtraParams, recordFormat, new Monitors() ) ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java index 4237741e678c8..22121c20ee33d 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/discovery/ReadReplica.java @@ -52,14 +52,6 @@ public class ReadReplica implements ClusterMember private ReadReplicaGraphDatabase database; private Monitors monitors; - public ReadReplica( File parentDir, int memberId, DiscoveryServiceFactory discoveryServiceFactory, - List coreMemberHazelcastAddresses, Map extraParams, - Map> instanceExtraParams, String recordFormat ) - { - this( parentDir, memberId, discoveryServiceFactory, coreMemberHazelcastAddresses, extraParams, - instanceExtraParams, recordFormat, new Monitors() ); - } - public ReadReplica( File parentDir, int memberId, DiscoveryServiceFactory discoveryServiceFactory, List coreMemberHazelcastAddresses, Map extraParams, Map> instanceExtraParams, String recordFormat, Monitors monitors ) diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterIdentityIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterIdentityIT.java index 82bb5ef712bc8..3b23e5fcba0d0 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterIdentityIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ClusterIdentityIT.java @@ -81,6 +81,10 @@ public void setup() throws Exception { fs = fileSystemRule.get(); cluster = clusterRule.startCluster(); + cluster.coreTx( (db,tx) -> { + SampleData.createSchema( db ); + tx.success(); + } ); } @Test diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreToCoreCopySnapshotIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreToCoreCopySnapshotIT.java index 7ffd53884da1c..72ee09b40544e 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreToCoreCopySnapshotIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreToCoreCopySnapshotIT.java @@ -22,6 +22,7 @@ import org.junit.Rule; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.time.Clock; import java.util.Map; @@ -76,14 +77,20 @@ public void shouldBeAbleToDownloadLargerFreshSnapshot() throws Exception // shutdown the follower, remove the store, restart follower.shutdown(); - FileUtils.deleteRecursively( follower.storeDir() ); - FileUtils.deleteRecursively( follower.clusterStateDirectory() ); + deleteDirectoryRecursively( follower.storeDir() ); + deleteDirectoryRecursively( follower.clusterStateDirectory() ); follower.start(); // then assertEquals( DbRepresentation.of( source.database() ), DbRepresentation.of( follower.database() ) ); } + protected void deleteDirectoryRecursively( File directory ) throws IOException + { + // Extracted to the inheriting test in the block device repository can override it. + FileUtils.deleteRecursively( directory ); + } + @Test public void shouldBeAbleToDownloadToNewInstanceAfterPruning() throws Exception { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java index f7f1c5f85c113..d6a62f508c2c1 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ReadReplicaReplicationIT.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -71,7 +70,6 @@ import org.neo4j.test.DbRepresentation; import org.neo4j.test.causalclustering.ClusterRule; -import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; @@ -86,6 +84,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import static org.neo4j.causalclustering.scenarios.SampleData.createData; import static org.neo4j.com.storecopy.StoreUtil.TEMP_COPY_DIRECTORY_NAME; import static org.neo4j.function.Predicates.awaitEx; import static org.neo4j.helpers.collection.Iterables.count; @@ -386,7 +385,7 @@ public void shouldBeAbleToPullTxAfterHavingDownloadedANewStoreAfterPruning() thr // when cluster.coreTx( (db, tx) -> { - createData( db ); + createData( db, 10 ); tx.success(); } ); @@ -526,29 +525,7 @@ private long versionBy( File raftLogDir, BinaryOperator operator ) throws private final BiConsumer createSomeData = ( db, tx ) -> { - createData( db ); + createData( db, 10 ); tx.success(); }; - - private void createData( GraphDatabaseService db ) - { - createData( db, 10 ); - } - - private void createData( GraphDatabaseService db, int amount ) - { - for ( int i = 0; i < amount; i++ ) - { - Node node = db.createNode(Label.label( "Foo" )); - node.setProperty( "foobar", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar1", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar2", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar3", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar4", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar5", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar6", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar7", format( "baz_bat%s", UUID.randomUUID() ) ); - node.setProperty( "foobar8", format( "baz_bat%s", UUID.randomUUID() ) ); - } - } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/SampleData.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/SampleData.java index c66e5636751cb..699b5de4bb97c 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/SampleData.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/SampleData.java @@ -21,39 +21,48 @@ import org.neo4j.causalclustering.discovery.Cluster; import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Label; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Relationship; import org.neo4j.graphdb.RelationshipType; import static org.neo4j.graphdb.Label.label; -class SampleData +public class SampleData { - static void createSomeData( int items, Cluster cluster ) throws Exception + private static final Label LABEL = label( "ExampleNode" ); + private static final String PROPERTY_KEY = "prop"; + + public static void createSomeData( int items, Cluster cluster ) throws Exception { for ( int i = 0; i < items; i++ ) { cluster.coreTx( ( db, tx ) -> { - Node node = db.createNode( label( "boo" ) ); + Node node = db.createNode( LABEL ); node.setProperty( "foobar", "baz_bat" ); tx.success(); } ); } } - static void createData( GraphDatabaseService db, int size ) + public static void createData( GraphDatabaseService db, int size ) { for ( int i = 0; i < size; i++ ) { - Node node1 = db.createNode(); - Node node2 = db.createNode(); + Node node1 = db.createNode( LABEL ); + Node node2 = db.createNode( LABEL ); - node1.setProperty( "hej", "svej" ); + node1.setProperty( PROPERTY_KEY, "svej" + i ); node2.setProperty( "tjabba", "tjena" ); Relationship rel = node1.createRelationshipTo( node2, RelationshipType.withName( "halla" ) ); rel.setProperty( "this", "that" ); } } + + public static void createSchema( GraphDatabaseService db ) + { + db.schema().constraintFor( LABEL ).assertPropertyIsUnique( PROPERTY_KEY ).create(); + } }