Skip to content

Commit

Permalink
Fully read and write buffers from/into channels
Browse files Browse the repository at this point in the history
Update places where ByteBuffers were not fully written or read.
Most affected from change set was IdContainer that had both problems
and any of those can potentially cause id file corruption (real or "heisen")

Add additional checks into NativeLabelScanStoreMigrator and CountsMigrator
that will check that stores are in "store ok" state before performing
migration.
  • Loading branch information
MishaDemianenko committed Nov 26, 2017
1 parent 6dd106d commit 28efba6
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 18 deletions.
Expand Up @@ -295,7 +295,7 @@ private void write( File file )
try try
{ {
channel = fileSystem.open( file, "rw" ); channel = fileSystem.open( file, "rw" );
channel.write( ByteBuffer.wrap( MAGICK ) ); channel.writeAll( ByteBuffer.wrap( MAGICK ) );
IoPrimitiveUtils.writeInt( channel, buffer( 4 ), VERSION ); IoPrimitiveUtils.writeInt( channel, buffer( 4 ), VERSION );
writeMap( channel, nodeConfig ); writeMap( channel, nodeConfig );
writeMap( channel, relConfig ); writeMap( channel, relConfig );
Expand Down
Expand Up @@ -137,11 +137,11 @@ private long readAndValidateHeader() throws IOException
private static long readAndValidate( StoreChannel channel, File fileName ) throws IOException private static long readAndValidate( StoreChannel channel, File fileName ) throws IOException
{ {
ByteBuffer buffer = ByteBuffer.allocate( HEADER_SIZE ); ByteBuffer buffer = ByteBuffer.allocate( HEADER_SIZE );
int read = channel.read( buffer ); int totalBytesRead = readBuffer( channel, buffer );
if ( read != HEADER_SIZE ) if ( totalBytesRead != HEADER_SIZE )
{ {
throw new InvalidIdGeneratorException( throw new InvalidIdGeneratorException(
"Unable to read header, bytes read: " + read ); "Unable to read header, bytes read: " + totalBytesRead );
} }
buffer.flip(); buffer.flip();
byte storageStatus = buffer.get(); byte storageStatus = buffer.get();
Expand Down Expand Up @@ -174,7 +174,7 @@ private void markAsSticky() throws IOException
ByteBuffer buffer = ByteBuffer.allocate( Byte.BYTES ); ByteBuffer buffer = ByteBuffer.allocate( Byte.BYTES );
buffer.put( STICKY_GENERATOR ).flip(); buffer.put( STICKY_GENERATOR ).flip();
fileChannel.position( 0 ); fileChannel.position( 0 );
fileChannel.write( buffer ); fileChannel.writeAll( buffer );
fileChannel.force( false ); fileChannel.force( false );
} }


Expand All @@ -184,7 +184,7 @@ private void markAsCleanlyClosed( ) throws IOException
ByteBuffer buffer = ByteBuffer.allocate( Byte.BYTES ); ByteBuffer buffer = ByteBuffer.allocate( Byte.BYTES );
buffer.put( CLEAN_GENERATOR ).flip(); buffer.put( CLEAN_GENERATOR ).flip();
fileChannel.position( 0 ); fileChannel.position( 0 );
fileChannel.write( buffer ); fileChannel.writeAll( buffer );
} }


public void close( long highId ) public void close( long highId )
Expand Down Expand Up @@ -218,7 +218,7 @@ private void writeHeader( long highId ) throws IOException
ByteBuffer buffer = ByteBuffer.allocate( HEADER_SIZE ); ByteBuffer buffer = ByteBuffer.allocate( HEADER_SIZE );
buffer.put( STICKY_GENERATOR ).putLong( highId ).flip(); buffer.put( STICKY_GENERATOR ).putLong( highId ).flip();
fileChannel.position( 0 ); fileChannel.position( 0 );
fileChannel.write( buffer ); fileChannel.writeAll( buffer );
} }


