Skip to content

Commit

Permalink
Fix store file checking in StreamToDisk
Browse files Browse the repository at this point in the history
* Also clean up various causal clustering test code.
* Also make sure that the Cluster and ClusterRule make sure that the
  cluster-wide and instance configurations are properly passed through to late
  coming cluster member additions.
  • Loading branch information
chrisvest committed Jan 30, 2017
1 parent d25df80 commit c780648
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 109 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.impl.store.StoreType;
import org.neo4j.kernel.monitoring.Monitors;

class StreamToDisk implements StoreFileStreams
Expand All @@ -50,8 +51,8 @@ class StreamToDisk implements StoreFileStreams
this.pageCache = pageCache;
fs.mkdirs( storeDir );
this.fileCopyMonitor = monitors.newMonitor( FileCopyMonitor.class );
channels = new HashMap<String,WritableByteChannel>();
pagedFiles = new HashMap<String,PagedFile>();
channels = new HashMap<>();
pagedFiles = new HashMap<>();

}

Expand All @@ -62,14 +63,7 @@ public void write( String destination, int requiredAlignment, byte[] data ) thro
fs.mkdirs( fileName.getParentFile() );

fileCopyMonitor.copyFile( fileName );
if ( destination.endsWith( ".id" ) )
{
try ( OutputStream outputStream = fs.openAsOutputStream( fileName, true ) )
{
outputStream.write( data );
}
}
else
if ( StoreType.typeOf( destination ).map( StoreType::isRecordStore ).orElse( false ) )
{
WritableByteChannel channel = channels.get( destination );
if ( channel == null )
Expand All @@ -87,6 +81,13 @@ public void write( String destination, int requiredAlignment, byte[] data ) thro
channel.write( buffer );
}
}
else
{
try ( OutputStream outputStream = fs.openAsOutputStream( fileName, true ) )
{
outputStream.write( data );
}
}
}

@Override
Expand Down
Expand Up @@ -60,15 +60,13 @@
import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException;
import org.neo4j.test.DbRepresentation;

import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.neo4j.concurrent.Futures.combine;
import static org.neo4j.function.Predicates.await;
import static org.neo4j.function.Predicates.awaitEx;
import static org.neo4j.function.Predicates.notNull;
import static org.neo4j.helpers.collection.Iterables.firstOrNull;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.kernel.api.exceptions.Status.Transaction.LockSessionExpired;

public class Cluster
Expand All @@ -77,27 +75,38 @@ public class Cluster
private static final int DEFAULT_CLUSTER_SIZE = 3;

private final File parentDir;
private final Map<String,String> coreParams;
private final Map<String,IntFunction<String>> instanceCoreParams;
private final Map<String,String> readReplicaParams;
private final Map<String,IntFunction<String>> instanceReadReplicaParams;
private final String recordFormat;
private final DiscoveryServiceFactory discoveryServiceFactory;

private Map<Integer, CoreClusterMember> coreMembers = new ConcurrentHashMap<>();
private Map<Integer,CoreClusterMember> coreMembers = new ConcurrentHashMap<>();
private Map<Integer,ReadReplica> readReplicas = new ConcurrentHashMap<>();

public Cluster( File parentDir, int noOfCoreMembers, int noOfReadReplicas,
DiscoveryServiceFactory discoveryServiceFactory,
Map<String, String> coreParams, Map<String, IntFunction<String>> instanceCoreParams,
Map<String, String> readReplicaParams, Map<String, IntFunction<String>> instanceReadReplicaParams,
Map<String,String> coreParams, Map<String,IntFunction<String>> instanceCoreParams,
Map<String,String> readReplicaParams, Map<String,IntFunction<String>> instanceReadReplicaParams,
String recordFormat )
{
this.discoveryServiceFactory = discoveryServiceFactory;
this.parentDir = parentDir;
this.coreParams = coreParams;
this.instanceCoreParams = instanceCoreParams;
this.readReplicaParams = readReplicaParams;
this.instanceReadReplicaParams = instanceReadReplicaParams;
this.recordFormat = recordFormat;
HashSet<Integer> coreServerIds = new HashSet<>();
for ( int i = 0; i < noOfCoreMembers; i++ )
{
coreServerIds.add( i );
}
List<AdvertisedSocketAddress> initialHosts = buildAddresses( coreServerIds );
createCoreMembers( noOfCoreMembers, initialHosts, coreParams, instanceCoreParams, recordFormat );
createReadReplicas( noOfReadReplicas, initialHosts, readReplicaParams, instanceReadReplicaParams, recordFormat );
createReadReplicas( noOfReadReplicas, initialHosts, readReplicaParams, instanceReadReplicaParams,
recordFormat );
}

