Skip to content

Commit

Permalink
Restore temporary db during store copy correctly.
Browse files Browse the repository at this point in the history
Make logging during Netty test less verbose by default.
  • Loading branch information
MishaDemianenko committed Jul 23, 2018
1 parent eceb30c commit 015bf7f
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 22 deletions.
Expand Up @@ -56,10 +56,10 @@ public void replaceWithStoreFrom( CatchupAddressProvider addressProvider, StoreI
{
try ( TemporaryStoreDirectory tempStore = new TemporaryStoreDirectory( fs, pageCache, localDatabase.storeDir() ) )
{
remoteStore.copy( addressProvider, expectedStoreId, tempStore.storeDir(),
remoteStore.copy( addressProvider, expectedStoreId, tempStore.databaseDirectory(),
false );
copiedStoreRecovery.recoverCopiedStore( tempStore.storeDir() );
localDatabase.replaceWith( tempStore.storeDir() );
localDatabase.replaceWith( tempStore.databaseDirectory() );
}
log.info( "Replaced store with one downloaded from %s", addressProvider );
}
Expand Down
Expand Up @@ -29,20 +29,23 @@
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.transaction.log.files.LogFiles;
import org.neo4j.kernel.impl.transaction.log.files.LogFilesBuilder;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;

public class TemporaryStoreDirectory implements AutoCloseable
{
private static final String TEMP_COPY_DIRECTORY_NAME = "temp-copy";

private final File tempStoreDir;
private final File tempDatabaseDirectory;
private final StoreFiles storeFiles;
private LogFiles tempLogFiles;

public TemporaryStoreDirectory( FileSystemAbstraction fs, PageCache pageCache, File parent ) throws IOException
{
this.tempStoreDir = new File( parent, TEMP_COPY_DIRECTORY_NAME );
this.tempDatabaseDirectory = new File( tempStoreDir, DataSourceManager.DEFAULT_DATABASE_NAME );
storeFiles = new StoreFiles( fs, pageCache, ( directory, name ) -> true );
tempLogFiles = LogFilesBuilder.logFilesBasedOnlyBuilder( tempStoreDir, fs ).build();
tempLogFiles = LogFilesBuilder.logFilesBasedOnlyBuilder( tempDatabaseDirectory, fs ).build();
storeFiles.delete( tempStoreDir, tempLogFiles );
}

Expand All @@ -51,6 +54,11 @@ public File storeDir()
return tempStoreDir;
}

public File databaseDirectory()
{
return tempDatabaseDirectory;
}

@Override
public void close() throws IOException
{
Expand Down
Expand Up @@ -417,8 +417,7 @@ private static TopologyServiceRetryStrategy resolveStrategy( Config config, LogP
return new TopologyServiceMultiRetryStrategy( refreshPeriodMillis / pollingFrequencyWithinRefreshWindow, numberOfRetries, logProvider );
}

private LogFiles buildLocalDatabaseLogFiles( PlatformModule platformModule, FileSystemAbstraction fileSystem,
File storeDir, Config config )
private static LogFiles buildLocalDatabaseLogFiles( PlatformModule platformModule, FileSystemAbstraction fileSystem, File storeDir, Config config )
{
try
{
Expand Down
Expand Up @@ -70,7 +70,7 @@
import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.FormattedLogProvider;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.LogProvider;
import org.neo4j.ports.allocation.PortAuthority;
import org.neo4j.stream.Streams;
Expand All @@ -87,16 +87,15 @@
@RunWith( Parameterized.class )
public class NettyInstalledProtocolsIT
{
private static final int TIMEOUT_SECONDS = 10;
private Parameters parameters;
private AssertableLogProvider logProvider;

public NettyInstalledProtocolsIT( Parameters parameters )
{
this.parameters = parameters;
}

private static final int TIMEOUT_SECONDS = 10;
private static final LogProvider logProvider = FormattedLogProvider.toOutputStream( System.out );

@Parameterized.Parameters( name = "{0}" )
public static Collection<Parameters> data()
{
Expand Down Expand Up @@ -149,6 +148,7 @@ public void shouldSuccessfullySendAndReceiveAMessage() throws Throwable
@Before
public void setUp()
{
logProvider = new AssertableLogProvider( true );
ApplicationProtocolRepository applicationProtocolRepository =
new ApplicationProtocolRepository( Protocol.ApplicationProtocols.values(), parameters.applicationSupportedProtocol );
ModifierProtocolRepository modifierProtocolRepository =
Expand All @@ -158,11 +158,11 @@ public void setUp()
NettyPipelineBuilderFactory clientPipelineBuilderFactory = new NettyPipelineBuilderFactory( VoidPipelineWrapperFactory.VOID_WRAPPER );

server = new Server( serverPipelineBuilderFactory );
server.start( applicationProtocolRepository, modifierProtocolRepository );
server.start( applicationProtocolRepository, modifierProtocolRepository, logProvider );

Config config = Config.builder().withSetting( CausalClusteringSettings.handshake_timeout, TIMEOUT_SECONDS + "s" ).build();

client = new Client( applicationProtocolRepository, modifierProtocolRepository, clientPipelineBuilderFactory, config );
client = new Client( applicationProtocolRepository, modifierProtocolRepository, clientPipelineBuilderFactory, config, logProvider );

client.connect( server.port() );
}
Expand All @@ -172,6 +172,7 @@ public void tearDown()
{
client.disconnect();
server.stop();
logProvider.clear();
}

private static class Parameters
Expand Down Expand Up @@ -216,7 +217,8 @@ protected void channelRead0( ChannelHandlerContext ctx, Object msg )
this.pipelineBuilderFactory = pipelineBuilderFactory;
}

void start( final ApplicationProtocolRepository applicationProtocolRepository, final ModifierProtocolRepository modifierProtocolRepository )
void start( final ApplicationProtocolRepository applicationProtocolRepository, final ModifierProtocolRepository modifierProtocolRepository,
LogProvider logProvider )
{
RaftProtocolServerInstallerV2.Factory raftFactoryV2 =
new RaftProtocolServerInstallerV2.Factory( nettyHandler, pipelineBuilderFactory, logProvider );
Expand Down Expand Up @@ -262,7 +264,7 @@ static class Client
private HandshakeClientInitializer handshakeClientInitializer;

Client( ApplicationProtocolRepository applicationProtocolRepository, ModifierProtocolRepository modifierProtocolRepository,
NettyPipelineBuilderFactory pipelineBuilderFactory, Config config )
NettyPipelineBuilderFactory pipelineBuilderFactory, Config config, LogProvider logProvider )
{
RaftProtocolClientInstallerV2.Factory raftFactoryV2 = new RaftProtocolClientInstallerV2.Factory( pipelineBuilderFactory, logProvider );
RaftProtocolClientInstallerV1.Factory raftFactoryV1 =
Expand Down
Expand Up @@ -110,8 +110,8 @@

public class ReadReplicaReplicationIT
{
protected static final int NR_CORE_MEMBERS = 3;
protected static final int NR_READ_REPLICAS = 1;
private static final int NR_CORE_MEMBERS = 3;
private static final int NR_READ_REPLICAS = 1;

@Rule
public final ClusterRule clusterRule = new ClusterRule().withNumberOfCoreMembers( NR_CORE_MEMBERS )
Expand Down Expand Up @@ -185,13 +185,9 @@ public void shouldEventuallyPullTransactionDownToAllReadReplicas() throws Except
AtomicBoolean labelScanStoreCorrectlyPlaced = new AtomicBoolean( false );
Monitors monitors = new Monitors();
ReadReplica rr = cluster.addReadReplicaWithIdAndMonitors( 0, monitors );
Path readReplicateStoreDir = rr.databaseDirectory().toPath().toAbsolutePath();

monitors.addMonitorListener( (FileCopyMonitor) file ->
{
Path relativPath = readReplicateStoreDir.relativize( file.toPath().toAbsolutePath() );
relativPath = relativPath.subpath( 1, relativPath.getNameCount() );
if ( labelScanStoreFiles.contains( relativPath ) )
if ( labelScanStoreFiles.contains( file.toPath().getFileName() ) )
{
labelScanStoreCorrectlyPlaced.set( true );
}
Expand Down
Expand Up @@ -53,7 +53,7 @@ public static void createSomeData( int items, Cluster cluster ) throws Exception
}
}

public static void createData( GraphDatabaseService db, int size )
static void createData( GraphDatabaseService db, int size )
{
for ( int i = 0; i < size; i++ )
{
Expand All @@ -70,7 +70,7 @@ public static void createData( GraphDatabaseService db, int size )
}
}

public static void createSchema( GraphDatabaseService db )
static void createSchema( GraphDatabaseService db )
{
db.schema().constraintFor( LABEL ).assertPropertyIsUnique( PROPERTY_KEY ).create();
}
Expand Down

0 comments on commit 015bf7f

Please sign in to comment.