From e78d1197173c1a5a54c9c0639ac65659381f6ce3 Mon Sep 17 00:00:00 2001 From: Mark Needham Date: Wed, 15 Jun 2016 12:19:56 +0100 Subject: [PATCH] Adding test for LogShipper / Pruning race condition fix If we were catching up a follower and the log was pruned away while we were in the process of doing that we could end up with an exception when trying to read from the Raft Log. This commit adds a test that simulates that scenario. --- .../org/neo4j/coreedge/raft/RaftInstance.java | 4 +- .../coreedge/raft/log/DelegatingRaftLog.java | 80 +++++++++++++ .../coreedge/raft/log/MonitoredRaftLog.java | 47 +------- .../raft/OutboundMessageCollector.java | 2 +- .../shipping/RaftLogShipperTest.java | 106 +++++++++++++++--- .../org/neo4j/test/matchers/Matchers.java | 83 ++++++++++++++ 6 files changed, 264 insertions(+), 58 deletions(-) create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/DelegatingRaftLog.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/test/matchers/Matchers.java diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java index 55d995a4af5d..250a48ff14d8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java @@ -50,6 +50,7 @@ import org.neo4j.coreedge.server.CoreMember; import org.neo4j.coreedge.server.core.NotMyselfSelectionStrategy; import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy; +import org.neo4j.kernel.impl.store.MismatchingStoreIdException; import org.neo4j.kernel.impl.store.kvstore.Rotation; import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.internal.DatabaseHealth; @@ -322,7 +323,7 @@ public boolean validate( RaftMessages.RaftMessage incomingMessage ) return processable; } - catch ( Throwable e ) + catch ( MismatchingStoreIdException e ) { panicAndStop( incomingMessage, e ); throw e; @@ -331,7 +332,6 @@ public boolean validate( RaftMessages.RaftMessage incomingMessage ) private void panicAndStop( RaftMessages.RaftMessage incomingMessage, Throwable e ) { - // TODO: perhaps try to recover from some errors, like IllegalArgumentExceptions from the log log.error( "Failed to process Raft message " + incomingMessage, e ); databaseHealthSupplier.get().panic( e ); electionTimer.cancel(); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/DelegatingRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/DelegatingRaftLog.java new file mode 100644 index 000000000000..b38162f0df10 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/DelegatingRaftLog.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import java.io.IOException; + +public class DelegatingRaftLog implements RaftLog +{ + private final RaftLog inner; + + public DelegatingRaftLog( RaftLog inner ) + { + this.inner = inner; + } + + @Override + public long append( RaftLogEntry... entry ) throws IOException + { + return inner.append( entry ); + } + + @Override + public void truncate( long fromIndex ) throws IOException + { + inner.truncate( fromIndex ); + } + + @Override + public long prune( long safeIndex ) throws IOException + { + return inner.prune( safeIndex ); + } + + @Override + public long skip( long index, long term ) throws IOException + { + return inner.skip( index, term ); + } + + @Override + public long appendIndex() + { + return inner.appendIndex(); + } + + @Override + public long prevIndex() + { + return inner.prevIndex(); + } + + @Override + public long readEntryTerm( long logIndex ) throws IOException + { + return inner.readEntryTerm( logIndex ); + } + + @Override + public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException + { + return inner.getEntryCursor( fromIndex ); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java index 909060b056b9..0d3bbe237167 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/MonitoredRaftLog.java @@ -24,21 +24,20 @@ import org.neo4j.coreedge.raft.log.monitoring.RaftLogAppendIndexMonitor; import org.neo4j.kernel.monitoring.Monitors; -public class MonitoredRaftLog implements RaftLog +public class MonitoredRaftLog extends DelegatingRaftLog { - private final RaftLog delegate; private final RaftLogAppendIndexMonitor appendIndexMonitor; public MonitoredRaftLog( RaftLog delegate, Monitors monitors ) { - this.delegate = delegate; + super( delegate ); this.appendIndexMonitor = monitors.newMonitor( RaftLogAppendIndexMonitor.class, getClass() ); } @Override public long append( RaftLogEntry... entries ) throws IOException { - long appendIndex = delegate.append( entries ); + long appendIndex = super.append( entries ); appendIndexMonitor.appendIndex( appendIndex ); return appendIndex; } @@ -46,43 +45,7 @@ public long append( RaftLogEntry... entries ) throws IOException @Override public void truncate( long fromIndex ) throws IOException { - delegate.truncate( fromIndex ); - appendIndexMonitor.appendIndex( delegate.appendIndex() ); - } - - @Override - public long prune( long safeIndex ) throws IOException - { - return delegate.prune( safeIndex ); - } - - @Override - public long appendIndex() - { - return delegate.appendIndex(); - } - - @Override - public long prevIndex() - { - return delegate.prevIndex(); - } - - @Override - public long readEntryTerm( long logIndex ) throws IOException - { - return delegate.readEntryTerm( logIndex ); - } - - @Override - public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException - { - return delegate.getEntryCursor( fromIndex ); - } - - @Override - public long skip( long index, long term ) throws IOException - { - return delegate.skip( index, term ); + super.truncate( fromIndex ); + appendIndexMonitor.appendIndex( super.appendIndex() ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/OutboundMessageCollector.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/OutboundMessageCollector.java index 4e3496be7b9d..858c41dc4d1a 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/OutboundMessageCollector.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/OutboundMessageCollector.java @@ -77,7 +77,7 @@ public boolean hasEntriesTo( RaftTestMember member, RaftLogEntry... expectedMess for ( Message message : sentTo( member ) ) { - if( message instanceof RaftMessages.AppendEntries.Request ) + if ( message instanceof RaftMessages.AppendEntries.Request ) { for ( RaftLogEntry actualEntry : ((RaftMessages.AppendEntries.Request) message).entries() ) { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java index c5add1158d51..fc64e626d909 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipperTest.java @@ -19,6 +19,9 @@ */ package org.neo4j.coreedge.raft.replication.shipping; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -27,13 +30,18 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; +import org.neo4j.coreedge.network.Message; import org.neo4j.coreedge.raft.LeaderContext; import org.neo4j.coreedge.raft.OutboundMessageCollector; +import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftMessages.AppendEntries; import org.neo4j.coreedge.raft.ReplicatedInteger; import org.neo4j.coreedge.raft.ReplicatedString; +import org.neo4j.coreedge.raft.log.DelegatingRaftLog; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; @@ -43,12 +51,18 @@ import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import org.neo4j.test.DoubleLatch; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.neo4j.helpers.collection.Iterators.asSet; +import static org.neo4j.test.matchers.Matchers.hasMessage; +import static org.neo4j.test.matchers.Matchers.hasRaftLogEntries; + public class RaftLogShipperTest { private OutboundMessageCollector outbound; @@ -70,6 +84,7 @@ public class RaftLogShipperTest private RaftLogEntry entry1 = new RaftLogEntry( 0, ReplicatedString.valueOf( "kedha" ) ); private RaftLogEntry entry2 = new RaftLogEntry( 0, ReplicatedInteger.valueOf( 2000 ) ); private RaftLogEntry entry3 = new RaftLogEntry( 0, ReplicatedString.valueOf( "chupchick" ) ); + private StoreId storeId = new StoreId( 1, 2, 3, 4, 5 ); @Before public void setup() @@ -100,10 +115,13 @@ public void teardown() private void startLogShipper() { + LocalDatabase localDatabase = mock( LocalDatabase.class ); + when(localDatabase.storeId()).thenReturn( storeId ); + logShipper = new RaftLogShipper<>( outbound, logProvider, raftLog, clock, leader, follower, leaderTerm, leaderCommit, retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, new InFlightMap<>(), - mock( LocalDatabase.class )); + localDatabase ); logShipper.start(); } @@ -118,7 +136,7 @@ public void shouldSendLastEntryOnStart() throws Throwable startLogShipper(); // then - assertTrue( outbound.hasEntriesTo( follower, entry1 ) ); + assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry1 ) ); } @Test @@ -134,7 +152,7 @@ public void shouldSendPreviousEntryOnMismatch() throws Throwable logShipper.onMismatch( 0, new LeaderContext( 0, 0 ) ); // then - assertTrue( outbound.hasEntriesTo( follower, entry0 ) ); + assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry0 ) ); } @Test @@ -154,6 +172,7 @@ public void shouldKeepSendingFirstEntryAfterSeveralMismatches() throws Throwable // then assertTrue( outbound.hasEntriesTo( follower, entry0 ) ); + assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry0 ) ); } @Test @@ -173,7 +192,7 @@ public void shouldSendNextBatchAfterMatch() throws Throwable logShipper.onMatch( 0, new LeaderContext( 0, 0 ) ); // then - assertTrue( outbound.hasEntriesTo( follower, entry1, entry2, entry3 ) ); + assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry1, entry2, entry3 ) ); } @Test @@ -189,12 +208,12 @@ public void shouldSendNewEntriesAfterMatchingLastEntry() throws Throwable outbound.clear(); raftLog.append( entry1 ); - logShipper.onNewEntries( 0, 0, new RaftLogEntry[]{ entry1 }, new LeaderContext( 0, 0 ) ); + logShipper.onNewEntries( 0, 0, new RaftLogEntry[]{entry1}, new LeaderContext( 0, 0 ) ); raftLog.append( entry2 ); - logShipper.onNewEntries( 1, 0, new RaftLogEntry[]{ entry2 }, new LeaderContext( 0, 0 ) ); + logShipper.onNewEntries( 1, 0, new RaftLogEntry[]{entry2}, new LeaderContext( 0, 0 ) ); // then - assertTrue( outbound.hasEntriesTo( follower, entry1, entry2 ) ); + assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry1, entry2) ); } @Test @@ -206,8 +225,8 @@ public void shouldNotSendNewEntriesWhenNotMatched() throws Throwable // when outbound.clear(); - logShipper.onNewEntries( 0, 0, new RaftLogEntry[]{ entry1 }, new LeaderContext( 0, 0 ) ); - logShipper.onNewEntries( 1, 0, new RaftLogEntry[]{ entry2 }, new LeaderContext( 0, 0 ) ); + logShipper.onNewEntries( 0, 0, new RaftLogEntry[]{entry1}, new LeaderContext( 0, 0 ) ); + logShipper.onNewEntries( 1, 0, new RaftLogEntry[]{entry2}, new LeaderContext( 0, 0 ) ); // then assertEquals( outbound.sentTo( follower ).size(), 0 ); @@ -223,15 +242,15 @@ public void shouldResendLastSentEntryOnFirstMismatch() throws Throwable raftLog.append( entry2 ); logShipper.onMatch( 0, new LeaderContext( 0, 0 ) ); - logShipper.onNewEntries( 0, 0, new RaftLogEntry[]{ entry1 }, new LeaderContext( 0, 0 ) ); - logShipper.onNewEntries( 1, 0, new RaftLogEntry[]{ entry2 }, new LeaderContext( 0, 0 ) ); + logShipper.onNewEntries( 0, 0, new RaftLogEntry[]{entry1}, new LeaderContext( 0, 0 ) ); + logShipper.onNewEntries( 1, 0, new RaftLogEntry[]{entry2}, new LeaderContext( 0, 0 ) ); // when outbound.clear(); logShipper.onMismatch( 1, new LeaderContext( 0, 0 ) ); // then - assertTrue( outbound.hasEntriesTo( follower, entry2 ) ); + assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry2) ); } @Test @@ -294,6 +313,67 @@ public void shouldSendMostRecentlyAvailableEntryIfPruningHappened() throws IOExc //then assertTrue( outbound.hasAnyEntriesTo( follower ) ); - assertTrue( outbound.hasEntriesTo( follower, entry3) ); + assertThat( outbound.sentTo( follower ), hasRaftLogEntries( entry3) ); + } + + @Test + public void shouldSendLogCompactionInfoToFollowerOnMatchIfEntryHasBeenPrunedAway() throws Exception + { + //given + AtomicBoolean afterInit = new AtomicBoolean(); + final DoubleLatch latch = new DoubleLatch(); + raftLog = new DelegatingRaftLog( raftLog ) + { + @Override + public long readEntryTerm( long logIndex ) throws IOException + { + if ( afterInit.get() ) + { + latch.start(); + latch.awaitFinish(); + } + return super.readEntryTerm( logIndex ); + } + }; + + raftLog.append( entry0 ); + raftLog.append( entry1 ); + raftLog.append( entry2 ); + raftLog.append( entry3 ); + + startLogShipper(); + + afterInit.set( true ); + + //when + outbound.clear(); + Thread pruningThread = new Thread( "Pruning" ) + { + @Override + public void run() + { + try + { + latch.awaitStart(); + raftLog.prune( 2 ); + latch.finish(); + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } + } + }; + + pruningThread.start(); + logShipper.onMatch( 1, new LeaderContext( 0, 0 ) ); + + //then + assertTrue( outbound.hasAnyEntriesTo( follower ) ); + assertThat( outbound.sentTo( follower ), + hasMessage( new RaftMessages.LogCompactionInfo<>( leader, 0, 1, storeId )) ); + + pruningThread.join(); } + } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/test/matchers/Matchers.java b/enterprise/core-edge/src/test/java/org/neo4j/test/matchers/Matchers.java new file mode 100644 index 000000000000..65ae42a954f9 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/test/matchers/Matchers.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.test.matchers; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +import org.neo4j.coreedge.network.Message; +import org.neo4j.coreedge.raft.RaftMessages; +import org.neo4j.coreedge.raft.log.RaftLogEntry; +import org.neo4j.coreedge.server.RaftTestMember; + +public final class Matchers +{ + private Matchers() + { + } + + public static Matcher> hasMessage( RaftMessages.BaseMessage message ) + { + return new TypeSafeMatcher>() + { + @Override + protected boolean matchesSafely( List messages ) + { + return messages.contains( message ); + } + + @Override + public void describeTo( Description description ) + { + description.appendText( "has message " + message ); + } + }; + } + + public static Matcher> hasRaftLogEntries( RaftLogEntry... expectedEntries ) + { + return new TypeSafeMatcher>() + { + @Override + protected boolean matchesSafely( List messages ) + { + List entries = messages.stream() + .filter( message -> message instanceof RaftMessages.AppendEntries.Request ) + .map( m -> ((RaftMessages.AppendEntries.Request) m) ) + .flatMap( x -> Arrays.stream( x.entries() ) ) + .collect( Collectors.toList() ); + + return entries.containsAll( Arrays.asList( expectedEntries ) ); + + } + + @Override + public void describeTo( Description description ) + { + description.appendText( "log entries " + Arrays.toString( expectedEntries ) ); + } + }; + } +}