Skip to content

Commit

Permalink
Process all property updates from particular node as single update du…
Browse files Browse the repository at this point in the history
…ring index population

Group all updates from all properties on single node into container that will be processed
in atomic fashion - all of the updates will be processed before check of population queue for current node.
All updates need to be processed as single unit to avoid duplicates in index that can appear
when queue already contains update for current node, but for some other property.
  • Loading branch information
MishaDemianenko committed Mar 9, 2016
1 parent 2caeb94 commit 9a78361
Show file tree
Hide file tree
Showing 16 changed files with 523 additions and 318 deletions.
Expand Up @@ -20,7 +20,7 @@
package org.neo4j.kernel.api.index; package org.neo4j.kernel.api.index;


import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.Collection;


import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.impl.api.index.UpdateMode; import org.neo4j.kernel.impl.api.index.UpdateMode;
Expand Down Expand Up @@ -53,7 +53,7 @@ public interface IndexPopulator
* {@link NodePropertyUpdate#getNodeId()} method and property values will be retrieved using * {@link NodePropertyUpdate#getNodeId()} method and property values will be retrieved using
* {@link NodePropertyUpdate#getValueAfter()} method. * {@link NodePropertyUpdate#getValueAfter()} method.
*/ */
void add( List<NodePropertyUpdate> updates ) void add( Collection<NodePropertyUpdate> updates )
throws IndexEntryConflictException, IOException; throws IndexEntryConflictException, IOException;


/** /**
Expand All @@ -70,7 +70,7 @@ void add( List<NodePropertyUpdate> updates )
* Simultaneously as population progresses there might be incoming updates * Simultaneously as population progresses there might be incoming updates
* from committing transactions, which needs to be applied as well. This populator will only receive updates * from committing transactions, which needs to be applied as well. This populator will only receive updates
* for nodes that it already has seen. Updates coming in here must be applied idempotently as the same data * for nodes that it already has seen. Updates coming in here must be applied idempotently as the same data
* may have been {@link #add(List) added previously}. * may have been {@link #add(Collection) added previously}.
* Updates can come in two different {@link NodePropertyUpdate#getUpdateMode() modes}. * Updates can come in two different {@link NodePropertyUpdate#getUpdateMode() modes}.
* <ol> * <ol>
* <li>{@link UpdateMode#ADDED} means that there's an added property to a node already seen by this * <li>{@link UpdateMode#ADDED} means that there's an added property to a node already seen by this
Expand Down Expand Up @@ -130,7 +130,7 @@ public void drop() throws IOException
} }


@Override @Override
public void add( List<NodePropertyUpdate> updates ) throws IndexEntryConflictException, IOException public void add( Collection<NodePropertyUpdate> updates ) throws IndexEntryConflictException, IOException
{ {
} }


Expand Down
Expand Up @@ -21,6 +21,7 @@


import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
Expand Down Expand Up @@ -182,12 +183,12 @@ private void awaitCompletion()
* {@link IndexPopulation population}. Flushes all updates if {@link #BATCH_SIZE} is reached. * {@link IndexPopulation population}. Flushes all updates if {@link #BATCH_SIZE} is reached.
* *
* @param population the index population. * @param population the index population.
* @param update the update to add to the batch. * @param updates updates to add to the batch.
*/ */
private void batchUpdate( IndexPopulation population, NodePropertyUpdate update ) private void batchUpdate( IndexPopulation population, Collection<NodePropertyUpdate> updates )
{ {
List<NodePropertyUpdate> batch = batchedUpdates.computeIfAbsent( population, key -> newBatch() ); List<NodePropertyUpdate> batch = batchedUpdates.computeIfAbsent( population, key -> newBatch() );
batch.add( update ); batch.addAll( updates );
flushIfNeeded( population, batch ); flushIfNeeded( population, batch );
} }


Expand Down Expand Up @@ -342,9 +343,10 @@ private class BatchingIndexPopulation extends IndexPopulation
} }


@Override @Override
protected void addApplicable( NodePropertyUpdate update ) throws IOException, IndexEntryConflictException protected void addApplicable( Collection<NodePropertyUpdate> updates ) throws IOException,
IndexEntryConflictException
{ {
batchUpdate( this, update ); batchUpdate( this, updates );
} }
} }


Expand Down
Expand Up @@ -44,7 +44,7 @@ public interface IndexStoreView extends PropertyAccessor
*/ */
<FAILURE extends Exception> StoreScan<FAILURE> visitNodes( <FAILURE extends Exception> StoreScan<FAILURE> visitNodes(
IntPredicate labelIdFilter, IntPredicate propertyKeyIdFilter, IntPredicate labelIdFilter, IntPredicate propertyKeyIdFilter,
Visitor<NodePropertyUpdate, FAILURE> propertyUpdateVisitor, Visitor<NodePropertyUpdates, FAILURE> propertyUpdateVisitor,
Visitor<NodeLabelUpdate, FAILURE> labelUpdateVisitor ); Visitor<NodeLabelUpdate, FAILURE> labelUpdateVisitor );