public void delete() public void delete()
Expand Down Expand Up @@ -305,7 +305,7 @@ public static void createEmptyIdFile( FileSystemAbstraction fs, File file, long
channel.truncate( 0 ); channel.truncate( 0 );
ByteBuffer buffer = ByteBuffer.allocate( HEADER_SIZE ); ByteBuffer buffer = ByteBuffer.allocate( HEADER_SIZE );
buffer.put( CLEAN_GENERATOR ).putLong( highId ).flip(); buffer.put( CLEAN_GENERATOR ).putLong( highId ).flip();
channel.write( buffer ); channel.writeAll( buffer );
channel.force( false ); channel.force( false );
} }
catch ( IOException e ) catch ( IOException e )
Expand All @@ -314,6 +314,22 @@ public static void createEmptyIdFile( FileSystemAbstraction fs, File file, long
} }
} }


private static int readBuffer( StoreChannel channel, ByteBuffer buffer ) throws IOException
{
int totalBytesRead = 0;
int currentBytesRead;
do
{
currentBytesRead = channel.read( buffer );
if ( currentBytesRead > 0 )
{
totalBytesRead += currentBytesRead;
}
}
while ( buffer.hasRemaining() && currentBytesRead >= 0 );
return totalBytesRead;
}

@Override @Override
public String toString() public String toString()
{ {
Expand Down
Expand Up @@ -163,6 +163,7 @@ private void rebuildCountsFromScratch( File storeDirToReadFrom, File migrationDi
.openNeoStores( StoreType.NODE, StoreType.RELATIONSHIP, StoreType.LABEL_TOKEN, .openNeoStores( StoreType.NODE, StoreType.RELATIONSHIP, StoreType.LABEL_TOKEN,
StoreType.RELATIONSHIP_TYPE_TOKEN ) ) StoreType.RELATIONSHIP_TYPE_TOKEN ) )
{ {
neoStores.verifyStoreOk();
NodeStore nodeStore = neoStores.getNodeStore(); NodeStore nodeStore = neoStores.getNodeStore();
RelationshipStore relationshipStore = neoStores.getRelationshipStore(); RelationshipStore relationshipStore = neoStores.getRelationshipStore();
try ( Lifespan life = new Lifespan() ) try ( Lifespan life = new Lifespan() )
Expand Down
Expand Up @@ -73,6 +73,7 @@ public void migrate( File storeDir, File migrationDir, MigrationProgressMonitor.
try ( NeoStores neoStores = storeFactory.openAllNeoStores(); try ( NeoStores neoStores = storeFactory.openAllNeoStores();
Lifespan lifespan = new Lifespan() ) Lifespan lifespan = new Lifespan() )
{ {
neoStores.verifyStoreOk();
// Remove any existing file to ensure we always do migration // Remove any existing file to ensure we always do migration
deleteNativeIndexFile( migrationDir ); deleteNativeIndexFile( migrationDir );


Expand Down
Expand Up @@ -66,7 +66,7 @@ public static void writeLogHeader( StoreChannel channel, long logVersion, long p
{ {
ByteBuffer buffer = ByteBuffer.allocate( LOG_HEADER_SIZE ); ByteBuffer buffer = ByteBuffer.allocate( LOG_HEADER_SIZE );
writeLogHeader( buffer, logVersion, previousLastCommittedTxId ); writeLogHeader( buffer, logVersion, previousLastCommittedTxId );
channel.write( buffer ); channel.writeAll( buffer );
} }


public static long encodeLogVersion( long logVersion ) public static long encodeLogVersion( long logVersion )
Expand Down
Expand Up @@ -19,13 +19,17 @@
*/ */
package org.neo4j.kernel.impl.store.id; package org.neo4j.kernel.impl.store.id;


import java.io.File;

import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;


import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreFileChannel;
import org.neo4j.kernel.impl.store.InvalidIdGeneratorException; import org.neo4j.kernel.impl.store.InvalidIdGeneratorException;
import org.neo4j.test.rule.TestDirectory; import org.neo4j.test.rule.TestDirectory;
import org.neo4j.test.rule.fs.DefaultFileSystemRule; import org.neo4j.test.rule.fs.DefaultFileSystemRule;
Expand Down Expand Up @@ -171,8 +175,61 @@ public void shouldReturnTrueOnInitIfAProperFileWasThere() throws Exception
assertTrue( idContainer.init() ); assertTrue( idContainer.init() );
} }


@Test
public void idContainerReadWriteBySingleByte() throws IOException
{
SingleByteFileSystemAbstraction fileSystem = new SingleByteFileSystemAbstraction();
IdContainer idContainer = new IdContainer( fileSystem, file, 100, false );
idContainer.init();
idContainer.close( 100 );

idContainer = new IdContainer( fileSystem, file, 100, false );
idContainer.init();
assertEquals( 100, idContainer.getInitialHighId() );
fileSystem.close();
}

private void createEmptyFile() private void createEmptyFile()
{ {
IdContainer.createEmptyIdFile( fs, file, 42, false ); IdContainer.createEmptyIdFile( fs, file, 42, false );
} }

private static class SingleByteFileSystemAbstraction extends DefaultFileSystemAbstraction
{
@Override
public StoreFileChannel open( File fileName, String mode ) throws IOException
{
return new SingleByteBufferChannel( super.open( fileName, mode ) );
}
}

private static class SingleByteBufferChannel extends StoreFileChannel
{

SingleByteBufferChannel( StoreFileChannel channel )
{
super( channel );
}

@Override
public int write( ByteBuffer src ) throws IOException
{
byte b = src.get();
ByteBuffer byteBuffer = ByteBuffer.wrap( new byte[]{b} );
return super.write( byteBuffer );
}

@Override
public int read( ByteBuffer dst ) throws IOException
{
ByteBuffer byteBuffer = ByteBuffer.allocate( 1 );
int read = super.read( byteBuffer );
if ( read > 0 )
{
byteBuffer.flip();
dst.put( byteBuffer.get() );
}
return read;
}
}
} }
Expand Up @@ -43,7 +43,9 @@
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.scan.FullStoreChangeStream; import org.neo4j.kernel.impl.api.scan.FullStoreChangeStream;
import org.neo4j.kernel.impl.index.labelscan.NativeLabelScanStore; import org.neo4j.kernel.impl.index.labelscan.NativeLabelScanStore;
import org.neo4j.kernel.impl.store.InvalidIdGeneratorException;
import org.neo4j.kernel.impl.store.MetaDataStore; import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.StoreFile;
import org.neo4j.kernel.impl.store.format.standard.StandardV2_3; import org.neo4j.kernel.impl.store.format.standard.StandardV2_3;
import org.neo4j.kernel.impl.store.format.standard.StandardV3_2; import org.neo4j.kernel.impl.store.format.standard.StandardV3_2;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor; import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
Expand Down Expand Up @@ -100,7 +102,7 @@ public void setUp() throws Exception
@Test @Test
public void skipMigrationIfNativeIndexExist() throws Exception public void skipMigrationIfNativeIndexExist() throws Exception
{ {
ByteBuffer sourceBuffer = writeNativeIndexFile( nativeLabelIndex, new byte[]{1, 2, 3} ); ByteBuffer sourceBuffer = writeFile( nativeLabelIndex, new byte[]{1, 2, 3} );


indexMigrator.migrate( storeDir, migrationDir, progressMonitor, StandardV3_2.STORE_VERSION, StandardV3_2.STORE_VERSION ); indexMigrator.migrate( storeDir, migrationDir, progressMonitor, StandardV3_2.STORE_VERSION, StandardV3_2.STORE_VERSION );
indexMigrator.moveMigratedFiles( migrationDir, storeDir, StandardV3_2.STORE_VERSION, StandardV3_2.STORE_VERSION ); indexMigrator.moveMigratedFiles( migrationDir, storeDir, StandardV3_2.STORE_VERSION, StandardV3_2.STORE_VERSION );
Expand All @@ -110,6 +112,16 @@ public void skipMigrationIfNativeIndexExist() throws Exception
assertTrue( fileSystem.fileExists( luceneLabelScanStore ) ); assertTrue( fileSystem.fileExists( luceneLabelScanStore ) );
} }


@Test( expected = InvalidIdGeneratorException.class )
public void failMigrationWhenNodeIdFileIsBroken() throws Exception
{
prepareEmpty23Database();
File nodeIdFile = new File( storeDir, StoreFile.NODE_STORE.storeFileName() + ".id" );
writeFile( nodeIdFile, new byte[]{1, 2, 3} );

indexMigrator.migrate( storeDir, migrationDir, progressMonitor, StandardV3_2.STORE_VERSION, StandardV3_2.STORE_VERSION );
}

@Test @Test
public void clearMigrationDirFromAnyLabelScanStoreBeforeMigrating() throws Exception public void clearMigrationDirFromAnyLabelScanStoreBeforeMigrating() throws Exception
{ {
Expand Down Expand Up @@ -143,7 +155,7 @@ public void moveCreatedNativeLabelIndexBackToStoreDirectory() throws IOException
prepareEmpty23Database(); prepareEmpty23Database();
indexMigrator.migrate( storeDir, migrationDir, progressMonitor, StandardV2_3.STORE_VERSION, StandardV3_2.STORE_VERSION ); indexMigrator.migrate( storeDir, migrationDir, progressMonitor, StandardV2_3.STORE_VERSION, StandardV3_2.STORE_VERSION );
File migrationNativeIndex = new File( migrationDir, NativeLabelScanStore.FILE_NAME ); File migrationNativeIndex = new File( migrationDir, NativeLabelScanStore.FILE_NAME );
ByteBuffer migratedFileContent = writeNativeIndexFile( migrationNativeIndex, new byte[]{5, 4, 3, 2, 1} ); ByteBuffer migratedFileContent = writeFile( migrationNativeIndex, new byte[]{5, 4, 3, 2, 1} );


indexMigrator.moveMigratedFiles( migrationDir, storeDir, StandardV2_3.STORE_VERSION, StandardV3_2.STORE_VERSION ); indexMigrator.moveMigratedFiles( migrationDir, storeDir, StandardV2_3.STORE_VERSION, StandardV3_2.STORE_VERSION );


Expand Down Expand Up @@ -219,7 +231,7 @@ private void assertNoContentInNativeLabelScanStore( File dir )
} }
} }


private ByteBuffer writeNativeIndexFile( File file, byte[] content ) throws IOException private ByteBuffer writeFile( File file, byte[] content ) throws IOException
{ {
ByteBuffer sourceBuffer = ByteBuffer.wrap( content ); ByteBuffer sourceBuffer = ByteBuffer.wrap( content );
storeFileContent( file, sourceBuffer ); storeFileContent( file, sourceBuffer );
Expand Down Expand Up @@ -271,9 +283,9 @@ private ByteBuffer readFileContent( File nativeLabelIndex, int length ) throws I
} }
} }


