Skip to content

Commit

Permalink
Implementation of PhysicalRaftLog
Browse files Browse the repository at this point in the history
This is the work on having an append only, rotateable
 raft log, based on PhysicalLogFile. Some supporting classes,
 including a Raft log entry cursor, are also introduced.

PRL now has proper records that contain information about commit
 and truncation actions, besides the already existing append.
Iteration over the log happens via a two layer cursor, one that
 iterates over records and another that builds up state and determines
 if entries are committed, truncated or appended

Renames PhysicalLogFiles.hasAnyTransaction() to .hasAnyEntry()
This is to remove notional dependencies of PLF to transactional
 concepts, since we want to use it elsewhere

PRL can rotate and change versions via support from PhysicalLogFiles
Log versions are maintained by scanning the filename suffixes present
 on disk, instead of having a dedicated file storage like tx logs

Changes semantics of RaftEntryStore, cursors may be before requested index
Previously there was an implicit assumption that cursors returned
 from RaftEntryStore would be positioned at the requested index. This
 is not true for the current implementation, so instead PhysicalRaftLog
 assumes that the cursor must be iterated until the desired index
 is reached. This also requires the cursor to return RaftEntryRecords
 instead of RaftLogEntries, so that it's possible to compare indexes.
  • Loading branch information
digitalstain committed Feb 11, 2016
1 parent 51d12d0 commit 3e08266
Show file tree
Hide file tree
Showing 32 changed files with 2,036 additions and 19 deletions.
Expand Up @@ -205,7 +205,7 @@ void dump( NeoStoreDataSource source, Logger log )
for ( long logVersion = logFiles.getLowestLogVersion();
logFiles.versionExists( logVersion ); logVersion++ )
{
if ( logFiles.hasAnyTransaction( logVersion ) )
if ( logFiles.hasAnyEntries( logVersion ) )
{
LogHeader header = logFiles.extractHeader( logVersion );
long firstTransactionIdInThisLog = header.lastCommittedTxId + 1;
Expand Down
Expand Up @@ -58,7 +58,7 @@ public long getFirstExistingTxId() throws IOException

// OK, so we now have the oldest existing log version here. Open it and see if there's any transaction
// in there. If there is then that transaction is the first one that we have.
return logFiles.hasAnyTransaction( version ) ? candidateFirstTx : -1;
return logFiles.hasAnyEntries( version ) ? candidateFirstTx : -1;
}

@Override
Expand Down
Expand Up @@ -89,7 +89,7 @@ public LogHeader extractHeader( long version ) throws IOException
return readLogHeader( fileSystem, getLogFileForVersion( version ) );
}

