Skip to content

Commit

Permalink
Introducing an on-disk store for ID allocation.
Browse files Browse the repository at this point in the history
The implemention is not yet wired in.
  • Loading branch information
jimwebber committed Dec 17, 2015
1 parent cab0cc8 commit d957ec7
Show file tree
Hide file tree
Showing 12 changed files with 1,137 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,58 @@

public interface IdAllocationState
{
int lastIdRangeLengthForMe( IdType idType );
void lastIdRangeLengthForMe( IdType idType, int idRangeLength );
/**
*
* @param idType the type of graph object whose ID is under allocation
* @return the first unallocated entry for idType
*/
long firstUnallocated( IdType idType );

long firstNotAllocated( IdType idType );
void firstNotAllocated( IdType idType, long idRangeEnd );
/**
*
* @param idType the type of graph object whose ID is under allocation
* @param idRangeEnd the first unallocated entry for idType
*/
void firstUnallocated( IdType idType, long idRangeEnd );

long lastIdRangeStartForMe( IdType idType );
void lastIdRangeStartForMe( IdType idType, long idRangeStart );

/**
*
* @param idType The type of graph object whose ID is under allocation
* @return start position of allocation
*/
long lastIdRangeStart( IdType idType );

/**
*
* @param idType The type of graph object whose ID is under allocation
* @param idRangeStart start position of allocation
*/
void lastIdRangeStart( IdType idType, long idRangeStart );

/**
*
* @param idType The type of graph object whose ID is under allocation
* @return the length of the last ID range allocated
*/
int lastIdRangeLength( IdType idType );

/**
*
* @param idType The type of graph object whose ID is under allocation
* @param idRangeLength the length of the ID range to be allocated
*/
void lastIdRangeLength( IdType idType, int idRangeLength );

/**
* @return The last set log index, which is the value last passed to {@link #logIndex(long)}
*/
long logIndex();

/**
* Sets the last seen log index, which is the last log index at which a replicated value that updated this state
* was encountered.
* @param logIndex The value to set as the last log index at which this state was updated
*/
void logIndex( long logIndex );
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package org.neo4j.coreedge.raft.replication.id;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;

import static org.neo4j.coreedge.raft.replication.id.InMemoryIdAllocationState.Serializer
.NUMBER_OF_BYTES_PER_WRITE;

public class IdAllocationStoreRecoveryManager
{
private static final long EMPTY = -1;

private final FileSystemAbstraction fileSystem;

public enum RecoveryStatus
{
NEW, RECOVERABLE, UNRECOVERABLE;

private File active;

public File getActive()
{
return active;
}

public void setActive( File active )
{
this.active = active;
}
}

public IdAllocationStoreRecoveryManager( final FileSystemAbstraction fsa )
{
this.fileSystem = fsa;
}

public File recover( File fileA, File fileB ) throws IOException
{
assert fileA != null && fileB != null;

ensureExists( fileA );
ensureExists( fileB );

RecoveryStatus recoveryStatus;

long a = getLogIndex( fileA );
long b = getLogIndex( fileB );

if ( a > b )
{
RecoveryStatus.RECOVERABLE.setActive( fileA );
recoveryStatus = RecoveryStatus.RECOVERABLE;
}
else if ( a < b )
{
RecoveryStatus.RECOVERABLE.setActive( fileB );
recoveryStatus = RecoveryStatus.RECOVERABLE;
}
else if ( a == b && a == EMPTY )
{
recoveryStatus = RecoveryStatus.NEW;
}
else
{
recoveryStatus = RecoveryStatus.UNRECOVERABLE;
}

File toReturn = null;

switch ( recoveryStatus )
{
case NEW:
toReturn = fileA;
break;

case RECOVERABLE:
toReturn = trimGarbage( recoveryStatus.getActive() );
break;

case UNRECOVERABLE:
throw new RuntimeException( "Developer Alistair says a lot of things" );
}

return toReturn;
}

private void ensureExists( File file ) throws IOException
{
if ( !fileSystem.fileExists( file ) )
{
fileSystem.mkdirs( file.getParentFile() );
fileSystem.create( file );
}
}

private File trimGarbage( File storeFile ) throws IOException
{
long fileSize = fileSystem.getFileSize( storeFile );
long extraneousBytes = fileSize % NUMBER_OF_BYTES_PER_WRITE;
if ( extraneousBytes != 0 )
{
fileSystem.truncate( storeFile, fileSize - extraneousBytes );
}

return storeFile;
}

private long getLogIndex( File storeFile ) throws IOException
{
long newPosition = beginningOfLastCompleteEntry( storeFile );

if ( newPosition < 0 )
{
return newPosition;
}

ByteBuffer buffer = ByteBuffer.allocate(
NUMBER_OF_BYTES_PER_WRITE );

StoreChannel channel = fileSystem.open( storeFile, "r" );

channel.position( newPosition );

channel.read( buffer );

buffer.flip();

InMemoryIdAllocationState inMemoryIdAllocationState =
new InMemoryIdAllocationState.Serializer().deserialize( buffer );

channel.close();

return inMemoryIdAllocationState.logIndex();
}

/*
* This method sets the position of the current channel to point to the beginning of the last complete entry.
* It integer-divides the file size by the entry size (thus finding the number of complete entries), it then
* subtracts one (which is the index of the next-to-last entry) and then multiplies by the entry size, which
* finds the end of the next-to-last entry and therefore the beginning of the last complete entry.
* It is assumed that the currentChannel contains at least one complete entry.
*/
private long beginningOfLastCompleteEntry( File storeFile ) throws IOException
{
if ( storeFile == null )
{
return -1;
}
if ( fileSystem.getFileSize( storeFile ) < NUMBER_OF_BYTES_PER_WRITE )
{
return -1;
}

long fileSize = fileSystem.getFileSize( storeFile );
long positionOfLastCompleteEntry =
((fileSize / NUMBER_OF_BYTES_PER_WRITE) - 1)
* NUMBER_OF_BYTES_PER_WRITE;
return positionOfLastCompleteEntry;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.replication.id;

import java.io.Serializable;
import java.nio.ByteBuffer;

import org.neo4j.kernel.IdType;

/**
* An in-memory representation of the IDs allocated to this core instance.
*/
public class InMemoryIdAllocationState implements IdAllocationState, Serializable
{
private final long[] firstUnallocated;
private final long[] lastIdRangeStartForMe;
private final int[] lastIdRangeLengthForMe;
private long logIndex;

public InMemoryIdAllocationState()
{
this( new long[IdType.values().length],
new long[IdType.values().length],
new int[IdType.values().length],
-1L );
}

private InMemoryIdAllocationState( long[] firstUnallocated,
long[] lastIdRangeStartForMe,
int[] lastIdRangeLengthForMe,
long logIndex )
{
this.firstUnallocated = firstUnallocated;
this.lastIdRangeStartForMe = lastIdRangeStartForMe;
this.lastIdRangeLengthForMe = lastIdRangeLengthForMe;
this.logIndex = logIndex;
}

@Override
public int lastIdRangeLength( IdType idType )
{
return lastIdRangeLengthForMe[idType.ordinal()];
}

@Override
public void lastIdRangeLength( IdType idType, int idRangeLength )
{
lastIdRangeLengthForMe[idType.ordinal()] = idRangeLength;
}

@Override
public long logIndex()
{
return logIndex;
}

@Override
public void logIndex( long logIndex )
{
this.logIndex = logIndex;
}

@Override
public long firstUnallocated( IdType idType )
{
return firstUnallocated[idType.ordinal()];
}

@Override
public void firstUnallocated( IdType idType, long idRangeEnd )
{
firstUnallocated[idType.ordinal()] = idRangeEnd;
}

@Override
public long lastIdRangeStart( IdType idType )
{
return lastIdRangeStartForMe[idType.ordinal()];
}

@Override
public void lastIdRangeStart( IdType idType, long idRangeStart )
{
lastIdRangeStartForMe[idType.ordinal()] = idRangeStart;
}

public static class Serializer
{
public static final int NUMBER_OF_BYTES_PER_WRITE =
3 * IdType.values().length * 8 // 3 arrays of IdType enum value length storing longs
+ 8 * 3 // the length (as long) for each array
+ 8; // the raft log index

public void serialize( InMemoryIdAllocationState store, ByteBuffer buffer )
{
buffer.putLong( (long) store.firstUnallocated.length );
for ( long l : store.firstUnallocated )
{
buffer.putLong( l );
}

buffer.putLong( (long) store.lastIdRangeStartForMe.length );
for ( long l : store.lastIdRangeStartForMe )
{
buffer.putLong( l );
}

buffer.putLong( store.lastIdRangeLengthForMe.length );
for ( int i : store.lastIdRangeLengthForMe )
{
buffer.putLong( i );
}
buffer.putLong( store.logIndex );
}

public InMemoryIdAllocationState deserialize( ByteBuffer buffer )
{
long[] firstNotAllocated = new long[(int) buffer.getLong()];

for ( int i = 0; i < firstNotAllocated.length; i++ )
{
firstNotAllocated[i] = buffer.getLong();
}

long[] lastIdRangeStartForMe = new long[(int) buffer.getLong()];
for ( int i = 0; i < lastIdRangeStartForMe.length; i++ )
{
lastIdRangeStartForMe[i] = buffer.getLong();
}

int[] lastIdRangeLengthForMe = new int[(int) buffer.getLong()];
for ( int i = 0; i < lastIdRangeLengthForMe.length; i++ )
{
lastIdRangeLengthForMe[i] = (int) buffer.getLong();
}

long logIndex = buffer.getLong();

return new InMemoryIdAllocationState( firstNotAllocated, lastIdRangeStartForMe, lastIdRangeLengthForMe, logIndex );
}
}
}

0 comments on commit d957ec7

Please sign in to comment.