Skip to content

Commit

Permalink
Index updates for relationship indexes
Browse files Browse the repository at this point in the history
wip
  • Loading branch information
ragadeeshu committed Jun 20, 2018
1 parent 0896f0a commit c5d3a89
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 90 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.kernel.impl.api.BatchTransactionApplier;
import org.neo4j.kernel.impl.api.TransactionApplier;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand;
import org.neo4j.kernel.impl.transaction.command.Command.PropertyCommand;
import org.neo4j.storageengine.api.CommandsToApply;
Expand All @@ -39,11 +40,13 @@
* Implements both BatchTransactionApplier and TransactionApplier in order to reduce garbage.
* Gathers node/property commands by node id, preparing for extraction of {@link EntityUpdates updates}.
*/
public class NodePropertyCommandsExtractor extends TransactionApplier.Adapter
public class PropertyCommandsExtractor extends TransactionApplier.Adapter
implements BatchTransactionApplier
{
private final MutableLongObjectMap<NodeCommand> nodeCommandsById = new LongObjectHashMap<>();
private final MutableLongObjectMap<Command.RelationshipCommand> relationshipCommandsById = new LongObjectHashMap<>();
private final MutableLongObjectMap<List<PropertyCommand>> propertyCommandsByNodeIds = new LongObjectHashMap<>();
private final MutableLongObjectMap<List<PropertyCommand>> propertyCommandsByRelationshipIds = new LongObjectHashMap<>();
private boolean hasUpdates;

@Override
Expand All @@ -62,7 +65,9 @@ public TransactionApplier startTx( CommandsToApply transaction, LockGroup lockGr
public void close()
{
nodeCommandsById.clear();
relationshipCommandsById.clear();
propertyCommandsByNodeIds.clear();
propertyCommandsByRelationshipIds.clear();
}

@Override
Expand All @@ -76,40 +81,50 @@ public boolean visitNodeCommand( NodeCommand command )
return false;
}

public static boolean mayResultInIndexUpdates( NodeCommand command )
@Override
public boolean visitRelationshipCommand( Command.RelationshipCommand command )
{
relationshipCommandsById.put( command.getKey(), command );
hasUpdates = true;
return false;
}

private static boolean mayResultInIndexUpdates( NodeCommand command )
{
long before = command.getBefore().getLabelField();
long after = command.getAfter().getLabelField();
return before != after ||
// Because we don't know here, there may have been changes to a dynamic label record
// even though it still points to the same one
fieldPointsToDynamicRecordOfLabels( before ) || fieldPointsToDynamicRecordOfLabels( after );

}

public static boolean mayResultInIndexUpdates( PropertyCommand command )
{
return command.getAfter().isNodeSet();
}

@Override
public boolean visitPropertyCommand( PropertyCommand command )
{
if ( mayResultInIndexUpdates( command ) )
if ( command.getAfter().isNodeSet() )
{
long nodeId = command.getAfter().getNodeId();
List<PropertyCommand> group = propertyCommandsByNodeIds.get( nodeId );
if ( group == null )
{
propertyCommandsByNodeIds.put( nodeId, group = new ArrayList<>() );
}
group.add( command );
hasUpdates = true;
createOrAddToGroup( command, command.getAfter().getNodeId(), propertyCommandsByNodeIds );
}
else if ( command.getAfter().isRelSet() )
{
createOrAddToGroup( command, command.getAfter().getRelId(), propertyCommandsByRelationshipIds );
}
return false;
}

public boolean containsAnyNodeOrPropertyUpdate()
private void createOrAddToGroup( PropertyCommand command, long entityId, MutableLongObjectMap<List<PropertyCommand>> propertyCommandsByEntityIds )
{
List<PropertyCommand> group = propertyCommandsByEntityIds.get( entityId );
if ( group == null )
{
this.propertyCommandsByNodeIds.put( entityId, group = new ArrayList<>() );
}
group.add( command );
hasUpdates = true;
}

public boolean containsAnyEntityOrPropertyUpdate()
{
return hasUpdates;
}
Expand All @@ -119,8 +134,18 @@ public LongObjectMap<NodeCommand> nodeCommandsById()
return nodeCommandsById;
}

