Skip to content

Commit

Permalink
Read unused dynamic records when reading chain
Browse files Browse the repository at this point in the history
Without short-lived locks reading of property records and corresponding dynamic
records is not atomic. First, property record is read and only then
corresponding dynamic records are read. This might result in reading
inconsistent property values under concurrent load.

This commit allows reading of unused dynamic records and their data when they
are reachable from the chain.

Co-authored-by: @MishaDemianenko
  • Loading branch information
lutovich committed May 10, 2016
1 parent 36add50 commit 4a29e2b
Show file tree
Hide file tree
Showing 8 changed files with 463 additions and 80 deletions.
Expand Up @@ -278,7 +278,7 @@ private void readFromStore( AbstractDynamicStore store, AbstractDynamicStore.Dyn
{
buffer.clear();
long startBlockId = PropertyBlock.fetchLong( currentHeader() );
try ( GenericCursor<DynamicRecord> records = store.getRecordsCursor( startBlockId, true, cursor ) )
try ( GenericCursor<DynamicRecord> records = store.getRecordsCursor( startBlockId, cursor ) )
{
while ( records.next() )
{
Expand Down
Expand Up @@ -67,16 +67,10 @@
public abstract class AbstractDynamicStore extends CommonAbstractStore implements RecordStore<DynamicRecord>,
DynamicBlockSize, DynamicRecordAllocator
{
public static final byte[] NO_DATA = new byte[0];
private static final byte[] NO_DATA = new byte[0];
// (in_use+next high)(1 byte)+nr_of_bytes(3 bytes)+next_block(int)
public static final int BLOCK_HEADER_SIZE = 1 + 3 + 4; // = 8

// Return signals for the readRecordHeader() method:
private static int hasDataSignal = 0;
private static int hasNoDataSignal = 1;
private static int notInUseSignal = 2;
private static int illegalSizeSignal = 3;

private final int blockSizeFromConfiguration;
private int blockSize;

Expand Down Expand Up @@ -393,12 +387,12 @@ private Collection<DynamicRecord> getRecords( long startBlockId, boolean readBot
while ( blockId != noNextBlock && cursor.next( pageIdForRecord( blockId ) ) )
{
DynamicRecord record = new DynamicRecord( blockId );
int headerReadResult;
HeaderReadResult headerReadResult;
do
{
cursor.setOffset( offsetForId( blockId ) );
headerReadResult = readRecordHeader( cursor, record, false );
if ( headerReadResult == hasDataSignal && readBothHeaderAndData )
if ( headerReadResult == HeaderReadResult.DATA && readBothHeaderAndData )
{
readRecordData( cursor, record );
}
Expand All @@ -423,20 +417,17 @@ public DynamicRecordCursor newDynamicRecordCursor()
return new DynamicRecordCursor();
}

public DynamicRecordCursor getRecordsCursor( final long startBlockId,
final boolean readBothHeaderAndData )
DynamicRecordCursor getRecordsCursor( long startBlockId )
{
return getRecordsCursor( startBlockId, readBothHeaderAndData, newDynamicRecordCursor() );
return getRecordsCursor( startBlockId, newDynamicRecordCursor() );
}

public DynamicRecordCursor getRecordsCursor( final long startBlockId,
final boolean readBothHeaderAndData, DynamicRecordCursor dynamicRecordCursor )
public DynamicRecordCursor getRecordsCursor( long startBlockId, DynamicRecordCursor dynamicRecordCursor )
{
try
{
final PageCursor cursor = storeFile.io( 0, PF_SHARED_LOCK );

dynamicRecordCursor.init( startBlockId, cursor, readBothHeaderAndData );
PageCursor cursor = storeFile.io( 0, PF_SHARED_LOCK );
dynamicRecordCursor.init( startBlockId, cursor );
return dynamicRecordCursor;
}
catch ( IOException e )
Expand All @@ -445,17 +436,17 @@ public DynamicRecordCursor getRecordsCursor( final long startBlockId,
}
}

private void checkForInUse( int headerReadResult, DynamicRecord record )
private void checkForInUse( HeaderReadResult headerReadResult, DynamicRecord record )
{
if ( headerReadResult == notInUseSignal )
if ( headerReadResult == HeaderReadResult.NOT_IN_USE )
{
throw new InvalidRecordException( "DynamicRecord Not in use, blockId[" + record.getId() + "]" );
throw new InvalidRecordException( "DynamicRecord not in use, blockId[" + record.getId() + "]" );
}
}

private void checkForIllegalSize( int headerReadResult, DynamicRecord record )
private void checkForIllegalSize( HeaderReadResult headerReadResult, DynamicRecord record )
{
if ( headerReadResult == illegalSizeSignal )
if ( headerReadResult == HeaderReadResult.ILLEGAL_SIZE )
{
int dataSize = getBlockSize() - AbstractDynamicStore.BLOCK_HEADER_SIZE;
throw new InvalidRecordException( "Next block set[" + record.getNextBlock()
Expand All @@ -467,7 +458,7 @@ private void checkForIllegalSize( int headerReadResult, DynamicRecord record )
/**
* Reads data from the cursor into the given record, and returns one of the signals specified above.
*/
private int readRecordHeader( PageCursor cursor, DynamicRecord record, boolean force )
private HeaderReadResult readRecordHeader( PageCursor cursor, DynamicRecord record, boolean force )
{
/*
* First 4b
Expand All @@ -484,7 +475,7 @@ private int readRecordHeader( PageCursor cursor, DynamicRecord record, boolean f
boolean inUse = highNibbleInMaskedInteger == Record.IN_USE.intValue();
if ( !inUse && !force )
{
return notInUseSignal;
return HeaderReadResult.NOT_IN_USE;
}
int dataSize = getBlockSize() - AbstractDynamicStore.BLOCK_HEADER_SIZE;

Expand All @@ -508,10 +499,10 @@ private int readRecordHeader( PageCursor cursor, DynamicRecord record, boolean f
hasDataToRead = false;
if ( !force )
{
return illegalSizeSignal;
return HeaderReadResult.ILLEGAL_SIZE;
}
}
return hasDataToRead ? hasDataSignal : hasNoDataSignal;
return hasDataToRead ? HeaderReadResult.DATA : HeaderReadResult.NO_DATA;
}

private void readRecordData( PageCursor cursor, DynamicRecord record )
Expand Down Expand Up @@ -566,15 +557,15 @@ public DynamicRecord getRecord( long id )
long pageId = pageIdForRecord( id );
try ( PageCursor cursor = storeFile.io( pageId, PF_SHARED_LOCK ) )
{
int headerReadResult = notInUseSignal;
HeaderReadResult headerReadResult = HeaderReadResult.NOT_IN_USE;
if ( cursor.next() )
{
int offset = offsetForId( record.getId() );
do
{
cursor.setOffset( offset );
headerReadResult = readRecordHeader( cursor, record, false );
if ( headerReadResult == hasDataSignal )
if ( headerReadResult == HeaderReadResult.DATA )
{
readRecordData( cursor, record );
}
Expand Down Expand Up @@ -602,12 +593,12 @@ public DynamicRecord forceGetRecord( long id )
if ( cursor.next() )
{
int offset = offsetForId( record.getId() );
int headerReadResult;
HeaderReadResult headerReadResult;
do
{
cursor.setOffset( offset );
headerReadResult = readRecordHeader( cursor, record, true );
if ( headerReadResult == hasDataSignal )
if ( headerReadResult == HeaderReadResult.DATA )
{
readRecordData( cursor, record );
}
Expand Down Expand Up @@ -656,16 +647,14 @@ public String toString()
public class DynamicRecordCursor extends GenericCursor<DynamicRecord>
{
private PageCursor cursor;
private boolean readBothHeaderAndData;
long blockId;
int noNextBlock;

private final DynamicRecord record = new DynamicRecord( blockId );

public void init( long startBlockId, PageCursor cursor, boolean readBothHeaderAndData )
public void init( long startBlockId, PageCursor cursor )
{
this.cursor = cursor;
this.readBothHeaderAndData = readBothHeaderAndData;
blockId = startBlockId;
noNextBlock = Record.NO_NEXT_BLOCK.intValue();
}
Expand All @@ -679,19 +668,18 @@ public boolean next()
{
record.setId( blockId );

int headerReadResult;
HeaderReadResult headerReadResult;
do
{
cursor.setOffset( offsetForId( blockId ) );
headerReadResult = readRecordHeader( cursor, record, false );
if ( headerReadResult == hasDataSignal && readBothHeaderAndData )
headerReadResult = readRecordHeader( cursor, record, true );
if ( headerReadResult == HeaderReadResult.DATA )
{
readRecordData( cursor, record );
}
}
while ( cursor.shouldRetry() );

checkForInUse( headerReadResult, record );
checkForIllegalSize( headerReadResult, record );
current = record;
blockId = record.getNextBlock();
Expand All @@ -715,4 +703,9 @@ public void close()
cursor = null;
}
}

private enum HeaderReadResult
{
DATA, NO_DATA, NOT_IN_USE, ILLEGAL_SIZE
}
}
Expand Up @@ -265,38 +265,33 @@ public void ensureHeavy( PropertyBlock block )
{
if ( block.getType() == PropertyType.STRING )
{
if ( block.isLight() )
{
try ( GenericCursor<DynamicRecord> stringRecords = stringPropertyStore.getRecordsCursor(
block.getSingleValueLong(), false ) )
{
while ( stringRecords.next() )
{
stringRecords.get().setType( PropertyType.STRING.intValue() );
block.addValueRecord( stringRecords.get().clone() );
}
}
}
for ( DynamicRecord stringRecord : block.getValueRecords() )
{
stringPropertyStore.ensureHeavy( stringRecord );
}
loadPropertyBlock( block, stringPropertyStore );
}
else if ( block.getType() == PropertyType.ARRAY )
{
if ( block.isLight() )
loadPropertyBlock( block, arrayPropertyStore );
}
}

private static void loadPropertyBlock( PropertyBlock block, AbstractDynamicStore dynamicStore )
{
if ( block.isLight() )
{
long startBlockId = block.getSingleValueLong();
try ( GenericCursor<DynamicRecord> cursor = dynamicStore.getRecordsCursor( startBlockId ) )
{
Collection<DynamicRecord> arrayRecords = arrayPropertyStore.getLightRecords(
block.getSingleValueLong() );
for ( DynamicRecord arrayRecord : arrayRecords )
while ( cursor.next() )
{
arrayRecord.setType( PropertyType.ARRAY.intValue() );
block.addValueRecord( arrayRecord );
cursor.get().setType( block.getType().intValue() );
block.addValueRecord( cursor.get().clone() );
}
}
for ( DynamicRecord arrayRecord : block.getValueRecords() )
}
else
{
for ( DynamicRecord dynamicRecord : block.getValueRecords() )
{
arrayPropertyStore.ensureHeavy( arrayRecord );
dynamicStore.ensureHeavy( dynamicRecord );
}
}
}
Expand Down

0 comments on commit 4a29e2b

Please sign in to comment.