Skip to content

Commit

Permalink
Inject CoreLogPruningStrategy into SegmentRaftLog
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Sep 27, 2016
1 parent 92ca753 commit 4391e9b
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 78 deletions.
Expand Up @@ -27,6 +27,8 @@
import org.neo4j.coreedge.core.consensus.log.MonitoredRaftLog; import org.neo4j.coreedge.core.consensus.log.MonitoredRaftLog;
import org.neo4j.coreedge.core.consensus.log.RaftLog; import org.neo4j.coreedge.core.consensus.log.RaftLog;
import org.neo4j.coreedge.core.consensus.log.RaftLogEntry; import org.neo4j.coreedge.core.consensus.log.RaftLogEntry;
import org.neo4j.coreedge.core.consensus.log.segmented.CoreLogPruningStrategy;
import org.neo4j.coreedge.core.consensus.log.segmented.CoreLogPruningStrategyFactory;
import org.neo4j.coreedge.core.consensus.log.segmented.InFlightMap; import org.neo4j.coreedge.core.consensus.log.segmented.InFlightMap;
import org.neo4j.coreedge.core.consensus.log.segmented.SegmentedRaftLog; import org.neo4j.coreedge.core.consensus.log.segmented.SegmentedRaftLog;
import org.neo4j.coreedge.core.consensus.membership.MemberIdSetBuilder; import org.neo4j.coreedge.core.consensus.membership.MemberIdSetBuilder;
Expand Down Expand Up @@ -59,6 +61,7 @@
import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.join_catch_up_timeout; import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.join_catch_up_timeout;
import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.log_shipping_max_lag; import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.log_shipping_max_lag;
import static org.neo4j.coreedge.core.consensus.log.RaftLog.PHYSICAL_LOG_DIRECTORY_NAME; import static org.neo4j.coreedge.core.consensus.log.RaftLog.PHYSICAL_LOG_DIRECTORY_NAME;
import static org.neo4j.time.Clocks.systemClock;


public class ConsensusModule public class ConsensusModule
{ {
Expand Down Expand Up @@ -118,19 +121,19 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule, Ou
SendToMyself leaderOnlyReplicator = new SendToMyself( myself, outbound ); SendToMyself leaderOnlyReplicator = new SendToMyself( myself, outbound );


raftMembershipManager = new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider, raftMembershipManager = new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider,
expectedClusterSize, electionTimeout, Clocks.systemClock(), expectedClusterSize, electionTimeout, systemClock(),
config.get( join_catch_up_timeout ), raftMembershipStorage config.get( join_catch_up_timeout ), raftMembershipStorage
); );


life.add( raftMembershipManager ); life.add( raftMembershipManager );


RaftLogShippingManager logShipping = RaftLogShippingManager logShipping =
new RaftLogShippingManager( outbound, logProvider, raftLog, Clocks.systemClock(), new RaftLogShippingManager( outbound, logProvider, raftLog, systemClock(),
myself, raftMembershipManager, electionTimeout, myself, raftMembershipManager, electionTimeout,
config.get( catchup_batch_size ), config.get( catchup_batch_size ),
config.get( log_shipping_max_lag ), inFlightMap ); config.get( log_shipping_max_lag ), inFlightMap );


raftTimeoutService = new DelayedRenewableTimeoutService( Clocks.systemClock(), logProvider ); raftTimeoutService = new DelayedRenewableTimeoutService( systemClock(), logProvider );