/** /**
Expand Down Expand Up @@ -93,7 +93,7 @@ public Property getProperty( long nodeId, int propertyKeyId ) throws EntityNotFo


@Override @Override
public <FAILURE extends Exception> StoreScan<FAILURE> visitNodes( IntPredicate labelIdFilter, public <FAILURE extends Exception> StoreScan<FAILURE> visitNodes( IntPredicate labelIdFilter,
IntPredicate propertyKeyIdFilter, Visitor<NodePropertyUpdate,FAILURE> propertyUpdateVisitor, IntPredicate propertyKeyIdFilter, Visitor<NodePropertyUpdates,FAILURE> propertyUpdateVisitor,
Visitor<NodeLabelUpdate,FAILURE> labelUpdateVisitor ) Visitor<NodeLabelUpdate,FAILURE> labelUpdateVisitor )
{ {
return EMPTY_SCAN; return EMPTY_SCAN;
Expand Down
Expand Up @@ -20,7 +20,8 @@
package org.neo4j.kernel.impl.api.index; package org.neo4j.kernel.impl.api.index;


import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
Expand Down Expand Up @@ -143,7 +144,7 @@ public void drop() throws IOException
} }


@Override @Override
public void add( List<NodePropertyUpdate> updates ) public void add( Collection<NodePropertyUpdate> updates )
{ {
throw new UnsupportedOperationException( "Can't populate directly using this populator implementation. " ); throw new UnsupportedOperationException( "Can't populate directly using this populator implementation. " );
} }
Expand Down Expand Up @@ -437,6 +438,7 @@ protected class IndexPopulation
final FlippableIndexProxy flipper; final FlippableIndexProxy flipper;
final FailedIndexProxyFactory failedIndexProxyFactory; final FailedIndexProxyFactory failedIndexProxyFactory;
final String indexUserDescription; final String indexUserDescription;
private Collection<NodePropertyUpdate> applicableUpdates = new ArrayList<>();


IndexPopulation( IndexPopulation(
IndexPopulator populator, IndexPopulator populator,
Expand All @@ -463,20 +465,28 @@ private void flipToFailed( Throwable t )
populator, failure( t ), indexCountsRemover, logProvider ) ); populator, failure( t ), indexCountsRemover, logProvider ) );
} }


private void add( NodePropertyUpdate update ) private void addAll( Collection<NodePropertyUpdate> updates )
throws IndexEntryConflictException, IOException throws IndexEntryConflictException, IOException
{ {
if ( isApplicable( update ) ) for ( NodePropertyUpdate update : updates )
{ {
populator.includeSample( update ); if ( isApplicable( update ) )
addApplicable( update ); {
populator.includeSample( update );
applicableUpdates.add( update );
}
}
if ( !applicableUpdates.isEmpty() )
{
addApplicable( applicableUpdates );
applicableUpdates.clear();
} }
} }


protected void addApplicable( NodePropertyUpdate update ) void addApplicable( Collection<NodePropertyUpdate> updates )
throws IOException, IndexEntryConflictException throws IOException, IndexEntryConflictException
{ {
populator.add( Collections.singletonList( update ) ); populator.add( updates );
} }


private boolean isApplicable( NodePropertyUpdate update ) private boolean isApplicable( NodePropertyUpdate update )
Expand All @@ -500,19 +510,20 @@ private void flip() throws FlipFailedKernelException
} }
} }


private class NodePopulationVisitor implements Visitor<NodePropertyUpdate,IndexPopulationFailedKernelException> private class NodePopulationVisitor implements Visitor<NodePropertyUpdates,
IndexPopulationFailedKernelException>
{ {
@Override @Override
public boolean visit( NodePropertyUpdate update ) throws IndexPopulationFailedKernelException public boolean visit( NodePropertyUpdates updates ) throws IndexPopulationFailedKernelException
{ {
add( update ); add( updates );
populateFromQueue( update.getNodeId() ); populateFromQueue( updates.getNodeId() );
return false; return false;
} }


private void add( NodePropertyUpdate update ) private void add( NodePropertyUpdates updates )
{ {
forEachPopulation( population -> population.add( update ) ); forEachPopulation( population -> population.addAll( updates.getPropertyUpdates() ) );
} }
} }
} }
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api.index;

import java.util.ArrayList;
import java.util.Collection;

import org.neo4j.kernel.api.index.NodePropertyUpdate;

import static org.neo4j.kernel.impl.store.record.AbstractBaseRecord.NO_ID;

/**
* Container for all properties updates for one node identified by {@linkplain #nodeId}
* Used to group node properties updates and and pass in processing chain, so all updates can be processed together.
*/
public class NodePropertyUpdates
{
private Collection<NodePropertyUpdate> propertyUpdates = new ArrayList<>();
private long nodeId;

public NodePropertyUpdates()
{
}

public void reset()
{
propertyUpdates.clear();
nodeId = NO_ID;
}

public void initForNodeId( long nodeId )
{
if ( containsUpdates() )
{
throw new AssertionError( "Please clear updates fom previous node." );
}
this.nodeId = nodeId;
}

public long getNodeId()
{
return nodeId;
}

public void add( NodePropertyUpdate update )
{
propertyUpdates.add( update );
}

public void add( int propertyKeyId, Object value, long[] labels )
{
if ( nodeId == NO_ID )
{
throw new AssertionError( "Please init property updates container for specific node." );
}
propertyUpdates.add( NodePropertyUpdate.add( nodeId, propertyKeyId, value, labels ) );
}

public boolean containsUpdates()
{
return !propertyUpdates.isEmpty();
}

public Collection<NodePropertyUpdate> getPropertyUpdates()
{
return propertyUpdates;
}

public void addAll( Collection<NodePropertyUpdate> propertyUpdates )
{
this.propertyUpdates.addAll( propertyUpdates );
}
}

0 comments on commit 9a78361

Please sign in to comment.