public boolean hasAnyTransaction( long version )
public boolean hasAnyEntries( long version )
{
return fileSystem.getFileSize( getLogFileForVersion( version ) ) > LOG_HEADER_SIZE;
}
Expand Down
Expand Up @@ -213,7 +213,7 @@ private PhysicalLogFiles logWithTransactions( long logVersion, long headerTxId )
{
PhysicalLogFiles files = mock( PhysicalLogFiles.class );
when( files.getLowestLogVersion() ).thenReturn( logVersion );
when( files.hasAnyTransaction( logVersion ) ).thenReturn( true );
when( files.hasAnyEntries( logVersion ) ).thenReturn( true );
when( files.versionExists( logVersion ) ).thenReturn( true );
when( files.extractHeader( logVersion ) ).thenReturn( new LogHeader( LogEntryVersion.CURRENT.byteCode(),
logVersion, headerTxId ) );
Expand All @@ -225,7 +225,7 @@ private PhysicalLogFiles logWithTransactionsInNextToOldestLog( long logVersion,
{
PhysicalLogFiles files = logWithTransactions( logVersion + 1, prevLogLastTxId );
when( files.getLowestLogVersion() ).thenReturn( logVersion );
when( files.hasAnyTransaction( logVersion ) ).thenReturn( false );
when( files.hasAnyEntries( logVersion ) ).thenReturn( false );
when( files.versionExists( logVersion ) ).thenReturn( true );
return files;
}
Expand Down
Expand Up @@ -25,7 +25,6 @@
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFileInformation;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -90,7 +89,7 @@ public void shouldReadAndCacheFirstCommittedTransactionIdWhenNotCached() throws
when( logFiles.extractHeader( version ) ).thenReturn(
new LogHeader( (byte) -1/*ignored*/, -1L/*ignored*/, expected - 1L )
);
when( logFiles.hasAnyTransaction( version ) ).thenReturn( true );
when( logFiles.hasAnyEntries( version ) ).thenReturn( true );

long firstCommittedTxId = info.getFirstExistingTxId();
assertEquals( expected, firstCommittedTxId );
Expand All @@ -108,7 +107,7 @@ public void shouldReadFirstCommittedTransactionIdWhenCached() throws Exception
when( logFiles.getHighestLogVersion() ).thenReturn( version );
when( logFiles.versionExists( version ) ).thenReturn( true );
when( logHeaderCache.getLogHeader( version ) ).thenReturn( expected -1 );
when( logFiles.hasAnyTransaction( version ) ).thenReturn( true );
when( logFiles.hasAnyEntries( version ) ).thenReturn( true );

long firstCommittedTxId = info.getFirstExistingTxId();
assertEquals( expected, firstCommittedTxId );
Expand All @@ -122,7 +121,7 @@ public void shouldReturnNothingWhenThereAreNoTransactions() throws Exception

long version = 10L;
when( logFiles.getHighestLogVersion() ).thenReturn( version );
when( logFiles.hasAnyTransaction( version ) ).thenReturn( false );
when( logFiles.hasAnyEntries( version ) ).thenReturn( false );

long firstCommittedTxId = info.getFirstExistingTxId();
assertEquals( -1, firstCommittedTxId );
Expand Down
Expand Up @@ -23,4 +23,26 @@

public interface IOCursor<T> extends RawCursor<T,IOException>
{
static <M> IOCursor<M> getEmpty()
{
return new IOCursor<M>()
{
@Override
public boolean next() throws IOException
{
return false;
}

@Override
public void close() throws IOException
{
}

@Override
public M get()
{
return null;
}
};
}
}
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.log;

import java.io.File;
import java.io.IOException;

import org.neo4j.io.file.Files;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;

public class FilenameBasedLogVersionRepository implements LogVersionRepository
{
private long version = 0;

public FilenameBasedLogVersionRepository( PhysicalLogFiles logFiles)
{
version = Math.max(logFiles.getHighestLogVersion(), 0);
}


@Override
public long getCurrentLogVersion()
{
return version;
}

@Override
public long incrementAndGetVersion() throws IOException
{
return ++version;
}
}
Expand Up @@ -27,8 +27,6 @@
import io.netty.buffer.Unpooled;
import org.neo4j.coreedge.raft.replication.MarshallingException;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ByteBufferMarshal;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
import org.neo4j.coreedge.server.ByteBufMarshal;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
Expand Down
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.log;

import java.io.IOException;

import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.impl.transaction.log.LogFile;
import org.neo4j.kernel.impl.transaction.log.LogHeaderVisitor;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;

public class PhysicalRaftEntryStore implements RaftEntryStore
{
private final LogFile logFile;
private final RaftLogMetadataCache metadataCache;
private final ChannelMarshal<ReplicatedContent> marshal;

public PhysicalRaftEntryStore( LogFile logFile, RaftLogMetadataCache metadataCache,
ChannelMarshal<ReplicatedContent> marshal )
{
this.logFile = logFile;
this.metadataCache = metadataCache;
this.marshal = marshal;
}

@Override
public IOCursor<RaftLogAppendRecord> getEntriesFrom( final long indexToStartFrom ) throws RaftStorageException
{
// look up in position cache
try
{
RaftLogMetadataCache.RaftLogEntryMetadata metadata = metadataCache.getMetadata( indexToStartFrom );
LogPosition startPosition = null;
if ( metadata != null )
{
startPosition = metadata.getStartPosition();
}
else
{
// ask LogFile about the version it may be in
LogVersionLocator headerVisitor = new LogVersionLocator( indexToStartFrom );
logFile.accept( headerVisitor );
startPosition = headerVisitor.getLogPosition();
}
if ( startPosition == null )
{
return IOCursor.getEmpty();
}
ReadableLogChannel reader = logFile.getReader( startPosition );

return new PhysicalRaftLogEntryCursor( new RaftRecordCursor<>( reader, marshal ) );
}
catch ( IOException e )
{
throw new RaftStorageException( e );
}
}

public static final class LogVersionLocator implements LogHeaderVisitor
{
private final long logEntryIndex;
private LogPosition foundPosition;

public LogVersionLocator( long logEntryIndex )
{
this.logEntryIndex = logEntryIndex;
}

@Override
public boolean visit( LogPosition position, long firstLogIndex, long lastLogIndex )
{
boolean foundIt = logEntryIndex >= firstLogIndex && logEntryIndex <= lastLogIndex;
if ( foundIt )
{
foundPosition = position;
}
return !foundIt; // continue as long we don't find it
}

public LogPosition getLogPosition() throws RaftStorageException
{
return foundPosition;
}
}
}

0 comments on commit 3e08266

Please sign in to comment.