raftMachine = raftMachine =
new RaftMachine( myself, termState, voteState, raftLog, electionTimeout, new RaftMachine( myself, termState, voteState, raftLog, electionTimeout,
Expand Down Expand Up @@ -159,16 +162,11 @@ private RaftLog createRaftLog( Config config, LifeSupport life, FileSystemAbstra
long rotateAtSize = config.get( CoreEdgeClusterSettings.raft_log_rotation_size ); long rotateAtSize = config.get( CoreEdgeClusterSettings.raft_log_rotation_size );
int readerPoolSize = config.get( CoreEdgeClusterSettings.raft_log_reader_pool_size ); int readerPoolSize = config.get( CoreEdgeClusterSettings.raft_log_reader_pool_size );


String pruningStrategyConfig = config.get( CoreEdgeClusterSettings.raft_log_pruning_strategy ); CoreLogPruningStrategy pruningStrategy = new CoreLogPruningStrategyFactory(

config.get( CoreEdgeClusterSettings.raft_log_pruning_strategy ), logProvider ).newInstance();
return life.add( new SegmentedRaftLog( File directory = new File( clusterStateDirectory, PHYSICAL_LOG_DIRECTORY_NAME );
fileSystem, return life.add( new SegmentedRaftLog( fileSystem, directory, rotateAtSize, marshal,
new File( clusterStateDirectory, PHYSICAL_LOG_DIRECTORY_NAME ), logProvider, readerPoolSize, systemClock(), scheduler, pruningStrategy ) );
rotateAtSize,
marshal,
logProvider,
pruningStrategyConfig,
readerPoolSize, Clocks.systemClock(), scheduler ) );
} }
default: default:
throw new IllegalStateException( "Unknown raft log implementation: " + raftLogImplementation ); throw new IllegalStateException( "Unknown raft log implementation: " + raftLogImplementation );
Expand Down
Expand Up @@ -19,7 +19,7 @@
*/ */
package org.neo4j.coreedge.core.consensus.log.segmented; package org.neo4j.coreedge.core.consensus.log.segmented;


interface CoreLogPruningStrategy public interface CoreLogPruningStrategy
{ {
/** /**
* Returns the index to keep depending on the configuration strategy. * Returns the index to keep depending on the configuration strategy.
Expand Down
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.core.consensus.log.segmented;

import org.neo4j.function.Factory;
import org.neo4j.kernel.impl.transaction.log.pruning.ThresholdConfigParser;
import org.neo4j.logging.LogProvider;

import static org.neo4j.kernel.impl.transaction.log.pruning.ThresholdConfigParser.parse;

public class CoreLogPruningStrategyFactory implements Factory<CoreLogPruningStrategy>
{
private final String pruningStrategyConfig;
private final LogProvider logProvider;

public CoreLogPruningStrategyFactory( String pruningStrategyConfig, LogProvider logProvider )
{
this.pruningStrategyConfig = pruningStrategyConfig;
this.logProvider = logProvider;
}

@Override
public CoreLogPruningStrategy newInstance()
{
ThresholdConfigParser.ThresholdConfigValue thresholdConfigValue = parse( pruningStrategyConfig );

String type = thresholdConfigValue.type;
long value = thresholdConfigValue.value;
switch ( type )
{
case "size":
return new SizeBasedLogPruningStrategy( value );
case "txs":
case "entries": // txs and entries are synonyms
return new EntryBasedLogPruningStrategy( value, logProvider );
case "hours": // hours and days are currently not supported as such, default to no prune
case "days":
throw new IllegalArgumentException(
"Time based pruning not supported yet for the segmented raft log, got '" + type + "'." );
case "false":
return new NoPruningPruningStrategy();
default:
throw new IllegalArgumentException( "Invalid log pruning configuration value '" + value +
"'. Invalid type '" + type + "', valid are files, size, txs, entries, hours, days." );
}
}
}
Expand Up @@ -77,8 +77,8 @@ public class SegmentedRaftLog extends LifecycleAdapter implements RaftLog
private JobScheduler.JobHandle readerPoolPruner; private JobScheduler.JobHandle readerPoolPruner;


public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long rotateAtSize, public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long rotateAtSize,
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, int readerPoolSize, Clock clock,
String pruningConfig, int readerPoolSize, Clock clock, JobScheduler scheduler ) JobScheduler scheduler, CoreLogPruningStrategy pruningStrategy )
{ {
this.fileSystem = fileSystem; this.fileSystem = fileSystem;
this.directory = directory; this.directory = directory;
Expand All @@ -89,7 +89,7 @@ public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long


this.fileNames = new FileNames( directory ); this.fileNames = new FileNames( directory );
this.readerPool = new ReaderPool( readerPoolSize, logProvider, fileNames, fileSystem, clock ); this.readerPool = new ReaderPool( readerPoolSize, logProvider, fileNames, fileSystem, clock );
this.pruner = new SegmentedRaftLogPruner( pruningConfig, logProvider ); this.pruner = new SegmentedRaftLogPruner( pruningStrategy );
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
} }


Expand All @@ -101,8 +101,7 @@ public synchronized void start() throws IOException, DamagedLogStorageException,
throw new IOException( "Could not create: " + directory ); throw new IOException( "Could not create: " + directory );
} }


RecoveryProtocol recoveryProtocol = new RecoveryProtocol( fileSystem, fileNames, readerPool, contentMarshal, logProvider ); state = new RecoveryProtocol( fileSystem, fileNames, readerPool, contentMarshal, logProvider ).run();
state = recoveryProtocol.run();
log.info( "log started with recovered state %s", state ); log.info( "log started with recovered state %s", state );


readerPoolPruner = scheduler.scheduleRecurring( new JobScheduler.Group( "reader-pool-pruner", POOLED ), readerPoolPruner = scheduler.scheduleRecurring( new JobScheduler.Group( "reader-pool-pruner", POOLED ),
Expand Down
Expand Up @@ -19,46 +19,17 @@
*/ */
package org.neo4j.coreedge.core.consensus.log.segmented; package org.neo4j.coreedge.core.consensus.log.segmented;


import org.neo4j.logging.LogProvider;

import static org.neo4j.kernel.impl.transaction.log.pruning.ThresholdConfigParser.ThresholdConfigValue;
import static org.neo4j.kernel.impl.transaction.log.pruning.ThresholdConfigParser.parse;

class SegmentedRaftLogPruner class SegmentedRaftLogPruner
{ {
private final ThresholdConfigValue parsedConfigOption; private final CoreLogPruningStrategy pruningStrategy;
private final CoreLogPruningStrategy pruneStrategy;

SegmentedRaftLogPruner( String pruningStrategyConfig, LogProvider logProvider )
{
parsedConfigOption = parse( pruningStrategyConfig );
pruneStrategy = getPruneStrategy( parsedConfigOption, logProvider );
}


private CoreLogPruningStrategy getPruneStrategy( ThresholdConfigValue configValue, LogProvider logProvider ) SegmentedRaftLogPruner( CoreLogPruningStrategy pruningStrategy )
{ {
String type = configValue.type; this.pruningStrategy = pruningStrategy;
switch ( type )
{
case "size":
return new SizeBasedLogPruningStrategy( parsedConfigOption.value );
case "txs":
case "entries": // txs and entries are synonyms
return new EntryBasedLogPruningStrategy( parsedConfigOption.value, logProvider );
case "hours": // hours and days are currently not supported as such, default to no prune
case "days":
throw new IllegalArgumentException(
"Time based pruning not supported yet for the segmented raft log, got '" + type + "'." );
case "false":
return new NoPruningPruningStrategy();
default:
throw new IllegalArgumentException( "Invalid log pruning configuration value '" + configValue.value +
"'. Invalid type '" + type + "', valid are files, size, txs, entries, hours, days." );
}
} }


long getIndexToPruneFrom( long safeIndex, Segments segments ) long getIndexToPruneFrom( long safeIndex, Segments segments )
{ {
return Math.min( safeIndex, pruneStrategy.getIndexToKeep( segments ) ); return Math.min( safeIndex, pruningStrategy.getIndexToKeep( segments ) );
} }
} }
Expand Up @@ -21,29 +21,24 @@


import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;


import org.neo4j.coreedge.core.EnterpriseCoreEditionModule.RaftLogImplementation;
import org.neo4j.coreedge.core.consensus.ReplicatedInteger; import org.neo4j.coreedge.core.consensus.ReplicatedInteger;
import org.neo4j.coreedge.core.consensus.ReplicatedString; import org.neo4j.coreedge.core.consensus.ReplicatedString;
import org.neo4j.coreedge.core.consensus.log.segmented.CoreLogPruningStrategyFactory;
import org.neo4j.coreedge.core.consensus.log.segmented.SegmentedRaftLog; import org.neo4j.coreedge.core.consensus.log.segmented.SegmentedRaftLog;
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.OnDemandJobScheduler; import org.neo4j.test.OnDemandJobScheduler;
import org.neo4j.test.rule.fs.EphemeralFileSystemRule; import org.neo4j.test.rule.fs.EphemeralFileSystemRule;
import org.neo4j.time.Clocks; import org.neo4j.time.Clocks;


import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.neo4j.coreedge.core.EnterpriseCoreEditionModule.RaftLogImplementation.SEGMENTED;
import static org.neo4j.coreedge.core.consensus.ReplicatedInteger.valueOf; import static org.neo4j.coreedge.core.consensus.ReplicatedInteger.valueOf;
import static org.neo4j.coreedge.core.consensus.log.RaftLog.PHYSICAL_LOG_DIRECTORY_NAME; import static org.neo4j.coreedge.core.consensus.log.RaftLog.PHYSICAL_LOG_DIRECTORY_NAME;
import static org.neo4j.coreedge.core.consensus.log.RaftLogHelper.hasNoContent; import static org.neo4j.coreedge.core.consensus.log.RaftLogHelper.hasNoContent;
Expand All @@ -63,9 +58,11 @@ public class SegmentedRaftLogDurabilityTest
long rotateAtSizeBytes = 128; long rotateAtSizeBytes = 128;
int readerPoolSize = 8; int readerPoolSize = 8;


NullLogProvider logProvider = getInstance();
SegmentedRaftLog log = SegmentedRaftLog log =
new SegmentedRaftLog( fileSystem, directory, rotateAtSizeBytes, new DummyRaftableContentSerializer(), new SegmentedRaftLog( fileSystem, directory, rotateAtSizeBytes, new DummyRaftableContentSerializer(),
getInstance(), "1 size", readerPoolSize, Clocks.fakeClock(), new OnDemandJobScheduler() ); logProvider, readerPoolSize, Clocks.fakeClock(), new OnDemandJobScheduler(),
new CoreLogPruningStrategyFactory( "1 size", logProvider ).newInstance() );
log.start(); log.start();


return log; return log;
Expand Down
Expand Up @@ -22,6 +22,8 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;


import org.neo4j.coreedge.core.consensus.log.segmented.CoreLogPruningStrategy;
import org.neo4j.coreedge.core.consensus.log.segmented.CoreLogPruningStrategyFactory;
import org.neo4j.coreedge.core.consensus.log.segmented.SegmentedRaftLog; import org.neo4j.coreedge.core.consensus.log.segmented.SegmentedRaftLog;
import org.neo4j.coreedge.core.replication.ReplicatedContent; import org.neo4j.coreedge.core.replication.ReplicatedContent;
import org.neo4j.coreedge.core.state.machines.tx.ReplicatedTransaction; import org.neo4j.coreedge.core.state.machines.tx.ReplicatedTransaction;
Expand All @@ -30,6 +32,8 @@
import org.neo4j.helpers.Args; import org.neo4j.helpers.Args;
import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.OnDemandJobScheduler; import org.neo4j.test.OnDemandJobScheduler;
import org.neo4j.time.Clocks; import org.neo4j.time.Clocks;


Expand All @@ -55,11 +59,14 @@ public static void main( String[] args ) throws IOException
System.out.println( "logDirectory = " + logDirectory ); System.out.println( "logDirectory = " + logDirectory );
Config config = new Config( stringMap() ); Config config = new Config( stringMap() );


LogProvider logProvider = getInstance();
CoreLogPruningStrategy pruningStrategy =
new CoreLogPruningStrategyFactory( config.get( raft_log_pruning_strategy ), logProvider )
.newInstance();
SegmentedRaftLog log = new SegmentedRaftLog( new DefaultFileSystemAbstraction(), logDirectory, SegmentedRaftLog log = new SegmentedRaftLog( new DefaultFileSystemAbstraction(), logDirectory,
config.get( raft_log_rotation_size ), new CoreReplicatedContentMarshal(), config.get( raft_log_rotation_size ), new CoreReplicatedContentMarshal(), logProvider,
getInstance(), config.get( raft_log_pruning_strategy ), config.get( raft_log_reader_pool_size ), Clocks.systemClock(), new OnDemandJobScheduler(),
config.get( raft_log_reader_pool_size ), Clocks.systemClock(), pruningStrategy );
new OnDemandJobScheduler() );


long totalCommittedEntries = log.appendIndex(); // Not really, but we need to have a way to pass in the commit index long totalCommittedEntries = log.appendIndex(); // Not really, but we need to have a way to pass in the commit index
for ( int i = 0; i <= totalCommittedEntries; i++ ) for ( int i = 0; i <= totalCommittedEntries; i++ )
Expand Down
Expand Up @@ -24,6 +24,8 @@
import org.neo4j.coreedge.core.consensus.log.ConcurrentStressIT; import org.neo4j.coreedge.core.consensus.log.ConcurrentStressIT;
import org.neo4j.coreedge.core.consensus.log.DummyRaftableContentSerializer; import org.neo4j.coreedge.core.consensus.log.DummyRaftableContentSerializer;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.OnDemandJobScheduler; import org.neo4j.test.OnDemandJobScheduler;
import org.neo4j.time.Clocks; import org.neo4j.time.Clocks;


Expand All @@ -35,9 +37,15 @@ public class SegmentedConcurrentStressIT extends ConcurrentStressIT<SegmentedRaf
@Override @Override
public SegmentedRaftLog createRaftLog( FileSystemAbstraction fsa, File dir ) throws Throwable public SegmentedRaftLog createRaftLog( FileSystemAbstraction fsa, File dir ) throws Throwable
{ {
SegmentedRaftLog raftLog = new SegmentedRaftLog( fsa, dir, 8 * 1024 * 1024, long rotateAtSize = 8 * 1024 * 1024;
new DummyRaftableContentSerializer(), getInstance(), LogProvider logProvider = getInstance();
raft_log_pruning_strategy.getDefaultValue(), 8, Clocks.fakeClock(), new OnDemandJobScheduler() ); int readerPoolSize = 8;
CoreLogPruningStrategy pruningStrategy =
new CoreLogPruningStrategyFactory( raft_log_pruning_strategy.getDefaultValue(), logProvider )
.newInstance();
SegmentedRaftLog raftLog =
new SegmentedRaftLog( fsa, dir, rotateAtSize, new DummyRaftableContentSerializer(), logProvider,
readerPoolSize, Clocks.fakeClock(), new OnDemandJobScheduler(), pruningStrategy );
raftLog.start(); raftLog.start();
return raftLog; return raftLog;
} }
Expand Down
Expand Up @@ -29,6 +29,8 @@
import org.neo4j.coreedge.core.consensus.log.RaftLogContractTest; import org.neo4j.coreedge.core.consensus.log.RaftLogContractTest;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.lifecycle.LifeRule; import org.neo4j.kernel.lifecycle.LifeRule;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.OnDemandJobScheduler; import org.neo4j.test.OnDemandJobScheduler;
import org.neo4j.test.rule.fs.EphemeralFileSystemRule; import org.neo4j.test.rule.fs.EphemeralFileSystemRule;
import org.neo4j.time.Clocks; import org.neo4j.time.Clocks;
Expand All @@ -51,7 +53,10 @@ public RaftLog createRaftLog()
FileSystemAbstraction fileSystem = fsRule.get(); FileSystemAbstraction fileSystem = fsRule.get();
fileSystem.mkdir( directory ); fileSystem.mkdir( directory );


LogProvider logProvider = getInstance();
CoreLogPruningStrategy pruningStrategy =
new CoreLogPruningStrategyFactory( "1 entries", logProvider ).newInstance();
return life.add( new SegmentedRaftLog( fileSystem, directory, 1024, new DummyRaftableContentSerializer(), return life.add( new SegmentedRaftLog( fileSystem, directory, 1024, new DummyRaftableContentSerializer(),
getInstance(), "1 entries", 8, Clocks.fakeClock(), new OnDemandJobScheduler() ) ); logProvider, 8, Clocks.fakeClock(), new OnDemandJobScheduler(), pruningStrategy ) );
} }
} }
Expand Up @@ -31,6 +31,8 @@
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.OnDemandJobScheduler; import org.neo4j.test.OnDemandJobScheduler;
import org.neo4j.time.Clocks; import org.neo4j.time.Clocks;