public void start() throws InterruptedException, ExecutionException
Expand All @@ -114,15 +123,6 @@ public void start() throws InterruptedException, ExecutionException
}
}

private void waitForReadReplicas( CompletionService<ReadReplicaGraphDatabase> readReplicaGraphDatabaseCompletionService ) throws
InterruptedException, ExecutionException
{
for ( int i = 0; i < readReplicas.size(); i++ )
{
readReplicaGraphDatabaseCompletionService.take().get();
}
}

public Set<CoreClusterMember> healthyCoreMembers()
{
return coreMembers.values().stream()
Expand All @@ -143,38 +143,50 @@ public ReadReplica getReadReplicaById( int memberId )

public CoreClusterMember addCoreMemberWithId( int memberId )
{
return addCoreMemberWithId( memberId, stringMap(), emptyMap(), Standard.LATEST_NAME );
List<AdvertisedSocketAddress> advertisedAddress = buildAddresses( coreMembers.keySet() );
return addCoreMemberWithId( memberId, coreParams, instanceCoreParams, recordFormat, advertisedAddress );
}

public CoreClusterMember addCoreMemberWithIdAndInitialMembers(
int memberId, List<AdvertisedSocketAddress> initialMembers )
{
return addCoreMemberWithId( memberId, coreParams, instanceCoreParams, recordFormat, initialMembers );
}

public CoreClusterMember addCoreMemberWithIdAndInitialMembers( int memberId,
List<AdvertisedSocketAddress> initialMembers )
private CoreClusterMember addCoreMemberWithId(
int memberId,
Map<String,String> extraParams,
Map<String,IntFunction<String>> instanceExtraParams,
String recordFormat,
List<AdvertisedSocketAddress> advertisedAddress )
{
CoreClusterMember coreClusterMember = new CoreClusterMember( memberId, DEFAULT_CLUSTER_SIZE, initialMembers,
discoveryServiceFactory, Standard.LATEST_NAME, parentDir,
emptyMap(), emptyMap() );
CoreClusterMember coreClusterMember = new CoreClusterMember(
memberId, DEFAULT_CLUSTER_SIZE, advertisedAddress, discoveryServiceFactory, recordFormat, parentDir,
extraParams, instanceExtraParams );
coreMembers.put( memberId, coreClusterMember );
return coreClusterMember;
}

public ReadReplica addReadReplicaWithIdAndRecordFormat( int memberId, String recordFormat )
{
List<AdvertisedSocketAddress> hazelcastAddresses = buildAddresses( coreMembers.keySet() );
ReadReplica member = new ReadReplica( parentDir, memberId, discoveryServiceFactory,
hazelcastAddresses, stringMap(), emptyMap(), recordFormat );
readReplicas.put( memberId, member );
return member;
return addReadReplica( memberId, recordFormat, new Monitors() );
}

public ReadReplica addReadReplicaWithId( int memberId )
{
return addReadReplicaWithIdAndRecordFormat( memberId, Standard.LATEST_NAME );
return addReadReplicaWithIdAndRecordFormat( memberId, recordFormat );
}

public ReadReplica addReadReplicaWithIdAndMonitors( int memberId, Monitors monitors )
{
return addReadReplica( memberId, recordFormat, monitors );
}

private ReadReplica addReadReplica( int memberId, String recordFormat, Monitors monitors )
{
List<AdvertisedSocketAddress> hazelcastAddresses = buildAddresses( coreMembers.keySet() );
ReadReplica member = new ReadReplica( parentDir, memberId, discoveryServiceFactory,
hazelcastAddresses, stringMap(), emptyMap(), Standard.LATEST_NAME, monitors );
hazelcastAddresses, readReplicaParams, instanceReadReplicaParams, recordFormat, monitors );
readReplicas.put( memberId, member );
return member;
}
Expand Down Expand Up @@ -292,7 +304,8 @@ public CoreClusterMember awaitLeader( long timeout, TimeUnit timeUnit ) throws T
return awaitCoreMemberWithRole( Role.LEADER, timeout, timeUnit );
}

