Skip to content

Commit

Permalink
Changed to do sampling at SpatialFusion level
Browse files Browse the repository at this point in the history
this cleaned up the state handling in SpatialCRSSchemaIndex
  • Loading branch information
OliviaYtterbrink authored and fickludd committed Mar 1, 2018
1 parent 131cc0d commit 958a4a0
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 180 deletions.
Expand Up @@ -32,9 +32,10 @@ public class CombiningIterable<T> implements Iterable<T>
{
private Iterable<Iterable<T>> iterables;

public CombiningIterable( Iterable<Iterable<T>> iterables )
@SuppressWarnings( "unchecked" )
public <INNER extends Iterable<T>> CombiningIterable( Iterable<INNER> iterables )
{
this.iterables = iterables;
this.iterables = (Iterable) iterables;
}

@Override
Expand Down
Expand Up @@ -26,11 +26,11 @@
/**
* Utilities for implementing {@link IndexSampler sampling}.
*/
class SamplingUtil
public class SamplingUtil
{
private static final String DELIMITER = "\u001F";

static String encodedStringValuesForSampling( Object... values )
public static String encodedStringValuesForSampling( Object... values )
{
return StringUtils.join( values, DELIMITER );
}
Expand Down
Expand Up @@ -48,13 +48,10 @@
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.sampling.DefaultNonUniqueIndexSampler;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.api.index.sampling.UniqueIndexSampler;
import org.neo4j.kernel.impl.index.schema.NativeSchemaIndexPopulator.IndexUpdateApply;
import org.neo4j.kernel.impl.index.schema.NativeSchemaIndexPopulator.IndexUpdateWork;
import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.values.storable.CoordinateReferenceSystem;

import static org.neo4j.helpers.collection.Iterators.asResourceIterator;
Expand All @@ -70,10 +67,8 @@
*/
public class SpatialCRSSchemaIndex
{
private UniqueIndexSampler uniqueSampler;
private final File indexFile;
private final PageCache pageCache;
private final IndexDescriptor descriptor;
private final CoordinateReferenceSystem crs;
private final FileSystemAbstraction fs;
private final RecoveryCleanupWorkCollector recoveryCleanupWorkCollector;
Expand All @@ -89,7 +84,6 @@ public class SpatialCRSSchemaIndex
private Writer<SpatialSchemaKey,NativeSchemaValue> singleTreeWriter;
private NativeSchemaIndex<SpatialSchemaKey,NativeSchemaValue> schemaIndex;
private WorkSync<IndexUpdateApply<SpatialSchemaKey,NativeSchemaValue>,IndexUpdateWork<SpatialSchemaKey,NativeSchemaValue>> workSync;
private DefaultNonUniqueIndexSampler generalSampler;

public SpatialCRSSchemaIndex( IndexDescriptor descriptor,
IndexDirectoryStructure directoryStructure,
Expand All @@ -100,7 +94,6 @@ public SpatialCRSSchemaIndex( IndexDescriptor descriptor,
SchemaIndexProvider.Monitor monitor,
RecoveryCleanupWorkCollector recoveryCleanupWorkCollector )
{
this.descriptor = descriptor;
this.crs = crs;
this.pageCache = pageCache;
this.fs = fs;
Expand All @@ -124,32 +117,19 @@ else if ( crs.getDimension() == 3 )
{
throw new IllegalArgumentException( "Cannot create spatial index with other than 2D or 3D coordinate reference system: " + crs );
}
state = State.NONE;
state = State.INIT;

layout = layout( descriptor );
treeKey = layout.newKey();
treeValue = layout.newValue();
schemaIndex = new NativeSchemaIndex<>( pageCache, fs, indexFile, layout, monitor, descriptor, indexId );

}

/**
* Makes sure that the index is initialized
*/
public void init( IndexSamplingConfig samplingConfig )
{
if ( state == State.NONE )
{
initialize( samplingConfig );
}
}

/**
* Makes sure that the index is ready to populate
*/
public void startPopulation( IndexSamplingConfig samplingConfig ) throws IOException
public void startPopulation() throws IOException
{
init( samplingConfig );
if ( state == State.INIT )
{
// First add to sub-index, make sure to create
Expand All @@ -164,9 +144,8 @@ public void startPopulation( IndexSamplingConfig samplingConfig ) throws IOExcep
/**
* Makes sure that the index is online
*/
public void takeOnline( IndexSamplingConfig samplingConfig ) throws IOException
public void takeOnline() throws IOException
{
init( samplingConfig );
if ( !indexExists() )
{
throw new IOException( "Index file does not exist." );
Expand All @@ -181,24 +160,22 @@ public void takeOnline( IndexSamplingConfig samplingConfig ) throws IOException
}
}

public IndexUpdater updaterWithCreate( IndexSamplingConfig samplingConfig, boolean populating ) throws IOException
public IndexUpdater updaterWithCreate( boolean populating ) throws IOException
{
if ( populating )
{
if ( state == State.NONE )
if ( state == State.INIT )
{
// sub-index didn't exist, create in populating mode
initialize( samplingConfig );
create();
}
return newPopulatingUpdater();
}
else
{
if ( state == State.NONE )
if ( state == State.INIT )
{
// sub-index didn't exist, create and make it online
initialize( samplingConfig );
create();
finishPopulation( true );
online();
Expand Down Expand Up @@ -291,62 +268,6 @@ public void add( Collection<IndexEntryUpdate<?>> updates ) throws IOException
applyWithWorkSync( updates );
}

public void includeSample( IndexEntryUpdate<?> update )
{
if ( uniqueSampler != null )
{
uniqueSampler.increment( 1 );
}
else if ( generalSampler != null )
{
generalSampler.include( SamplingUtil.encodedStringValuesForSampling( (Object[]) update.values() ) );
}
else
{
throw new UnsupportedOperationException();
}
}

public IndexSample sampleResult()
{
if ( uniqueSampler != null )
{
return uniqueSampler.result();
}
else if ( generalSampler != null )
{
// Close the writer before scanning
try
{
closeWriter();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}

try
{
return generalSampler.result();
}
finally
{
try
{
instantiateWriter();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}
}
else
{
throw new UnsupportedOperationException();
}
}

private IndexUpdater newPopulatingUpdater()
{
return new IndexUpdater()
Expand Down Expand Up @@ -391,7 +312,7 @@ public synchronized void drop() throws IOException
finally
{
dropped = true;
state = State.NONE;
state = State.INIT;
}
}

Expand Down Expand Up @@ -440,20 +361,6 @@ private synchronized void create() throws IOException
state = State.POPULATING;
}

private void initialize( IndexSamplingConfig samplingConfig )
{
assert state == State.NONE;
if ( isUnique( descriptor ) )
{
uniqueSampler = new UniqueIndexSampler();
}
else
{
generalSampler = new DefaultNonUniqueIndexSampler( samplingConfig.sampleSizeLimit() );
}
state = State.INIT;
}

private void online() throws IOException
{
assert state == State.POPULATED || state == State.INIT;
Expand Down Expand Up @@ -544,7 +451,6 @@ private boolean isUnique( IndexDescriptor descriptor )

private enum State
{
NONE,
INIT,
POPULATING,
POPULATED,
Expand Down
Expand Up @@ -63,7 +63,7 @@ class SpatialFusionIndexAccessor implements IndexAccessor
this.indexFactory = indexFactory;
for ( SpatialCRSSchemaIndex index : indexMap.values() )
{
index.takeOnline( samplingConfig );
index.takeOnline();
}
}

Expand All @@ -77,7 +77,7 @@ public void drop() throws IOException
@Override
public IndexUpdater newUpdater( IndexUpdateMode mode )
{
return SpatialFusionIndexUpdater.updaterForAccessor( indexMap, indexId, indexFactory, descriptor, samplingConfig );
return SpatialFusionIndexUpdater.updaterForAccessor( indexMap, indexId, indexFactory, descriptor );
}

@Override
Expand Down Expand Up @@ -142,7 +142,7 @@ public void close() throws Exception
@Override
public Iterator<Long> iterator()
{
return new CombiningIterable( allEntriesReader ).iterator();
return new CombiningIterable<>( allEntriesReader ).iterator();
}
};
}
Expand Down
Expand Up @@ -31,32 +31,34 @@
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.sampling.DefaultNonUniqueIndexSampler;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.api.index.sampling.UniqueIndexSampler;
import org.neo4j.kernel.impl.index.schema.SamplingUtil;
import org.neo4j.kernel.impl.index.schema.SpatialCRSSchemaIndex;
import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.values.storable.CoordinateReferenceSystem;
import org.neo4j.values.storable.PointValue;
import org.neo4j.values.storable.Value;

import static org.neo4j.kernel.impl.index.schema.fusion.FusionIndexUtils.forAll;
import static org.neo4j.kernel.impl.index.schema.fusion.FusionSchemaIndexProvider.combineSamples;

class SpatialFusionIndexPopulator implements IndexPopulator
{
private final long indexId;
private final IndexDescriptor descriptor;
private final IndexSamplingConfig samplingConfig;
private final SpatialCRSSchemaIndex.Supplier indexSupplier;
private final Map<CoordinateReferenceSystem,SpatialCRSSchemaIndex> indexMap;
private final IndexSamplerWrapper sampler;

SpatialFusionIndexPopulator( Map<CoordinateReferenceSystem,SpatialCRSSchemaIndex> indexMap, long indexId, IndexDescriptor descriptor,
IndexSamplingConfig samplingConfig, SpatialCRSSchemaIndex.Supplier indexSupplier )
{
this.indexMap = indexMap;
this.indexId = indexId;
this.descriptor = descriptor;
this.samplingConfig = samplingConfig;
this.indexSupplier = indexSupplier;
this.sampler = new IndexSamplerWrapper( descriptor, samplingConfig );
}

@Override
Expand Down Expand Up @@ -88,7 +90,7 @@ public void add( Collection<? extends IndexEntryUpdate<?>> updates ) throws Inde
for ( CoordinateReferenceSystem crs : batchMap.keySet() )
{
SpatialCRSSchemaIndex index = indexSupplier.get( descriptor, indexMap, indexId, crs );
index.startPopulation( samplingConfig );
index.startPopulation();
index.add( batchMap.get( crs ) );
}
}
Expand All @@ -110,7 +112,7 @@ public void verifyDeferredConstraints( PropertyAccessor propertyAccessor )
@Override
public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor )
{
return SpatialFusionIndexUpdater.updaterForPopulator( indexMap, indexId, indexSupplier, descriptor, samplingConfig );
return SpatialFusionIndexUpdater.updaterForPopulator( indexMap, indexId, indexSupplier, descriptor );
}

@Override
Expand All @@ -128,17 +130,59 @@ public void markAsFailed( String failure )
@Override
public void includeSample( IndexEntryUpdate<?> update )
{
Value[] values = update.values();
assert values.length == 1;
CoordinateReferenceSystem crs = ((PointValue) values[0]).getCoordinateReferenceSystem();
SpatialCRSSchemaIndex index = indexSupplier.get( descriptor, indexMap, indexId, crs );
index.init( samplingConfig );
index.includeSample( update );
sampler.includeSample( update.values() );
}

@Override
public IndexSample sampleResult()
{
return combineSamples( indexMap.values().stream().map( SpatialCRSSchemaIndex::sampleResult ).toArray( IndexSample[]::new ) );
return sampler.sampleResult();
}

private static class IndexSamplerWrapper
{
private final DefaultNonUniqueIndexSampler generalSampler;
private final UniqueIndexSampler uniqueSampler;

IndexSamplerWrapper( IndexDescriptor descriptor, IndexSamplingConfig samplingConfig )
{
switch ( descriptor.type() )
{
case GENERAL:
generalSampler = new DefaultNonUniqueIndexSampler( samplingConfig.sampleSizeLimit() );
uniqueSampler = null;
break;
case UNIQUE:
generalSampler = null;
uniqueSampler = new UniqueIndexSampler();
break;
default:
throw new UnsupportedOperationException( "Unexpected index type " + descriptor.type() );
}
}

void includeSample( Value[] values )
{
if ( uniqueSampler != null )
{
uniqueSampler.increment( 1 );
}
else
{
generalSampler.include( SamplingUtil.encodedStringValuesForSampling( (Object[]) values ) );
}
}

IndexSample sampleResult()
{
if ( uniqueSampler != null )
{
return uniqueSampler.result();
}
else
{
return generalSampler.result();
}
}
}
}

0 comments on commit 958a4a0

Please sign in to comment.