Skip to content

Commit

Permalink
Introduced CoreLogPruningStrategy interface.
Browse files Browse the repository at this point in the history
Introduced an interface for pruning strategies and
two implementations of this interface: Size based and Entry based.
  • Loading branch information
Max Sumrall committed May 17, 2016
1 parent 1686964 commit 0dd4b85
Show file tree
Hide file tree
Showing 16 changed files with 516 additions and 38 deletions.
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.log.segmented;

public interface CoreLogPruningStrategy
{
/**
* Returns the index to keep depending on the configuration strategy.
* This does not factor in the value of the safe index to prune to.
*
* It is worth noting that the returned value may be the first available value,
* rather than the first possible value. This signifies that no pruning is needed.
* @param segments
* @return The lowest index the pruning configuration allows to keep. It is a value in the same range as
* append indexes, starting from -1 all the way to {@link Long#MAX_VALUE}.
*/
long getIndexToKeep( Segments segments );
}
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.log.segmented;

import java.util.ListIterator;

import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class EntryBasedLogPruningStrategy implements CoreLogPruningStrategy
{
private final long entriesToKeep;
private final Log log;

public EntryBasedLogPruningStrategy( long entriesToKeep, LogProvider logProvider )
{
this.entriesToKeep = entriesToKeep;
this.log = logProvider.getLog( getClass() );
}

@Override
public long getIndexToKeep( Segments segments )
{
ListIterator<SegmentFile> iterator = segments.getSegmentFileIteratorAtEnd();
SegmentFile segmentFile = null;
long nextPrevIndex = 0;
long accumulated = 0;
if ( !iterator.hasPrevious() )
{
log.warn( "No log files found during the prune operation. This state should resolve on its own, but" +
" if this warning continues, you may want to look for other errors in the user log." );
return -1; // -1 is the lowest possible append index and so always safe to return.
}
segmentFile = iterator.previous();
nextPrevIndex = segmentFile.header().prevIndex();
long prevIndex;
// Iterate backwards through the files, counting entries from the headers until the limit is reached.
while ( accumulated < entriesToKeep && iterator.hasPrevious() )
{
segmentFile = iterator.previous();
prevIndex = segmentFile.header().prevIndex();
accumulated += (nextPrevIndex - prevIndex);
nextPrevIndex = prevIndex;
}
return segmentFile.header().prevIndex();
}
}
Expand Up @@ -19,8 +19,6 @@
*/
package org.neo4j.coreedge.raft.log.segmented;

import static org.neo4j.coreedge.raft.log.EntryRecord.read;

import java.io.File;
import java.io.IOException;

Expand All @@ -40,6 +38,8 @@
import org.neo4j.storageengine.api.ReadPastEndException;
import org.neo4j.storageengine.api.WritableChannel;

import static org.neo4j.coreedge.raft.log.EntryRecord.read;

/**
* Keeps track of a segment of the RAFT log, i.e. a consecutive set of entries.
* A segment can have several concurrent readers but just a single writer.
Expand Down Expand Up @@ -257,4 +257,9 @@ public SegmentHeader header()
{
return header;
}

public long size()
{
return fileSystem.getFileSize( file );
}
}
Expand Up @@ -19,8 +19,6 @@
*/
package org.neo4j.coreedge.raft.log.segmented;

import static java.lang.String.format;

import java.io.File;
import java.io.IOException;

Expand All @@ -39,18 +37,20 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;