public CoreClusterMember awaitCoreMemberWithRole( Role role, long timeout, TimeUnit timeUnit ) throws TimeoutException
public CoreClusterMember awaitCoreMemberWithRole( Role role, long timeout, TimeUnit timeUnit )
throws TimeoutException
{
return await( () -> getDbWithRole( role ), notNull(), timeout, timeUnit );
}
Expand All @@ -309,29 +322,20 @@ public int numberOfCoreMembersReportedByTopology()
/**
* Perform a transaction against the core cluster, selecting the target and retrying as necessary.
*/
public CoreClusterMember coreTx( BiConsumer<CoreGraphDatabase, Transaction> op ) throws Exception
public CoreClusterMember coreTx( BiConsumer<CoreGraphDatabase,Transaction> op ) throws Exception
{
// this currently wraps the leader-only strategy, since it is the recommended and only approach
return leaderTx( op, DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS );
}

private CoreClusterMember addCoreMemberWithId( int memberId, Map<String,String> extraParams, Map<String,IntFunction<String>> instanceExtraParams, String recordFormat )
{
List<AdvertisedSocketAddress> advertisedAddress = buildAddresses( coreMembers.keySet() );
CoreClusterMember coreClusterMember = new CoreClusterMember( memberId, DEFAULT_CLUSTER_SIZE, advertisedAddress,
discoveryServiceFactory, recordFormat, parentDir,
extraParams, instanceExtraParams );
coreMembers.put( memberId, coreClusterMember );
return coreClusterMember;
}

/**
* Perform a transaction against the leader of the core cluster, retrying as necessary.
*/
private CoreClusterMember leaderTx( BiConsumer<CoreGraphDatabase,Transaction> op, int timeout, TimeUnit timeUnit )
throws Exception
{
ThrowingSupplier<CoreClusterMember,Exception> supplier = () -> {
ThrowingSupplier<CoreClusterMember,Exception> supplier = () ->
{
CoreClusterMember member = awaitLeader( timeout, timeUnit );
CoreGraphDatabase db = member.database();
if ( db == null )
Expand Down Expand Up @@ -367,29 +371,28 @@ private boolean isTransientFailure( Throwable e )
// TODO: This should really catch all cases of transient failures. Must be able to express that in a clearer
// manner...
return (e instanceof IdGenerationException) || isLockExpired( e ) || isLockOnFollower( e ) ||
isWriteNotOnLeader( e );

isWriteNotOnLeader( e );
}

private boolean isWriteNotOnLeader( Throwable e )
{
return e instanceof WriteOperationsNotAllowedException &&
e.getMessage().startsWith( String.format( LeaderCanWrite.NOT_LEADER_ERROR_MSG, "" ) );
e.getMessage().startsWith( String.format( LeaderCanWrite.NOT_LEADER_ERROR_MSG, "" ) );
}

private boolean isLockOnFollower( Throwable e )
{
return e instanceof AcquireLockTimeoutException &&
(e.getMessage().equals( LeaderOnlyLockManager.LOCK_NOT_ON_LEADER_ERROR_MESSAGE ) ||
e.getCause() instanceof NoLeaderFoundException);
(e.getMessage().equals( LeaderOnlyLockManager.LOCK_NOT_ON_LEADER_ERROR_MESSAGE ) ||
e.getCause() instanceof NoLeaderFoundException);
}

private boolean isLockExpired( Throwable e )
{
return e instanceof TransactionFailureException &&
e.getCause() instanceof org.neo4j.kernel.api.exceptions.TransactionFailureException &&
((org.neo4j.kernel.api.exceptions.TransactionFailureException) e.getCause()).status() ==
LockSessionExpired;
e.getCause() instanceof org.neo4j.kernel.api.exceptions.TransactionFailureException &&
((org.neo4j.kernel.api.exceptions.TransactionFailureException) e.getCause()).status() ==
LockSessionExpired;
}

public static List<AdvertisedSocketAddress> buildAddresses( Set<Integer> coreServerIds )
Expand All @@ -403,8 +406,8 @@ public static AdvertisedSocketAddress socketAddressForServer( int id )
}

private void createCoreMembers( final int noOfCoreMembers,
List<AdvertisedSocketAddress> addresses, Map<String, String> extraParams,
Map<String, IntFunction<String>> instanceExtraParams, String recordFormat )
List<AdvertisedSocketAddress> addresses, Map<String,String> extraParams,
Map<String,IntFunction<String>> instanceExtraParams, String recordFormat )
{
for ( int i = 0; i < noOfCoreMembers; i++ )
{
Expand Down Expand Up @@ -453,15 +456,15 @@ private void startReadReplicas( ExecutorService executor ) throws InterruptedExc
}

private void createReadReplicas( int noOfReadReplicas,
final List<AdvertisedSocketAddress> coreMemberAddresses,
Map<String, String> extraParams,
Map<String, IntFunction<String>> instanceExtraParams,
String recordFormat )
final List<AdvertisedSocketAddress> coreMemberAddresses,
Map<String,String> extraParams,
Map<String,IntFunction<String>> instanceExtraParams,
String recordFormat )
{
for ( int i = 0; i < noOfReadReplicas; i++ )
{
readReplicas.put( i, new ReadReplica( parentDir, i, discoveryServiceFactory, coreMemberAddresses,
extraParams, instanceExtraParams, recordFormat ) );
extraParams, instanceExtraParams, recordFormat, new Monitors() ) );
}
}

