Skip to content

Commit

Permalink
Import tool uses prepareForCommit
Browse files Browse the repository at this point in the history
and better handling of secondary unit id, by having AbstractBaseRecord
still reference a secondary unit id even if it's not required anymore,
so that it can be freed.
  • Loading branch information
tinwelint committed Feb 16, 2016
1 parent 111eb44 commit 9c42ba7
Show file tree
Hide file tree
Showing 14 changed files with 125 additions and 39 deletions.
Expand Up @@ -989,6 +989,12 @@ public void updateRecord( RECORD record )
{
freeId( id );
}
if ( (!record.inUse() || !record.requiresSecondaryUnit()) && record.hasSecondaryUnitId() )
{
// If record was just now deleted, or if the record used a secondary unit, but not anymore
// then free the id of that secondary unit.
freeId( record.getSecondaryUnitId() );
}
}
}
catch ( IOException e )
Expand Down
Expand Up @@ -126,11 +126,11 @@ protected final void doWrite( RECORD record, PageCursor primaryCursor, int recor
assert (headerByte & 0x7) == 0 : "Format-specific header bits (" + headerByte +
") collides with format-generic header bits";
headerByte = set( headerByte, IN_USE_BIT, record.inUse() );
headerByte = set( headerByte, HEADER_BIT_RECORD_UNIT, record.requiresTwoUnits() );
headerByte = set( headerByte, HEADER_BIT_RECORD_UNIT, record.requiresSecondaryUnit() );
headerByte = set( headerByte, HEADER_BIT_FIRST_RECORD_UNIT, true );
primaryCursor.putByte( headerByte );

if ( record.requiresTwoUnits() )
if ( record.requiresSecondaryUnit() )
{
doWriteInternal( record, primaryCursor,
recordIO.getWriteAdapter( record, primaryCursor, recordSize, storeFile ) );
Expand All @@ -151,9 +151,13 @@ public final void prepare( RECORD record, int recordSize, IdSequence idSequence
{
assert record.inUse();
int length = 1 + requiredDataLength( record );
if ( length > recordSize )
boolean requiresSecondaryUnit = length > recordSize;
record.setRequiresSecondaryUnit( requiresSecondaryUnit );
if ( record.requiresSecondaryUnit() && !record.hasSecondaryUnitId() )
{
record.setSecondaryId( idSequence.nextId() );
// Allocate a new id at this point, but this is not the time to free this ID the the case where
// this record doesn't need this secondary unit anymore... that needs to be done when applying to store.
record.setSecondaryUnitId( idSequence.nextId() );
}
}

Expand Down
Expand Up @@ -52,19 +52,19 @@ public String storeVersion()
@Override
public RecordFormat<NodeRecord> node()
{
return new NodeRecordFormat();
return new NodeRecordFormat( new RecordIO.CommunityRecordIO<>() );
}

@Override
public RecordFormat<RelationshipRecord> relationship()
{
return new RelationshipRecordFormat();
return new RelationshipRecordFormat( new RecordIO.CommunityRecordIO<>() );
}

@Override
public RecordFormat<RelationshipGroupRecord> relationshipGroup()
{
return new RelationshipGroupRecordFormat();
return new RelationshipGroupRecordFormat( new RecordIO.CommunityRecordIO<>() );
}

@Override
Expand Down
Expand Up @@ -39,10 +39,6 @@ public NodeRecordFormat( RecordIO<NodeRecord> recordIO )
{
super( fixedRecordSize( RECORD_SIZE ), 0, recordIO );
}
public NodeRecordFormat()
{
super( fixedRecordSize( RECORD_SIZE ), 0 );
}

@Override
public NodeRecord newRecord()
Expand Down
Expand Up @@ -32,11 +32,6 @@ class RelationshipGroupRecordFormat extends BaseHighLimitRecordFormat<Relationsh
private static final int HAS_LOOP_BIT = 0b0010_0000;
private static final int HAS_NEXT_BIT = 0b0100_0000;

public RelationshipGroupRecordFormat()
{
super( fixedRecordSize( RECORD_SIZE ), 0 );
}

public RelationshipGroupRecordFormat( RecordIO<RelationshipGroupRecord> recordIO )
{
super( fixedRecordSize( RECORD_SIZE ), 0, recordIO );
Expand Down
Expand Up @@ -33,10 +33,6 @@ class RelationshipRecordFormat extends BaseHighLimitRecordFormat<RelationshipRec
private static final int HAS_END_NEXT_BIT = 0b0100_0000;
private static final int HAS_PROPERTY_BIT = 0b1000_0000;

public RelationshipRecordFormat()
{
super( fixedRecordSize( RECORD_SIZE ), 0 );
}
public RelationshipRecordFormat( RecordIO<RelationshipRecord> recordIO )
{
super( fixedRecordSize( RECORD_SIZE ), 0, recordIO );
Expand Down
Expand Up @@ -35,21 +35,30 @@ public DefaultIdGeneratorFactory( FileSystemAbstraction fs )
this.fs = fs;
}

@Override
public IdGenerator open( File fileName, int grabSize, IdType idType, long highId )
{
long maxValue = idType.getMaxValue();
boolean aggressiveReuse = idType.allowAggressiveReuse();
IdGenerator generator = new IdGeneratorImpl( fs, fileName, grabSize, maxValue,
IdGenerator generator = instantiate( fs, fileName, grabSize, maxValue,
aggressiveReuse, highId );
generators.put( idType, generator );
return generator;
}

protected IdGenerator instantiate( FileSystemAbstraction fs, File fileName, int grabSize, long maxValue,
boolean aggressiveReuse, long highId )
{
return new IdGeneratorImpl( fs, fileName, grabSize, maxValue, aggressiveReuse, highId );
}

@Override
public IdGenerator get( IdType idType )
{
return generators.get( idType );
}

@Override
public void create( File fileName, long highId, boolean throwIfFileExists )
{
IdGeneratorImpl.createGenerator( fs, fileName, highId, throwIfFileExists );
Expand Down
Expand Up @@ -34,7 +34,11 @@ public abstract class AbstractBaseRecord implements CloneableInPublic
private long id;
// Used for the "record unit" feature where one logical record may span two physical records,
// as to still keep low and fixed record size, but support occasionally bigger records.
private long secondaryId;
private long secondaryUnitId;
// This flag is for when a record required a secondary unit, was changed, as a result of that change
// no longer requires that secondary unit and gets updated. In that scenario we still want to know
// about the secondary unit id so that we can free it when the time comes to apply the record to store.
private boolean requiresSecondaryUnit;
private boolean inUse;
private boolean created;

Expand All @@ -48,7 +52,8 @@ protected AbstractBaseRecord initialize( boolean inUse )
{
this.inUse = inUse;
this.created = false;
this.secondaryId = NO_ID;
this.secondaryUnitId = NO_ID;
this.requiresSecondaryUnit = false;
return this;
}

Expand All @@ -62,7 +67,8 @@ public void clear()
{
inUse = false;
created = false;
secondaryId = NO_ID;
secondaryUnitId = NO_ID;
requiresSecondaryUnit = false;
}

public long getId()
Expand All @@ -81,28 +87,40 @@ public void setId( long id )
}

/**
* Sets a secondary record unit ID for this record. If this is set to something other than {@link #NO_ID}
* then {@link #requiresTwoUnits()} will return {@code true}.
* Sets a secondary record unit ID for this record. If this is set to something other than {@code -1}
* then {@link #requiresSecondaryUnit()} will return {@code true}.
* Setting this id is separate from setting {@link #requiresSecondaryUnit()} since this secondary unit id
* may be used to just free that id at the time of updating in the store if a record goes from two to one unit.
*/
public void setSecondaryId( long id )
public void setSecondaryUnitId( long id )
{
this.secondaryId = id;
this.secondaryUnitId = id;
}

public boolean hasSecondaryUnitId()
{
return secondaryUnitId != NO_ID;
}

/**
* @return secondary record unit ID set by {@link #setSecondaryId(long)}.
* @return secondary record unit ID set by {@link #setSecondaryUnitId(long)}.
*/
public long getSecondaryId()
public long getSecondaryUnitId()
{
return this.secondaryUnitId;
}

public void setRequiresSecondaryUnit( boolean requires )
{
return this.secondaryId;
this.requiresSecondaryUnit = requires;
}

/**
* @return whether or not a secondary record unit ID has been assigned.
*/
public boolean requiresTwoUnits()
public boolean requiresSecondaryUnit()
{
return this.secondaryId != NO_ID;
return requiresSecondaryUnit;
}

public final boolean inUse()
Expand Down
Expand Up @@ -122,6 +122,7 @@ protected void process( Batch<INPUT,RECORD> batch, BatchSender sender )
}
}
highestId = max( highestId, record.getId() );
entityStore.prepareForCommit( record );
entityStore.updateRecord( record );
}
else
Expand Down
Expand Up @@ -80,6 +80,7 @@ public long visit( long nodeId, int type, long next, long out, long in, long loo
{
groupRecord.setNext( nextGroupId = relGroupStore.nextId() );
}
relGroupStore.prepareForCommit( groupRecord );
relGroupStore.updateRecord( groupRecord );
return id;
}
Expand Down
Expand Up @@ -72,6 +72,7 @@ record = (RECORD) record.clone();

protected void update( RECORD record ) throws Throwable
{
store.prepareForCommit( record );
store.updateRecord( record );
}

Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdGenerator;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdType;
import org.neo4j.kernel.configuration.Config;
Expand All @@ -56,6 +57,9 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import static java.util.Arrays.asList;

Expand All @@ -77,6 +81,7 @@ public class NodeStoreTest

private NodeStore nodeStore;
private NeoStores neoStores;
private IdGeneratorFactory idGeneratorFactory;

@After
public void tearDown()
Expand Down Expand Up @@ -302,6 +307,52 @@ public int read( ByteBuffer dst ) throws IOException
}
}

