Skip to content

Commit

Permalink
Update store lock checker optimistic assumption and log file test
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed May 15, 2017
1 parent c8e228a commit 5ac14d3
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 80 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.io.IOUtils;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.StoreLockException;
import org.neo4j.kernel.internal.StoreLocker;

class StoreLockChecker implements Closeable
Expand Down Expand Up @@ -58,10 +59,24 @@ static Closeable check( Path databaseDirectory ) throws CannotWriteException
{
if ( Files.isWritable( lockFile ) )
{
StoreLockChecker storeLocker = new StoreLockChecker( new DefaultFileSystemAbstraction(),
databaseDirectory.toFile() );
storeLocker.checkLock();
return storeLocker;
StoreLockChecker storeLocker = new StoreLockChecker( new DefaultFileSystemAbstraction(), databaseDirectory.toFile() );
try
{
storeLocker.checkLock();
return storeLocker;
}
catch ( StoreLockException le )
{
try
{
storeLocker.close();
}
catch ( IOException e )
{
le.addSuppressed( e );
}
throw le;
}
}
else
{
Expand Down
Expand Up @@ -235,8 +235,7 @@ public BatchInserterImpl( final File storeDir, final FileSystemAbstraction fileS

StoreLogService logService = life.add( StoreLogService.inLogsDirectory( fileSystem, this.storeDir ) );
msgLog = logService.getInternalLog( getClass() );
storeLocker = new StoreLocker( fileSystem, this.storeDir );
storeLocker.checkLock();
storeLocker = tryLockStore( fileSystem );

boolean dump = config.get( GraphDatabaseSettings.dump_configuration );
this.idGeneratorFactory = new DefaultIdGeneratorFactory( fileSystem );
Expand Down Expand Up @@ -304,6 +303,28 @@ public BatchInserterImpl( final File storeDir, final FileSystemAbstraction fileS
.batch_inserter_batch_size ) );
}

private StoreLocker tryLockStore( FileSystemAbstraction fileSystem )
{
StoreLocker storeLocker = new StoreLocker( fileSystem, this.storeDir );
try
{
storeLocker.checkLock();
}
catch ( Exception e )
{
try
{
storeLocker.close();
}
catch ( IOException ce )
{
e.addSuppressed( ce );
}
throw e;
}
return storeLocker;
}

private Map<String, String> getDefaultParams()
{
Map<String, String> params = new HashMap<>();
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

import java.io.File;
import java.io.IOException;
Expand All @@ -34,7 +35,7 @@
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile.Monitor;
import org.neo4j.kernel.impl.transaction.log.entry.IncompleteLogHeaderException;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.LifeRule;
import org.neo4j.test.rule.TestDirectory;
import org.neo4j.test.rule.fs.DefaultFileSystemRule;

Expand All @@ -54,35 +55,36 @@

public class PhysicalLogFileTest
{
private final TestDirectory directory = TestDirectory.testDirectory();
private final DefaultFileSystemRule fileSystemRule = new DefaultFileSystemRule();
private final LifeRule life = new LifeRule( true );

@Rule
public final TestDirectory directory = TestDirectory.testDirectory();
@Rule
public final DefaultFileSystemRule fileSystemRule = new DefaultFileSystemRule();
public RuleChain ruleChain = RuleChain.outerRule( directory ).around( fileSystemRule ).around( life );

private final LogVersionRepository logVersionRepository = new DeadSimpleLogVersionRepository( 1L );
private final TransactionIdStore transactionIdStore =
new DeadSimpleTransactionIdStore( 5L, 0, BASE_TX_COMMIT_TIMESTAMP, 0, 0 );

@Test
public void skipLogFileWithoutHeader() throws IOException
{
LifeSupport life = new LifeSupport();
FileSystemAbstraction fs = fileSystemRule.get();
PhysicalLogFiles physicalLogFiles = new PhysicalLogFiles( directory.directory(), "logs", fs );
PhysicalLogFile physicalLogFile = new PhysicalLogFile( fs, physicalLogFiles, 1000,
() -> 1L, logVersionRepository,
mock( Monitor.class ), new LogHeaderCache( 10 ) );
PhysicalLogFile physicalLogFile =
new PhysicalLogFile( fs, physicalLogFiles, 1000, () -> 1L, logVersionRepository, mock( Monitor.class ),
new LogHeaderCache( 10 ) );
life.add( physicalLogFile );
life.start();

// simulate new file without header presence
PhysicalLogFile logFileToSearchFrom = new PhysicalLogFile( fs, physicalLogFiles, 1000,
() -> 10L, logVersionRepository, mock( Monitor.class ),
new LogHeaderCache( 10 ) );
PhysicalLogFile logFileToSearchFrom =
new PhysicalLogFile( fs, physicalLogFiles, 1000, () -> 10L, logVersionRepository, mock( Monitor.class ),
new LogHeaderCache( 10 ) );
logVersionRepository.incrementAndGetVersion();
fs.create( physicalLogFiles.getLogFileForVersion( logVersionRepository.getCurrentLogVersion() ) ).close();

PhysicalLogicalTransactionStore.LogVersionLocator versionLocator =
new PhysicalLogicalTransactionStore.LogVersionLocator( 4L );
PhysicalLogicalTransactionStore.LogVersionLocator versionLocator = new PhysicalLogicalTransactionStore.LogVersionLocator( 4L );
logFileToSearchFrom.accept( versionLocator );

LogPosition logPosition = versionLocator.getLogPosition();
Expand All @@ -94,7 +96,6 @@ public void shouldOpenInFreshDirectoryAndFinallyAddHeader() throws Exception
{
// GIVEN
String name = "log";
LifeSupport life = new LifeSupport();
FileSystemAbstraction fs = fileSystemRule.get();
PhysicalLogFiles logFiles = new PhysicalLogFiles( directory.directory(), name, fs );
life.add( new PhysicalLogFile( fs, logFiles, 1000, transactionIdStore::getLastCommittedTransactionId,
Expand All @@ -116,7 +117,6 @@ public void shouldWriteSomeDataIntoTheLog() throws Exception
{
// GIVEN
String name = "log";
LifeSupport life = new LifeSupport();
FileSystemAbstraction fs = fileSystemRule.get();
PhysicalLogFiles logFiles = new PhysicalLogFiles( directory.directory(), name, fs );
Monitor monitor = mock( Monitor.class );
Expand All @@ -125,29 +125,20 @@ public void shouldWriteSomeDataIntoTheLog() throws Exception
new LogHeaderCache( 10 ) ) );

// WHEN
try
{
life.start();

FlushablePositionAwareChannel writer = logFile.getWriter();
LogPositionMarker positionMarker = new LogPositionMarker();
writer.getCurrentPosition( positionMarker );
int intValue = 45;
long longValue = 4854587;
writer.putInt( intValue );
writer.putLong( longValue );
writer.prepareForFlush().flush();
FlushablePositionAwareChannel writer = logFile.getWriter();
LogPositionMarker positionMarker = new LogPositionMarker();
writer.getCurrentPosition( positionMarker );
int intValue = 45;
long longValue = 4854587;
writer.putInt( intValue );
writer.putLong( longValue );
writer.prepareForFlush().flush();

// THEN
try ( ReadableClosableChannel reader = logFile.getReader( positionMarker.newPosition() ) )
{
assertEquals( intValue, reader.getInt() );
assertEquals( longValue, reader.getLong() );
}
}
finally
// THEN
try ( ReadableClosableChannel reader = logFile.getReader( positionMarker.newPosition() ) )
{
life.shutdown();
assertEquals( intValue, reader.getInt() );
assertEquals( longValue, reader.getLong() );
}
}

Expand All @@ -156,51 +147,42 @@ public void shouldReadOlderLogs() throws Exception
{
// GIVEN
String name = "log";
LifeSupport life = new LifeSupport();
FileSystemAbstraction fs = fileSystemRule.get();
PhysicalLogFiles logFiles = new PhysicalLogFiles( directory.directory(), name, fs );
LogFile logFile = life.add( new PhysicalLogFile( fs, logFiles, 50,
transactionIdStore::getLastCommittedTransactionId, logVersionRepository, mock( Monitor.class ),
new LogHeaderCache( 10 ) ) );

// WHEN
life.start();
try
{
FlushablePositionAwareChannel writer = logFile.getWriter();
LogPositionMarker positionMarker = new LogPositionMarker();
writer.getCurrentPosition( positionMarker );
LogPosition position1 = positionMarker.newPosition();
int intValue = 45;
long longValue = 4854587;
byte[] someBytes = someBytes( 40 );
writer.putInt( intValue );
writer.putLong( longValue );
writer.put( someBytes, someBytes.length );
writer.prepareForFlush().flush();
writer.getCurrentPosition( positionMarker );
LogPosition position2 = positionMarker.newPosition();
long longValue2 = 123456789L;
writer.putLong( longValue2 );
writer.put( someBytes, someBytes.length );
writer.prepareForFlush().flush();
FlushablePositionAwareChannel writer = logFile.getWriter();
LogPositionMarker positionMarker = new LogPositionMarker();
writer.getCurrentPosition( positionMarker );
LogPosition position1 = positionMarker.newPosition();
int intValue = 45;
long longValue = 4854587;
byte[] someBytes = someBytes( 40 );
writer.putInt( intValue );
writer.putLong( longValue );
writer.put( someBytes, someBytes.length );
writer.prepareForFlush().flush();
writer.getCurrentPosition( positionMarker );
LogPosition position2 = positionMarker.newPosition();
long longValue2 = 123456789L;
writer.putLong( longValue2 );
writer.put( someBytes, someBytes.length );
writer.prepareForFlush().flush();

// THEN
try ( ReadableClosableChannel reader = logFile.getReader( position1 ) )
{
assertEquals( intValue, reader.getInt() );
assertEquals( longValue, reader.getLong() );
assertArrayEquals( someBytes, readBytes( reader, 40 ) );
}
try ( ReadableClosableChannel reader = logFile.getReader( position2 ) )
{
assertEquals( longValue2, reader.getLong() );
assertArrayEquals( someBytes, readBytes( reader, 40 ) );
}
// THEN
try ( ReadableClosableChannel reader = logFile.getReader( position1 ) )
{
assertEquals( intValue, reader.getInt() );
assertEquals( longValue, reader.getLong() );
assertArrayEquals( someBytes, readBytes( reader, 40 ) );
}
finally
try ( ReadableClosableChannel reader = logFile.getReader( position2 ) )
{
life.shutdown();
assertEquals( longValue2, reader.getLong() );
assertArrayEquals( someBytes, readBytes( reader, 40 ) );
}
}

Expand All @@ -209,13 +191,11 @@ public void shouldVisitLogFile() throws Exception
{
// GIVEN
String name = "log";
LifeSupport life = new LifeSupport();
FileSystemAbstraction fs = fileSystemRule.get();
PhysicalLogFiles logFiles = new PhysicalLogFiles( directory.directory(), name, fs );
LogFile logFile = life.add( new PhysicalLogFile( fs, logFiles, 50,
transactionIdStore::getLastCommittedTransactionId, logVersionRepository, mock( Monitor.class ),
new LogHeaderCache( 10 ) ) );
life.start();
FlushablePositionAwareChannel writer = logFile.getWriter();
LogPositionMarker mark = new LogPositionMarker();
writer.getCurrentPosition( mark );
Expand All @@ -237,7 +217,6 @@ public void shouldVisitLogFile() throws Exception
return true;
}, mark.newPosition() );
assertTrue( called.get() );
life.shutdown();
}

@Test
Expand Down

0 comments on commit 5ac14d3

Please sign in to comment.