Expand Down
Expand Up @@ -52,14 +52,6 @@ public class ReadReplica implements ClusterMember
private ReadReplicaGraphDatabase database;
private Monitors monitors;

public ReadReplica( File parentDir, int memberId, DiscoveryServiceFactory discoveryServiceFactory,
List<AdvertisedSocketAddress> coreMemberHazelcastAddresses, Map<String, String> extraParams,
Map<String, IntFunction<String>> instanceExtraParams, String recordFormat )
{
this( parentDir, memberId, discoveryServiceFactory, coreMemberHazelcastAddresses, extraParams,
instanceExtraParams, recordFormat, new Monitors() );
}

public ReadReplica( File parentDir, int memberId, DiscoveryServiceFactory discoveryServiceFactory,
List<AdvertisedSocketAddress> coreMemberHazelcastAddresses, Map<String, String> extraParams,
Map<String, IntFunction<String>> instanceExtraParams, String recordFormat, Monitors monitors )
Expand Down
Expand Up @@ -81,6 +81,10 @@ public void setup() throws Exception
{
fs = fileSystemRule.get();
cluster = clusterRule.startCluster();
cluster.coreTx( (db,tx) -> {
SampleData.createSchema( db );
tx.success();
} );
}

@Test
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.junit.Rule;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.time.Clock;
import java.util.Map;
Expand Down Expand Up @@ -76,14 +77,20 @@ public void shouldBeAbleToDownloadLargerFreshSnapshot() throws Exception

// shutdown the follower, remove the store, restart
follower.shutdown();
FileUtils.deleteRecursively( follower.storeDir() );
FileUtils.deleteRecursively( follower.clusterStateDirectory() );
deleteDirectoryRecursively( follower.storeDir() );
deleteDirectoryRecursively( follower.clusterStateDirectory() );
follower.start();

// then
assertEquals( DbRepresentation.of( source.database() ), DbRepresentation.of( follower.database() ) );
}

protected void deleteDirectoryRecursively( File directory ) throws IOException
{
// Extracted to the inheriting test in the block device repository can override it.
FileUtils.deleteRecursively( directory );
}

@Test
public void shouldBeAbleToDownloadToNewInstanceAfterPruning() throws Exception
{
Expand Down

0 comments on commit c780648

Please sign in to comment.