@Test
public void shouldFreeSecondaryUnitIdOfDeletedRecord() throws Exception
{
// GIVEN
EphemeralFileSystemAbstraction fs = efs.get();
nodeStore = newNodeStore( fs );
NodeRecord record = new NodeRecord( 5L );
record.setRequiresSecondaryUnit( true );
record.setSecondaryUnitId( 10L );
record.setInUse( true );
nodeStore.updateRecord( record );
nodeStore.setHighestPossibleIdInUse( 10L );

// WHEN
record.setInUse( false );
nodeStore.updateRecord( record );

// THEN
IdGenerator idGenerator = idGeneratorFactory.get( IdType.NODE );
verify( idGenerator ).freeId( 5L );
verify( idGenerator ).freeId( 10L );
}

@Test
public void shouldFreeSecondaryUnitIdOfShrunkRecord() throws Exception
{
// GIVEN
EphemeralFileSystemAbstraction fs = efs.get();
nodeStore = newNodeStore( fs );
NodeRecord record = new NodeRecord( 5L );
record.setRequiresSecondaryUnit( true );
record.setSecondaryUnitId( 10L );
record.setInUse( true );
nodeStore.updateRecord( record );
nodeStore.setHighestPossibleIdInUse( 10L );

// WHEN
record.setRequiresSecondaryUnit( false );
nodeStore.updateRecord( record );

// THEN
IdGenerator idGenerator = idGeneratorFactory.get( IdType.NODE );
verify( idGenerator, times( 0 ) ).freeId( 5L );
verify( idGenerator ).freeId( 10L );
}