private void storeFileContent( File nativeLabelIndex, ByteBuffer sourceBuffer ) throws IOException private void storeFileContent( File file, ByteBuffer sourceBuffer ) throws IOException
{ {
try ( StoreChannel storeChannel = fileSystem.create( nativeLabelIndex ) ) try ( StoreChannel storeChannel = fileSystem.create( file ) )
{ {
storeChannel.write( sourceBuffer ); storeChannel.write( sourceBuffer );
} }
Expand Down
Expand Up @@ -70,7 +70,7 @@ public synchronized void reserveForIndex() throws IOException
File failureFile = failureFile(); File failureFile = failureFile();
try ( StoreChannel channel = fs.create( failureFile ) ) try ( StoreChannel channel = fs.create( failureFile ) )
{ {
channel.write( ByteBuffer.wrap( new byte[MAX_FAILURE_SIZE] ) ); channel.writeAll( ByteBuffer.wrap( new byte[MAX_FAILURE_SIZE] ) );
channel.force( true ); channel.force( true );
} }
} }
Expand Down Expand Up @@ -120,7 +120,7 @@ public synchronized void storeIndexFailure( String failure ) throws IOException
channel.position( lengthOf( existingData ) ); channel.position( lengthOf( existingData ) );


byte[] data = UTF8.encode( failure ); byte[] data = UTF8.encode( failure );
channel.write( ByteBuffer.wrap( data, 0, Math.min( data.length, MAX_FAILURE_SIZE ) ) ); channel.writeAll( ByteBuffer.wrap( data, 0, Math.min( data.length, MAX_FAILURE_SIZE ) ) );


channel.force( true ); channel.force( true );
channel.close(); channel.close();
Expand Down

0 comments on commit 28efba6

Please sign in to comment.