diff --git a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java index bbd8c15d9c195..6c0d18d449a3f 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java @@ -205,7 +205,7 @@ void dump( NeoStoreDataSource source, Logger log ) for ( long logVersion = logFiles.getLowestLogVersion(); logFiles.versionExists( logVersion ); logVersion++ ) { - if ( logFiles.hasAnyTransaction( logVersion ) ) + if ( logFiles.hasAnyEntries( logVersion ) ) { LogHeader header = logFiles.extractHeader( logVersion ); long firstTransactionIdInThisLog = header.lastCommittedTxId + 1; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFileInformation.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFileInformation.java index 59390bb202f4f..3460ea5383bac 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFileInformation.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFileInformation.java @@ -58,7 +58,7 @@ public long getFirstExistingTxId() throws IOException // OK, so we now have the oldest existing log version here. Open it and see if there's any transaction // in there. If there is then that transaction is the first one that we have. - return logFiles.hasAnyTransaction( version ) ? candidateFirstTx : -1; + return logFiles.hasAnyEntries( version ) ? candidateFirstTx : -1; } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFiles.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFiles.java index e0e33ce422eb8..4f2e2713fe4cb 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFiles.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogFiles.java @@ -89,7 +89,7 @@ public LogHeader extractHeader( long version ) throws IOException return readLogHeader( fileSystem, getLogFileForVersion( version ) ); } - public boolean hasAnyTransaction( long version ) + public boolean hasAnyEntries( long version ) { return fileSystem.getFileSize( getLogFileForVersion( version ) ) > LOG_HEADER_SIZE; } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/NeoStoreDataSourceTest.java b/community/kernel/src/test/java/org/neo4j/kernel/NeoStoreDataSourceTest.java index bd0c688b3869a..0267153e666de 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/NeoStoreDataSourceTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/NeoStoreDataSourceTest.java @@ -213,7 +213,7 @@ private PhysicalLogFiles logWithTransactions( long logVersion, long headerTxId ) { PhysicalLogFiles files = mock( PhysicalLogFiles.class ); when( files.getLowestLogVersion() ).thenReturn( logVersion ); - when( files.hasAnyTransaction( logVersion ) ).thenReturn( true ); + when( files.hasAnyEntries( logVersion ) ).thenReturn( true ); when( files.versionExists( logVersion ) ).thenReturn( true ); when( files.extractHeader( logVersion ) ).thenReturn( new LogHeader( LogEntryVersion.CURRENT.byteCode(), logVersion, headerTxId ) ); @@ -225,7 +225,7 @@ private PhysicalLogFiles logWithTransactionsInNextToOldestLog( long logVersion, { PhysicalLogFiles files = logWithTransactions( logVersion + 1, prevLogLastTxId ); when( files.getLowestLogVersion() ).thenReturn( logVersion ); - when( files.hasAnyTransaction( logVersion ) ).thenReturn( false ); + when( files.hasAnyEntries( logVersion ) ).thenReturn( false ); when( files.versionExists( logVersion ) ).thenReturn( true ); return files; } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/PhysicalLogFileInformationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/PhysicalLogFileInformationTest.java index 8058470579f6f..3bc07c6ce0103 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/PhysicalLogFileInformationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/PhysicalLogFileInformationTest.java @@ -25,7 +25,6 @@ import org.neo4j.kernel.impl.transaction.log.PhysicalLogFileInformation; import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; -import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache; import org.neo4j.kernel.impl.transaction.log.entry.LogHeader; import static org.junit.Assert.assertEquals; @@ -90,7 +89,7 @@ public void shouldReadAndCacheFirstCommittedTransactionIdWhenNotCached() throws when( logFiles.extractHeader( version ) ).thenReturn( new LogHeader( (byte) -1/*ignored*/, -1L/*ignored*/, expected - 1L ) ); - when( logFiles.hasAnyTransaction( version ) ).thenReturn( true ); + when( logFiles.hasAnyEntries( version ) ).thenReturn( true ); long firstCommittedTxId = info.getFirstExistingTxId(); assertEquals( expected, firstCommittedTxId ); @@ -108,7 +107,7 @@ public void shouldReadFirstCommittedTransactionIdWhenCached() throws Exception when( logFiles.getHighestLogVersion() ).thenReturn( version ); when( logFiles.versionExists( version ) ).thenReturn( true ); when( logHeaderCache.getLogHeader( version ) ).thenReturn( expected -1 ); - when( logFiles.hasAnyTransaction( version ) ).thenReturn( true ); + when( logFiles.hasAnyEntries( version ) ).thenReturn( true ); long firstCommittedTxId = info.getFirstExistingTxId(); assertEquals( expected, firstCommittedTxId ); @@ -122,7 +121,7 @@ public void shouldReturnNothingWhenThereAreNoTransactions() throws Exception long version = 10L; when( logFiles.getHighestLogVersion() ).thenReturn( version ); - when( logFiles.hasAnyTransaction( version ) ).thenReturn( false ); + when( logFiles.hasAnyEntries( version ) ).thenReturn( false ); long firstCommittedTxId = info.getFirstExistingTxId(); assertEquals( -1, firstCommittedTxId ); diff --git a/community/primitive-collections/src/main/java/org/neo4j/cursor/IOCursor.java b/community/primitive-collections/src/main/java/org/neo4j/cursor/IOCursor.java index 5d9d4cccd57d6..7bd2dfb6d5679 100644 --- a/community/primitive-collections/src/main/java/org/neo4j/cursor/IOCursor.java +++ b/community/primitive-collections/src/main/java/org/neo4j/cursor/IOCursor.java @@ -23,4 +23,26 @@ public interface IOCursor extends RawCursor { + static IOCursor getEmpty() + { + return new IOCursor() + { + @Override + public boolean next() throws IOException + { + return false; + } + + @Override + public void close() throws IOException + { + } + + @Override + public M get() + { + return null; + } + }; + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepository.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepository.java new file mode 100644 index 0000000000000..cc1ca8288cf84 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepository.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import java.io.File; +import java.io.IOException; + +import org.neo4j.io.file.Files; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.impl.transaction.log.LogVersionRepository; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; + +public class FilenameBasedLogVersionRepository implements LogVersionRepository +{ + private long version = 0; + + public FilenameBasedLogVersionRepository( PhysicalLogFiles logFiles) + { + version = Math.max(logFiles.getHighestLogVersion(), 0); + } + + + @Override + public long getCurrentLogVersion() + { + return version; + } + + @Override + public long incrementAndGetVersion() throws IOException + { + return ++version; + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java index 0b518f48be21a..0944e5003ed78 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/NaiveDurableRaftLog.java @@ -27,8 +27,6 @@ import io.netty.buffer.Unpooled; import org.neo4j.coreedge.raft.replication.MarshallingException; import org.neo4j.coreedge.raft.replication.ReplicatedContent; -import org.neo4j.coreedge.raft.state.ByteBufferMarshal; -import org.neo4j.coreedge.raft.state.ChannelMarshal; import org.neo4j.coreedge.server.ByteBufMarshal; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.StoreChannel; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java new file mode 100644 index 0000000000000..e37530ddecfd7 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftEntryStore.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import java.io.IOException; + +import org.neo4j.coreedge.raft.replication.ReplicatedContent; +import org.neo4j.coreedge.raft.state.ChannelMarshal; +import org.neo4j.cursor.IOCursor; +import org.neo4j.kernel.impl.transaction.log.LogFile; +import org.neo4j.kernel.impl.transaction.log.LogHeaderVisitor; +import org.neo4j.kernel.impl.transaction.log.LogPosition; +import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel; + +public class PhysicalRaftEntryStore implements RaftEntryStore +{ + private final LogFile logFile; + private final RaftLogMetadataCache metadataCache; + private final ChannelMarshal marshal; + + public PhysicalRaftEntryStore( LogFile logFile, RaftLogMetadataCache metadataCache, + ChannelMarshal marshal ) + { + this.logFile = logFile; + this.metadataCache = metadataCache; + this.marshal = marshal; + } + + @Override + public IOCursor getEntriesFrom( final long indexToStartFrom ) throws RaftStorageException + { + // look up in position cache + try + { + RaftLogMetadataCache.RaftLogEntryMetadata metadata = metadataCache.getMetadata( indexToStartFrom ); + LogPosition startPosition = null; + if ( metadata != null ) + { + startPosition = metadata.getStartPosition(); + } + else + { + // ask LogFile about the version it may be in + LogVersionLocator headerVisitor = new LogVersionLocator( indexToStartFrom ); + logFile.accept( headerVisitor ); + startPosition = headerVisitor.getLogPosition(); + } + if ( startPosition == null ) + { + return IOCursor.getEmpty(); + } + ReadableLogChannel reader = logFile.getReader( startPosition ); + + return new PhysicalRaftLogEntryCursor( new RaftRecordCursor<>( reader, marshal ) ); + } + catch ( IOException e ) + { + throw new RaftStorageException( e ); + } + } + + public static final class LogVersionLocator implements LogHeaderVisitor + { + private final long logEntryIndex; + private LogPosition foundPosition; + + public LogVersionLocator( long logEntryIndex ) + { + this.logEntryIndex = logEntryIndex; + } + + @Override + public boolean visit( LogPosition position, long firstLogIndex, long lastLogIndex ) + { + boolean foundIt = logEntryIndex >= firstLogIndex && logEntryIndex <= lastLogIndex; + if ( foundIt ) + { + foundPosition = position; + } + return !foundIt; // continue as long we don't find it + } + + public LogPosition getLogPosition() throws RaftStorageException + { + return foundPosition; + } + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java new file mode 100644 index 0000000000000..dbfa7d043182f --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLog.java @@ -0,0 +1,355 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import static java.lang.String.format; + +import java.io.File; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import org.neo4j.coreedge.raft.replication.ReplicatedContent; +import org.neo4j.coreedge.raft.state.ChannelMarshal; +import org.neo4j.cursor.IOCursor; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.impl.transaction.log.FlushablePositionAwareChannel; +import org.neo4j.kernel.impl.transaction.log.LogHeaderCache; +import org.neo4j.kernel.impl.transaction.log.LogPosition; +import org.neo4j.kernel.impl.transaction.log.LogPositionMarker; +import org.neo4j.kernel.impl.transaction.log.LogVersionRepository; +import org.neo4j.kernel.impl.transaction.log.LoggingLogFileMonitor; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; +import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel; +import org.neo4j.kernel.impl.transaction.log.entry.LogHeader; +import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation; +import org.neo4j.kernel.impl.transaction.log.rotation.LogRotationImpl; +import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent; +import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.lifecycle.Lifecycle; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +public class PhysicalRaftLog implements RaftLog, Lifecycle +{ + public static final String BASE_FILE_NAME = "raft.log"; + public static final String DIRECTORY_NAME = "raft-log"; + + private final PhysicalLogFile logFile; + private final ChannelMarshal marshal; + private final Supplier databaseHealthSupplier; + private final Log log; + private LogRotation logRotation; + private FlushablePositionAwareChannel writer; + private final RaftLogMetadataCache metadataCache; + private final AtomicLong appendIndex = new AtomicLong( -1 ); + private long commitIndex = -1; + private final LogPositionMarker positionMarker = new LogPositionMarker(); + private long term = -1; + + private final RaftEntryStore entryStore; + + private final Set listeners = new CopyOnWriteArraySet<>(); + private final PhysicalLogFiles logFiles; + + public PhysicalRaftLog( FileSystemAbstraction fileSystem, File directory, long rotateAtSize, + int entryCacheSize, PhysicalLogFile.Monitor monitor, + ChannelMarshal marshal, Supplier databaseHealthSupplier, + LogProvider logProvider ) + { + this.marshal = marshal; + this.databaseHealthSupplier = databaseHealthSupplier; + this.log = logProvider.getLog( getClass() ); + + directory.mkdirs(); + + logFiles = new PhysicalLogFiles( directory, BASE_FILE_NAME, fileSystem ); + LogVersionRepository logVersionRepository = new FilenameBasedLogVersionRepository( logFiles ); + + logFile = new PhysicalLogFile( fileSystem, logFiles, rotateAtSize, + appendIndex::get, logVersionRepository, monitor, new LogHeaderCache( 10 ) ); + + this.metadataCache = new RaftLogMetadataCache( entryCacheSize ); + this.entryStore = new PhysicalRaftEntryStore( logFile, metadataCache, marshal ); + } + + @Override + public void replay() throws Throwable + { + + } + + @Override + public void registerListener( Listener listener ) + { + listeners.add( listener ); + } + + @Override + public long append( RaftLogEntry entry ) throws RaftStorageException + { + if ( entry.term() >= term ) + { + term = entry.term(); + } + else + { + throw new RaftStorageException( format( "Non-monotonic term %d for in entry %s in term %d", + entry.term(), entry.toString(), term ) ); + } + + try + { + appendIndex.incrementAndGet(); + LogPositionMarker entryStartPosition = writer.getCurrentPosition( positionMarker ); + writer.put( RecordType.APPEND.value ); + writer.putLong( appendIndex.get() ); + writer.putLong( entry.term() ); + marshal.marshal( entry.content(), writer ); + metadataCache.cacheMetadata( appendIndex.get(), entry.term(), entryStartPosition.newPosition() ); + writer.prepareForFlush().flush(); + + for ( Listener listener : listeners ) + { + listener.onAppended( entry.content(), appendIndex.get() ); + } + logRotation.rotateLogIfNeeded( LogAppendEvent.NULL ); + } + catch ( IOException e ) + { + throw new RaftStorageException( e ); + } + return appendIndex.get(); + } + + @Override + public void truncate( long fromIndex ) throws RaftStorageException + { + if ( fromIndex <= commitIndex ) + { + throw new IllegalArgumentException( format( "cannot truncate (%d) before the commit index (%d)", + fromIndex, commitIndex ) ); + } + + if ( appendIndex.get() >= fromIndex ) + { + appendIndex.set( fromIndex - 1 ); + + for ( Listener listener : listeners ) + { + listener.onTruncated( fromIndex ); + } + try + { + writer.put( RecordType.TRUNCATE.value ); + writer.putLong( fromIndex ); + writer.prepareForFlush().flush(); + logRotation.rotateLogIfNeeded( LogAppendEvent.NULL ); + } + catch ( IOException e ) + { + throw new RaftStorageException( e ); + } + } + term = readEntryTerm( appendIndex.get() ); + } + + @Override + public void commit( long newCommitIndex ) throws RaftStorageException + { + if ( appendIndex.get() == -1 || commitIndex == appendIndex.get() ) + { + return; + } + + long actualNewCommitIndex = Math.min(newCommitIndex, appendIndex.get()); + + while ( commitIndex < actualNewCommitIndex ) + { + commitIndex++; + for ( Listener listener : listeners ) + { + ReplicatedContent content = readEntryContent( commitIndex ); + listener.onCommitted( content, commitIndex ); + } + } + try + { + writer.put( RecordType.COMMIT.value ); + writer.putLong( commitIndex ); + writer.prepareForFlush().flush(); + logRotation.rotateLogIfNeeded( LogAppendEvent.NULL ); + } + catch ( IOException e ) + { + throw new RaftStorageException( e ); + } + } + + @Override + public long appendIndex() + { + return appendIndex.get(); + } + + @Override + public long commitIndex() + { + return commitIndex; + } + + @Override + public RaftLogEntry readLogEntry( long logIndex ) throws RaftStorageException + { + try ( IOCursor entriesFrom = entryStore.getEntriesFrom( logIndex ) ) + { + while ( entriesFrom.next() ) + { + RaftLogAppendRecord raftLogAppendRecord = entriesFrom.get(); + if ( raftLogAppendRecord.getLogIndex() == logIndex ) + { + return raftLogAppendRecord.getLogEntry(); + } + else if ( raftLogAppendRecord.getLogIndex() > logIndex ) + { + throw new IllegalStateException( format( "Asked for index %d but got up to %d without " + + "finding it.", logIndex, raftLogAppendRecord.getLogIndex() ) ); + } + } + } + catch ( IOException e ) + { + throw new RaftStorageException( e ); + } + return null; + } + + @Override + public ReplicatedContent readEntryContent( long logIndex ) throws RaftStorageException + { + RaftLogEntry raftLogEntry = readLogEntry( logIndex ); + return raftLogEntry == null ? null : raftLogEntry.content(); + } + + @Override + public long readEntryTerm( long logIndex ) throws RaftStorageException + { + RaftLogEntry raftLogEntry = readLogEntry( logIndex ); + return raftLogEntry == null ? -1 : raftLogEntry.term(); + } + + @Override + public boolean entryExists( long logIndex ) throws RaftStorageException + { + return readLogEntry( logIndex ) != null; + } + + @Override + public void init() throws Throwable + { + logFile.init(); + } + + @Override + public void start() throws Throwable + { + this.logRotation = + new LogRotationImpl( new LoggingLogFileMonitor( log ), logFile, databaseHealthSupplier.get() ); + + logFile.start(); + restoreIndexes(); + this.writer = logFile.getWriter(); + } + + private void restoreIndexes() throws IOException + { + long lowestLogVersion = logFiles.getLowestLogVersion(); + ReadableLogChannel reader = logFile.getReader( new LogPosition( lowestLogVersion, LogHeader.LOG_HEADER_SIZE ) ); + try ( RaftRecordCursor recordCursor = new RaftRecordCursor<>( reader, marshal ) ) + { + while ( recordCursor.next() ) + { + RaftLogRecord record = recordCursor.get(); + switch ( record.getType() ) + { + case COMMIT: + commitIndex = record.getLogIndex(); + break; + case APPEND: + appendIndex.set( record.getLogIndex() ); + break; + default: + throw new IllegalStateException( "Record is of unknown type " + record ); + } + } + } + log.info( "Restored commit index at %d, append index at %d, started at version %d (largest is %d)", + commitIndex, appendIndex.get(), lowestLogVersion, logFiles.getHighestLogVersion() ); + } + + @Override + public void stop() throws Throwable + { + logFile.stop(); + this.writer = null; + } + + @Override + public void shutdown() throws Throwable + { + logFile.shutdown(); + } + + public enum RecordType + { + APPEND( (byte) 0 ), COMMIT( (byte) 1 ), TRUNCATE( (byte) 2 ); + + private final byte value; + + RecordType( byte value ) + { + this.value = value; + } + + public byte value() + { + return value; + } + + public static RecordType forValue( byte value ) + { + switch ( value ) + { + case 0: + return APPEND; + case 1: + return COMMIT; + case 2: + return TRUNCATE; + default: + throw new IllegalArgumentException( "Value " + value + " is not a known entry type" ); + } + } + } + +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java new file mode 100644 index 0000000000000..043a9879d9f91 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursor.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Queue; +import java.util.function.Predicate; + +import org.neo4j.cursor.IOCursor; + +public class PhysicalRaftLogEntryCursor implements IOCursor +{ + private final RaftRecordCursor recordCursor; + private RaftLogAppendRecord currentEntry; + private final Queue logicallyNext; + private final List staging; + + public PhysicalRaftLogEntryCursor( RaftRecordCursor recordCursor ) + { + this.recordCursor = recordCursor; + this.logicallyNext = new LinkedList<>(); + this.staging = new LinkedList<>(); + } + + @Override + public boolean next() throws IOException + { + while ( recordCursor.next() ) + { + RaftLogRecord record = recordCursor.get(); + switch ( record.getType() ) + { + case APPEND: + staging.add( (RaftLogAppendRecord) record ); + break; + case COMMIT: + { + moveStagingToLogicallyNextAndSetCurrentEntry( logIndex -> logIndex <= record.getLogIndex() ); + + if ( currentEntry != null ) + { + return true; + } + + break; + } + case TRUNCATE: + { + removeFromStaging( index -> index >= record.getLogIndex() ); + break; + } + } + } + moveStagingToLogicallyNextAndSetCurrentEntry( index -> true ); + return currentEntry != null; + } + + private void removeFromStaging( Predicate condition ) + { + ListIterator iterator = staging.listIterator(); + while ( iterator.hasNext() ) + { + if ( condition.test( iterator.next().getLogIndex() ) ) + { + iterator.remove(); + } + } + } + + private void moveStagingToLogicallyNextAndSetCurrentEntry( Predicate condition ) + { + ListIterator iterator = staging.listIterator(); + while ( iterator.hasNext() ) + { + RaftLogAppendRecord next = iterator.next(); + if ( condition.test( next.getLogIndex() ) ) + { + logicallyNext.offer( next ); + iterator.remove(); + } + } + currentEntry = logicallyNext.poll(); + } + + @Override + public void close() throws IOException + { + recordCursor.close(); + } + + @Override + public RaftLogAppendRecord get() + { + return currentEntry; + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftCommitEntry.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftCommitEntry.java new file mode 100644 index 0000000000000..f9ef0efef2ec8 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftCommitEntry.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +public class RaftCommitEntry +{ + private final long commitIndex; + + public RaftCommitEntry( long commitIndex ) + { + this.commitIndex = commitIndex; + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftEntryStore.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftEntryStore.java new file mode 100644 index 0000000000000..082cb6bde9ee5 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftEntryStore.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import org.neo4j.cursor.IOCursor; + +public interface RaftEntryStore +{ + IOCursor getEntriesFrom( long logIndex ) throws RaftStorageException; +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java new file mode 100644 index 0000000000000..d245cf13eb53e --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogAppendRecord.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +public class RaftLogAppendRecord extends RaftLogRecord +{ + private final RaftLogEntry logEntry; + + RaftLogAppendRecord( long logIndex, RaftLogEntry logEntry ) + { + super( PhysicalRaftLog.RecordType.APPEND, logIndex ); + this.logEntry = logEntry; + } + + public RaftLogEntry getLogEntry() + { + return logEntry; + } + +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCommitRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCommitRecord.java new file mode 100644 index 0000000000000..d012518c1fe5b --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogCommitRecord.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +public class RaftLogCommitRecord extends RaftLogRecord +{ + public RaftLogCommitRecord( long logIndex ) + { + super( PhysicalRaftLog.RecordType.COMMIT, logIndex ); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogEntry.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogEntry.java index 0a19ddba03992..4d9f6293ebef2 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogEntry.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogEntry.java @@ -77,4 +77,5 @@ public String toString() { return String.format( "RaftLogEntry{term=%d, content=%s}", term, content ); } + } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCache.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCache.java new file mode 100644 index 0000000000000..3b3971942c5d8 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCache.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + + +import java.util.Objects; + +import org.neo4j.helpers.collection.LruCache; +import org.neo4j.kernel.impl.transaction.log.LogPosition; + +public class RaftLogMetadataCache +{ + private final LruCache raftLogEntryCache; + + public RaftLogMetadataCache( int logEntryCacheSize ) + { + this.raftLogEntryCache = new LruCache<>( "Raft log entry cache", logEntryCacheSize ); + } + + public void clear() + { + raftLogEntryCache.clear(); + } + + /** + * Returns the metadata for the entry at position {@param logIndex}, null if the metadata is not present in the cache + */ + public RaftLogEntryMetadata getMetadata( long logIndex ) + { + return raftLogEntryCache.get( logIndex ); + } + + public RaftLogEntryMetadata cacheMetadata( long logIndex, long entryTerm, LogPosition position ) + { + if ( position.getByteOffset() == -1 ) + { + throw new RuntimeException( "StartEntry.position is " + position ); + } + + RaftLogEntryMetadata result = new RaftLogEntryMetadata( entryTerm, position ); + raftLogEntryCache.put( logIndex, result ); + return result; + } + + public static class RaftLogEntryMetadata + { + private final long entryTerm; + private final LogPosition startPosition; + + public RaftLogEntryMetadata( long entryTerm, LogPosition startPosition ) + { + Objects.requireNonNull( startPosition ); + this.entryTerm = entryTerm; + this.startPosition = startPosition; + } + + public long getEntryTerm() + { + return entryTerm; + } + + public LogPosition getStartPosition() + { + return startPosition; + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + RaftLogEntryMetadata that = (RaftLogEntryMetadata) o; + + if ( entryTerm != that.entryTerm ) + { + return false; + } + return startPosition.equals( that.startPosition ); + + } + + @Override + public int hashCode() + { + int result = (int) (entryTerm ^ (entryTerm >>> 32)); + result = 31 * result + startPosition.hashCode(); + return result; + } + + @Override + public String toString() + { + return "RaftLogEntryMetadata{" + + "entryTerm=" + entryTerm + + ", startPosition=" + startPosition + + '}'; + } + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogRecord.java new file mode 100644 index 0000000000000..f947633d4096a --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogRecord.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +public abstract class RaftLogRecord +{ + private final long logIndex; + private final PhysicalRaftLog.RecordType type; + + RaftLogRecord( PhysicalRaftLog.RecordType type, long logIndex ) + { + this.type = type; + this.logIndex = logIndex; + } + + public long getLogIndex() + { + return logIndex; + } + + public PhysicalRaftLog.RecordType getType() + { + return type; + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogTruncateRecord.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogTruncateRecord.java new file mode 100644 index 0000000000000..6b5e36d3aa2b7 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftLogTruncateRecord.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +public class RaftLogTruncateRecord extends RaftLogRecord +{ + RaftLogTruncateRecord( long fromLogIndex ) + { + super( PhysicalRaftLog.RecordType.TRUNCATE, fromLogIndex ); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java new file mode 100644 index 0000000000000..5f88f0d469388 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/log/RaftRecordCursor.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import java.io.IOException; + +import org.neo4j.coreedge.raft.replication.ReplicatedContent; +import org.neo4j.coreedge.raft.state.ChannelMarshal; +import org.neo4j.cursor.IOCursor; +import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; +import org.neo4j.storageengine.api.ReadPastEndException; + +public class RaftRecordCursor implements IOCursor +{ + private final T channel; + private final ChannelMarshal marshal; + private RaftLogRecord lastFoundRecord; + + public RaftRecordCursor( T channel, ChannelMarshal marshal ) + { + this.channel = channel; + this.marshal = marshal; + } + + @Override + public boolean next() throws IOException + { + try + { + byte type = channel.get(); + switch ( PhysicalRaftLog.RecordType.forValue( type ) ) + { + case APPEND: + long appendIndex = channel.getLong(); + long term = channel.getLong(); + ReplicatedContent content = marshal.unmarshal( channel ); + if ( content == null ) + { + return false; + } + lastFoundRecord = new RaftLogAppendRecord( appendIndex, new RaftLogEntry( term, content ) ); + return true; + case COMMIT: + long commitIndex = channel.getLong(); + lastFoundRecord = new RaftLogCommitRecord( commitIndex ); + return true; + case TRUNCATE: + long truncateFromIndex = channel.getLong(); + lastFoundRecord = new RaftLogTruncateRecord( truncateFromIndex ); + return true; + default: + throw new IllegalStateException( "Not really sure how we got here. Read type value was " + type ); + } + } + catch ( ReadPastEndException notEnoughBytes ) + { + return false; + } + } + + @Override + public void close() throws IOException + { + channel.close(); + } + + @Override + public RaftLogRecord get() + { + return lastFoundRecord; + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java index 3247d040a167e..9354146c4f698 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/codecs/RaftMessageDecoder.java @@ -28,7 +28,6 @@ import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.log.RaftLogEntry; -import org.neo4j.coreedge.raft.net.CoreReplicatedContentMarshal; import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.server.AdvertisedSocketAddress; import org.neo4j.coreedge.server.ByteBufMarshal; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index 710ac28b9f092..cc033812a0d2b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -39,6 +39,7 @@ import org.neo4j.coreedge.raft.RaftServer; import org.neo4j.coreedge.raft.log.MonitoredRaftLog; import org.neo4j.coreedge.raft.log.NaiveDurableRaftLog; +import org.neo4j.coreedge.raft.log.PhysicalRaftLog; import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.membership.CoreMemberSetBuilder; import org.neo4j.coreedge.raft.membership.MembershipWaiter; @@ -114,6 +115,7 @@ import org.neo4j.kernel.impl.store.stats.IdBasedStoreEntityCounters; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.internal.DatabaseHealth; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepositoryTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepositoryTest.java new file mode 100644 index 0000000000000..5e12eb243f225 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/FilenameBasedLogVersionRepositoryTest.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Rule; +import org.junit.Test; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; +import org.neo4j.test.TargetDirectory; + +public class FilenameBasedLogVersionRepositoryTest +{ + @Rule + public TargetDirectory.TestDirectory testDir; + + @Test + public void shouldStartAtVersion0() throws Exception + { + PhysicalLogFiles logFiles = mock( PhysicalLogFiles.class ); + when( logFiles.getHighestLogVersion() ).thenReturn( -1L ); + + FilenameBasedLogVersionRepository repository = new FilenameBasedLogVersionRepository( logFiles ); + + assertEquals( 0L, repository.getCurrentLogVersion() ); + } + + @Test + public void shouldPickHighestVersionAvailable() throws Exception + { + PhysicalLogFiles logFiles = mock( PhysicalLogFiles.class ); + when( logFiles.getHighestLogVersion() ).thenReturn( 2L ); + + FilenameBasedLogVersionRepository repository = new FilenameBasedLogVersionRepository( logFiles ); + + assertEquals( 2L, repository.getCurrentLogVersion() ); + + } +} \ No newline at end of file diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java new file mode 100644 index 0000000000000..54f367ea3a8cf --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogContractTest.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.io.IOException; +import java.util.function.Supplier; + +import org.junit.After; +import org.junit.Test; +import org.neo4j.coreedge.raft.ReplicatedInteger; +import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile; +import org.neo4j.kernel.lifecycle.LifeSupport; +import org.neo4j.logging.NullLogProvider; + +public class PhysicalRaftLogContractTest extends RaftLogContractTest +{ + private PhysicalRaftLog raftLog; + private LifeSupport life = new LifeSupport(); + private FileSystemAbstraction fileSystem; + + @Override + public RaftLog createRaftLog() throws IOException + { + this.raftLog = createRaftLog( 100 ); + return raftLog; + } + + @After + public void tearDown() throws Throwable + { + life.stop(); + life.shutdown(); + } + + private PhysicalRaftLog createRaftLog( int cacheSize ) + { + if ( fileSystem == null ) + { + fileSystem = new EphemeralFileSystemAbstraction(); + } + File directory = new File( "raft-log" ); + fileSystem.mkdir( directory ); + + PhysicalRaftLog newRaftLog = new PhysicalRaftLog( fileSystem, directory, 1000000, cacheSize, + new PhysicalLogFile.Monitor.Adapter(), new DummyRaftableContentSerializer(), mock( Supplier.class ), + NullLogProvider.getInstance() ); + life.add( newRaftLog ); + life.init(); + life.start(); + return newRaftLog; + } + + @Test + public void shouldReadBackInCachedEntry() throws Throwable + { + // Given + PhysicalRaftLog raftLog = (PhysicalRaftLog) createRaftLog(); + int term = 0; + ReplicatedInteger content = ReplicatedInteger.valueOf( 4 ); + + // When + long entryIndex = raftLog.append( new RaftLogEntry( term, content ) ); + + // Then + assertTrue( raftLog.entryExists( entryIndex ) ); + assertEquals( content, raftLog.readEntryContent( entryIndex ) ); + assertEquals( term, raftLog.readEntryTerm( entryIndex ) ); + } + + @Test + public void shouldReadBackNonCachedEntry() throws Exception + { + // Given + int cacheSize = 1; + PhysicalRaftLog raftLog = createRaftLog( cacheSize ); + int term = 0; + ReplicatedInteger content1 = ReplicatedInteger.valueOf( 4 ); + ReplicatedInteger content2 = ReplicatedInteger.valueOf( 5 ); + + // When + long entryIndex1 = raftLog.append( new RaftLogEntry( term, content1 ) ); + long entryIndex2 = raftLog.append( new RaftLogEntry( term, content2 ) ); // this will push the first entry out of cache + + // Then + // entry 1 should be there + assertTrue( raftLog.entryExists( entryIndex1 ) ); + assertEquals( content1, raftLog.readEntryContent( entryIndex1 ) ); + assertEquals( term, raftLog.readEntryTerm( entryIndex1 ) ); + + // entry 2 should be there also + assertTrue( raftLog.entryExists( entryIndex2 ) ); + assertEquals( content2, raftLog.readEntryContent( entryIndex2 ) ); + assertEquals( term, raftLog.readEntryTerm( entryIndex2 ) ); + } + + @Test + public void shouldRestoreCommitIndexOnStartup() throws Throwable + { + // Given + PhysicalRaftLog raftLog = createRaftLog( 100 /* cache size */ ); + int term = 0; + ReplicatedInteger content1 = ReplicatedInteger.valueOf( 4 ); + ReplicatedInteger content2 = ReplicatedInteger.valueOf( 5 ); + long entryIndex1 = raftLog.append( new RaftLogEntry( term, content1 ) ); + long entryIndex2 = raftLog.append( new RaftLogEntry( term, content2 ) ); + + raftLog.commit( entryIndex1 ); + + // When + // we restart the raft log + life.remove( raftLog ); // stops the removed instance + raftLog = createRaftLog( 100 ); + + // Then + assertEquals( entryIndex1, raftLog.commitIndex() ); + assertEquals( entryIndex2, raftLog.appendIndex() ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java new file mode 100644 index 0000000000000..57865431e3f80 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogEntryCursorTest.java @@ -0,0 +1,307 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Test; +import org.neo4j.coreedge.raft.ReplicatedInteger; + +public class PhysicalRaftLogEntryCursorTest +{ + @Test + public void shouldReturnAllRecordsIfAllAreAppended() throws Exception + { + // Given + ReplicatedInteger content = ReplicatedInteger.valueOf( 1 ); + RaftLogEntry payload = new RaftLogEntry( 0, content ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 1, payload ) ) + .thenReturn( new RaftLogAppendRecord( 2, payload ) ) + .thenReturn( new RaftLogAppendRecord( 3, payload ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload, entryCursor.get().getLogEntry() ); + assertTrue( entryCursor.next() ); + assertEquals( payload, entryCursor.get().getLogEntry() ); + assertTrue( entryCursor.next() ); + assertEquals( payload, entryCursor.get().getLogEntry() ); + + assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + } + + @Test + public void shouldReturnNonTruncatedRecords() throws Exception + { + // Given + ReplicatedInteger content = ReplicatedInteger.valueOf( 1 ); + RaftLogEntry payload = new RaftLogEntry( 0, content ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 1, payload ) ) + .thenReturn( new RaftLogAppendRecord( 2, payload ) ) + .thenReturn( new RaftLogTruncateRecord( 2 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload, entryCursor.get().getLogEntry() ); + + assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + } + @Test + public void shouldNotReturnAnyRecordsIfTheyAreAllTruncated() throws Exception + { + // Given + ReplicatedInteger content = ReplicatedInteger.valueOf( 1 ); + RaftLogEntry payload = new RaftLogEntry( 0, content ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 1, payload ) ) + .thenReturn( new RaftLogAppendRecord( 2, payload ) ) + .thenReturn( new RaftLogTruncateRecord( 2 ) ) + .thenReturn( new RaftLogTruncateRecord( 1 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertFalse( entryCursor.next() ); + } + + @Test + public void shouldReturnCommittedRecords() throws Exception + { + // Given + ReplicatedInteger content = ReplicatedInteger.valueOf( 1 ); + RaftLogEntry payload = new RaftLogEntry( 0, content ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 1, payload ) ) + .thenReturn( new RaftLogAppendRecord( 2, payload ) ) + .thenReturn( new RaftLogCommitRecord( 2 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload, entryCursor.get().getLogEntry() ); + + assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + } + + @Test + public void shouldSkipTruncatedAndReturnCommittedRecords() throws Exception + { + // Given + RaftLogEntry payloadTruncated = new RaftLogEntry( 0, ReplicatedInteger.valueOf( 4 ) ); + RaftLogEntry payload5 = new RaftLogEntry( 5, ReplicatedInteger.valueOf( 5 ) ); + RaftLogEntry payload6 = new RaftLogEntry( 6, ReplicatedInteger.valueOf( 6 ) ); + RaftLogEntry payload7 = new RaftLogEntry( 7, ReplicatedInteger.valueOf( 7 ) ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 5, payload5 ) ) + .thenReturn( new RaftLogAppendRecord( 6, payloadTruncated ) ) + .thenReturn( new RaftLogAppendRecord( 7, payloadTruncated ) ) + .thenReturn( new RaftLogTruncateRecord( 6 ) ) + .thenReturn( new RaftLogAppendRecord( 6, payload6 ) ) + .thenReturn( new RaftLogAppendRecord( 7, payload7 ) ) + .thenReturn( new RaftLogCommitRecord( 7 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload5, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload6, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload7, entryCursor.get().getLogEntry() ); + + assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + } + + @Test + public void shouldSkipTruncatedAndReturnOnEndOfFile() throws Exception + { + // Given + RaftLogEntry payloadTruncated = new RaftLogEntry( 0, ReplicatedInteger.valueOf( 4 ) ); + RaftLogEntry payload5 = new RaftLogEntry( 5, ReplicatedInteger.valueOf( 5 ) ); + RaftLogEntry payload6 = new RaftLogEntry( 6, ReplicatedInteger.valueOf( 6 ) ); + RaftLogEntry payload7 = new RaftLogEntry( 7, ReplicatedInteger.valueOf( 7 ) ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 5, payload5 ) ) + .thenReturn( new RaftLogAppendRecord( 6, payloadTruncated ) ) + .thenReturn( new RaftLogAppendRecord( 7, payloadTruncated ) ) + .thenReturn( new RaftLogTruncateRecord( 6 ) ) + .thenReturn( new RaftLogAppendRecord( 6, payload6 ) ) + .thenReturn( new RaftLogAppendRecord( 7, payload7 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload5, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload6, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload7, entryCursor.get().getLogEntry() ); + + assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + } + + @Test + public void shouldSkipTruncatedAndReturnCommittedRecordsAndRecordsAtEndOfFile() throws Exception + { + // Given + RaftLogEntry payloadTruncated = new RaftLogEntry( 0, ReplicatedInteger.valueOf( 4 ) ); + RaftLogEntry payload5 = new RaftLogEntry( 5, ReplicatedInteger.valueOf( 5 ) ); + RaftLogEntry payload6 = new RaftLogEntry( 6, ReplicatedInteger.valueOf( 6 ) ); + RaftLogEntry payload7 = new RaftLogEntry( 7, ReplicatedInteger.valueOf( 7 ) ); + RaftLogEntry payload8 = new RaftLogEntry( 7, ReplicatedInteger.valueOf( 8 ) ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + // return 3 records that are just appended, then be done with it + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 5, payload5 ) ) + .thenReturn( new RaftLogAppendRecord( 6, payloadTruncated ) ) + .thenReturn( new RaftLogAppendRecord( 7, payloadTruncated ) ) + .thenReturn( new RaftLogTruncateRecord( 6 ) ) + .thenReturn( new RaftLogAppendRecord( 6, payload6 ) ) + .thenReturn( new RaftLogAppendRecord( 7, payload7 ) ) + .thenReturn( new RaftLogCommitRecord( 7 ) ) + .thenReturn( new RaftLogAppendRecord( 8, payload8 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload5, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload6, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload7, entryCursor.get().getLogEntry() ); + + assertTrue( entryCursor.next() ); + assertEquals( payload8, entryCursor.get().getLogEntry() ); + + assertFalse( entryCursor.next() ); // record cursor is done, there should be no more entries/ + } + + @Test + public void shouldReturnUncommittedEntriesAtEndOfFileDespiteCommitEntryForNonEncounteredRecords() throws Exception + { + // Given + RaftLogEntry payload5 = new RaftLogEntry( 5, ReplicatedInteger.valueOf( 5 ) ); + RaftRecordCursor recordCursor = mock( RaftRecordCursor.class ); + when( recordCursor.next() ) + .thenReturn( true ) + .thenReturn( true ) + .thenReturn( false ); + when( recordCursor.get() ) + .thenReturn( new RaftLogAppendRecord( 5, payload5 ) ) + .thenReturn( new RaftLogCommitRecord( 2 ) ) + .thenReturn( null ); + + PhysicalRaftLogEntryCursor entryCursor = new PhysicalRaftLogEntryCursor( recordCursor ); + + // When - Then + assertTrue( entryCursor.next() ); + assertEquals( payload5, entryCursor.get().getLogEntry() ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java new file mode 100644 index 0000000000000..549180a25c563 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/PhysicalRaftLogRotationTest.java @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.After; +import org.junit.Test; +import org.neo4j.coreedge.raft.ReplicatedInteger; +import org.neo4j.coreedge.raft.ReplicatedString; +import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile; +import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.lifecycle.LifeSupport; +import org.neo4j.logging.NullLogProvider; + +public class PhysicalRaftLogRotationTest +{ + private LifeSupport life = new LifeSupport(); + private FileSystemAbstraction fileSystem; + + @After + public void tearDown() throws Throwable + { + life.stop(); + life.shutdown(); + } + + private PhysicalRaftLog createRaftLog( long rotateAtSize, PhysicalLogFile.Monitor logFileMonitor ) + { + if ( fileSystem == null ) + { + fileSystem = new EphemeralFileSystemAbstraction(); + } + File directory = new File( "raft-log" ); + fileSystem.mkdir( directory ); + + PhysicalRaftLog newRaftLog = new PhysicalRaftLog( fileSystem, directory, rotateAtSize, 100, + logFileMonitor, new DummyRaftableContentSerializer(), () -> mock( DatabaseHealth.class ), + NullLogProvider.getInstance() ); + life.add( newRaftLog ); + life.init(); + life.start(); + return newRaftLog; + } + + @Test + public void shouldRotateOnAppendWhenRotateSizeIsReached() throws Exception + { + // Given + AtomicLong currentVersion = new AtomicLong(); + PhysicalLogFile.Monitor logFileMonitor = + ( logFile, logVersion, lastTransactionId, clean ) -> currentVersion.set( logVersion ); + int rotateAtSize = 100; + PhysicalRaftLog log = createRaftLog( rotateAtSize, logFileMonitor ); + + StringBuilder builder = new StringBuilder(); + for ( int i = 0; i < rotateAtSize; i++ ) + { + builder.append( "i" ); + } + + // When + ReplicatedString stringThatIsMoreThan100Bytes = new ReplicatedString( builder.toString() ); + log.append( new RaftLogEntry( 0, stringThatIsMoreThan100Bytes ) ); + + // Then + assertEquals( 1, currentVersion.get() ); + } + + @Test + public void shouldRotateOnCommitWhenRotateSizeIsReached() throws Exception + { + // Given + AtomicLong currentVersion = new AtomicLong(); + PhysicalLogFile.Monitor logFileMonitor = + ( logFile, logVersion, lastTransactionId, clean ) -> currentVersion.set( logVersion ); + int rotateAtSize = 100; + PhysicalRaftLog log = createRaftLog( rotateAtSize, logFileMonitor ); + + StringBuilder builder = new StringBuilder(); + for ( int i = 0; i < rotateAtSize - 40; i++ ) + { + builder.append( "i" ); + } + + // When + ReplicatedString stringThatGetsTheSizeToAlmost100Bytes = new ReplicatedString( builder.toString() ); + log.append( new RaftLogEntry( 0, stringThatGetsTheSizeToAlmost100Bytes ) ); + assertEquals( 0, currentVersion.get() ); + log.commit( 1 ); + + // Then + assertEquals( 1, currentVersion.get() ); + } + + @Test + public void shouldRotateOnTruncateWhenRotateSizeIsReached() throws Exception + { + // Given + AtomicLong currentVersion = new AtomicLong(); + PhysicalLogFile.Monitor logFileMonitor = + ( logFile, logVersion, lastTransactionId, clean ) -> currentVersion.set( logVersion ); + int rotateAtSize = 100; + PhysicalRaftLog log = createRaftLog( rotateAtSize, logFileMonitor ); + + StringBuilder builder = new StringBuilder(); + for ( int i = 0; i < rotateAtSize - 40; i++ ) + { + builder.append( "i" ); + } + + // When + ReplicatedString stringThatGetsTheSizeToAlmost100Bytes = new ReplicatedString( builder.toString() ); + long indexToTruncate = log.append( new RaftLogEntry( 0, stringThatGetsTheSizeToAlmost100Bytes ) ); + assertEquals( 0, currentVersion.get() ); + log.truncate( indexToTruncate ); + + // Then + assertEquals( 1, currentVersion.get() ); + } + + @Test + public void shouldBeAbleToRecoverToLatestStateAfterRotation() throws Throwable + { + int rotateAtSize = 100; + PhysicalRaftLog log = createRaftLog( rotateAtSize, new PhysicalLogFile.Monitor.Adapter() ); + + StringBuilder builder = new StringBuilder(); + for ( int i = 0; i < rotateAtSize - 40; i++ ) + { + builder.append( "i" ); + } + + ReplicatedString stringThatGetsTheSizeToAlmost100Bytes = new ReplicatedString( builder.toString() ); + int term = 0; + long indexToCommit = log.append( new RaftLogEntry( term, stringThatGetsTheSizeToAlmost100Bytes ) ); + log.commit( indexToCommit ); + indexToCommit = log.append( new RaftLogEntry( term, ReplicatedInteger.valueOf( 1 ) ) ); + log.commit( indexToCommit ); + + // When + life.remove( log ); + log = createRaftLog( rotateAtSize, new PhysicalLogFile.Monitor.Adapter() ); + + // Then + assertEquals( indexToCommit, log.commitIndex() ); + assertEquals( indexToCommit, log.appendIndex() ); + assertTrue( log.entryExists( indexToCommit ) ); + assertEquals( term, log.readEntryTerm( indexToCommit ) ); + } + + + @Test + public void shouldBeAbleToRecoverToLatestStateAfterRotationWhenEarlierFilesArePruned() throws Throwable + { + int rotateAtSize = 100; + PhysicalRaftLog log = createRaftLog( rotateAtSize, new PhysicalLogFile.Monitor.Adapter() ); + + StringBuilder builder = new StringBuilder(); + for ( int i = 0; i < rotateAtSize - 40; i++ ) + { + builder.append( "i" ); + } + + ReplicatedString stringThatGetsTheSizeToAlmost100Bytes = new ReplicatedString( builder.toString() ); + int term = 0; + long indexToCommit = log.append( new RaftLogEntry( term, stringThatGetsTheSizeToAlmost100Bytes ) ); + log.commit( indexToCommit ); + indexToCommit = log.append( new RaftLogEntry( term, ReplicatedInteger.valueOf( 1 ) ) ); + log.commit( indexToCommit ); + + // When + life.remove( log ); + + fileSystem.deleteFile( new File( new File("raft-log"), "raft.log.0" ) ); // hackish, decorate it my great Tech Liege + + log = createRaftLog( rotateAtSize, new PhysicalLogFile.Monitor.Adapter() ); + + // Then + assertEquals( indexToCommit, log.commitIndex() ); + assertEquals( indexToCommit, log.appendIndex() ); + assertTrue( log.entryExists( indexToCommit ) ); + assertEquals( term, log.readEntryTerm( indexToCommit ) ); + } +} \ No newline at end of file diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftInstanceLogTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftInstanceLogTest.java index 7333ca4c58c84..99638edf719d7 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftInstanceLogTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftInstanceLogTest.java @@ -28,9 +28,6 @@ import org.neo4j.coreedge.raft.RaftInstance; import org.neo4j.coreedge.raft.RaftInstanceBuilder; import org.neo4j.coreedge.raft.ReplicatedInteger; -import org.neo4j.coreedge.raft.log.InMemoryRaftLog; -import org.neo4j.coreedge.raft.log.RaftLog; -import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.state.StateMachine; import org.neo4j.coreedge.server.RaftTestMember; import org.neo4j.coreedge.server.RaftTestMemberSetBuilder; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java index e2d45e6bcab7f..eb75ab304c992 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogAdversarialTest.java @@ -31,7 +31,6 @@ import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.neo4j.graphdb.mockfs.SelectiveFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.NullLogProvider; import static org.hamcrest.CoreMatchers.is; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogContractTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogContractTest.java index fcc6001c732dc..f3fd26346f0ca 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogContractTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogContractTest.java @@ -28,6 +28,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; @@ -117,6 +118,26 @@ public void shouldCommitOutOfOrderAppend() throws Exception assertThat( log.entryExists( 0 ), is( true ) ); } + @Test + public void shouldCommitOnlyOnce() throws Exception + { + RaftLog log = createRaftLog(); + + RaftLog.Listener listener = mock( RaftLog.Listener.class ); + log.registerListener( listener ); + + RaftLogEntry logEntry = new RaftLogEntry( 1, ReplicatedInteger.valueOf( 1 ) ); + log.append( logEntry ); + + log.commit( 10 ); + log.commit( 10 ); + + assertThat( log.appendIndex(), is( 0L ) ); + assertThat( log.commitIndex(), is( 0L ) ); + assertThat( log.entryExists( 0 ), is( true ) ); + verify( listener, times( 1 ) ).onCommitted( eq( logEntry.content() ), anyLong() ); + } + @Test public void shouldTruncatePreviouslyAppendedEntries() throws Exception { @@ -224,4 +245,141 @@ public void shouldRejectNonMonotonicTermsForEntries() throws Exception // expected } } + + @Test + public void shouldCommitAndThenTruncateSubsequentEntry() throws Exception + { + // given + RaftLog log = createRaftLog(); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) ); + long toCommit = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); + long toTruncate = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) ); + + // when + log.commit( toCommit ); + log.truncate( toTruncate ); + + // then + assertThat( log.entryExists( toCommit ), is( true ) ); + assertThat( log.entryExists( toTruncate ), is( false ) ); + assertThat( log.readEntryTerm( toCommit ), is( 0L ) ); + } + + @Test + public void shouldTruncateAndThenCommitPreviousEntry() throws Exception + { + // given + RaftLog log = createRaftLog(); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) ); + long toCommit = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); + long toTruncate = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) ); + + // when + log.truncate( toTruncate ); + log.commit( toCommit ); + + // then + assertThat( log.entryExists( toCommit ), is( true ) ); + assertThat( log.entryExists( toTruncate ), is( false ) ); + assertThat( log.readEntryTerm( toCommit ), is( 0L ) ); + } + + @Test + public void shouldCommitAfterTruncatingAndAppending() throws Exception + { + // given + RaftLog log = createRaftLog(); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) ); + long toCommit = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); + long toTruncate = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) ); + + /* + 0 1 2 Tr(2) 2 C*(1) + */ + + // when + log.truncate( toTruncate ); + long lastAppended = log.append( new RaftLogEntry( 2, ReplicatedInteger.valueOf( 3 ) ) ); + log.commit( toCommit ); + + // then + assertThat( log.entryExists( toCommit ), is( true ) ); + assertThat( log.entryExists( lastAppended ), is( true ) ); + assertThat( log.entryExists( toTruncate ), is( true ) ); // index is "reused" + assertThat( log.readEntryTerm( toCommit ), is( 0L ) ); + assertThat( log.readEntryTerm( lastAppended ), is( 2L ) ); + } + + @Test + public void shouldCommitAfterAppendingAndTruncating() throws Exception + { + // given + RaftLog log = createRaftLog(); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) ); + long toCommit = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); + long toTruncate = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) ); + + // when + long lastAppended = log.append( new RaftLogEntry( 2, ReplicatedInteger.valueOf( 3 ) ) ); + log.truncate( toTruncate ); + log.commit( toCommit ); + + // then + assertThat( log.entryExists( toCommit ), is( true ) ); + assertThat( log.entryExists( lastAppended ), is( false ) ); + assertThat( log.entryExists( toTruncate ), is( false ) ); + assertThat( log.readEntryTerm( toCommit ), is( 0L ) ); + } + + @Test + public void shouldNotAllowTruncationAtLastCommit() throws Exception + { + // given + RaftLog log = createRaftLog(); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) ); + long toCommit = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) ); + + log.commit( toCommit ); + + try + { + // when + log.truncate( toCommit ); + fail("Truncation at this point should have failed"); + } + catch( IllegalArgumentException truncationFailed ) + { + // awesome + } + + // then + assertThat( log.entryExists( toCommit ), is( true ) ); + } + + @Test + public void shouldNotAllowTruncationBeforeLastCommit() throws Exception + { + // given + RaftLog log = createRaftLog(); + log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) ); + long toTryToTruncate = log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) ); + long toCommit = log.append( new RaftLogEntry( 1, ReplicatedInteger.valueOf( 2 ) ) ); + + log.commit( toCommit ); + + try + { + // when + log.truncate( toTryToTruncate ); + fail("Truncation at this point should have failed"); + } + catch( IllegalArgumentException truncationFailed ) + { + // awesome + } + + // then + assertThat( log.entryExists( toCommit ), is( true ) ); + assertThat( log.entryExists( toTryToTruncate ), is( true ) ); + } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java index 934bed51e8829..63391e51b51f3 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogDurabilityTest.java @@ -26,7 +26,6 @@ import org.neo4j.coreedge.raft.ReplicatedInteger; import org.neo4j.coreedge.raft.ReplicatedString; import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; -import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.NullLogProvider; import static org.hamcrest.CoreMatchers.equalTo; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCacheTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCacheTest.java new file mode 100644 index 0000000000000..8d76274b4e7c3 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftLogMetadataCacheTest.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.log; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import org.junit.Test; +import org.neo4j.kernel.impl.transaction.log.LogPosition; + +public class RaftLogMetadataCacheTest +{ + @Test + public void shouldReturnNullWhenMissingAnEntryInTheCache() + { + // given + final RaftLogMetadataCache cache = new RaftLogMetadataCache( 2 ); + + // when + final RaftLogMetadataCache.RaftLogEntryMetadata metadata = cache.getMetadata( 42 ); + + // then + assertNull( metadata ); + } + + @Test + public void shouldReturnTheTxValueTIfInTheCached() + { + // given + final RaftLogMetadataCache cache = new RaftLogMetadataCache( 2 ); + final long index = 12L; + final long term = 12L; + final LogPosition position = new LogPosition( 3, 4 ); + + // when + cache.cacheMetadata( index, term, position ); + final RaftLogMetadataCache.RaftLogEntryMetadata metadata = cache.getMetadata( index ); + + // then + assertEquals( new RaftLogMetadataCache.RaftLogEntryMetadata( term, position ), metadata ); + } + + @Test + public void shouldThrowWhenCachingATxWithNegativeOffsetPosition() + { + // given + final RaftLogMetadataCache cache = new RaftLogMetadataCache( 2 ); + final long index = 42; + final long term= 42; + final LogPosition position = new LogPosition( 3, -1 ); + + // when + try + { + cache.cacheMetadata( index, term, position ); + fail(); + } catch (RuntimeException ex) { + assertEquals( "StartEntry.position is " + position, ex.getMessage() ); + } + } + + @Test + public void shouldClearTheCache() + { + // given + final RaftLogMetadataCache cache = new RaftLogMetadataCache( 2 ); + final long index = 12L; + final long term = 12L; + final LogPosition position = new LogPosition( 3, 4 ); + + // when + cache.cacheMetadata( index, term, position ); + cache.clear(); + RaftLogMetadataCache.RaftLogEntryMetadata metadata = cache.getMetadata( index ); + + // then + assertNull( metadata ); + } + +} \ No newline at end of file diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java index a1f78e065c48a..cdeab2f129ef8 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java @@ -58,9 +58,7 @@ import static org.neo4j.coreedge.raft.MessageUtils.messageFor; import static org.neo4j.coreedge.server.RaftTestMember.member; import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesResponse; -import static org.neo4j.coreedge.raft.TestMessageBuilders.voteRequest; import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER; -import static org.neo4j.coreedge.raft.roles.Role.LEADER; import static org.neo4j.coreedge.raft.state.RaftStateBuilder.raftState; import static org.neo4j.helpers.collection.IteratorUtil.asSet; import static org.neo4j.helpers.collection.IteratorUtil.count;