Skip to content

Commit

Permalink
core-edge: move file creation from recovery to state storage
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski authored and davidegrohmann committed Aug 10, 2016
1 parent 875d072 commit f1262fd
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 52 deletions.
Expand Up @@ -26,7 +26,9 @@
import org.neo4j.coreedge.messaging.EndOfStreamException; import org.neo4j.coreedge.messaging.EndOfStreamException;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel; import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.Page;
import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel; import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosableChannel;
import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.ReadableChannel;


public class StateRecoveryManager<STATE> public class StateRecoveryManager<STATE>
Expand Down Expand Up @@ -72,38 +74,37 @@ public RecoveryStatus<STATE> recover( File fileA, File fileB ) throws IOExceptio
{ {
assert fileA != null && fileB != null; assert fileA != null && fileB != null;


ensureExists( fileA );
ensureExists( fileB );

STATE a = readLastEntryFrom( fileA ); STATE a = readLastEntryFrom( fileA );
STATE b = readLastEntryFrom( fileB ); STATE b = readLastEntryFrom( fileB );


if ( marshal.ordinal( a ) > marshal.ordinal( b ) ) if ( a == null && b == null)
{ {
return new RecoveryStatus<>( fileB, a ); throw new IllegalStateException( "no recoverable state" );
} }
else
if ( a == null )
{ {
return new RecoveryStatus<>( fileA, b ); return new RecoveryStatus<>( fileA, b );
} }
} else if ( b == null )

{
private void ensureExists( File file ) throws IOException return new RecoveryStatus<>( fileB, a );
{ }
if ( !fileSystem.fileExists( file ) ) else if ( marshal.ordinal( a ) > marshal.ordinal( b ) )
{
return new RecoveryStatus<>( fileB, a );
}
else
{ {
fileSystem.mkdirs( file.getParentFile() ); return new RecoveryStatus<>( fileA, b );
fileSystem.create( file ).close();
} }
} }