private NodeStore newNodeStore( FileSystemAbstraction fs ) throws IOException
{
return newNodeStore( fs, pageCacheRule.getPageCache( fs ) );
Expand All @@ -311,7 +362,15 @@ private NodeStore newNodeStore( FileSystemAbstraction fs, PageCache pageCache )
{
File storeDir = new File( "dir" );
fs.mkdirs( storeDir );
IdGeneratorFactory idGeneratorFactory = new DefaultIdGeneratorFactory( fs );
idGeneratorFactory = spy( new DefaultIdGeneratorFactory( fs )
{
@Override
protected IdGenerator instantiate( FileSystemAbstraction fs, File fileName, int grabSize, long maxValue,
boolean aggressiveReuse, long highId )
{
return spy( super.instantiate( fs, fileName, grabSize, maxValue, aggressiveReuse, highId ) );
}
} );
StoreFactory factory = new StoreFactory( storeDir, new Config(), idGeneratorFactory, pageCache, fs,
NullLogProvider.getInstance() );
neoStores = factory.openAllNeoStores( true );
Expand Down
Expand Up @@ -192,7 +192,7 @@ private <R extends AbstractBaseRecord> void verifyWriteAndRead(
{
assertEquals( written.inUse(), read.inUse() );
assertEquals( written.getId(), read.getId() );
assertEquals( written.getSecondaryId(), read.getSecondaryId() );
assertEquals( written.getSecondaryUnitId(), read.getSecondaryUnitId() );
key.assertRecordsEquals( written, read );
}
else
Expand Down
Expand Up @@ -64,7 +64,7 @@ public void read( RECORD record, PageCursor primaryCursor, int recordSize, Paged
}
while ( secondaryPageCursorControl.shouldRetry() );

record.setSecondaryId( secondaryId );
record.setSecondaryUnitId( secondaryId );
}
}

Expand All @@ -76,10 +76,10 @@ public DataAdapter<PageCursor> getWriteAdapter( RECORD record, PageCursor primar

// Write using the normal adapter since the first reference we write cannot really overflow
// into the secondary record
Reference.encode( record.getSecondaryId(), primaryCursor, PAGE_CURSOR_ADAPTER );
Reference.encode( record.getSecondaryUnitId(), primaryCursor, PAGE_CURSOR_ADAPTER );

return new SecondaryPageCursorWriteDataAdapter(
pageIdForRecord( record.getSecondaryId(), storeFile.pageSize(), recordSize ),
offsetForId( record.getSecondaryId(), storeFile.pageSize(), recordSize ), primaryEndOffset );
pageIdForRecord( record.getSecondaryUnitId(), storeFile.pageSize(), recordSize ),
offsetForId( record.getSecondaryUnitId(), storeFile.pageSize(), recordSize ), primaryEndOffset );
}
}

0 comments on commit 9c42ba7

Please sign in to comment.