Skip to content

Commit

Permalink
Hooks in call to RecordFormat.prepare
Browse files Browse the repository at this point in the history
as to properly make the decision about the secondary record unit
before writing commands to the log
  • Loading branch information
tinwelint committed Feb 5, 2016
1 parent 4de5f1c commit 936762e
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 9 deletions.
Expand Up @@ -81,6 +81,12 @@ protected void writeRecord( PageCursor cursor, RECORD record ) throws IOExceptio
recordFormat.write( record, cursor, recordSize, storeFile );
}

@Override
public void prepareForCommit( RECORD record )
{
recordFormat.prepare( record, recordSize, this );
}

@Override
public long getNextRecordReference( RECORD record )
{
Expand Down
Expand Up @@ -846,4 +846,9 @@ protected boolean isInUse( PageCursor cursor )
{
return cursor.getByte() == Record.IN_USE.byteValue();
}

@Override
public void prepareForCommit( NeoStoreActualRecord record )
{ // No need to do anything with these records before commit
}
}
Expand Up @@ -236,6 +236,8 @@ public interface RecordStore<RECORD extends AbstractBaseRecord> extends IdSequen
*/
int getStoreHeaderInt();

void prepareForCommit( RECORD record );

Predicate<AbstractBaseRecord> IN_USE = AbstractBaseRecord::inUse;

class Delegator<R extends AbstractBaseRecord> implements RecordStore<R>
Expand Down Expand Up @@ -372,6 +374,12 @@ public void ensureHeavy( R record )
{
actual.ensureHeavy( record );
}

@Override
public void prepareForCommit( R record )
{
actual.prepareForCommit( record );
}
}

@SuppressWarnings( "unchecked" )
Expand Down
Expand Up @@ -54,4 +54,10 @@ public <FAILURE extends Exception> void accept( Processor<FAILURE> processor, Re
{
processor.processRelationshipGroup( this, record );
}

@Override
public void updateRecord( RelationshipGroupRecord record )
{
super.updateRecord( record );
}
}
Expand Up @@ -30,7 +30,11 @@
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.SchemaStore;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.store.record.DynamicRecord;
import org.neo4j.kernel.impl.store.record.IndexRule;
import org.neo4j.kernel.impl.store.record.LabelTokenRecord;
Expand Down Expand Up @@ -82,6 +86,9 @@ public class TransactionRecordState implements RecordState
private final NeoStores neoStores;
private final IntegrityValidator integrityValidator;
private final NodeStore nodeStore;
private final RelationshipStore relationshipStore;
private final PropertyStore propertyStore;
private final RecordStore<RelationshipGroupRecord> relationshipGroupStore;
private final MetaDataStore metaDataStore;
private final SchemaStore schemaStore;
private final RecordAccessSet recordChangeSet;
Expand All @@ -95,6 +102,7 @@ public class TransactionRecordState implements RecordState
private RecordChanges<Long,NeoStoreRecord, Void> neoStoreRecord;
private boolean prepared;