/**
* The segmented RAFT log is an append only log supporting the operations required to support
* the RAFT consensus algorithm.
*
* <p>
* A RAFT log must be able to append new entries, but also truncate not yet committed entries,
* prune out old compacted entries and skip to a later starting point.
*
* <p>
* The RAFT log consists of a sequence of individual log files, called segments, with
* the following format:
*
* <p>
* [HEADER] [ENTRY]*
*
* <p>
* So a header with zero or more entries following it. Each segment file contains a consecutive
* sequence of appended entries. The operations of truncating and skipping in the log is implemented
* by switching to the next segment file, called the next version. A new segment file is also started
Expand All @@ -70,17 +70,14 @@ public class SegmentedRaftLog extends LifecycleAdapter implements RaftLog
private final LogProvider logProvider;
private final LruCache<Long,RaftLogEntry> entryCache; // TODO: replace with ring buffer, limit based on size
private EntryStore entryStore;
private final SegmentedRaftLogPruner segmentedRaftLogPruner;
// private TermCache termCache;

private State state;

public SegmentedRaftLog(
FileSystemAbstraction fileSystem,
File directory,
long rotateAtSize,
ChannelMarshal<ReplicatedContent> contentMarshal,
LogProvider logProvider,
int entryCacheSize )
public SegmentedRaftLog( FileSystemAbstraction fileSystem, File directory, long rotateAtSize,
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, int entryCacheSize,
String pruningStrategyConfig )
{
this.fileSystem = fileSystem;
this.directory = directory;
Expand All @@ -91,6 +88,7 @@ public SegmentedRaftLog(
this.log = logProvider.getLog( getClass() );
this.logProvider = logProvider;
this.entryCache = entryCacheSize >= 1 ? new LruCache<>( "raft-log-entry-cache", entryCacheSize ) : null;
this.segmentedRaftLogPruner = new SegmentedRaftLogPruner( pruningStrategyConfig, logProvider);
}

@Override
Expand Down Expand Up @@ -157,8 +155,9 @@ private void updateTerm( RaftLogEntry entry )
}
else
{
throw new IllegalStateException( format( "Non-monotonic term %d for entry %s in term %d",
entry.term(), entry.toString(), state.currentTerm ) );
throw new IllegalStateException(
format( "Non-monotonic term %d for entry %s in term %d", entry.term(), entry.toString(),
state.currentTerm ) );
}
}

Expand Down Expand Up @@ -275,7 +274,8 @@ else if ( logIndex < state.prevIndex || logIndex > state.appendIndex )
@Override
public long prune( long safeIndex ) throws IOException
{
SegmentFile oldestNotDisposed = state.segments.prune( safeIndex );
long pruneIndex = segmentedRaftLogPruner.getIndexToPruneFrom( safeIndex, state.segments );
SegmentFile oldestNotDisposed = state.segments.prune( pruneIndex );
state.prevIndex = oldestNotDisposed.header().prevIndex();
state.prevTerm = oldestNotDisposed.header().prevTerm();
return state.prevIndex;
Expand Down
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.log.segmented;


import org.neo4j.logging.LogProvider;

import static org.neo4j.kernel.impl.transaction.log.pruning.ThresholdConfigParser.ThresholdConfigValue;
import static org.neo4j.kernel.impl.transaction.log.pruning.ThresholdConfigParser.parse;

public class SegmentedRaftLogPruner
{
private final ThresholdConfigValue parsedConfigOption;
private final CoreLogPruningStrategy pruneStrategy;

public SegmentedRaftLogPruner( String pruningStrategyConfig, LogProvider logProvider )
{
parsedConfigOption = parse( pruningStrategyConfig );
pruneStrategy = getPruneStrategy( parsedConfigOption, logProvider );
}

private CoreLogPruningStrategy getPruneStrategy( ThresholdConfigValue configValue, LogProvider logProvider )
{
switch ( configValue.type )
{
case "size":
return new SizeBasedLogPruningStrategy( parsedConfigOption.value );
case "txs":
case "entries": // txs and entries are synonyms
return new EntryBasedLogPruningStrategy( parsedConfigOption.value, logProvider );
case "hours":
return new SizeBasedLogPruningStrategy( parsedConfigOption.value ); //NOT FINISHED
case "days":
return new SizeBasedLogPruningStrategy( parsedConfigOption.value ); //NOT FINISHED
default:
throw new IllegalArgumentException( "Invalid log pruning configuration value '" + configValue.value +
"'. Invalid type '" + configValue.type + "', valid are files, size, txs, entries, hours, days." );
}
}

public long getIndexToPruneFrom( long safeIndex, Segments segments )
{
return Math.min( safeIndex, pruneStrategy.getIndexToKeep( segments ) );
}
}
Expand Up @@ -19,14 +19,13 @@
*/
package org.neo4j.coreedge.raft.log.segmented;

import static java.lang.String.format;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;

import org.neo4j.coreedge.raft.log.segmented.OpenEndRangeMap.ValueRange;
Expand All @@ -37,6 +36,8 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;

/**
* Keeps track of all the segments that the RAFT log consists of.
*/
Expand Down Expand Up @@ -167,10 +168,10 @@ SegmentFile last()
return rangeMap.last();
}

public synchronized SegmentFile prune( long safeIndex )
public synchronized SegmentFile prune( long pruneIndex )
{
Collection<SegmentFile> forDisposal = new ArrayList<>();
SegmentFile oldestNotDisposed = collectSegmentsForDisposal( safeIndex, forDisposal );
SegmentFile oldestNotDisposed = collectSegmentsForDisposal( pruneIndex, forDisposal );

for ( SegmentFile segment : forDisposal )
{
Expand All @@ -186,7 +187,7 @@ public synchronized SegmentFile prune( long safeIndex )
return oldestNotDisposed;
}

private SegmentFile collectSegmentsForDisposal( long safeIndex, Collection<SegmentFile> forDisposal )
private SegmentFile collectSegmentsForDisposal( long pruneIndex, Collection<SegmentFile> forDisposal )
{
Iterator<SegmentFile> itr = segmentFiles.iterator();
SegmentFile prev = itr.next(); // there is always at least one segment
Expand All @@ -195,7 +196,7 @@ private SegmentFile collectSegmentsForDisposal( long safeIndex, Collection<Segme
while ( itr.hasNext() )
{
SegmentFile segment = itr.next();
if ( segment.header().prevFileLastIndex() <= safeIndex )
if ( segment.header().prevFileLastIndex() <= pruneIndex )
{
forDisposal.add( prev );
oldestNotDisposed = segment;
Expand Down Expand Up @@ -235,4 +236,9 @@ private synchronized void truncateHandler()
filesItr.remove();
}
}

public ListIterator<SegmentFile> getSegmentFileIteratorAtEnd()
{
return segmentFiles.listIterator( segmentFiles.size() );
}
}
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.log.segmented;

import java.util.ListIterator;

public class SizeBasedLogPruningStrategy implements CoreLogPruningStrategy
{
private final long bytesToKeep;

public SizeBasedLogPruningStrategy( long bytesToKeep )
{
this.bytesToKeep = bytesToKeep;
}

public long getIndexToKeep( Segments segments )
{
long accumulatedSize = 0;
ListIterator<SegmentFile> iterator = segments.getSegmentFileIteratorAtEnd();
SegmentFile segmentFile = null;
while ( accumulatedSize < bytesToKeep && iterator.hasPrevious() )
{
segmentFile = iterator.previous();
accumulatedSize += sizeOf( segmentFile );
}

return segmentFile != null ? segmentFile.header().prevIndex() : -1;
}

private long sizeOf( SegmentFile value )
{
return value.size();
}
}

0 comments on commit 0dd4b85

Please sign in to comment.