Skip to content

Commit

Permalink
Raft log pruning and skipping.
Browse files Browse the repository at this point in the history
Introduces pruning and skipping on the raft log interface and
implements it for all available raft logs.

Pruning is used for deleting a prefix of the log (due to local compaction).
Skipping is used for skipping a suffix of the log (due to remote compaction).

The physical raft log now puts prevLogIndex and prevLogTerm in the continuation
record to track all types of jumps (truncation, skipping) so that the history
for log matching always is available from that point.

An actual pruner that continually prunes according to some configured strategy
is not in place at this point.
  • Loading branch information
martinfurmanski committed Mar 14, 2016
1 parent 6e9c577 commit 165b405
Show file tree
Hide file tree
Showing 67 changed files with 1,470 additions and 573 deletions.
58 changes: 58 additions & 0 deletions community/kernel/src/main/java/org/neo4j/helpers/Reference.java
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.helpers;

public class Reference<T>
{
private T t;

public Reference( T initial )
{
this.t = initial;
}

public void set( T t )
{
this.t = t;
}

public T get()
{
return t;
}

@Override
public String toString()
{
return t.toString();
}

@Override
public boolean equals( Object obj )
{
return t.equals( obj );
}

@Override
public int hashCode()
{
return t.hashCode();
}
}
Expand Up @@ -21,5 +21,9 @@


public interface LogHeaderVisitor public interface LogHeaderVisitor
{ {
/***
* Used for visiting log headers in reverse order of age, meaning latest first.
* Stops visiting when false is returned.
*/
boolean visit( LogPosition position, long firstTransactionIdInLog, long lastTransactionIdInLog ); boolean visit( LogPosition position, long firstTransactionIdInLog, long lastTransactionIdInLog );
} }
Expand Up @@ -29,7 +29,7 @@
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel; import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader; import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.Lifecycle;


import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE; import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderReader.readLogHeader; import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderReader.readLogHeader;
Expand All @@ -39,7 +39,7 @@
/** /**
* {@link LogFile} backed by one or more files in a {@link FileSystemAbstraction}. * {@link LogFile} backed by one or more files in a {@link FileSystemAbstraction}.
*/ */
public class PhysicalLogFile extends LifecycleAdapter implements LogFile public class PhysicalLogFile implements LogFile, Lifecycle
{ {
public interface Monitor public interface Monitor
{ {
Expand Down Expand Up @@ -87,7 +87,7 @@ public PhysicalLogFile( FileSystemAbstraction fileSystem, PhysicalLogFiles logFi
} }


@Override @Override
public void init() throws Throwable public void init() throws IOException
{ {
// Make sure at least a bare bones log file is available before recovery // Make sure at least a bare bones log file is available before recovery
long lastLogVersionUsed = logVersionRepository.getCurrentLogVersion(); long lastLogVersionUsed = logVersionRepository.getCurrentLogVersion();
Expand All @@ -96,7 +96,7 @@ public void init() throws Throwable
} }


@Override @Override
public void start() throws Throwable public void start() throws IOException
{ {
// Recovery has taken place before this, so the log file has been truncated to last known good tx // Recovery has taken place before this, so the log file has been truncated to last known good tx
// Just read header and move to the end // Just read header and move to the end
Expand All @@ -109,11 +109,17 @@ public void start() throws Throwable
writer = new PositionAwarePhysicalFlushableChannel( channel ); writer = new PositionAwarePhysicalFlushableChannel( channel );
} }


@Override
public void stop()
{
// nothing to stop
}

// In order to be able to write into a logfile after life.stop during shutdown sequence // In order to be able to write into a logfile after life.stop during shutdown sequence
// we will close channel and writer only during shutdown phase when all pending changes (like last // we will close channel and writer only during shutdown phase when all pending changes (like last
// checkpoint) are already in // checkpoint) are already in
@Override @Override
public void shutdown() throws Throwable public void shutdown() throws IOException
{ {
if ( writer != null ) if ( writer != null )
{ {
Expand Down
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cursor;

public class CursorValue<T>
{
private boolean valid = false;
private T t;

public void set( T newT )
{
t = newT;
valid = true;
}

public T get()
{
if( valid )
{
return t;
}
throw new IllegalStateException();
}

public void invalidate()
{
valid = false;
t = null;
}
}
Expand Up @@ -19,6 +19,7 @@
*/ */
package org.neo4j.cursor; package org.neo4j.cursor;


import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;


/** /**
Expand All @@ -32,7 +33,7 @@
* {@link #next()} has been done, or if it returned false, then such accessor methods throw {@link * {@link #next()} has been done, or if it returned false, then such accessor methods throw {@link
* IllegalStateException}. * IllegalStateException}.
*/ */
public interface RawCursor<T,EXCEPTION extends Exception> extends Supplier<T>, AutoCloseable public interface RawCursor<T, EXCEPTION extends Exception> extends Supplier<T>, AutoCloseable
{ {
/** /**
* Move the cursor to the next row. * Move the cursor to the next row.
Expand All @@ -46,4 +47,19 @@ public interface RawCursor<T,EXCEPTION extends Exception> extends Supplier<T>, A
*/ */
@Override @Override
void close() throws EXCEPTION; void close() throws EXCEPTION;

default void forAll( Consumer<T> consumer ) throws EXCEPTION
{
try
{
while ( next() )
{
consumer.accept( get() );
}
}
finally
{
close();
}
}
} }
Expand Up @@ -31,6 +31,7 @@
import org.neo4j.coreedge.helper.VolatileFuture; import org.neo4j.coreedge.helper.VolatileFuture;
import org.neo4j.coreedge.network.Message; import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.membership.RaftGroup; import org.neo4j.coreedge.raft.membership.RaftGroup;
import org.neo4j.coreedge.raft.membership.RaftMembershipManager; import org.neo4j.coreedge.raft.membership.RaftMembershipManager;
Expand Down Expand Up @@ -164,7 +165,7 @@ Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), timeout -> {
* *
* @param memberSet The other members. * @param memberSet The other members.
*/ */
public synchronized void bootstrapWithInitialMembers( RaftGroup<MEMBER> memberSet ) throws BootstrapException public synchronized void bootstrapWithInitialMembers( RaftGroup<MEMBER> memberSet ) throws BootstrapException, RaftLogCompactedException
{ {
if ( entryLog.appendIndex() >= 0 ) if ( entryLog.appendIndex() >= 0 )
{ {
Expand Down
Expand Up @@ -21,12 +21,13 @@


import java.io.IOException; import java.io.IOException;


import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState; import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;


public interface RaftMessageHandler public interface RaftMessageHandler
{ {
<MEMBER> Outcome<MEMBER> handle( RaftMessages.RaftMessage<MEMBER> message, ReadableRaftState<MEMBER> context, Log log ) <MEMBER> Outcome<MEMBER> handle( RaftMessages.RaftMessage<MEMBER> message, ReadableRaftState<MEMBER> context, Log log )
throws IOException; throws IOException, RaftLogCompactedException;
} }

0 comments on commit 165b405

Please sign in to comment.