Expand All @@ -42,7 +44,7 @@


public class SegmentedRaftLogCursorIT public class SegmentedRaftLogCursorIT
{ {
private LifeSupport life = new LifeSupport(); private final LifeSupport life = new LifeSupport();
private FileSystemAbstraction fileSystem; private FileSystemAbstraction fileSystem;


@After @After
Expand All @@ -62,10 +64,14 @@ private SegmentedRaftLog createRaftLog( long rotateAtSize, String pruneStrategy
File directory = new File( PHYSICAL_LOG_DIRECTORY_NAME ); File directory = new File( PHYSICAL_LOG_DIRECTORY_NAME );
fileSystem.mkdir( directory ); fileSystem.mkdir( directory );


LogProvider logProvider = getInstance();
CoreLogPruningStrategy pruningStrategy =
new CoreLogPruningStrategyFactory( pruneStrategy, logProvider ).newInstance();
SegmentedRaftLog newRaftLog = SegmentedRaftLog newRaftLog =
new SegmentedRaftLog( fileSystem, directory, rotateAtSize, new DummyRaftableContentSerializer(), new SegmentedRaftLog( fileSystem, directory, rotateAtSize, new DummyRaftableContentSerializer(),
getInstance(), pruneStrategy, 8, Clocks.systemClock(), logProvider, 8, Clocks.systemClock(),
new OnDemandJobScheduler() ); new OnDemandJobScheduler(),
pruningStrategy );


life.add( newRaftLog ); life.add( newRaftLog );
life.init(); life.init();
Expand Down

0 comments on commit 4391e9b

Please sign in to comment.