private STATE readLastEntryFrom( File file ) throws IOException private STATE readLastEntryFrom( File file ) throws IOException
{ {
try ( StoreChannel storeChannel = fileSystem.open( file, "r" ) ) try ( ReadableClosableChannel channel = new ReadAheadChannel<>( fileSystem.open( file, "r" ) ) )
{ {
final ReadableChannel channel = new ReadAheadChannel<>( storeChannel ); STATE result = null;

STATE result = marshal.startState();
STATE lastRead; STATE lastRead;


try try
Expand Down
Expand Up @@ -24,8 +24,8 @@
import java.util.function.Supplier; import java.util.function.Supplier;


import org.neo4j.coreedge.core.state.StateRecoveryManager; import org.neo4j.coreedge.core.state.StateRecoveryManager;
import org.neo4j.coreedge.messaging.marsalling.ChannelMarshal;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.log.FlushableChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalFlushableChannel; import org.neo4j.kernel.impl.transaction.log.PhysicalFlushableChannel;
import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -34,11 +34,14 @@


public class DurableStateStorage<STATE> extends LifecycleAdapter implements StateStorage<STATE> public class DurableStateStorage<STATE> extends LifecycleAdapter implements StateStorage<STATE>
{ {
private final StateRecoveryManager<STATE> recoveryManager;
private final Log log;
private STATE initialState; private STATE initialState;
private final File fileA; private final File fileA;
private final File fileB; private final File fileB;
private final FileSystemAbstraction fileSystemAbstraction; private final FileSystemAbstraction fsa;
private final ChannelMarshal<STATE> marshal; private final String name;
private final StateMarshal<STATE> marshal;
private final Supplier<DatabaseHealth> databaseHealthSupplier; private final Supplier<DatabaseHealth> databaseHealthSupplier;
private final int numberOfEntriesBeforeRotation; private final int numberOfEntriesBeforeRotation;


Expand All @@ -47,36 +50,57 @@ public class DurableStateStorage<STATE> extends LifecycleAdapter implements Stat


private PhysicalFlushableChannel currentStoreChannel; private PhysicalFlushableChannel currentStoreChannel;


private File stateDir( File baseDir, String name ) static File stateDir( File baseDir, String name )
{ {
return new File( baseDir, name + "-state" ); return new File( baseDir, name + "-state" );
} }


public DurableStateStorage( FileSystemAbstraction fileSystemAbstraction, File baseDir, String name, public DurableStateStorage( FileSystemAbstraction fsa, File baseDir, String name,
StateMarshal<STATE> marshal, int numberOfEntriesBeforeRotation, StateMarshal<STATE> marshal, int numberOfEntriesBeforeRotation,
Supplier<DatabaseHealth> databaseHealthSupplier, LogProvider logProvider ) Supplier<DatabaseHealth> databaseHealthSupplier, LogProvider logProvider )
throws IOException throws IOException
// TODO Move file opening to start-time so that constructor doesn't need to throw exceptions // TODO Move file opening to start-time so that constructor doesn't need to throw exceptions
{ {
this.fileSystemAbstraction = fileSystemAbstraction; this.fsa = fsa;
this.name = name;
this.marshal = marshal; this.marshal = marshal;
this.numberOfEntriesBeforeRotation = numberOfEntriesBeforeRotation; this.numberOfEntriesBeforeRotation = numberOfEntriesBeforeRotation;
this.databaseHealthSupplier = databaseHealthSupplier; this.databaseHealthSupplier = databaseHealthSupplier;
this.log = logProvider.getLog( getClass() );
this.recoveryManager = new StateRecoveryManager<>( fsa, marshal );
this.fileA = new File( stateDir( baseDir, name ), name + ".a" );
this.fileB = new File( stateDir( baseDir, name ), name + ".b" );


fileA = new File( stateDir( baseDir, name ), name + ".a" ); create();
fileB = new File( stateDir( baseDir, name ), name + ".b" ); recover();
}


StateRecoveryManager<STATE> recoveryManager = private void create() throws IOException
new StateRecoveryManager<>( fileSystemAbstraction, marshal ); {
ensureExists( fileA );
ensureExists( fileB );
}

private void ensureExists( File file ) throws IOException
{
if ( !fsa.fileExists( file ) )
{
fsa.mkdirs( file.getParentFile() );
try ( FlushableChannel channel = new PhysicalFlushableChannel( fsa.create( file ) ) )
{
marshal.marshal( marshal.startState(), channel );
}
}
}


private void recover() throws IOException
{
final StateRecoveryManager.RecoveryStatus<STATE> recoveryStatus = recoveryManager.recover( fileA, fileB ); final StateRecoveryManager.RecoveryStatus<STATE> recoveryStatus = recoveryManager.recover( fileA, fileB );


this.currentStoreFile = recoveryStatus.activeFile(); this.currentStoreFile = recoveryStatus.activeFile();
this.currentStoreChannel = initialiseStoreFile( currentStoreFile ); this.currentStoreChannel = resetStoreFile( currentStoreFile );

this.initialState = recoveryStatus.recoveredState(); this.initialState = recoveryStatus.recoveredState();


Log log = logProvider.getLog( getClass() );
log.info( "%s state restored, up to ordinal %d", name, marshal.ordinal( initialState ) ); log.info( "%s state restored, up to ordinal %d", name, marshal.ordinal( initialState ) );
} }


Expand Down Expand Up @@ -120,28 +144,21 @@ void switchStoreFile() throws IOException
{ {
currentStoreChannel.close(); currentStoreChannel.close();


if ( currentStoreFile.getName().toLowerCase().endsWith( "a" ) ) if ( currentStoreFile.equals( fileA ) )
{ {
currentStoreChannel = initialiseStoreFile( fileB ); currentStoreChannel = resetStoreFile( fileB );
currentStoreFile = fileB; currentStoreFile = fileB;
} }
else if ( currentStoreFile.getName().toLowerCase().endsWith( "b" ) ) else
{ {
currentStoreChannel = initialiseStoreFile( fileA ); currentStoreChannel = resetStoreFile( fileA );
currentStoreFile = fileA; currentStoreFile = fileA;
} }
} }


private PhysicalFlushableChannel initialiseStoreFile( File nextStore ) throws IOException private PhysicalFlushableChannel resetStoreFile( File nextStore ) throws IOException
{ {
if ( fileSystemAbstraction.fileExists( nextStore ) ) fsa.truncate( nextStore, 0 );
{ return new PhysicalFlushableChannel( fsa.open( nextStore, "rw" ) );
fileSystemAbstraction.truncate( nextStore, 0 );
return new PhysicalFlushableChannel( fileSystemAbstraction.open( nextStore, "rw" ) );
}
else
{
return new PhysicalFlushableChannel( fileSystemAbstraction.create( nextStore ) );
}
} }
} }
Expand Up @@ -36,6 +36,7 @@
import org.neo4j.test.rule.TargetDirectory; import org.neo4j.test.rule.TargetDirectory;


import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;


public class StateRecoveryManagerTest public class StateRecoveryManagerTest
{ {
Expand All @@ -52,7 +53,7 @@ public void checkArgs()
} }


@Test @Test
public void shouldReturnFileAWhenNoStatePreviouslyStored() throws Exception public void shouldFailIfBothFilesAreEmpty() throws Exception
{ {
// given // given
EphemeralFileSystemAbstraction fsa = new EphemeralFileSystemAbstraction(); EphemeralFileSystemAbstraction fsa = new EphemeralFileSystemAbstraction();
Expand All @@ -66,11 +67,17 @@ public void shouldReturnFileAWhenNoStatePreviouslyStored() throws Exception


StateRecoveryManager<Long> manager = new StateRecoveryManager<>( fsa, new LongMarshal() ); StateRecoveryManager<Long> manager = new StateRecoveryManager<>( fsa, new LongMarshal() );


// when try
final StateRecoveryManager.RecoveryStatus recoveryStatus = manager.recover( fileA, fileB ); {

// when
// then StateRecoveryManager.RecoveryStatus recoveryStatus = manager.recover( fileA, fileB );
assertEquals( fileA, recoveryStatus.activeFile() ); fail();
}
catch ( IllegalStateException ex )
{
// then
// expected
}
} }


@Test @Test
Expand Down

0 comments on commit f1262fd

Please sign in to comment.