public TransactionRecordState( NeoStores neoStores, IntegrityValidator integrityValidator,
RecordChangeSet recordChangeSet, long lastCommittedTxWhenTransactionStarted,
ResourceLocker locks,
Expand All @@ -105,6 +113,9 @@ public TransactionRecordState( NeoStores neoStores, IntegrityValidator integrity
{
this.neoStores = neoStores;
this.nodeStore = neoStores.getNodeStore();
this.relationshipStore = neoStores.getRelationshipStore();
this.propertyStore = neoStores.getPropertyStore();
this.relationshipGroupStore = neoStores.getRelationshipGroupStore();
this.metaDataStore = neoStores.getMetaDataStore();
this.schemaStore = neoStores.getSchemaStore();
this.integrityValidator = integrityValidator;
Expand Down Expand Up @@ -157,7 +168,7 @@ public void extractCommands( Collection<StorageCommand> commands ) throws Transa
int i = 0;
for ( RecordProxy<Long, NodeRecord, Void> change : recordChangeSet.getNodeRecords().changes() )
{
NodeRecord record = change.forReadingLinkage();
NodeRecord record = prepared( change, nodeStore );
integrityValidator.validateNodeRecord( record );
nodeCommands[i++] = new Command.NodeCommand( change.getBefore(), record );
}
Expand All @@ -171,7 +182,8 @@ public void extractCommands( Collection<StorageCommand> commands ) throws Transa
int i = 0;
for ( RecordProxy<Long, RelationshipRecord, Void> change : recordChangeSet.getRelRecords().changes() )
{
relCommands[i++] = new Command.RelationshipCommand( change.getBefore(), change.forReadingLinkage() );
relCommands[i++] = new Command.RelationshipCommand( change.getBefore(),
prepared( change, relationshipStore ) );
}
Arrays.sort( relCommands, COMMAND_SORTER );
}
Expand All @@ -184,7 +196,8 @@ public void extractCommands( Collection<StorageCommand> commands ) throws Transa
for ( RecordProxy<Long, PropertyRecord, PrimitiveRecord> change :
recordChangeSet.getPropertyRecords().changes() )
{
propCommands[i++] = new Command.PropertyCommand( change.getBefore(), change.forReadingLinkage() );
propCommands[i++] = new Command.PropertyCommand( change.getBefore(),
prepared( change, propertyStore ) );
}
Arrays.sort( propCommands, COMMAND_SORTER );
}
Expand All @@ -197,8 +210,8 @@ public void extractCommands( Collection<StorageCommand> commands ) throws Transa
for ( RecordProxy<Long, RelationshipGroupRecord, Integer> change :
recordChangeSet.getRelGroupRecords().changes() )
{
relGroupCommands[i++] =
new Command.RelationshipGroupCommand( change.getBefore(), change.forReadingData() );
relGroupCommands[i++] = new Command.RelationshipGroupCommand( change.getBefore(),
prepared( change, relationshipGroupStore ) );
}
Arrays.sort( relGroupCommands, COMMAND_SORTER );
}
Expand Down Expand Up @@ -226,6 +239,14 @@ public void extractCommands( Collection<StorageCommand> commands ) throws Transa
prepared = true;
}

private <RECORD extends AbstractBaseRecord> RECORD prepared(
RecordProxy<?,RECORD,?> proxy, RecordStore<RECORD> store )
{
RECORD after = proxy.forReadingLinkage();
store.prepareForCommit( after );
return after;
}

public void relCreate( long id, int typeId, long startNodeId, long endNodeId )
{
relationshipCreator.relationshipCreate( id, typeId, startNodeId, endNodeId, recordChangeSet, locks );
Expand Down
Expand Up @@ -132,5 +132,10 @@ protected boolean isInUse( PageCursor cursor )
{
return false;
}

@Override
public void prepareForCommit( AbstractBaseRecord record )
{
}
}
}
@@ -0,0 +1,191 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.state;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.impl.store.StoreHeader;
import org.neo4j.kernel.impl.store.format.RecordFormat;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.store.id.IdSequence;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.store.record.DynamicRecord;
import org.neo4j.kernel.impl.store.record.LabelTokenRecord;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.PropertyKeyTokenRecord;
import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.kernel.impl.store.record.RecordLoad;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.kernel.impl.store.record.RelationshipTypeTokenRecord;

