From 3e08266de400be85e9310aab4d402d9ec5b805f4 Mon Sep 17 00:00:00 2001 From: Chris Gioran Date: Fri, 5 Feb 2016 17:03:51 +0200 Subject: [PATCH] Implementation of PhysicalRaftLog This is the work on having an append only, rotateable raft log, based on PhysicalLogFile. Some supporting classes, including a Raft log entry cursor, are also introduced. PRL now has proper records that contain information about commit and truncation actions, besides the already existing append. Iteration over the log happens via a two layer cursor, one that iterates over records and another that builds up state and determines if entries are committed, truncated or appended Renames PhysicalLogFiles.hasAnyTransaction() to .hasAnyEntry() This is to remove notional dependencies of PLF to transactional concepts, since we want to use it elsewhere PRL can rotate and change versions via support from PhysicalLogFiles Log versions are maintained by scanning the filename suffixes present on disk, instead of having a dedicated file storage like tx logs Changes semantics of RaftEntryStore, cursors may be before requested index Previously there was an implicit assumption that cursors returned from RaftEntryStore would be positioned at the requested index. This is not true for the current implementation, so instead PhysicalRaftLog assumes that the cursor must be iterated until the desired index is reached. This also requires the cursor to return RaftEntryRecords instead of RaftLogEntries, so that it's possible to compare indexes. --- .../org/neo4j/kernel/NeoStoreDataSource.java | 2 +- .../log/PhysicalLogFileInformation.java | 2 +- .../transaction/log/PhysicalLogFiles.java | 2 +- .../neo4j/kernel/NeoStoreDataSourceTest.java | 4 +- .../PhysicalLogFileInformationTest.java | 7 +- .../main/java/org/neo4j/cursor/IOCursor.java | 22 ++ .../FilenameBasedLogVersionRepository.java | 51 +++ .../raft/log/NaiveDurableRaftLog.java | 2 - .../raft/log/PhysicalRaftEntryStore.java | 105 ++++++ .../coreedge/raft/log/PhysicalRaftLog.java | 355 ++++++++++++++++++ .../raft/log/PhysicalRaftLogEntryCursor.java | 116 ++++++ .../coreedge/raft/log/RaftCommitEntry.java | 30 ++ .../coreedge/raft/log/RaftEntryStore.java | 27 ++ .../raft/log/RaftLogAppendRecord.java | 37 ++ .../raft/log/RaftLogCommitRecord.java | 28 ++ .../neo4j/coreedge/raft/log/RaftLogEntry.java | 1 + .../raft/log/RaftLogMetadataCache.java | 123 ++++++ .../coreedge/raft/log/RaftLogRecord.java | 42 +++ .../raft/log/RaftLogTruncateRecord.java | 28 ++ .../coreedge/raft/log/RaftRecordCursor.java | 89 +++++ .../raft/net/codecs/RaftMessageDecoder.java | 1 - .../core/EnterpriseCoreEditionModule.java | 2 + ...FilenameBasedLogVersionRepositoryTest.java | 58 +++ .../raft/log/PhysicalRaftLogContractTest.java | 142 +++++++ .../log/PhysicalRaftLogEntryCursorTest.java | 307 +++++++++++++++ .../raft/log/PhysicalRaftLogRotationTest.java | 209 +++++++++++ .../raft/log/RaftInstanceLogTest.java | 3 - .../raft/log/RaftLogAdversarialTest.java | 1 - .../raft/log/RaftLogContractTest.java | 158 ++++++++ .../raft/log/RaftLogDurabilityTest.java | 1 - .../raft/log/RaftLogMetadataCacheTest.java | 98 +++++ .../neo4j/coreedge/raft/roles/LeaderTest.java | 2 - 32 files changed, 2036 insertions(+), 19 deletions(-) create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepository.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftCommitEntry.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftEntryStore.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCommitRecord.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCache.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogRecord.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogTruncateRecord.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepositoryTest.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCacheTest.java diff --git a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java index bbd8c15d9c195..6c0d18d449a3f 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java @@ -205,7 +205,7 @@ void dump( NeoStoreDataSource source, Logger log ) for ( long logVersion = logFiles.getLowestLogVersion(); logFiles.versionExists( logVersion ); logVersion++ ) { - if ( logFiles.hasAnyTransaction( logVersion ) ) + if ( logFiles.hasAnyEntries( logVersion ) ) { LogHeader header = logFiles.extractHeader( logVersion ); long firstTransactionIdInThisLog = header.lastCommittedTxId + 1; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFileInformation.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFileInformation.java index 59390bb202f4f..3460ea5383bac 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFileInformation.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFileInformation.java @@ -58,7 +58,7 @@ public long getFirstExistingTxId() throws IOException // OK, so we now have the oldest existing log version here. Open it and see if there's any transaction // in there. If there is then that transaction is the first one that we have. - return logFiles.hasAnyTransaction( version ) ? candidateFirstTx : -1; + return logFiles.hasAnyEntries( version ) ? candidateFirstTx : -1; } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFiles.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFiles.java index e0e33ce422eb8..4f2e2713fe4cb 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFiles.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFiles.java @@ -89,7 +89,7 @@ public LogHeader extractHeader( long version ) throws IOException return readLogHeader( fileSystem, getLogFileForVersion( version ) ); } - public boolean hasAnyTransaction( long version ) + public boolean hasAnyEntries( long version ) { return fileSystem.getFileSize( getLogFileForVersion( version ) ) > LOG_HEADER_SIZE; } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/NeoStoreDataSourceTest.java b/community/kernel/src/test/java/org/neo4j/kernel/NeoStoreDataSourceTest.java index bd0c688b3869a..0267153e666de 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/NeoStoreDataSourceTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/NeoStoreDataSourceTest.java @@ -213,7 +213,7 @@ private PhysicalLogFiles logWithTransactions( long logVersion, long headerTxId ) { PhysicalLogFiles files = mock( PhysicalLogFiles.class ); when( files.getLowestLogVersion() ).thenReturn( logVersion ); - when( files.hasAnyTransaction( logVersion ) ).thenReturn( true ); + when( files.hasAnyEntries( logVersion ) ).thenReturn( true ); when( files.versionExists( logVersion ) ).thenReturn( true ); when( files.extractHeader( logVersion ) ).thenReturn( new LogHeader( LogEntryVersion.CURRENT.byteCode(), logVersion, headerTxId ) ); @@ -225,7 +225,7 @@ private PhysicalLogFiles logWithTransactionsInNextToOldestLog( long logVersion, { PhysicalLogFiles files = logWithTransactions( logVersion + 1, prevLogLastTxId ); when( files.getLowestLogVersion() ).thenReturn( logVersion ); - when( files.hasAnyTransaction( logVersion ) ).thenReturn( false ); + when( files.hasAnyEntries( logVersion ) ).thenReturn( false ); when( files.versionExists( logVersion ) ).thenReturn( true ); return files; } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/PhysicalLogFileInformationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/PhysicalLogFileInformationTest.java index 8058470579f6f..3bc07c6ce0103 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/PhysicalLogFileInformationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/PhysicalLogFileInformationTest.java @@ -25,7 +25,6 @@ import org.neo4j.kernel.impl.transaction.log.PhysicalLogFileInformation; import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; -import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache; import org.neo4j.kernel.impl.transaction.log.entry.LogHeader; import static org.junit.Assert.assertEquals; @@ -90,7 +89,7 @@ public void shouldReadAndCacheFirstCommittedTransactionIdWhenNotCached() throws when( logFiles.extractHeader( version ) ).thenReturn( new LogHeader( (byte) -1/*ignored*/, -1L/*ignored*/, expected - 1L ) ); - when( logFiles.hasAnyTransaction( version ) ).thenReturn( true ); + when( logFiles.hasAnyEntries( version ) ).thenReturn( true ); long firstCommittedTxId = info.getFirstExistingTxId(); assertEquals( expected, firstCommittedTxId ); @@ -108,7 +107,7 @@ public void shouldReadFirstCommittedTransactionIdWhenCached() throws Exception when( logFiles.getHighestLogVersion() ).thenReturn( version ); when( logFiles.versionExists( version ) ).thenReturn( true ); when( logHeaderCache.getLogHeader( version ) ).thenReturn( expected -1 ); - when( logFiles.hasAnyTransaction( version ) ).thenReturn( true ); + when( logFiles.hasAnyEntries( version ) ).thenReturn( true ); long firstCommittedTxId = info.getFirstExistingTxId(); assertEquals( expected, firstCommittedTxId ); @@ -122,7 +121,7 @@ public void shouldReturnNothingWhenThereAreNoTransactions() throws Exception long version = 10L; when( logFiles.getHighestLogVersion() ).thenReturn( version ); - when( logFiles.hasAnyTransaction( version ) ).thenReturn( false ); + when( logFiles.hasAnyEntries( version ) ).thenReturn( false ); long firstCommittedTxId = info.getFirstExistingTxId(); assertEquals( -1, firstCommittedTxId ); diff --git a/community/primitive-collections/src/main/java/org/neo4j/cursor/IOCursor.java b/community/primitive-collections/src/main/java/org/neo4j/cursor/IOCursor.java index 5d9d4cccd57d6..7bd2dfb6d5679 100644 --- a/community/primitive-collections/src/main/java/org/neo4j/cursor/IOCursor.java +++ b/community/primitive-collections/src/main/java/org/neo4j/cursor/IOCursor.java @@ -23,4 +23,26 @@ public interface IOCursor extends RawCursor { + static IOCursor getEmpty() + { + return new IOCursor() + { + @Override + public boolean next() throws IOException + { + return false; + } + + @Override + public void close() throws IOException + { + } + + @Override + public M get() + { + return null; + } + }; + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepository.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepository.java new file mode 100644 index 0000000000000..cc1ca8288cf84 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepository.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import java.io.File; +import java.io.IOException; + +import org.neo4j.io.file.Files; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.impl.transaction.log.LogVersionRepository; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; + +public class FilenameBasedLogVersionRepository implements LogVersionRepository +{ + private long version = 0; + + public FilenameBasedLogVersionRepository( PhysicalLogFiles logFiles) + { + version = Math.max(logFiles.getHighestLogVersion(), 0); + } + + + @Override + public long getCurrentLogVersion() + { + return version; + } + + @Override + public long incrementAndGetVersion() throws IOException + { + return ++version; + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java index 0b518f48be21a..0944e5003ed78 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java @@ -27,8 +27,6 @@ import io.netty.buffer.Unpooled; import org.neo4j.coreedge.raft.replication.MarshallingException; import org.neo4j.coreedge.raft.replication.ReplicatedContent; -import org.neo4j.coreedge.raft.state.ByteBufferMarshal; -import org.neo4j.coreedge.raft.state.ChannelMarshal; import org.neo4j.coreedge.server.ByteBufMarshal; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.StoreChannel; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java new file mode 100644 index 0000000000000..e37530ddecfd7 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import java.io.IOException; + +import org.neo4j.coreedge.raft.replication.ReplicatedContent; +import org.neo4j.coreedge.raft.state.ChannelMarshal; +import org.neo4j.cursor.IOCursor; +import org.neo4j.kernel.impl.transaction.log.LogFile; +import org.neo4j.kernel.impl.transaction.log.LogHeaderVisitor; +import org.neo4j.kernel.impl.transaction.log.LogPosition; +import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel; + +public class PhysicalRaftEntryStore implements RaftEntryStore +{ + private final LogFile logFile; + private final RaftLogMetadataCache metadataCache; + private final ChannelMarshal marshal; + + public PhysicalRaftEntryStore( LogFile logFile, RaftLogMetadataCache metadataCache, + ChannelMarshal marshal ) + { + this.logFile = logFile; + this.metadataCache = metadataCache; + this.marshal = marshal; + } + + @Override + public IOCursor getEntriesFrom( final long indexToStartFrom ) throws RaftStorageException + { + // look up in position cache + try + { + RaftLogMetadataCache.RaftLogEntryMetadata metadata = metadataCache.getMetadata( indexToStartFrom ); + LogPosition startPosition = null; + if ( metadata != null ) + { + startPosition = metadata.getStartPosition(); + } + else + { + // ask LogFile about the version it may be in + LogVersionLocator headerVisitor = new LogVersionLocator( indexToStartFrom ); + logFile.accept( headerVisitor ); + startPosition = headerVisitor.getLogPosition(); + } + if ( startPosition == null ) + { + return IOCursor.getEmpty(); + } + ReadableLogChannel reader = logFile.getReader( startPosition ); + + return new PhysicalRaftLogEntryCursor( new RaftRecordCursor<>( reader, marshal ) ); + } + catch ( IOException e ) + { + throw new RaftStorageException( e ); + } + } + + public static final class LogVersionLocator implements LogHeaderVisitor + { + private final long logEntryIndex; + private LogPosition foundPosition; + + public LogVersionLocator( long logEntryIndex ) + { + this.logEntryIndex = logEntryIndex; + } + + @Override + public boolean visit( LogPosition position, long firstLogIndex, long lastLogIndex ) + { + boolean foundIt = logEntryIndex >= firstLogIndex && logEntryIndex <= lastLogIndex; + if ( foundIt ) + { + foundPosition = position; + } + return !foundIt; // continue as long we don't find it + } + + public LogPosition getLogPosition() throws RaftStorageException + { + return foundPosition; + } + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java new file mode 100644 index 0000000000000..dbfa7d043182f --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java @@ -0,0 +1,355 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import static java.lang.String.format; + +import java.io.File; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import org.neo4j.coreedge.raft.replication.ReplicatedContent; +import org.neo4j.coreedge.raft.state.ChannelMarshal; +import org.neo4j.cursor.IOCursor; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.impl.transaction.log.FlushablePositionAwareChannel; +import org.neo4j.kernel.impl.transaction.log.LogHeaderCache; +import org.neo4j.kernel.impl.transaction.log.LogPosition; +import org.neo4j.kernel.impl.transaction.log.LogPositionMarker; +import org.neo4j.kernel.impl.transaction.log.LogVersionRepository; +import org.neo4j.kernel.impl.transaction.log.LoggingLogFileMonitor; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; +import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel; +import org.neo4j.kernel.impl.transaction.log.entry.LogHeader; +import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation; +import org.neo4j.kernel.impl.transaction.log.rotation.LogRotationImpl; +import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent; +import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.lifecycle.Lifecycle; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +public class PhysicalRaftLog implements RaftLog, Lifecycle +{ + public static final String BASE_FILE_NAME = "raft.log"; + public static final String DIRECTORY_NAME = "raft-log"; + + private final PhysicalLogFile logFile; + private final ChannelMarshal marshal; + private final Supplier databaseHealthSupplier; + private final Log log; + private LogRotation logRotation; + private FlushablePositionAwareChannel writer; + private final RaftLogMetadataCache metadataCache; + private final AtomicLong appendIndex = new AtomicLong( -1 ); + private long commitIndex = -1; + private final LogPositionMarker positionMarker = new LogPositionMarker(); + private long term = -1; + + private final RaftEntryStore entryStore; + + private final Set listeners = new CopyOnWriteArraySet<>(); + private final PhysicalLogFiles logFiles; + + public PhysicalRaftLog( FileSystemAbstraction fileSystem, File directory, long rotateAtSize, + int entryCacheSize, PhysicalLogFile.Monitor monitor, + ChannelMarshal marshal, Supplier databaseHealthSupplier, + LogProvider logProvider ) + { + this.marshal = marshal; + this.databaseHealthSupplier = databaseHealthSupplier; + this.log = logProvider.getLog( getClass() ); + + directory.mkdirs(); + + logFiles = new PhysicalLogFiles( directory, BASE_FILE_NAME, fileSystem ); + LogVersionRepository logVersionRepository = new FilenameBasedLogVersionRepository( logFiles ); + + logFile = new PhysicalLogFile( fileSystem, logFiles, rotateAtSize, + appendIndex::get, logVersionRepository, monitor, new LogHeaderCache( 10 ) ); + + this.metadataCache = new RaftLogMetadataCache( entryCacheSize ); + this.entryStore = new PhysicalRaftEntryStore( logFile, metadataCache, marshal ); + } + + @Override + public void replay() throws Throwable + { + + } + + @Override + public void registerListener( Listener listener ) + { + listeners.add( listener ); + } + + @Override + public long append( RaftLogEntry entry ) throws RaftStorageException + { + if ( entry.term() >= term ) + { + term = entry.term(); + } + else + { + throw new RaftStorageException( format( "Non-monotonic term %d for in entry %s in term %d", + entry.term(), entry.toString(), term ) ); + } + + try + { + appendIndex.incrementAndGet(); + LogPositionMarker entryStartPosition = writer.getCurrentPosition( positionMarker ); + writer.put( RecordType.APPEND.value ); + writer.putLong( appendIndex.get() ); + writer.putLong( entry.term() ); + marshal.marshal( entry.content(), writer ); + metadataCache.cacheMetadata( appendIndex.get(), entry.term(), entryStartPosition.newPosition() ); + writer.prepareForFlush().flush(); + + for ( Listener listener : listeners ) + { + listener.onAppended( entry.content(), appendIndex.get() ); + } + logRotation.rotateLogIfNeeded( LogAppendEvent.NULL ); + } + catch ( IOException e ) + { + throw new RaftStorageException( e ); + } + return appendIndex.get(); + } + + @Override + public void truncate( long fromIndex ) throws RaftStorageException + { + if ( fromIndex <= commitIndex ) + { + throw new IllegalArgumentException( format( "cannot truncate (%d) before the commit index (%d)", + fromIndex, commitIndex ) ); + } + + if ( appendIndex.get() >= fromIndex ) + { + appendIndex.set( fromIndex - 1 ); + + for ( Listener listener : listeners ) + { + listener.onTruncated( fromIndex ); + } + try + { + writer.put( RecordType.TRUNCATE.value ); + writer.putLong( fromIndex ); + writer.prepareForFlush().flush(); + logRotation.rotateLogIfNeeded( LogAppendEvent.NULL ); + } + catch ( IOException e ) + { + throw new RaftStorageException( e ); + } + } + term = readEntryTerm( appendIndex.get() ); + } + + @Override + public void commit( long newCommitIndex ) throws RaftStorageException + { + if ( appendIndex.get() == -1 || commitIndex == appendIndex.get() ) + { + return; + } + + long actualNewCommitIndex = Math.min(newCommitIndex, appendIndex.get()); + + while ( commitIndex < actualNewCommitIndex ) + { + commitIndex++; + for ( Listener listener : listeners ) + { + ReplicatedContent content = readEntryContent( commitIndex ); + listener.onCommitted( content, commitIndex ); + } + } + try + { + writer.put( RecordType.COMMIT.value ); + writer.putLong( commitIndex ); + writer.prepareForFlush().flush(); + logRotation.rotateLogIfNeeded( LogAppendEvent.NULL ); + } + catch ( IOException e ) + { + throw new RaftStorageException( e ); + } + } + + @Override + public long appendIndex() + { + return appendIndex.get(); + } + + @Override + public long commitIndex() + { + return commitIndex; + } + + @Override + public RaftLogEntry readLogEntry( long logIndex ) throws RaftStorageException + { + try ( IOCursor entriesFrom = entryStore.getEntriesFrom( logIndex ) ) + { + while ( entriesFrom.next() ) + { + RaftLogAppendRecord raftLogAppendRecord = entriesFrom.get(); + if ( raftLogAppendRecord.getLogIndex() == logIndex ) + { + return raftLogAppendRecord.getLogEntry(); + } + else if ( raftLogAppendRecord.getLogIndex() > logIndex ) + { + throw new IllegalStateException( format( "Asked for index %d but got up to %d without " + + "finding it.", logIndex, raftLogAppendRecord.getLogIndex() ) ); + } + } + } + catch ( IOException e ) + { + throw new RaftStorageException( e ); + } + return null; + } + + @Override + public ReplicatedContent readEntryContent( long logIndex ) throws RaftStorageException + { + RaftLogEntry raftLogEntry = readLogEntry( logIndex ); + return raftLogEntry == null ? null : raftLogEntry.content(); + } + + @Override + public long readEntryTerm( long logIndex ) throws RaftStorageException + { + RaftLogEntry raftLogEntry = readLogEntry( logIndex ); + return raftLogEntry == null ? -1 : raftLogEntry.term(); + } + + @Override + public boolean entryExists( long logIndex ) throws RaftStorageException + { + return readLogEntry( logIndex ) != null; + } + + @Override + public void init() throws Throwable + { + logFile.init(); + } + + @Override + public void start() throws Throwable + { + this.logRotation = + new LogRotationImpl( new LoggingLogFileMonitor( log ), logFile, databaseHealthSupplier.get() ); + + logFile.start(); + restoreIndexes(); + this.writer = logFile.getWriter(); + } + + private void restoreIndexes() throws IOException + { + long lowestLogVersion = logFiles.getLowestLogVersion(); + ReadableLogChannel reader = logFile.getReader( new LogPosition( lowestLogVersion, LogHeader.LOG_HEADER_SIZE ) ); + try ( RaftRecordCursor recordCursor = new RaftRecordCursor<>( reader, marshal ) ) + { + while ( recordCursor.next() ) + { + RaftLogRecord record = recordCursor.get(); + switch ( record.getType() ) + { + case COMMIT: + commitIndex = record.getLogIndex(); + break; + case APPEND: + appendIndex.set( record.getLogIndex() ); + break; + default: + throw new IllegalStateException( "Record is of unknown type " + record ); + } + } + } + log.info( "Restored commit index at %d, append index at %d, started at version %d (largest is %d)", + commitIndex, appendIndex.get(), lowestLogVersion, logFiles.getHighestLogVersion() ); + } + + @Override + public void stop() throws Throwable + { + logFile.stop(); + this.writer = null; + } + + @Override + public void shutdown() throws Throwable + { + logFile.shutdown(); + } + + public enum RecordType + { + APPEND( (byte) 0 ), COMMIT( (byte) 1 ), TRUNCATE( (byte) 2 ); + + private final byte value; + + RecordType( byte value ) + { + this.value = value; + } + + public byte value() + { + return value; + } + + public static RecordType forValue( byte value ) + { + switch ( value ) + { + case 0: + return APPEND; + case 1: + return COMMIT; + case 2: + return TRUNCATE; + default: + throw new IllegalArgumentException( "Value " + value + " is not a known entry type" ); + } + } + } + +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java new file mode 100644 index 0000000000000..043a9879d9f91 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Queue; +import java.util.function.Predicate; + +import org.neo4j.cursor.IOCursor; + +public class PhysicalRaftLogEntryCursor implements IOCursor +{ + private final RaftRecordCursor recordCursor; + private RaftLogAppendRecord currentEntry; + private final Queue logicallyNext; + private final List staging; + + public PhysicalRaftLogEntryCursor( RaftRecordCursor recordCursor ) + { + this.recordCursor = recordCursor; + this.logicallyNext = new LinkedList<>(); + this.staging = new LinkedList<>(); + } + + @Override + public boolean next() throws IOException + { + while ( recordCursor.next() ) + { + RaftLogRecord record = recordCursor.get(); + switch ( record.getType() ) + { + case APPEND: + staging.add( (RaftLogAppendRecord) record ); + break; + case COMMIT: + { + moveStagingToLogicallyNextAndSetCurrentEntry( logIndex -> logIndex <= record.getLogIndex() ); + + if ( currentEntry != null ) + { + return true; + } + + break; + } + case TRUNCATE: + { + removeFromStaging( index -> index >= record.getLogIndex() ); + break; + } + } + } + moveStagingToLogicallyNextAndSetCurrentEntry( index -> true ); + return currentEntry != null; + } + + private void removeFromStaging( Predicate condition ) + { + ListIterator iterator = staging.listIterator(); + while ( iterator.hasNext() ) + { + if ( condition.test( iterator.next().getLogIndex() ) ) + { + iterator.remove(); + } + } + } + + private void moveStagingToLogicallyNextAndSetCurrentEntry( Predicate condition ) + { + ListIterator iterator = staging.listIterator(); + while ( iterator.hasNext() ) + { + RaftLogAppendRecord next = iterator.next(); + if ( condition.test( next.getLogIndex() ) ) + { + logicallyNext.offer( next ); + iterator.remove(); + } + } + currentEntry = logicallyNext.poll(); + } + + @Override + public void close() throws IOException + { + recordCursor.close(); + } + + @Override + public RaftLogAppendRecord get() + { + return currentEntry; + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftCommitEntry.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftCommitEntry.java new file mode 100644 index 0000000000000..f9ef0efef2ec8 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftCommitEntry.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +public class RaftCommitEntry +{ + private final long commitIndex; + + public RaftCommitEntry( long commitIndex ) + { + this.commitIndex = commitIndex; + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftEntryStore.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftEntryStore.java new file mode 100644 index 0000000000000..082cb6bde9ee5 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftEntryStore.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import org.neo4j.cursor.IOCursor; + +public interface RaftEntryStore +{ + IOCursor getEntriesFrom( long logIndex ) throws RaftStorageException; +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java new file mode 100644 index 0000000000000..d245cf13eb53e --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +public class RaftLogAppendRecord extends RaftLogRecord +{ + private final RaftLogEntry logEntry; + + RaftLogAppendRecord( long logIndex, RaftLogEntry logEntry ) + { + super( PhysicalRaftLog.RecordType.APPEND, logIndex ); + this.logEntry = logEntry; + } + + public RaftLogEntry getLogEntry() + { + return logEntry; + } + +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCommitRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCommitRecord.java new file mode 100644 index 0000000000000..d012518c1fe5b --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCommitRecord.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +public class RaftLogCommitRecord extends RaftLogRecord +{ + public RaftLogCommitRecord( long logIndex ) + { + super( PhysicalRaftLog.RecordType.COMMIT, logIndex ); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogEntry.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogEntry.java index 0a19ddba03992..4d9f6293ebef2 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogEntry.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogEntry.java @@ -77,4 +77,5 @@ public String toString() { return String.format( "RaftLogEntry{term=%d, content=%s}", term, content ); } + } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCache.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCache.java new file mode 100644 index 0000000000000..3b3971942c5d8 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCache.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + + +import java.util.Objects; + +import org.neo4j.helpers.collection.LruCache; +import org.neo4j.kernel.impl.transaction.log.LogPosition; + +public class RaftLogMetadataCache +{ + private final LruCache raftLogEntryCache; + + public RaftLogMetadataCache( int logEntryCacheSize ) + { + this.raftLogEntryCache = new LruCache<>( "Raft log entry cache", logEntryCacheSize ); + } + + public void clear() + { + raftLogEntryCache.clear(); + } + + /** + * Returns the metadata for the entry at position {@param logIndex}, null if the metadata is not present in the cache + */ + public RaftLogEntryMetadata getMetadata( long logIndex ) + { + return raftLogEntryCache.get( logIndex ); + } + + public RaftLogEntryMetadata cacheMetadata( long logIndex, long entryTerm, LogPosition position ) + { + if ( position.getByteOffset() == -1 ) + { + throw new RuntimeException( "StartEntry.position is " + position ); + } + + RaftLogEntryMetadata result = new RaftLogEntryMetadata( entryTerm, position ); + raftLogEntryCache.put( logIndex, result ); + return result; + } + + public static class RaftLogEntryMetadata + { + private final long entryTerm; + private final LogPosition startPosition; + + public RaftLogEntryMetadata( long entryTerm, LogPosition startPosition ) + { + Objects.requireNonNull( startPosition ); + this.entryTerm = entryTerm; + this.startPosition = startPosition; + } + + public long getEntryTerm() + { + return entryTerm; + } + + public LogPosition getStartPosition() + { + return startPosition; + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + RaftLogEntryMetadata that = (RaftLogEntryMetadata) o; + + if ( entryTerm != that.entryTerm ) + { + return false; + } + return startPosition.equals( that.startPosition ); + + } + + @Override + public int hashCode() + { + int result = (int) (entryTerm ^ (entryTerm >>> 32)); + result = 31 * result + startPosition.hashCode(); + return result; + } + + @Override + public String toString() + { + return "RaftLogEntryMetadata{" + + "entryTerm=" + entryTerm + + ", startPosition=" + startPosition + + '}'; + } + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogRecord.java new file mode 100644 index 0000000000000..f947633d4096a --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogRecord.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +public abstract class RaftLogRecord +{ + private final long logIndex; + private final PhysicalRaftLog.RecordType type; + + RaftLogRecord( PhysicalRaftLog.RecordType type, long logIndex ) + { + this.type = type; + this.logIndex = logIndex; + } + + public long getLogIndex() + { + return logIndex; + } + + public PhysicalRaftLog.RecordType getType() + { + return type; + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogTruncateRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogTruncateRecord.java new file mode 100644 index 0000000000000..6b5e36d3aa2b7 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogTruncateRecord.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +public class RaftLogTruncateRecord extends RaftLogRecord +{ + RaftLogTruncateRecord( long fromLogIndex ) + { + super( PhysicalRaftLog.RecordType.TRUNCATE, fromLogIndex ); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java new file mode 100644 index 0000000000000..5f88f0d469388 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import java.io.IOException; + +import org.neo4j.coreedge.raft.replication.ReplicatedContent; +import org.neo4j.coreedge.raft.state.ChannelMarshal; +import org.neo4j.cursor.IOCursor; +import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; +import org.neo4j.storageengine.api.ReadPastEndException; + +public class RaftRecordCursor implements IOCursor +{ + private final T channel; + private final ChannelMarshal marshal; + private RaftLogRecord lastFoundRecord; + + public RaftRecordCursor( T channel, ChannelMarshal marshal ) + { + this.channel = channel; + this.marshal = marshal; + } + + @Override + public boolean next() throws IOException + { + try + { + byte type = channel.get(); + switch ( PhysicalRaftLog.RecordType.forValue( type ) ) + { + case APPEND: + long appendIndex = channel.getLong(); + long term = channel.getLong(); + ReplicatedContent content = marshal.unmarshal( channel ); + if ( content == null ) + { + return false; + } + lastFoundRecord = new RaftLogAppendRecord( appendIndex, new RaftLogEntry( term, content ) ); + return true; + case COMMIT: + long commitIndex = channel.getLong(); + lastFoundRecord = new RaftLogCommitRecord( commitIndex ); + return true; + case TRUNCATE: + long truncateFromIndex = channel.getLong(); + lastFoundRecord = new RaftLogTruncateRecord( truncateFromIndex ); + return true; + default: + throw new IllegalStateException( "Not really sure how we got here. Read type value was " + type ); + } + } + catch ( ReadPastEndException notEnoughBytes ) + { + return false; + } + } + + @Override + public void close() throws IOException + { + channel.close(); + } + + @Override + public RaftLogRecord get() + { + return lastFoundRecord; + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java index 3247d040a167e..9354146c4f698 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java @@ -28,7 +28,6 @@ import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.log.RaftLogEntry; -import org.neo4j.coreedge.raft.net.CoreReplicatedContentMarshal; import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.ByteBufMarshal; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index 710ac28b9f092..cc033812a0d2b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -39,6 +39,7 @@ import org.neo4j.coreedge.raft.RaftServer; import org.neo4j.coreedge.raft.log.MonitoredRaftLog; import org.neo4j.coreedge.raft.log.NaiveDurableRaftLog; +import org.neo4j.coreedge.raft.log.PhysicalRaftLog; import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.membership.CoreMemberSetBuilder; import org.neo4j.coreedge.raft.membership.MembershipWaiter; @@ -114,6 +115,7 @@ import org.neo4j.kernel.impl.store.stats.IdBasedStoreEntityCounters; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.internal.DatabaseHealth; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepositoryTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepositoryTest.java new file mode 100644 index 0000000000000..5e12eb243f225 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepositoryTest.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Rule; +import org.junit.Test; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; +import org.neo4j.test.TargetDirectory; + +public class FilenameBasedLogVersionRepositoryTest +{ + @Rule + public TargetDirectory.TestDirectory testDir; + + @Test + public void shouldStartAtVersion0() throws Exception + { + PhysicalLogFiles logFiles = mock( PhysicalLogFiles.class ); + when( logFiles.getHighestLogVersion() ).thenReturn( -1L ); + + FilenameBasedLogVersionRepository repository = new FilenameBasedLogVersionRepository( logFiles ); + + assertEquals( 0L, repository.getCurrentLogVersion() ); + } + + @Test + public void shouldPickHighestVersionAvailable() throws Exception + { + PhysicalLogFiles logFiles = mock( PhysicalLogFiles.class ); + when( logFiles.getHighestLogVersion() ).thenReturn( 2L ); + + FilenameBasedLogVersionRepository repository = new FilenameBasedLogVersionRepository( logFiles ); + + assertEquals( 2L, repository.getCurrentLogVersion() ); + + } +} \ No newline at end of file diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java new file mode 100644 index 0000000000000..54f367ea3a8cf --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.io.IOException; +import java.util.function.Supplier; + +import org.junit.After; +import org.junit.Test; +import org.neo4j.coreedge.raft.ReplicatedInteger; +import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile; +import org.neo4j.kernel.lifecycle.LifeSupport; +import org.neo4j.logging.NullLogProvider; + +public class PhysicalRaftLogContractTest extends RaftLogContractTest +{ + private PhysicalRaftLog raftLog; + private LifeSupport life = new LifeSupport(); + private FileSystemAbstraction fileSystem; + + @Override + public RaftLog createRaftLog() throws IOException + { + this.raftLog = createRaftLog( 100 ); + return raftLog; + } + + @After + public void tearDown() throws Throwable + { + life.stop(); + life.shutdown(); + } + + private PhysicalRaftLog createRaftLog( int cacheSize ) + { + if ( fileSystem == null ) + { + fileSystem = new EphemeralFileSystemAbstraction(); + } + File directory = new File( "raft-log" ); + fileSystem.mkdir( directory ); + + PhysicalRaftLog newRaftLog = new PhysicalRaftLog( fileSystem, directory, 1000000, cacheSize, + new PhysicalLogFile.Monitor.Adapter(), new DummyRaftableContentSerializer(), mock( Supplier.class ), + NullLogProvider.getInstance() ); + life.add( newRaftLog ); + life.init(); + life.start(); + return newRaftLog; + } + + @Test + public void shouldReadBackInCachedEntry() throws Throwable + { + // Given + PhysicalRaftLog raftLog = (PhysicalRaftLog) createRaftLog(); + int term = 0; + ReplicatedInteger content = ReplicatedInteger.valueOf( 4 ); + + // When + long entryIndex = raftLog.append( new RaftLogEntry( term, content ) ); + + // Then + assertTrue( raftLog.entryExists( entryIndex ) ); + assertEquals( content, raftLog.readEntryContent( entryIndex ) ); + assertEquals( term, raftLog.readEntryTerm( entryIndex ) ); + } + + @Test + public void shouldReadBackNonCachedEntry() throws Exception + { + // Given + int cacheSize = 1; + PhysicalRaftLog raftLog = createRaftLog( cacheSize ); + int term = 0; + ReplicatedInteger content1 = ReplicatedInteger.valueOf( 4 ); + ReplicatedInteger content2 = ReplicatedInteger.valueOf( 5 ); + + // When + long entryIndex1 = raftLog.append( new RaftLogEntry( term, content1 ) ); + long entryIndex2 = raftLog.append( new RaftLogEntry( term, content2 ) ); // this will push the first entry out of cache + + // Then + // entry 1 should be there + assertTrue( raftLog.entryExists( entryIndex1 ) ); + assertEquals( content1, raftLog.readEntryContent( entryIndex1 ) ); + assertEquals( term, raftLog.readEntryTerm( entryIndex1 ) ); + + // entry 2 should be there also + assertTrue( raftLog.entryExists( entryIndex2 ) ); + assertEquals( content2, raftLog.readEntryContent( entryIndex2 ) ); + assertEquals( term, raftLog.readEntryTerm( entryIndex2 ) ); + } + + @Test + public void shouldRestoreCommitIndexOnStartup() throws Throwable + { + // Given + PhysicalRaftLog raftLog = createRaftLog( 100 /* cache size */ ); + int term = 0; + ReplicatedInteger content1 = ReplicatedInteger.valueOf( 4 ); + ReplicatedInteger content2 = ReplicatedInteger.valueOf( 5 ); + long entryIndex1 = raftLog.append( new RaftLogEntry( term, content1 ) ); + long entryIndex2 = raftLog.append( new RaftLogEntry( term, content2 ) ); + + raftLog.commit( entryIndex1 ); + + // When + // we restart the raft log + life.remove( raftLog ); // stops the removed instance + raftLog = createRaftLog( 100 ); + + // Then + assertEquals( entryIndex1, raftLog.commitIndex() ); + assertEquals( entryIndex2, raftLog.appendIndex() ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java new file mode 100644 index 0000000000000..57865431e3f80 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java @@ -0,0 +1,307 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Test; +import org.neo4j.coreedge.raft.ReplicatedInteger; + +public class PhysicalRaftLogEntryCursorTest +{ + @Test + public void shouldReturnAllRecordsIfAllAreAppended() throws Exception + { + // Given + ReplicatedInteger content = ReplicatedInteger.valueOf( 1 ); + RaftLogEntry payload = new RaftLogEntry( 0, content ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 1, payload ) ) + .thenReturn( new RaftLogAppendRecord( 2, payload ) ) + .thenReturn( new RaftLogAppendRecord( 3, payload ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload, entryCursor.get().getLogEntry() ); + assertTrue( entryCursor.next() ); + assertEquals( payload, entryCursor.get().getLogEntry() ); + assertTrue( entryCursor.next() ); + assertEquals( payload, entryCursor.get().getLogEntry() ); + + assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + } + + @Test + public void shouldReturnNonTruncatedRecords() throws Exception + { + // Given + ReplicatedInteger content = ReplicatedInteger.valueOf( 1 ); + RaftLogEntry payload = new RaftLogEntry( 0, content ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 1, payload ) ) + .thenReturn( new RaftLogAppendRecord( 2, payload ) ) + .thenReturn( new RaftLogTruncateRecord( 2 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload, entryCursor.get().getLogEntry() ); + + assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + } + @Test + public void shouldNotReturnAnyRecordsIfTheyAreAllTruncated() throws Exception + { + // Given + ReplicatedInteger content = ReplicatedInteger.valueOf( 1 ); + RaftLogEntry payload = new RaftLogEntry( 0, content ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 1, payload ) ) + .thenReturn( new RaftLogAppendRecord( 2, payload ) ) + .thenReturn( new RaftLogTruncateRecord( 2 ) ) + .thenReturn( new RaftLogTruncateRecord( 1 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertFalse( entryCursor.next() ); + } + + @Test + public void shouldReturnCommittedRecords() throws Exception + { + // Given + ReplicatedInteger content = ReplicatedInteger.valueOf( 1 ); + RaftLogEntry payload = new RaftLogEntry( 0, content ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 1, payload ) ) + .thenReturn( new RaftLogAppendRecord( 2, payload ) ) + .thenReturn( new RaftLogCommitRecord( 2 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload, entryCursor.get().getLogEntry() ); + + assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + } + + @Test + public void shouldSkipTruncatedAndReturnCommittedRecords() throws Exception + { + // Given + RaftLogEntry payloadTruncated = new RaftLogEntry( 0, ReplicatedInteger.valueOf( 4 ) ); + RaftLogEntry payload5 = new RaftLogEntry( 5, ReplicatedInteger.valueOf( 5 ) ); + RaftLogEntry payload6 = new RaftLogEntry( 6, ReplicatedInteger.valueOf( 6 ) ); + RaftLogEntry payload7 = new RaftLogEntry( 7, ReplicatedInteger.valueOf( 7 ) ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 5, payload5 ) ) + .thenReturn( new RaftLogAppendRecord( 6, payloadTruncated ) ) + .thenReturn( new RaftLogAppendRecord( 7, payloadTruncated ) ) + .thenReturn( new RaftLogTruncateRecord( 6 ) ) + .thenReturn( new RaftLogAppendRecord( 6, payload6 ) ) + .thenReturn( new RaftLogAppendRecord( 7, payload7 ) ) + .thenReturn( new RaftLogCommitRecord( 7 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload5, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload6, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload7, entryCursor.get().getLogEntry() ); + + assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + } + + @Test + public void shouldSkipTruncatedAndReturnOnEndOfFile() throws Exception + { + // Given + RaftLogEntry payloadTruncated = new RaftLogEntry( 0, ReplicatedInteger.valueOf( 4 ) ); + RaftLogEntry payload5 = new RaftLogEntry( 5, ReplicatedInteger.valueOf( 5 ) ); + RaftLogEntry payload6 = new RaftLogEntry( 6, ReplicatedInteger.valueOf( 6 ) ); + RaftLogEntry payload7 = new RaftLogEntry( 7, ReplicatedInteger.valueOf( 7 ) ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 5, payload5 ) ) + .thenReturn( new RaftLogAppendRecord( 6, payloadTruncated ) ) + .thenReturn( new RaftLogAppendRecord( 7, payloadTruncated ) ) + .thenReturn( new RaftLogTruncateRecord( 6 ) ) + .thenReturn( new RaftLogAppendRecord( 6, payload6 ) ) + .thenReturn( new RaftLogAppendRecord( 7, payload7 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload5, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload6, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload7, entryCursor.get().getLogEntry() ); + + assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + } + + @Test + public void shouldSkipTruncatedAndReturnCommittedRecordsAndRecordsAtEndOfFile() throws Exception + { + // Given + RaftLogEntry payloadTruncated = new RaftLogEntry( 0, ReplicatedInteger.valueOf( 4 ) ); + RaftLogEntry payload5 = new RaftLogEntry( 5, ReplicatedInteger.valueOf( 5 ) ); + RaftLogEntry payload6 = new RaftLogEntry( 6, ReplicatedInteger.valueOf( 6 ) ); + RaftLogEntry payload7 = new RaftLogEntry( 7, ReplicatedInteger.valueOf( 7 ) ); + RaftLogEntry payload8 = new RaftLogEntry( 7, ReplicatedInteger.valueOf( 8 ) ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 5, payload5 ) ) + .thenReturn( new RaftLogAppendRecord( 6, payloadTruncated ) ) + .thenReturn( new RaftLogAppendRecord( 7, payloadTruncated ) ) + .thenReturn( new RaftLogTruncateRecord( 6 ) ) + .thenReturn( new RaftLogAppendRecord( 6, payload6 ) ) + .thenReturn( new RaftLogAppendRecord( 7, payload7 ) ) + .thenReturn( new RaftLogCommitRecord( 7 ) ) + .thenReturn( new RaftLogAppendRecord( 8, payload8 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload5, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload6, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload7, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload8, entryCursor.get().getLogEntry() ); + + assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + } + + @Test + public void shouldReturnUncommittedEntriesAtEndOfFileDespiteCommitEntryForNonEncounteredRecords() throws Exception + { + // Given + RaftLogEntry payload5 = new RaftLogEntry( 5, ReplicatedInteger.valueOf( 5 ) ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 5, payload5 ) ) + .thenReturn( new RaftLogCommitRecord( 2 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload5, entryCursor.get().getLogEntry() ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java new file mode 100644 index 0000000000000..549180a25c563 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.After; +import org.junit.Test; +import org.neo4j.coreedge.raft.ReplicatedInteger; +import org.neo4j.coreedge.raft.ReplicatedString; +import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile; +import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.lifecycle.LifeSupport; +import org.neo4j.logging.NullLogProvider; + +public class PhysicalRaftLogRotationTest +{ + private LifeSupport life = new LifeSupport(); + private FileSystemAbstraction fileSystem; + + @After + public void tearDown() throws Throwable + { + life.stop(); + life.shutdown(); + } + + private PhysicalRaftLog createRaftLog( long rotateAtSize, PhysicalLogFile.Monitor logFileMonitor ) + { + if ( fileSystem == null ) + { + fileSystem = new EphemeralFileSystemAbstraction(); + } + File directory = new File( "raft-log" ); + fileSystem.mkdir( directory ); + + PhysicalRaftLog newRaftLog = new PhysicalRaftLog( fileSystem, directory, rotateAtSize, 100, + logFileMonitor, new DummyRaftableContentSerializer(), () -> mock( DatabaseHealth.class ), + NullLogProvider.getInstance() ); + life.add( newRaftLog ); + life.init(); + life.start(); + return newRaftLog; + } + + @Test + public void shouldRotateOnAppendWhenRotateSizeIsReached() throws Exception + { + // Given + AtomicLong currentVersion = new AtomicLong(); + PhysicalLogFile.Monitor logFileMonitor = + ( logFile, logVersion, lastTransactionId, clean ) -> currentVersion.set( logVersion ); + int rotateAtSize = 100; + PhysicalRaftLog log = createRaftLog( rotateAtSize, logFileMonitor ); + + StringBuilder builder = new StringBuilder(); + for ( int i = 0; i < rotateAtSize; i++ ) + { + builder.append( "i" ); + } + + // When + ReplicatedString stringThatIsMoreThan100Bytes = new ReplicatedString( builder.toString() ); + log.append( new RaftLogEntry( 0, stringThatIsMoreThan100Bytes ) ); + + // Then + assertEquals( 1, currentVersion.get() ); + } + + @Test + public void shouldRotateOnCommitWhenRotateSizeIsReached() throws Exception + { + // Given + AtomicLong currentVersion = new AtomicLong(); + PhysicalLogFile.Monitor logFileMonitor = + ( logFile, logVersion, lastTransactionId, clean ) -> currentVersion.set( logVersion ); + int rotateAtSize = 100; + PhysicalRaftLog log = createRaftLog( rotateAtSize, logFileMonitor ); + + StringBuilder builder = new StringBuilder(); + for ( int i = 0; i < rotateAtSize - 40; i++ ) + { + builder.append( "i" ); + } + + // When + ReplicatedString stringThatGetsTheSizeToAlmost100Bytes = new ReplicatedString( builder.toString() ); + log.append( new RaftLogEntry( 0, stringThatGetsTheSizeToAlmost100Bytes ) ); + assertEquals( 0, currentVersion.get() ); + log.commit( 1 ); + + // Then + assertEquals( 1, currentVersion.get() ); + } + + @Test + public void shouldRotateOnTruncateWhenRotateSizeIsReached() throws Exception + { + // Given + AtomicLong currentVersion = new AtomicLong(); + PhysicalLogFile.Monitor logFileMonitor = + ( logFile, logVersion, lastTransactionId, clean ) -> currentVersion.set( logVersion ); + int rotateAtSize = 100; + PhysicalRaftLog log = createRaftLog( rotateAtSize, logFileMonitor ); + + StringBuilder builder = new StringBuilder(); + for ( int i = 0; i < rotateAtSize - 40; i++ ) + { + builder.append( "i" ); + } + + // When + ReplicatedString stringThatGetsTheSizeToAlmost100Bytes = new ReplicatedString( builder.toString() ); + long indexToTruncate = log.append( new RaftLogEntry( 0, stringThatGetsTheSizeToAlmost100Bytes ) ); + assertEquals( 0, currentVersion.get() ); + log.truncate( indexToTruncate ); + + // Then + assertEquals( 1, currentVersion.get() ); + } + + @Test + public void shouldBeAbleToRecoverToLatestStateAfterRotation() throws Throwable + { + int rotateAtSize = 100; + PhysicalRaftLog log = createRaftLog( rotateAtSize, new PhysicalLogFile.Monitor.Adapter() ); + + StringBuilder builder = new StringBuilder(); + for ( int i = 0; i < rotateAtSize - 40; i++ ) + { + builder.append( "i" ); + } + + ReplicatedString stringThatGetsTheSizeToAlmost100Bytes = new ReplicatedString( builder.toString() ); + int term = 0; + long indexToCommit = log.append( new RaftLogEntry( term, stringThatGetsTheSizeToAlmost100Bytes ) ); + log.commit( indexToCommit ); + indexToCommit = log.append( new RaftLogEntry( term, ReplicatedInteger.valueOf( 1 ) ) ); + log.commit( indexToCommit ); + + // When + life.remove( log ); + log = createRaftLog( rotateAtSize, new PhysicalLogFile.Monitor.Adapter() ); + + // Then + assertEquals( indexToCommit, log.commitIndex() ); + assertEquals( indexToCommit, log.appendIndex() ); + assertTrue( log.entryExists( indexToCommit ) ); + assertEquals( term, log.readEntryTerm( indexToCommit ) ); + } + + + @Test + public void shouldBeAbleToRecoverToLatestStateAfterRotationWhenEarlierFilesArePruned() throws Throwable + { + int rotateAtSize = 100; + PhysicalRaftLog log = createRaftLog( rotateAtSize, new PhysicalLogFile.Monitor.Adapter() ); + + StringBuilder builder = new StringBuilder(); + for ( int i = 0; i < rotateAtSize - 40; i++ ) + { + builder.append( "i" ); + } + + ReplicatedString stringThatGetsTheSizeToAlmost100Bytes = new ReplicatedString( builder.toString() ); + int term = 0; + long indexToCommit = log.append( new RaftLogEntry( term, stringThatGetsTheSizeToAlmost100Bytes ) ); + log.commit( indexToCommit ); + indexToCommit = log.append( new RaftLogEntry( term, ReplicatedInteger.valueOf( 1 ) ) ); + log.commit( indexToCommit ); + + // When + life.remove( log ); + + fileSystem.deleteFile( new File( new File("raft-log"), "raft.log.0" ) ); // hackish, decorate it my great Tech Liege + + log = createRaftLog( rotateAtSize, new PhysicalLogFile.Monitor.Adapter() ); + + // Then + assertEquals( indexToCommit, log.commitIndex() ); + assertEquals( indexToCommit, log.appendIndex() ); + assertTrue( log.entryExists( indexToCommit ) ); + assertEquals( term, log.readEntryTerm( indexToCommit ) ); + } +} \ No newline at end of file diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftInstanceLogTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftInstanceLogTest.java index 7333ca4c58c84..99638edf719d7 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftInstanceLogTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftInstanceLogTest.java @@ -28,9 +28,6 @@ import org.neo4j.coreedge.raft.RaftInstance; import org.neo4j.coreedge.raft.RaftInstanceBuilder; import org.neo4j.coreedge.raft.ReplicatedInteger; -import org.neo4j.coreedge.raft.log.InMemoryRaftLog; -import org.neo4j.coreedge.raft.log.RaftLog; -import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.state.StateMachine; import org.neo4j.coreedge.server.RaftTestMember; import org.neo4j.coreedge.server.RaftTestMemberSetBuilder; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java index e2d45e6bcab7f..eb75ab304c992 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java @@ -31,7 +31,6 @@ import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.neo4j.graphdb.mockfs.SelectiveFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.NullLogProvider; import static org.hamcrest.CoreMatchers.is; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogContractTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogContractTest.java index fcc6001c732dc..f3fd26346f0ca 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogContractTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogContractTest.java @@ -28,6 +28,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; @@ -117,6 +118,26 @@ public void shouldCommitOutOfOrderAppend() throws Exception assertThat( log.entryExists( 0 ), is( true ) ); } + @Test + public void shouldCommitOnlyOnce() throws Exception + { + RaftLog log = createRaftLog(); + + RaftLog.Listener listener = mock( RaftLog.Listener.class ); + log.registerListener( listener ); + + RaftLogEntry logEntry = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 1 ) ); + log.append( logEntry ); + + log.commit( 10 ); + log.commit( 10 ); + + assertThat( log.appendIndex(), is( 0L ) ); + assertThat( log.commitIndex(), is( 0L ) ); + assertThat( log.entryExists( 0 ), is( true ) ); + verify( listener, times( 1 ) ).onCommitted( eq( logEntry.content() ), anyLong() ); + } + @Test public void shouldTruncatePreviouslyAppendedEntries() throws Exception { @@ -224,4 +245,141 @@ public void shouldRejectNonMonotonicTermsForEntries() throws Exception // expected } } + + @Test + public void shouldCommitAndThenTruncateSubsequentEntry() throws Exception + { + // given + RaftLog log = createRaftLog(); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) ); + long toCommit = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); + long toTruncate = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) ); + + // when + log.commit( toCommit ); + log.truncate( toTruncate ); + + // then + assertThat( log.entryExists( toCommit ), is( true ) ); + assertThat( log.entryExists( toTruncate ), is( false ) ); + assertThat( log.readEntryTerm( toCommit ), is( 0L ) ); + } + + @Test + public void shouldTruncateAndThenCommitPreviousEntry() throws Exception + { + // given + RaftLog log = createRaftLog(); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) ); + long toCommit = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); + long toTruncate = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) ); + + // when + log.truncate( toTruncate ); + log.commit( toCommit ); + + // then + assertThat( log.entryExists( toCommit ), is( true ) ); + assertThat( log.entryExists( toTruncate ), is( false ) ); + assertThat( log.readEntryTerm( toCommit ), is( 0L ) ); + } + + @Test + public void shouldCommitAfterTruncatingAndAppending() throws Exception + { + // given + RaftLog log = createRaftLog(); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) ); + long toCommit = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); + long toTruncate = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) ); + + /* + 0 1 2 Tr(2) 2 C*(1) + */ + + // when + log.truncate( toTruncate ); + long lastAppended = log.append( new RaftLogEntry( 2, ReplicatedInteger.valueOf( 3 ) ) ); + log.commit( toCommit ); + + // then + assertThat( log.entryExists( toCommit ), is( true ) ); + assertThat( log.entryExists( lastAppended ), is( true ) ); + assertThat( log.entryExists( toTruncate ), is( true ) ); // index is "reused" + assertThat( log.readEntryTerm( toCommit ), is( 0L ) ); + assertThat( log.readEntryTerm( lastAppended ), is( 2L ) ); + } + + @Test + public void shouldCommitAfterAppendingAndTruncating() throws Exception + { + // given + RaftLog log = createRaftLog(); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) ); + long toCommit = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); + long toTruncate = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) ); + + // when + long lastAppended = log.append( new RaftLogEntry( 2, ReplicatedInteger.valueOf( 3 ) ) ); + log.truncate( toTruncate ); + log.commit( toCommit ); + + // then + assertThat( log.entryExists( toCommit ), is( true ) ); + assertThat( log.entryExists( lastAppended ), is( false ) ); + assertThat( log.entryExists( toTruncate ), is( false ) ); + assertThat( log.readEntryTerm( toCommit ), is( 0L ) ); + } + + @Test + public void shouldNotAllowTruncationAtLastCommit() throws Exception + { + // given + RaftLog log = createRaftLog(); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) ); + long toCommit = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) ); + + log.commit( toCommit ); + + try + { + // when + log.truncate( toCommit ); + fail("Truncation at this point should have failed"); + } + catch( IllegalArgumentException truncationFailed ) + { + // awesome + } + + // then + assertThat( log.entryExists( toCommit ), is( true ) ); + } + + @Test + public void shouldNotAllowTruncationBeforeLastCommit() throws Exception + { + // given + RaftLog log = createRaftLog(); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) ); + long toTryToTruncate = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); + long toCommit = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) ); + + log.commit( toCommit ); + + try + { + // when + log.truncate( toTryToTruncate ); + fail("Truncation at this point should have failed"); + } + catch( IllegalArgumentException truncationFailed ) + { + // awesome + } + + // then + assertThat( log.entryExists( toCommit ), is( true ) ); + assertThat( log.entryExists( toTryToTruncate ), is( true ) ); + } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java index 934bed51e8829..63391e51b51f3 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java @@ -26,7 +26,6 @@ import org.neo4j.coreedge.raft.ReplicatedInteger; import org.neo4j.coreedge.raft.ReplicatedString; import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; -import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.NullLogProvider; import static org.hamcrest.CoreMatchers.equalTo; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCacheTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCacheTest.java new file mode 100644 index 0000000000000..8d76274b4e7c3 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCacheTest.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import org.junit.Test; +import org.neo4j.kernel.impl.transaction.log.LogPosition; + +public class RaftLogMetadataCacheTest +{ + @Test + public void shouldReturnNullWhenMissingAnEntryInTheCache() + { + // given + final RaftLogMetadataCache cache = new RaftLogMetadataCache( 2 ); + + // when + final RaftLogMetadataCache.RaftLogEntryMetadata metadata = cache.getMetadata( 42 ); + + // then + assertNull( metadata ); + } + + @Test + public void shouldReturnTheTxValueTIfInTheCached() + { + // given + final RaftLogMetadataCache cache = new RaftLogMetadataCache( 2 ); + final long index = 12L; + final long term = 12L; + final LogPosition position = new LogPosition( 3, 4 ); + + // when + cache.cacheMetadata( index, term, position ); + final RaftLogMetadataCache.RaftLogEntryMetadata metadata = cache.getMetadata( index ); + + // then + assertEquals( new RaftLogMetadataCache.RaftLogEntryMetadata( term, position ), metadata ); + } + + @Test + public void shouldThrowWhenCachingATxWithNegativeOffsetPosition() + { + // given + final RaftLogMetadataCache cache = new RaftLogMetadataCache( 2 ); + final long index = 42; + final long term= 42; + final LogPosition position = new LogPosition( 3, -1 ); + + // when + try + { + cache.cacheMetadata( index, term, position ); + fail(); + } catch (RuntimeException ex) { + assertEquals( "StartEntry.position is " + position, ex.getMessage() ); + } + } + + @Test + public void shouldClearTheCache() + { + // given + final RaftLogMetadataCache cache = new RaftLogMetadataCache( 2 ); + final long index = 12L; + final long term = 12L; + final LogPosition position = new LogPosition( 3, 4 ); + + // when + cache.cacheMetadata( index, term, position ); + cache.clear(); + RaftLogMetadataCache.RaftLogEntryMetadata metadata = cache.getMetadata( index ); + + // then + assertNull( metadata ); + } + +} \ No newline at end of file diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java index a1f78e065c48a..cdeab2f129ef8 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java @@ -58,9 +58,7 @@ import static org.neo4j.coreedge.raft.MessageUtils.messageFor; import static org.neo4j.coreedge.server.RaftTestMember.member; import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesResponse; -import static org.neo4j.coreedge.raft.TestMessageBuilders.voteRequest; import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER; -import static org.neo4j.coreedge.raft.roles.Role.LEADER; import static org.neo4j.coreedge.raft.state.RaftStateBuilder.raftState; import static org.neo4j.helpers.collection.IteratorUtil.asSet; import static org.neo4j.helpers.collection.IteratorUtil.count;