public LongObjectMap<Command.RelationshipCommand> relationshipCommandsById()
{
return relationshipCommandsById;
}

public LongObjectMap<List<PropertyCommand>> propertyCommandsByNodeIds()
{
return propertyCommandsByNodeIds;
}

public LongObjectMap<List<PropertyCommand>> propertyCommandsByRelationshipIds()
{
return propertyCommandsByRelationshipIds;
}
}
Expand Up @@ -42,14 +42,14 @@ public PropertyPhysicalToLogicalConverter( PropertyStore propertyStore )
}

/**
* Converts physical changes to PropertyRecords for a node into logical updates
* Converts physical changes to PropertyRecords for a entity into logical updates
*/
public void convertPropertyRecord( long nodeId, Iterable<PropertyRecordChange> changes,
public void convertPropertyRecord( long entityId, Iterable<PropertyRecordChange> changes,
EntityUpdates.Builder properties )
{
MutableIntObjectMap<PropertyBlock> beforeMap = new IntObjectHashMap<>();
MutableIntObjectMap<PropertyBlock> afterMap = new IntObjectHashMap<>();
mapBlocks( nodeId, changes, beforeMap, afterMap );
mapBlocks( entityId, changes, beforeMap, afterMap );

final IntIterator uniqueIntIterator = uniqueIntIterator( beforeMap, afterMap );
while ( uniqueIntIterator.hasNext() )
Expand Down Expand Up @@ -95,21 +95,21 @@ private IntIterator uniqueIntIterator( IntObjectMap<PropertyBlock> beforeMap, In
return keys.intIterator();
}

private void mapBlocks( long nodeId, Iterable<PropertyRecordChange> changes,
private void mapBlocks( long entityId, Iterable<PropertyRecordChange> changes,
MutableIntObjectMap<PropertyBlock> beforeMap, MutableIntObjectMap<PropertyBlock> afterMap )
{
for ( PropertyRecordChange change : changes )
{
equalCheck( change.getBefore().getNodeId(), nodeId );
equalCheck( change.getAfter().getNodeId(), nodeId );
equalCheck( change.getBefore().getEntityId(), entityId );
equalCheck( change.getAfter().getEntityId(), entityId );
mapBlocks( change.getBefore(), beforeMap );
mapBlocks( change.getAfter(), afterMap );
}
}

private void equalCheck( long nodeId, long expectedNodeId )
private void equalCheck( long entityId, long expectedEntityId )
{
assert nodeId == expectedNodeId : "Node id differs expected " + expectedNodeId + ", but was " + nodeId;
assert entityId == expectedEntityId : "Entity id differs expected " + expectedEntityId + ", but was " + entityId;
}

private void mapBlocks( PropertyRecord record, MutableIntObjectMap<PropertyBlock> blocks )
Expand Down
Expand Up @@ -196,11 +196,15 @@ public RecordStorageEngine(
labelScanStore = new NativeLabelScanStore( pageCache, storeDir, fs, new FullLabelStream( neoStoreIndexStoreView ),
readOnly, monitors, recoveryCleanupWorkCollector );

// We need to load the property tokens here, since we need them before we load the indexes.
propertyKeyTokenHolder.setInitialTokens(
neoStores.getPropertyKeyTokenStore().getTokens( Integer.MAX_VALUE ) );

indexStoreView = new DynamicIndexStoreView( neoStoreIndexStoreView, labelScanStore, lockService, neoStores, logProvider );
this.indexProviderMap = indexProviderMap;
indexingService = IndexingServiceFactory.createIndexingService( config, scheduler, this.indexProviderMap,
indexingService = IndexingServiceFactory.createIndexingService( config, scheduler, indexProviderMap,
indexStoreView, tokenNameLookup,
Iterators.asList( new SchemaStorage( neoStores.getSchemaStore() ).indexesGetAll() ), logProvider,
Iterators.asList( schemaStorage.indexesGetAll() ), logProvider,
indexingServiceMonitor, schemaState );

integrityValidator = new IntegrityValidator( neoStores, indexingService );
Expand Down Expand Up @@ -332,7 +336,7 @@ protected BatchTransactionApplierFacade applier( TransactionApplicationMode mode

// Schema index application
appliers.add( new IndexBatchTransactionApplier( indexingService, labelScanStoreSync, indexUpdatesSync,
neoStores.getNodeStore(),
neoStores.getNodeStore(), neoStores.getRelationshipStore(),
indexUpdatesConverter ) );

// Explicit index application
Expand Down
Expand Up @@ -140,6 +140,11 @@ public long getRelId()
return -1;
}

public long getEntityId()
{
return entityId;
}

/**
* Gets the sum of the sizes of the blocks in this record, in bytes.
*/
Expand Down
Expand Up @@ -30,15 +30,16 @@
import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException;
import org.neo4j.kernel.api.labelscan.LabelScanWriter;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.kernel.api.schema.index.StoreIndexDescriptor;
import org.neo4j.kernel.impl.api.BatchTransactionApplier;
import org.neo4j.kernel.impl.api.TransactionApplier;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.index.IndexingUpdateService;
import org.neo4j.kernel.impl.api.index.NodePropertyCommandsExtractor;
import org.neo4j.kernel.impl.api.index.PropertyCommandsExtractor;
import org.neo4j.kernel.impl.api.index.PropertyPhysicalToLogicalConverter;
import org.neo4j.kernel.impl.store.NodeLabels;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.api.schema.index.StoreIndexDescriptor;
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.transaction.command.Command.PropertyCommand;
import org.neo4j.kernel.impl.transaction.state.IndexUpdates;
Expand All @@ -64,17 +65,15 @@ public class IndexBatchTransactionApplier extends BatchTransactionApplier.Adapte
private List<NodeLabelUpdate> labelUpdates;
private IndexUpdates indexUpdates;

public IndexBatchTransactionApplier( IndexingService indexingService,
WorkSync<Supplier<LabelScanWriter>,LabelUpdateWork> labelScanStoreSync,
WorkSync<IndexingUpdateService,IndexUpdatesWork> indexUpdatesSync,
NodeStore nodeStore,
public IndexBatchTransactionApplier( IndexingService indexingService, WorkSync<Supplier<LabelScanWriter>,LabelUpdateWork> labelScanStoreSync,
WorkSync<IndexingUpdateService,IndexUpdatesWork> indexUpdatesSync, NodeStore nodeStore, RelationshipStore relationshipStore,
PropertyPhysicalToLogicalConverter indexUpdateConverter )
{
this.indexingService = indexingService;
this.labelScanStoreSync = labelScanStoreSync;
this.indexUpdatesSync = indexUpdatesSync;
this.indexUpdateConverter = indexUpdateConverter;
this.transactionApplier = new SingleTransactionApplier( nodeStore );
this.transactionApplier = new SingleTransactionApplier( nodeStore, relationshipStore );
}

@Override
Expand Down Expand Up @@ -133,23 +132,25 @@ public void close() throws Exception
private class SingleTransactionApplier extends TransactionApplier.Adapter
{
private final NodeStore nodeStore;
private final NodePropertyCommandsExtractor indexUpdatesExtractor = new NodePropertyCommandsExtractor();
private RelationshipStore relationshipStore;
private final PropertyCommandsExtractor indexUpdatesExtractor = new PropertyCommandsExtractor();
private List<StoreIndexDescriptor> createdIndexes;

SingleTransactionApplier( NodeStore nodeStore )
SingleTransactionApplier( NodeStore nodeStore, RelationshipStore relationshipStore )
{
this.nodeStore = nodeStore;
this.relationshipStore = relationshipStore;
}

@Override
public void close() throws Exception
{
if ( indexUpdatesExtractor.containsAnyNodeOrPropertyUpdate() )
if ( indexUpdatesExtractor.containsAnyEntityOrPropertyUpdate() )
{
// Queue the index updates. When index updates from all transactions in this batch have been accumulated
// we'll feed them to the index updates work sync at the end of the batch
indexUpdates().feed( indexUpdatesExtractor.propertyCommandsByNodeIds(),
indexUpdatesExtractor.nodeCommandsById() );
indexUpdates().feed( indexUpdatesExtractor.propertyCommandsByNodeIds(), indexUpdatesExtractor.propertyCommandsByRelationshipIds(),
indexUpdatesExtractor.nodeCommandsById(), indexUpdatesExtractor.relationshipCommandsById() );
indexUpdatesExtractor.close();
}

Expand All @@ -165,7 +166,7 @@ private IndexUpdates indexUpdates()
{
if ( indexUpdates == null )
{
indexUpdates = new OnlineIndexUpdates( nodeStore, indexingService, indexUpdateConverter );
indexUpdates = new OnlineIndexUpdates( nodeStore, relationshipStore, indexingService, indexUpdateConverter );
}
return indexUpdates;
}
Expand Down Expand Up @@ -198,6 +199,12 @@ public boolean visitNodeCommand( Command.NodeCommand command )
return indexUpdatesExtractor.visitNodeCommand( command );
}

@Override
public boolean visitRelationshipCommand( Command.RelationshipCommand command ) throws IOException
{
return indexUpdatesExtractor.visitRelationshipCommand( command );
}

@Override
public boolean visitPropertyCommand( PropertyCommand command )
{
Expand Down
Expand Up @@ -87,8 +87,9 @@ protected Iterator<IndexEntryUpdate<SchemaDescriptor>> createNestedIterator( Ind
}

@Override
public void feed( LongObjectMap<List<PropertyCommand>> propCommands,
LongObjectMap<NodeCommand> nodeCommands )
public void feed( LongObjectMap<List<PropertyCommand>> propCommandsByNodeId,
LongObjectMap<List<PropertyCommand>> propCommandsByRelationshipId, LongObjectMap<NodeCommand> nodeCommands,
LongObjectMap<Command.RelationshipCommand> relationshipCommandPrimitiveLongObjectMap )
{
throw new UnsupportedOperationException();
}
Expand Down
Expand Up @@ -26,6 +26,7 @@

import org.neo4j.internal.kernel.api.schema.SchemaDescriptor;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand;
import org.neo4j.kernel.impl.transaction.command.Command.PropertyCommand;

Expand All @@ -48,7 +49,8 @@ public Iterator<IndexEntryUpdate<SchemaDescriptor>> iterator()
}

@Override
public void feed( LongObjectMap<List<PropertyCommand>> propCommands, LongObjectMap<NodeCommand> nodeCommands )
public void feed( LongObjectMap<List<PropertyCommand>> propCommandsByNodeId, LongObjectMap<List<PropertyCommand>> propCommandsByRelationshipId,
LongObjectMap<NodeCommand> nodeCommands, LongObjectMap<Command.RelationshipCommand> relationshipCommandPrimitiveLongObjectMap )
{
throw new UnsupportedOperationException();
}
Expand Down
Expand Up @@ -25,6 +25,7 @@

import org.neo4j.internal.kernel.api.schema.SchemaDescriptor;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand;
import org.neo4j.kernel.impl.transaction.command.Command.PropertyCommand;

Expand All @@ -35,11 +36,13 @@ public interface IndexUpdates extends Iterable<IndexEntryUpdate<SchemaDescriptor
{
/**
* Feeds updates raw material in the form of node/property commands, to create updates from.
*
* @param propCommands {@link PropertyCommand} grouped by node id.
* @param propCommandsByNodeId {@link PropertyCommand} grouped by node id.
* @param propCommandsByRelationshipId
* @param nodeCommands {@link NodeCommand} by node id.
* @param relationshipCommandPrimitiveLongObjectMap
*/
void feed( LongObjectMap<List<PropertyCommand>> propCommands, LongObjectMap<NodeCommand> nodeCommands );
void feed( LongObjectMap<List<PropertyCommand>> propCommandsByNodeId, LongObjectMap<List<PropertyCommand>> propCommandsByRelationshipId,
LongObjectMap<NodeCommand> nodeCommands, LongObjectMap<Command.RelationshipCommand> relationshipCommandPrimitiveLongObjectMap );

boolean hasUpdates();
}

0 comments on commit c5d3a89

Please sign in to comment.