public class PrepareTrackingRecordFormats implements RecordFormats
{
private final RecordFormats actual;
private final Set<NodeRecord> nodePrepare = new HashSet<>();
private final Set<RelationshipRecord> relationshipPrepare = new HashSet<>();
private final Set<RelationshipGroupRecord> relationshipGroupPrepare = new HashSet<>();
private final Set<PropertyRecord> propertyPrepare = new HashSet<>();
private final Set<DynamicRecord> dynamicPrepare = new HashSet<>();
private final Set<PropertyKeyTokenRecord> propertyKeyTokenPrepare = new HashSet<>();
private final Set<LabelTokenRecord> labelTokenPrepare = new HashSet<>();
private final Set<RelationshipTypeTokenRecord> relationshipTypeTokenPrepare = new HashSet<>();

public PrepareTrackingRecordFormats( RecordFormats actual )
{
this.actual = actual;
}

@Override
public String storeVersion()
{
return actual.storeVersion();
}

@Override
public PrepareTrackingRecordFormat<NodeRecord> node()
{
return new PrepareTrackingRecordFormat<>( actual.node(), nodePrepare );
}

@Override
public PrepareTrackingRecordFormat<RelationshipGroupRecord> relationshipGroup()
{
return new PrepareTrackingRecordFormat<>( actual.relationshipGroup(), relationshipGroupPrepare );
}

@Override
public PrepareTrackingRecordFormat<RelationshipRecord> relationship()
{
return new PrepareTrackingRecordFormat<>( actual.relationship(), relationshipPrepare );
}

@Override
public PrepareTrackingRecordFormat<PropertyRecord> property()
{
return new PrepareTrackingRecordFormat<>( actual.property(), propertyPrepare );
}

@Override
public PrepareTrackingRecordFormat<LabelTokenRecord> labelToken()
{
return new PrepareTrackingRecordFormat<>( actual.labelToken(), labelTokenPrepare );
}

@Override
public PrepareTrackingRecordFormat<PropertyKeyTokenRecord> propertyKeyToken()
{
return new PrepareTrackingRecordFormat<>( actual.propertyKeyToken(), propertyKeyTokenPrepare );
}

@Override
public PrepareTrackingRecordFormat<RelationshipTypeTokenRecord> relationshipTypeToken()
{
return new PrepareTrackingRecordFormat<>( actual.relationshipTypeToken(), relationshipTypeTokenPrepare );
}

@Override
public PrepareTrackingRecordFormat<DynamicRecord> dynamic()
{
return new PrepareTrackingRecordFormat<>( actual.dynamic(), dynamicPrepare );
}

public class PrepareTrackingRecordFormat<RECORD extends AbstractBaseRecord> implements RecordFormat<RECORD>
{
private final RecordFormat<RECORD> actual;
private final Set<RECORD> prepare;

PrepareTrackingRecordFormat( RecordFormat<RECORD> actual, Set<RECORD> prepare )
{
this.actual = actual;
this.prepare = prepare;
}

@Override
public RECORD newRecord()
{
return actual.newRecord();
}

@Override
public int getRecordSize( StoreHeader storeHeader )
{
return actual.getRecordSize( storeHeader );
}

@Override
public int getRecordHeaderSize()
{
return actual.getRecordHeaderSize();
}

@Override
public boolean isInUse( PageCursor cursor )
{
return actual.isInUse( cursor );
}

@Override
public void read( RECORD record, PageCursor cursor, RecordLoad mode, int recordSize, PagedFile storeFile )
throws IOException
{
actual.read( record, cursor, mode, recordSize, storeFile );
}

@Override
public void prepare( RECORD record, int recordSize, IdSequence idSequence )
{
prepare.add( record );
actual.prepare( record, recordSize, idSequence );
}

@Override
public void write( RECORD record, PageCursor cursor, int recordSize, PagedFile storeFile ) throws IOException
{
actual.write( record, cursor, recordSize, storeFile );
}

@Override
public long getNextRecordReference( RECORD record )
{
return actual.getNextRecordReference( record );
}

@Override
public boolean equals( Object otherFormat )
{
return actual.equals( otherFormat );
}

@Override
public int hashCode()
{
return actual.hashCode();
}

public boolean prepared( RECORD record )
{
return prepare.contains( record );
}
}
}

0 comments on commit 936762e

Please sign in to comment.