Skip to content

Commit

Permalink
revert a few superfluous changes in test-cluster code
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Feb 27, 2017
1 parent cdb7592 commit 8f44a73
Showing 1 changed file with 38 additions and 41 deletions.
Expand Up @@ -54,21 +54,18 @@
import org.neo4j.graphdb.security.WriteOperationsNotAllowedException;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.impl.store.format.standard.StandardV3_0;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException;
import org.neo4j.test.DbRepresentation;

import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.neo4j.concurrent.Futures.combine;
import static org.neo4j.function.Predicates.await;
import static org.neo4j.function.Predicates.awaitEx;
import static org.neo4j.function.Predicates.notNull;
import static org.neo4j.helpers.collection.Iterables.firstOrNull;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.kernel.api.exceptions.Status.Transaction.LockSessionExpired;

public class Cluster
Expand All @@ -77,19 +74,28 @@ public class Cluster
private static final int DEFAULT_CLUSTER_SIZE = 3;

private final File parentDir;
private final Map<String,String> coreParams;
private final Map<String,IntFunction<String>> instanceCoreParams;
private final Map<String,String> readReplicaParams;
private final Map<String,IntFunction<String>> instanceReadReplicaParams;
private final String recordFormat;
private final DiscoveryServiceFactory discoveryServiceFactory;

private Map<Integer, CoreClusterMember> coreMembers = new ConcurrentHashMap<>();
private Map<Integer,CoreClusterMember> coreMembers = new ConcurrentHashMap<>();
private Map<Integer,ReadReplica> readReplicas = new ConcurrentHashMap<>();

public Cluster( File parentDir, int noOfCoreMembers, int noOfReadReplicas,
DiscoveryServiceFactory discoveryServiceFactory,
Map<String, String> coreParams, Map<String, IntFunction<String>> instanceCoreParams,
Map<String, String> readReplicaParams, Map<String, IntFunction<String>> instanceReadReplicaParams,
String recordFormat )
DiscoveryServiceFactory discoveryServiceFactory, Map<String,String> coreParams,
Map<String,IntFunction<String>> instanceCoreParams, Map<String,String> readReplicaParams,
Map<String,IntFunction<String>> instanceReadReplicaParams, String recordFormat )
{
this.discoveryServiceFactory = discoveryServiceFactory;
this.parentDir = parentDir;
this.coreParams = coreParams;
this.instanceCoreParams = instanceCoreParams;
this.readReplicaParams = readReplicaParams;
this.instanceReadReplicaParams = instanceReadReplicaParams;
this.recordFormat = recordFormat;
HashSet<Integer> coreServerIds = new HashSet<>();
for ( int i = 0; i < noOfCoreMembers; i++ )
{
Expand All @@ -114,15 +120,6 @@ public void start() throws InterruptedException, ExecutionException
}
}

private void waitForReadReplicas( CompletionService<ReadReplicaGraphDatabase> readReplicaGraphDatabaseCompletionService ) throws
InterruptedException, ExecutionException
{
for ( int i = 0; i < readReplicas.size(); i++ )
{
readReplicaGraphDatabaseCompletionService.take().get();
}
}

