diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/CoreLogPruningStrategy.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/CoreLogPruningStrategy.java
new file mode 100644
index 0000000000000..aa2c7ba0e641d
--- /dev/null
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/CoreLogPruningStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.coreedge.raft.log.segmented;
+
+public interface CoreLogPruningStrategy
+{
+ /**
+ * Returns the index to keep depending on the configuration strategy.
+ * This does not factor in the value of the safe index to prune to.
+ *
+ * It is worth noting that the returned value may be the first available value,
+ * rather than the first possible value. This signifies that no pruning is needed.
+ * @param segments
+ * @return The lowest index the pruning configuration allows to keep. It is a value in the same range as
+ * append indexes, starting from -1 all the way to {@link Long#MAX_VALUE}.
+ */
+ long getIndexToKeep( Segments segments );
+}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/EntryBasedLogPruningStrategy.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/EntryBasedLogPruningStrategy.java
new file mode 100644
index 0000000000000..a06fbd7977b5f
--- /dev/null
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/EntryBasedLogPruningStrategy.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.coreedge.raft.log.segmented;
+
+import java.util.ListIterator;
+
+import org.neo4j.logging.Log;
+import org.neo4j.logging.LogProvider;
+
+public class EntryBasedLogPruningStrategy implements CoreLogPruningStrategy
+{
+ private final long entriesToKeep;
+ private final Log log;
+
+ public EntryBasedLogPruningStrategy( long entriesToKeep, LogProvider logProvider )
+ {
+ this.entriesToKeep = entriesToKeep;
+ this.log = logProvider.getLog( getClass() );
+ }
+
+ @Override
+ public long getIndexToKeep( Segments segments )
+ {
+ ListIterator iterator = segments.getSegmentFileIteratorAtEnd();
+ SegmentFile segmentFile = null;
+ long nextPrevIndex = 0;
+ long accumulated = 0;
+ if ( !iterator.hasPrevious() )
+ {
+ log.warn( "No log files found during the prune operation. This state should resolve on its own, but" +
+ " if this warning continues, you may want to look for other errors in the user log." );
+ return -1; // -1 is the lowest possible append index and so always safe to return.
+ }
+ segmentFile = iterator.previous();
+ nextPrevIndex = segmentFile.header().prevIndex();
+ long prevIndex;
+ // Iterate backwards through the files, counting entries from the headers until the limit is reached.
+ while ( accumulated < entriesToKeep && iterator.hasPrevious() )
+ {
+ segmentFile = iterator.previous();
+ prevIndex = segmentFile.header().prevIndex();
+ accumulated += (nextPrevIndex - prevIndex);
+ nextPrevIndex = prevIndex;
+ }
+ return segmentFile.header().prevIndex();
+ }
+}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentFile.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentFile.java
index e52653d90d7c5..6dd42ec318477 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentFile.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentFile.java
@@ -19,8 +19,6 @@
*/
package org.neo4j.coreedge.raft.log.segmented;
-import static org.neo4j.coreedge.raft.log.EntryRecord.read;
-
import java.io.File;
import java.io.IOException;
@@ -40,6 +38,8 @@
import org.neo4j.storageengine.api.ReadPastEndException;
import org.neo4j.storageengine.api.WritableChannel;
+import static org.neo4j.coreedge.raft.log.EntryRecord.read;
+
/**
* Keeps track of a segment of the RAFT log, i.e. a consecutive set of entries.
* A segment can have several concurrent readers but just a single writer.
@@ -257,4 +257,9 @@ public SegmentHeader header()
{
return header;
}
+
+ public long size()
+ {
+ return fileSystem.getFileSize( file );
+ }
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLog.java
index 707baa5f8cc92..6ea1037cf4ada 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLog.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLog.java
@@ -19,8 +19,6 @@
*/
package org.neo4j.coreedge.raft.log.segmented;
-import static java.lang.String.format;
-
import java.io.File;
import java.io.IOException;
@@ -39,18 +37,20 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
+import static java.lang.String.format;
+
/**
* The segmented RAFT log is an append only log supporting the operations required to support
* the RAFT consensus algorithm.
- *
+ *
* A RAFT log must be able to append new entries, but also truncate not yet committed entries,
* prune out old compacted entries and skip to a later starting point.
- *
+ *
* The RAFT log consists of a sequence of individual log files, called segments, with
* the following format:
- *
+ *
* [HEADER] [ENTRY]*
- *
+ *
* So a header with zero or more entries following it. Each segment file contains a consecutive
* sequence of appended entries. The operations of truncating and skipping in the log is implemented
* by switching to the next segment file, called the next version. A new segment file is also started
@@ -70,17 +70,14 @@ public class SegmentedRaftLog extends LifecycleAdapter implements RaftLog
private final LogProvider logProvider;
private final LruCache entryCache; // TODO: replace with ring buffer, limit based on size
private EntryStore entryStore;
+ private final SegmentedRaftLogPruner segmentedRaftLogPruner;
// private TermCache termCache;
private State state;
- public SegmentedRaftLog(
- FileSystemAbstraction fileSystem,
- File directory,
- long rotateAtSize,
- ChannelMarshal contentMarshal,
- LogProvider logProvider,
- int entryCacheSize )
+ public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long rotateAtSize,
+ ChannelMarshal contentMarshal, LogProvider logProvider, int entryCacheSize,
+ String pruningStrategyConfig )
{
this.fileSystem = fileSystem;
this.directory = directory;
@@ -91,6 +88,7 @@ public SegmentedRaftLog(
this.log = logProvider.getLog( getClass() );
this.logProvider = logProvider;
this.entryCache = entryCacheSize >= 1 ? new LruCache<>( "raft-log-entry-cache", entryCacheSize ) : null;
+ this.segmentedRaftLogPruner = new SegmentedRaftLogPruner( pruningStrategyConfig, logProvider);
}
@Override
@@ -157,8 +155,9 @@ private void updateTerm( RaftLogEntry entry )
}
else
{
- throw new IllegalStateException( format( "Non-monotonic term %d for entry %s in term %d",
- entry.term(), entry.toString(), state.currentTerm ) );
+ throw new IllegalStateException(
+ format( "Non-monotonic term %d for entry %s in term %d", entry.term(), entry.toString(),
+ state.currentTerm ) );
}
}
@@ -275,7 +274,8 @@ else if ( logIndex < state.prevIndex || logIndex > state.appendIndex )
@Override
public long prune( long safeIndex ) throws IOException
{
- SegmentFile oldestNotDisposed = state.segments.prune( safeIndex );
+ long pruneIndex = segmentedRaftLogPruner.getIndexToPruneFrom( safeIndex, state.segments );
+ SegmentFile oldestNotDisposed = state.segments.prune( pruneIndex );
state.prevIndex = oldestNotDisposed.header().prevIndex();
state.prevTerm = oldestNotDisposed.header().prevTerm();
return state.prevIndex;
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLogPruner.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLogPruner.java
new file mode 100644
index 0000000000000..5beb52bd277ca
--- /dev/null
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SegmentedRaftLogPruner.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.coreedge.raft.log.segmented;
+
+
+import org.neo4j.logging.LogProvider;
+
+import static org.neo4j.kernel.impl.transaction.log.pruning.ThresholdConfigParser.ThresholdConfigValue;
+import static org.neo4j.kernel.impl.transaction.log.pruning.ThresholdConfigParser.parse;
+
+public class SegmentedRaftLogPruner
+{
+ private final ThresholdConfigValue parsedConfigOption;
+ private final CoreLogPruningStrategy pruneStrategy;
+
+ public SegmentedRaftLogPruner( String pruningStrategyConfig, LogProvider logProvider )
+ {
+ parsedConfigOption = parse( pruningStrategyConfig );
+ pruneStrategy = getPruneStrategy( parsedConfigOption, logProvider );
+ }
+
+ private CoreLogPruningStrategy getPruneStrategy( ThresholdConfigValue configValue, LogProvider logProvider )
+ {
+ switch ( configValue.type )
+ {
+ case "size":
+ return new SizeBasedLogPruningStrategy( parsedConfigOption.value );
+ case "txs":
+ case "entries": // txs and entries are synonyms
+ return new EntryBasedLogPruningStrategy( parsedConfigOption.value, logProvider );
+ case "hours":
+ return new SizeBasedLogPruningStrategy( parsedConfigOption.value ); //NOT FINISHED
+ case "days":
+ return new SizeBasedLogPruningStrategy( parsedConfigOption.value ); //NOT FINISHED
+ default:
+ throw new IllegalArgumentException( "Invalid log pruning configuration value '" + configValue.value +
+ "'. Invalid type '" + configValue.type + "', valid are files, size, txs, entries, hours, days." );
+ }
+ }
+
+ public long getIndexToPruneFrom( long safeIndex, Segments segments )
+ {
+ return Math.min( safeIndex, pruneStrategy.getIndexToKeep( segments ) );
+ }
+}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/Segments.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/Segments.java
index 53a278591f8a7..2c299a315f686 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/Segments.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/Segments.java
@@ -19,14 +19,13 @@
*/
package org.neo4j.coreedge.raft.log.segmented;
-import static java.lang.String.format;
-
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import org.neo4j.coreedge.raft.log.segmented.OpenEndRangeMap.ValueRange;
@@ -37,6 +36,8 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
+import static java.lang.String.format;
+
/**
* Keeps track of all the segments that the RAFT log consists of.
*/
@@ -167,10 +168,10 @@ SegmentFile last()
return rangeMap.last();
}
- public synchronized SegmentFile prune( long safeIndex )
+ public synchronized SegmentFile prune( long pruneIndex )
{
Collection forDisposal = new ArrayList<>();
- SegmentFile oldestNotDisposed = collectSegmentsForDisposal( safeIndex, forDisposal );
+ SegmentFile oldestNotDisposed = collectSegmentsForDisposal( pruneIndex, forDisposal );
for ( SegmentFile segment : forDisposal )
{
@@ -186,7 +187,7 @@ public synchronized SegmentFile prune( long safeIndex )
return oldestNotDisposed;
}
- private SegmentFile collectSegmentsForDisposal( long safeIndex, Collection forDisposal )
+ private SegmentFile collectSegmentsForDisposal( long pruneIndex, Collection forDisposal )
{
Iterator itr = segmentFiles.iterator();
SegmentFile prev = itr.next(); // there is always at least one segment
@@ -195,7 +196,7 @@ private SegmentFile collectSegmentsForDisposal( long safeIndex, Collection getSegmentFileIteratorAtEnd()
+ {
+ return segmentFiles.listIterator( segmentFiles.size() );
+ }
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SizeBasedLogPruningStrategy.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SizeBasedLogPruningStrategy.java
new file mode 100644
index 0000000000000..fb93ee4bca62a
--- /dev/null
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/segmented/SizeBasedLogPruningStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.coreedge.raft.log.segmented;
+
+import java.util.ListIterator;
+
+public class SizeBasedLogPruningStrategy implements CoreLogPruningStrategy
+{
+ private final long bytesToKeep;
+
+ public SizeBasedLogPruningStrategy( long bytesToKeep )
+ {
+ this.bytesToKeep = bytesToKeep;
+ }
+
+ public long getIndexToKeep( Segments segments )
+ {
+ long accumulatedSize = 0;
+ ListIterator iterator = segments.getSegmentFileIteratorAtEnd();
+ SegmentFile segmentFile = null;
+ while ( accumulatedSize < bytesToKeep && iterator.hasPrevious() )
+ {
+ segmentFile = iterator.previous();
+ accumulatedSize += sizeOf( segmentFile );
+ }
+
+ return segmentFile != null ? segmentFile.header().prevIndex() : -1;
+ }
+
+ private long sizeOf( SegmentFile value )
+ {
+ return value.size();
+ }
+}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java
index 286d67fbcf98f..74886c34736e7 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java
@@ -526,6 +526,8 @@ private RaftLog createRaftLog(
case SEGMENTED:
{
long rotateAtSize = config.get( CoreEdgeClusterSettings.raft_log_rotation_size );
+ int metaDataCacheSize = config.get( CoreEdgeClusterSettings.raft_log_meta_data_cache_size );
+ String pruningStrategyConfig = config.get( CoreEdgeClusterSettings.raft_log_pruning );
int entryCacheSize = config.get( CoreEdgeClusterSettings.raft_log_entry_cache_size );
return life.add( new SegmentedRaftLog(
@@ -534,7 +536,8 @@ private RaftLog createRaftLog(
rotateAtSize,
marshal,
logProvider,
- entryCacheSize ) );
+ entryCacheSize,
+ pruningStrategyConfig) );
}
case NAIVE:
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java
index 25d2f89a0a504..204cbfabc39d8 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java
@@ -49,6 +49,7 @@
import static org.neo4j.coreedge.raft.ReplicatedInteger.valueOf;
import static org.neo4j.coreedge.raft.log.RaftLogHelper.hasNoContent;
import static org.neo4j.coreedge.raft.log.RaftLogHelper.readLogEntry;
+import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_log_pruning;
import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.RaftLogImplementation.NAIVE;
import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.RaftLogImplementation.PHYSICAL;
import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.RaftLogImplementation.SEGMENTED;
@@ -103,7 +104,7 @@ public static Collection