diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/ConsensusModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/ConsensusModule.java index 6209cf32ac4d0..3b82623214742 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/ConsensusModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/ConsensusModule.java @@ -27,6 +27,8 @@ import org.neo4j.coreedge.core.consensus.log.MonitoredRaftLog; import org.neo4j.coreedge.core.consensus.log.RaftLog; import org.neo4j.coreedge.core.consensus.log.RaftLogEntry; +import org.neo4j.coreedge.core.consensus.log.segmented.CoreLogPruningStrategy; +import org.neo4j.coreedge.core.consensus.log.segmented.CoreLogPruningStrategyFactory; import org.neo4j.coreedge.core.consensus.log.segmented.InFlightMap; import org.neo4j.coreedge.core.consensus.log.segmented.SegmentedRaftLog; import org.neo4j.coreedge.core.consensus.membership.MemberIdSetBuilder; @@ -59,6 +61,7 @@ import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.join_catch_up_timeout; import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.log_shipping_max_lag; import static org.neo4j.coreedge.core.consensus.log.RaftLog.PHYSICAL_LOG_DIRECTORY_NAME; +import static org.neo4j.time.Clocks.systemClock; public class ConsensusModule { @@ -118,19 +121,19 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule, Ou SendToMyself leaderOnlyReplicator = new SendToMyself( myself, outbound ); raftMembershipManager = new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider, - expectedClusterSize, electionTimeout, Clocks.systemClock(), + expectedClusterSize, electionTimeout, systemClock(), config.get( join_catch_up_timeout ), raftMembershipStorage ); life.add( raftMembershipManager ); RaftLogShippingManager logShipping = - new RaftLogShippingManager( outbound, logProvider, raftLog, Clocks.systemClock(), + new RaftLogShippingManager( outbound, logProvider, raftLog, systemClock(), myself, raftMembershipManager, electionTimeout, config.get( catchup_batch_size ), config.get( log_shipping_max_lag ), inFlightMap ); - raftTimeoutService = new DelayedRenewableTimeoutService( Clocks.systemClock(), logProvider ); + raftTimeoutService = new DelayedRenewableTimeoutService( systemClock(), logProvider ); raftMachine = new RaftMachine( myself, termState, voteState, raftLog, electionTimeout, @@ -159,16 +162,11 @@ private RaftLog createRaftLog( Config config, LifeSupport life, FileSystemAbstra long rotateAtSize = config.get( CoreEdgeClusterSettings.raft_log_rotation_size ); int readerPoolSize = config.get( CoreEdgeClusterSettings.raft_log_reader_pool_size ); - String pruningStrategyConfig = config.get( CoreEdgeClusterSettings.raft_log_pruning_strategy ); - - return life.add( new SegmentedRaftLog( - fileSystem, - new File( clusterStateDirectory, PHYSICAL_LOG_DIRECTORY_NAME ), - rotateAtSize, - marshal, - logProvider, - pruningStrategyConfig, - readerPoolSize, Clocks.systemClock(), scheduler ) ); + CoreLogPruningStrategy pruningStrategy = new CoreLogPruningStrategyFactory( + config.get( CoreEdgeClusterSettings.raft_log_pruning_strategy ), logProvider ).newInstance(); + File directory = new File( clusterStateDirectory, PHYSICAL_LOG_DIRECTORY_NAME ); + return life.add( new SegmentedRaftLog( fileSystem, directory, rotateAtSize, marshal, + logProvider, readerPoolSize, systemClock(), scheduler, pruningStrategy ) ); } default: throw new IllegalStateException( "Unknown raft log implementation: " + raftLogImplementation ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/CoreLogPruningStrategy.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/CoreLogPruningStrategy.java index 6911e7131379f..9244a5e163be4 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/CoreLogPruningStrategy.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/CoreLogPruningStrategy.java @@ -19,7 +19,7 @@ */ package org.neo4j.coreedge.core.consensus.log.segmented; -interface CoreLogPruningStrategy +public interface CoreLogPruningStrategy { /** * Returns the index to keep depending on the configuration strategy. diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/CoreLogPruningStrategyFactory.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/CoreLogPruningStrategyFactory.java new file mode 100644 index 0000000000000..c1ddfcbf08686 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/CoreLogPruningStrategyFactory.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.core.consensus.log.segmented; + +import org.neo4j.function.Factory; +import org.neo4j.kernel.impl.transaction.log.pruning.ThresholdConfigParser; +import org.neo4j.logging.LogProvider; + +import static org.neo4j.kernel.impl.transaction.log.pruning.ThresholdConfigParser.parse; + +public class CoreLogPruningStrategyFactory implements Factory +{ + private final String pruningStrategyConfig; + private final LogProvider logProvider; + + public CoreLogPruningStrategyFactory( String pruningStrategyConfig, LogProvider logProvider ) + { + this.pruningStrategyConfig = pruningStrategyConfig; + this.logProvider = logProvider; + } + + @Override + public CoreLogPruningStrategy newInstance() + { + ThresholdConfigParser.ThresholdConfigValue thresholdConfigValue = parse( pruningStrategyConfig ); + + String type = thresholdConfigValue.type; + long value = thresholdConfigValue.value; + switch ( type ) + { + case "size": + return new SizeBasedLogPruningStrategy( value ); + case "txs": + case "entries": // txs and entries are synonyms + return new EntryBasedLogPruningStrategy( value, logProvider ); + case "hours": // hours and days are currently not supported as such, default to no prune + case "days": + throw new IllegalArgumentException( + "Time based pruning not supported yet for the segmented raft log, got '" + type + "'." ); + case "false": + return new NoPruningPruningStrategy(); + default: + throw new IllegalArgumentException( "Invalid log pruning configuration value '" + value + + "'. Invalid type '" + type + "', valid are files, size, txs, entries, hours, days." ); + } + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/SegmentedRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/SegmentedRaftLog.java index 8ae5e11e88518..0803d816f7233 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/SegmentedRaftLog.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/SegmentedRaftLog.java @@ -77,8 +77,8 @@ public class SegmentedRaftLog extends LifecycleAdapter implements RaftLog private JobScheduler.JobHandle readerPoolPruner; public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long rotateAtSize, - ChannelMarshal contentMarshal, LogProvider logProvider, - String pruningConfig, int readerPoolSize, Clock clock, JobScheduler scheduler ) + ChannelMarshal contentMarshal, LogProvider logProvider, int readerPoolSize, Clock clock, + JobScheduler scheduler, CoreLogPruningStrategy pruningStrategy ) { this.fileSystem = fileSystem; this.directory = directory; @@ -89,7 +89,7 @@ public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long this.fileNames = new FileNames( directory ); this.readerPool = new ReaderPool( readerPoolSize, logProvider, fileNames, fileSystem, clock ); - this.pruner = new SegmentedRaftLogPruner( pruningConfig, logProvider ); + this.pruner = new SegmentedRaftLogPruner( pruningStrategy ); this.log = logProvider.getLog( getClass() ); } @@ -101,8 +101,7 @@ public synchronized void start() throws IOException, DamagedLogStorageException, throw new IOException( "Could not create: " + directory ); } - RecoveryProtocol recoveryProtocol = new RecoveryProtocol( fileSystem, fileNames, readerPool, contentMarshal, logProvider ); - state = recoveryProtocol.run(); + state = new RecoveryProtocol( fileSystem, fileNames, readerPool, contentMarshal, logProvider ).run(); log.info( "log started with recovered state %s", state ); readerPoolPruner = scheduler.scheduleRecurring( new JobScheduler.Group( "reader-pool-pruner", POOLED ), diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/SegmentedRaftLogPruner.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/SegmentedRaftLogPruner.java index 6ac5e7e6ef1e0..8c0fcbbf9660a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/SegmentedRaftLogPruner.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/log/segmented/SegmentedRaftLogPruner.java @@ -19,46 +19,17 @@ */ package org.neo4j.coreedge.core.consensus.log.segmented; -import org.neo4j.logging.LogProvider; - -import static org.neo4j.kernel.impl.transaction.log.pruning.ThresholdConfigParser.ThresholdConfigValue; -import static org.neo4j.kernel.impl.transaction.log.pruning.ThresholdConfigParser.parse; - class SegmentedRaftLogPruner { - private final ThresholdConfigValue parsedConfigOption; - private final CoreLogPruningStrategy pruneStrategy; - - SegmentedRaftLogPruner( String pruningStrategyConfig, LogProvider logProvider ) - { - parsedConfigOption = parse( pruningStrategyConfig ); - pruneStrategy = getPruneStrategy( parsedConfigOption, logProvider ); - } + private final CoreLogPruningStrategy pruningStrategy; - private CoreLogPruningStrategy getPruneStrategy( ThresholdConfigValue configValue, LogProvider logProvider ) + SegmentedRaftLogPruner( CoreLogPruningStrategy pruningStrategy ) { - String type = configValue.type; - switch ( type ) - { - case "size": - return new SizeBasedLogPruningStrategy( parsedConfigOption.value ); - case "txs": - case "entries": // txs and entries are synonyms - return new EntryBasedLogPruningStrategy( parsedConfigOption.value, logProvider ); - case "hours": // hours and days are currently not supported as such, default to no prune - case "days": - throw new IllegalArgumentException( - "Time based pruning not supported yet for the segmented raft log, got '" + type + "'." ); - case "false": - return new NoPruningPruningStrategy(); - default: - throw new IllegalArgumentException( "Invalid log pruning configuration value '" + configValue.value + - "'. Invalid type '" + type + "', valid are files, size, txs, entries, hours, days." ); - } + this.pruningStrategy = pruningStrategy; } long getIndexToPruneFrom( long safeIndex, Segments segments ) { - return Math.min( safeIndex, pruneStrategy.getIndexToKeep( segments ) ); + return Math.min( safeIndex, pruningStrategy.getIndexToKeep( segments ) ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/SegmentedRaftLogDurabilityTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/SegmentedRaftLogDurabilityTest.java index 70cd4c89f229b..fe7ecc8a801f8 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/SegmentedRaftLogDurabilityTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/SegmentedRaftLogDurabilityTest.java @@ -21,21 +21,17 @@ import org.junit.Rule; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import java.io.File; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import org.neo4j.coreedge.core.EnterpriseCoreEditionModule.RaftLogImplementation; import org.neo4j.coreedge.core.consensus.ReplicatedInteger; import org.neo4j.coreedge.core.consensus.ReplicatedString; +import org.neo4j.coreedge.core.consensus.log.segmented.CoreLogPruningStrategyFactory; import org.neo4j.coreedge.core.consensus.log.segmented.SegmentedRaftLog; import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.logging.NullLogProvider; import org.neo4j.test.OnDemandJobScheduler; import org.neo4j.test.rule.fs.EphemeralFileSystemRule; import org.neo4j.time.Clocks; @@ -43,7 +39,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.neo4j.coreedge.core.EnterpriseCoreEditionModule.RaftLogImplementation.SEGMENTED; import static org.neo4j.coreedge.core.consensus.ReplicatedInteger.valueOf; import static org.neo4j.coreedge.core.consensus.log.RaftLog.PHYSICAL_LOG_DIRECTORY_NAME; import static org.neo4j.coreedge.core.consensus.log.RaftLogHelper.hasNoContent; @@ -63,9 +58,11 @@ public class SegmentedRaftLogDurabilityTest long rotateAtSizeBytes = 128; int readerPoolSize = 8; + NullLogProvider logProvider = getInstance(); SegmentedRaftLog log = new SegmentedRaftLog( fileSystem, directory, rotateAtSizeBytes, new DummyRaftableContentSerializer(), - getInstance(), "1 size", readerPoolSize, Clocks.fakeClock(), new OnDemandJobScheduler() ); + logProvider, readerPoolSize, Clocks.fakeClock(), new OnDemandJobScheduler(), + new CoreLogPruningStrategyFactory( "1 size", logProvider ).newInstance() ); log.start(); return log; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/debug/ReplayRaftLog.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/debug/ReplayRaftLog.java index ee137fb685374..88351c541d739 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/debug/ReplayRaftLog.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/debug/ReplayRaftLog.java @@ -22,6 +22,8 @@ import java.io.File; import java.io.IOException; +import org.neo4j.coreedge.core.consensus.log.segmented.CoreLogPruningStrategy; +import org.neo4j.coreedge.core.consensus.log.segmented.CoreLogPruningStrategyFactory; import org.neo4j.coreedge.core.consensus.log.segmented.SegmentedRaftLog; import org.neo4j.coreedge.core.replication.ReplicatedContent; import org.neo4j.coreedge.core.state.machines.tx.ReplicatedTransaction; @@ -30,6 +32,8 @@ import org.neo4j.helpers.Args; import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.kernel.configuration.Config; +import org.neo4j.logging.LogProvider; +import org.neo4j.logging.NullLogProvider; import org.neo4j.test.OnDemandJobScheduler; import org.neo4j.time.Clocks; @@ -55,11 +59,14 @@ public static void main( String[] args ) throws IOException System.out.println( "logDirectory = " + logDirectory ); Config config = new Config( stringMap() ); + LogProvider logProvider = getInstance(); + CoreLogPruningStrategy pruningStrategy = + new CoreLogPruningStrategyFactory( config.get( raft_log_pruning_strategy ), logProvider ) + .newInstance(); SegmentedRaftLog log = new SegmentedRaftLog( new DefaultFileSystemAbstraction(), logDirectory, - config.get( raft_log_rotation_size ), new CoreReplicatedContentMarshal(), - getInstance(), config.get( raft_log_pruning_strategy ), - config.get( raft_log_reader_pool_size ), Clocks.systemClock(), - new OnDemandJobScheduler() ); + config.get( raft_log_rotation_size ), new CoreReplicatedContentMarshal(), logProvider, + config.get( raft_log_reader_pool_size ), Clocks.systemClock(), new OnDemandJobScheduler(), + pruningStrategy ); long totalCommittedEntries = log.appendIndex(); // Not really, but we need to have a way to pass in the commit index for ( int i = 0; i <= totalCommittedEntries; i++ ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/segmented/SegmentedConcurrentStressIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/segmented/SegmentedConcurrentStressIT.java index 6e76cbb888b26..a70dc10825c3b 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/segmented/SegmentedConcurrentStressIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/consensus/log/segmented/SegmentedConcurrentStressIT.java @@ -24,6 +24,8 @@ import org.neo4j.coreedge.core.consensus.log.ConcurrentStressIT; import org.neo4j.coreedge.core.consensus.log.DummyRaftableContentSerializer; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.logging.LogProvider; +import org.neo4j.logging.NullLogProvider; import org.neo4j.test.OnDemandJobScheduler; import org.neo4j.time.Clocks; @@ -35,9 +37,15 @@ public class SegmentedConcurrentStressIT extends ConcurrentStressIT