public Set<CoreClusterMember> healthyCoreMembers()
{
return coreMembers.values().stream()
Expand All @@ -143,38 +140,48 @@ public ReadReplica getReadReplicaById( int memberId )

public CoreClusterMember addCoreMemberWithId( int memberId )
{
return addCoreMemberWithId( memberId, stringMap(), emptyMap(), StandardV3_0.NAME );
List<AdvertisedSocketAddress> advertisedAddress = buildAddresses( coreMembers.keySet() );
return addCoreMemberWithId( memberId, coreParams, instanceCoreParams, recordFormat, advertisedAddress );
}

public CoreClusterMember addCoreMemberWithIdAndInitialMembers( int memberId,
List<AdvertisedSocketAddress> initialMembers )
{
CoreClusterMember coreClusterMember = new CoreClusterMember( memberId, DEFAULT_CLUSTER_SIZE, initialMembers,
discoveryServiceFactory, StandardV3_0.NAME, parentDir,
emptyMap(), emptyMap() );
return addCoreMemberWithId( memberId, coreParams, instanceCoreParams, recordFormat, initialMembers );
}

private CoreClusterMember addCoreMemberWithId( int memberId, Map<String,String> extraParams,
Map<String,IntFunction<String>> instanceExtraParams, String recordFormat,
List<AdvertisedSocketAddress> advertisedAddress )
{
CoreClusterMember coreClusterMember =
new CoreClusterMember( memberId, DEFAULT_CLUSTER_SIZE, advertisedAddress, discoveryServiceFactory,
recordFormat, parentDir, extraParams, instanceExtraParams );
coreMembers.put( memberId, coreClusterMember );
return coreClusterMember;
}

public ReadReplica addReadReplicaWithIdAndRecordFormat( int memberId, String recordFormat )
{
List<AdvertisedSocketAddress> hazelcastAddresses = buildAddresses( coreMembers.keySet() );
ReadReplica member = new ReadReplica( parentDir, memberId, discoveryServiceFactory,
hazelcastAddresses, stringMap(), emptyMap(), recordFormat, new Monitors() );
readReplicas.put( memberId, member );
return member;
return addReadReplica( memberId, recordFormat, new Monitors() );
}

public ReadReplica addReadReplicaWithId( int memberId )
{
return addReadReplicaWithIdAndRecordFormat( memberId, StandardV3_0.NAME );
return addReadReplicaWithIdAndRecordFormat( memberId, recordFormat );
}

public ReadReplica addReadReplicaWithIdAndMonitors( int memberId, Monitors monitors )
{
return addReadReplica( memberId, recordFormat, monitors );
}

private ReadReplica addReadReplica( int memberId, String recordFormat, Monitors monitors )
{
List<AdvertisedSocketAddress> hazelcastAddresses = buildAddresses( coreMembers.keySet() );
ReadReplica member = new ReadReplica( parentDir, memberId, discoveryServiceFactory,
hazelcastAddresses, stringMap(), emptyMap(), StandardV3_0.NAME, monitors );
ReadReplica member =
new ReadReplica( parentDir, memberId, discoveryServiceFactory, hazelcastAddresses, readReplicaParams,
instanceReadReplicaParams, recordFormat, monitors );
readReplicas.put( memberId, member );
return member;
}
Expand Down Expand Up @@ -309,22 +316,12 @@ public int numberOfCoreMembersReportedByTopology()
/**
* Perform a transaction against the core cluster, selecting the target and retrying as necessary.
*/
public CoreClusterMember coreTx( BiConsumer<CoreGraphDatabase, Transaction> op ) throws Exception
public CoreClusterMember coreTx( BiConsumer<CoreGraphDatabase,Transaction> op ) throws Exception
{
// this currently wraps the leader-only strategy, since it is the recommended and only approach
return leaderTx( op, DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS );
}

private CoreClusterMember addCoreMemberWithId( int memberId, Map<String,String> extraParams, Map<String,IntFunction<String>> instanceExtraParams, String recordFormat )
{
List<AdvertisedSocketAddress> advertisedAddress = buildAddresses( coreMembers.keySet() );
CoreClusterMember coreClusterMember = new CoreClusterMember( memberId, DEFAULT_CLUSTER_SIZE, advertisedAddress,
discoveryServiceFactory, recordFormat, parentDir,
extraParams, instanceExtraParams );
coreMembers.put( memberId, coreClusterMember );
return coreClusterMember;
}

/**
* Perform a transaction against the leader of the core cluster, retrying as necessary.
*/
Expand Down Expand Up @@ -392,7 +389,7 @@ private boolean isLockExpired( Throwable e )
LockSessionExpired;
}

public static List<AdvertisedSocketAddress> buildAddresses( Set<Integer> coreServerIds )
private static List<AdvertisedSocketAddress> buildAddresses( Set<Integer> coreServerIds )
{
return coreServerIds.stream().map( Cluster::socketAddressForServer ).collect( toList() );
}
Expand Down

0 comments on commit 8f44a73

Please sign in to comment.