Skip to content

Commit

Permalink
Adding test for LogShipper / Pruning race condition fix
Browse files Browse the repository at this point in the history
If we were catching up a follower and the log was pruned away
while we were in the process of doing that we could end up with an
exception when trying to read from the Raft Log.

This commit adds a test that simulates that scenario.
  • Loading branch information
Mark Needham committed Jun 15, 2016
1 parent c037729 commit e78d119
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 58 deletions.
Expand Up @@ -50,6 +50,7 @@
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.NotMyselfSelectionStrategy;
import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy;
import org.neo4j.kernel.impl.store.MismatchingStoreIdException;
import org.neo4j.kernel.impl.store.kvstore.Rotation;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.internal.DatabaseHealth;
Expand Down Expand Up @@ -322,7 +323,7 @@ public boolean validate( RaftMessages.RaftMessage<MEMBER> incomingMessage )

return processable;
}
catch ( Throwable e )
catch ( MismatchingStoreIdException e )
{
panicAndStop( incomingMessage, e );
throw e;
Expand All @@ -331,7 +332,6 @@ public boolean validate( RaftMessages.RaftMessage<MEMBER> incomingMessage )

private void panicAndStop( RaftMessages.RaftMessage<MEMBER> incomingMessage, Throwable e )
{
// TODO: perhaps try to recover from some errors, like IllegalArgumentExceptions from the log
log.error( "Failed to process Raft message " + incomingMessage, e );
databaseHealthSupplier.get().panic( e );
electionTimer.cancel();
Expand Down
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.log;

import java.io.IOException;

public class DelegatingRaftLog implements RaftLog
{
private final RaftLog inner;

public DelegatingRaftLog( RaftLog inner )
{
this.inner = inner;
}

@Override
public long append( RaftLogEntry... entry ) throws IOException
{
return inner.append( entry );
}

@Override
public void truncate( long fromIndex ) throws IOException
{
inner.truncate( fromIndex );
}

@Override
public long prune( long safeIndex ) throws IOException
{
return inner.prune( safeIndex );
}

@Override
public long skip( long index, long term ) throws IOException
{
return inner.skip( index, term );
}

@Override
public long appendIndex()
{
return inner.appendIndex();
}

@Override
public long prevIndex()
{
return inner.prevIndex();
}

@Override
public long readEntryTerm( long logIndex ) throws IOException
{
return inner.readEntryTerm( logIndex );
}

@Override
public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException
{
return inner.getEntryCursor( fromIndex );
}
}
Expand Up @@ -24,65 +24,28 @@
import org.neo4j.coreedge.raft.log.monitoring.RaftLogAppendIndexMonitor;
import org.neo4j.kernel.monitoring.Monitors;

public class MonitoredRaftLog implements RaftLog
public class MonitoredRaftLog extends DelegatingRaftLog
{
private final RaftLog delegate;
private final RaftLogAppendIndexMonitor appendIndexMonitor;

public MonitoredRaftLog( RaftLog delegate, Monitors monitors )
{
this.delegate = delegate;
super( delegate );
this.appendIndexMonitor = monitors.newMonitor( RaftLogAppendIndexMonitor.class, getClass() );
}

@Override
public long append( RaftLogEntry... entries ) throws IOException
{
long appendIndex = delegate.append( entries );
long appendIndex = super.append( entries );
appendIndexMonitor.appendIndex( appendIndex );
return appendIndex;
}

@Override
public void truncate( long fromIndex ) throws IOException
{
delegate.truncate( fromIndex );
appendIndexMonitor.appendIndex( delegate.appendIndex() );
}

@Override
public long prune( long safeIndex ) throws IOException
{
return delegate.prune( safeIndex );
}

@Override
public long appendIndex()
{
return delegate.appendIndex();
}

@Override
public long prevIndex()
{
return delegate.prevIndex();
}

@Override
public long readEntryTerm( long logIndex ) throws IOException
{
return delegate.readEntryTerm( logIndex );
}

@Override
public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException
{
return delegate.getEntryCursor( fromIndex );
}

@Override
public long skip( long index, long term ) throws IOException
{
return delegate.skip( index, term );
super.truncate( fromIndex );
appendIndexMonitor.appendIndex( super.appendIndex() );
}
}
Expand Up @@ -77,7 +77,7 @@ public boolean hasEntriesTo( RaftTestMember member, RaftLogEntry... expectedMess

for ( Message message : sentTo( member ) )
{
if( message instanceof RaftMessages.AppendEntries.Request )
if ( message instanceof RaftMessages.AppendEntries.Request )
{
for ( RaftLogEntry actualEntry : ((RaftMessages.AppendEntries.Request) message).entries() )
{
Expand Down

0 comments on commit e78d119

Please sign in to comment.