Skip to content

Commit

Permalink
Don't share state between parts
Browse files Browse the repository at this point in the history
  • Loading branch information
OliviaYtterbrink committed Mar 26, 2018
1 parent f88d6e8 commit f940254
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 33 deletions.
Expand Up @@ -22,7 +22,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.neo4j.gis.spatial.index.curves.SpaceFillingCurveConfiguration;
import org.neo4j.io.fs.FileSystemAbstraction;
Expand Down Expand Up @@ -78,14 +80,16 @@ public synchronized void drop() throws IOException
@Override
public void add( Collection<? extends IndexEntryUpdate<?>> updates ) throws IOException, IndexEntryConflictException
{
Map<CoordinateReferenceSystem,List<IndexEntryUpdate<?>>> batchMap = new HashMap<>();
for ( IndexEntryUpdate<?> update : updates )
{
PointValue point = (PointValue) update.values()[0];
select( point.getCoordinateReferenceSystem() ).batchUpdate( update );
List<IndexEntryUpdate<?>> batch = batchMap.computeIfAbsent( point.getCoordinateReferenceSystem(), k -> new ArrayList<>() );
batch.add( update );
}
for ( PartPopulator part : this )
for ( Map.Entry<CoordinateReferenceSystem,List<IndexEntryUpdate<?>>> entry : batchMap.entrySet() )
{
part.applyUpdateBatch();
select( entry.getKey() ).add( entry.getValue() );
}
}

Expand Down Expand Up @@ -183,7 +187,6 @@ static class PartPopulator extends NativeSchemaIndexPopulator<SpatialSchemaKey,
{
private final SpaceFillingCurveConfiguration configuration;
private final SpaceFillingCurveSettings settings;
List<IndexEntryUpdate<?>> updates = new ArrayList<>();

PartPopulator( PageCache pageCache, FileSystemAbstraction fs, SpatialIndexFiles.SpatialFileLayout fileLayout,
IndexProvider.Monitor monitor, SchemaIndexDescriptor descriptor, long indexId, IndexSamplingConfig samplingConfig,
Expand All @@ -194,18 +197,6 @@ static class PartPopulator extends NativeSchemaIndexPopulator<SpatialSchemaKey,
this.settings = fileLayout.settings;
}

void batchUpdate( IndexEntryUpdate<?> update )
{
updates.add( update );
}

void applyUpdateBatch() throws IOException, IndexEntryConflictException
{
List<IndexEntryUpdate<?>> batch = updates;
updates = new ArrayList<>();
add( batch );
}

@Override
IndexReader newReader()
{
Expand Down
Expand Up @@ -22,7 +22,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
Expand All @@ -39,6 +42,7 @@
import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.ValueGroup;

import static org.neo4j.kernel.impl.index.schema.fusion.FusionIndexBase.forAll;

Expand Down Expand Up @@ -73,13 +77,16 @@ public synchronized void drop() throws IOException
@Override
public void add( Collection<? extends IndexEntryUpdate<?>> updates ) throws IndexEntryConflictException, IOException
{
Map<ValueGroup,List<IndexEntryUpdate<?>>> batchMap = new HashMap<>();
for ( IndexEntryUpdate<?> update : updates )
{
select( update.values()[0].valueGroup() ).batchUpdate( update );
ValueGroup valueGroup = update.values()[0].valueGroup();
List<IndexEntryUpdate<?>> batch = batchMap.computeIfAbsent( valueGroup, k -> new ArrayList<>() );
batch.add( update );
}
for ( PartPopulator part : this )
for ( Map.Entry<ValueGroup,List<IndexEntryUpdate<?>>> entry : batchMap.entrySet() )
{
part.applyUpdateBatch();
select( entry.getKey() ).add( entry.getValue() );
}
}

Expand Down Expand Up @@ -175,26 +182,12 @@ IndexSample sampleResult()

static class PartPopulator<KEY extends NativeSchemaKey> extends NativeSchemaIndexPopulator<KEY, NativeSchemaValue>
{
List<IndexEntryUpdate<?>> updates = new ArrayList<>();

PartPopulator( PageCache pageCache, FileSystemAbstraction fs, TemporalIndexFiles.FileLayout<KEY> fileLayout,
IndexProvider.Monitor monitor, SchemaIndexDescriptor descriptor, long indexId, IndexSamplingConfig samplingConfig )
{
super( pageCache, fs, fileLayout.indexFile, fileLayout.layout, monitor, descriptor, indexId, samplingConfig );
}

void batchUpdate( IndexEntryUpdate<?> update )
{
updates.add( update );
}

void applyUpdateBatch() throws IOException, IndexEntryConflictException
{
List<IndexEntryUpdate<?>> batch = updates;
updates = new ArrayList<>();
add( batch );
}

@Override
IndexReader newReader()
{
Expand Down

0 comments on commit f940254

Please sign in to comment.