Skip to content

Commit

Permalink
Combined unique/non-unique native index populator into one
Browse files Browse the repository at this point in the history
so that it matches index accessor design and more easily
can provide reader for the added deferred constraint checking
  • Loading branch information
tinwelint committed Mar 8, 2018
1 parent 37099b1 commit 1b07e82
Show file tree
Hide file tree
Showing 15 changed files with 174 additions and 164 deletions.
Expand Up @@ -42,9 +42,16 @@
import org.neo4j.kernel.api.index.PropertyAccessor; import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.sampling.DefaultNonUniqueIndexSampler;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.api.index.sampling.NonUniqueIndexSampler;
import org.neo4j.kernel.impl.api.index.sampling.UniqueIndexSampler;
import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.IndexSample;


import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER; import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
import static org.neo4j.kernel.api.schema.index.IndexDescriptor.Type.GENERAL; import static org.neo4j.kernel.api.schema.index.IndexDescriptor.Type.GENERAL;
import static org.neo4j.kernel.api.schema.index.IndexDescriptor.Type.UNIQUE;


/** /**
* {@link IndexPopulator} backed by a {@link GBPTree}. * {@link IndexPopulator} backed by a {@link GBPTree}.
Expand All @@ -61,18 +68,36 @@ abstract class NativeSchemaIndexPopulator<KEY extends NativeSchemaKey, VALUE ext


private final KEY treeKey; private final KEY treeKey;
private final VALUE treeValue; private final VALUE treeValue;
private final UniqueIndexSampler uniqueSampler;
private final NonUniqueIndexSampler nonUniqueSampler;
final IndexSamplingConfig samplingConfig;

private WorkSync<IndexUpdateApply<KEY,VALUE>,IndexUpdateWork<KEY,VALUE>> additionsWorkSync; private WorkSync<IndexUpdateApply<KEY,VALUE>,IndexUpdateWork<KEY,VALUE>> additionsWorkSync;
private WorkSync<IndexUpdateApply<KEY,VALUE>,IndexUpdateWork<KEY,VALUE>> updatesWorkSync; private WorkSync<IndexUpdateApply<KEY,VALUE>,IndexUpdateWork<KEY,VALUE>> updatesWorkSync;


private byte[] failureBytes; private byte[] failureBytes;
private boolean dropped; private boolean dropped;


NativeSchemaIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File storeFile, Layout<KEY,VALUE> layout, NativeSchemaIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File storeFile, Layout<KEY,VALUE> layout,
SchemaIndexProvider.Monitor monitor, IndexDescriptor descriptor, long indexId ) SchemaIndexProvider.Monitor monitor, IndexDescriptor descriptor, long indexId, IndexSamplingConfig samplingConfig )
{ {
super( pageCache, fs, storeFile, layout, monitor, descriptor, indexId ); super( pageCache, fs, storeFile, layout, monitor, descriptor, indexId );
this.treeKey = layout.newKey(); this.treeKey = layout.newKey();
this.treeValue = layout.newValue(); this.treeValue = layout.newValue();
this.samplingConfig = samplingConfig;
switch ( descriptor.type() )
{
case GENERAL:
uniqueSampler = null;
nonUniqueSampler = new DefaultNonUniqueIndexSampler( samplingConfig.sampleSizeLimit() );
break;
case UNIQUE:
uniqueSampler = new UniqueIndexSampler();
nonUniqueSampler = null;
break;
default:
throw new IllegalArgumentException( "Unexpected index type " + descriptor.type() );
}
} }


public void clear() throws IOException public void clear() throws IOException
Expand All @@ -89,11 +114,11 @@ public synchronized void create() throws IOException
// true: tree uniqueness is (value,entityId) // true: tree uniqueness is (value,entityId)
// false: tree uniqueness is (value) <-- i.e. more strict // false: tree uniqueness is (value) <-- i.e. more strict
boolean compareIds = descriptor.type() == GENERAL; boolean compareIds = descriptor.type() == GENERAL;
additionsWorkSync = new WorkSync<>( new IndexUpdateApply( tree, treeKey, treeValue, new ConflictDetectingValueMerger<>( compareIds ) ) ); additionsWorkSync = new WorkSync<>( new IndexUpdateApply<>( tree, treeKey, treeValue, new ConflictDetectingValueMerger<>( compareIds ) ) );


// for updates we have to have uniqueness on (value,entityId) to allow for intermediary violating updates. // for updates we have to have uniqueness on (value,entityId) to allow for intermediary violating updates.
// there are added conflict checks after updates have been applied. // there are added conflict checks after updates have been applied.
updatesWorkSync = new WorkSync<>( new IndexUpdateApply( tree, treeKey, treeValue, new ConflictDetectingValueMerger<>( true ) ) ); updatesWorkSync = new WorkSync<>( new IndexUpdateApply<>( tree, treeKey, treeValue, new ConflictDetectingValueMerger<>( true ) ) );
} }


@Override @Override
Expand Down Expand Up @@ -125,7 +150,7 @@ public void verifyDeferredConstraints( PropertyAccessor propertyAccessor )
@Override @Override
public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor ) public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor )
{ {
return new IndexUpdater() IndexUpdater updater = new IndexUpdater()
{ {
private boolean closed; private boolean closed;
private final Collection<IndexEntryUpdate<?>> updates = new ArrayList<>(); private final Collection<IndexEntryUpdate<?>> updates = new ArrayList<>();
Expand All @@ -152,8 +177,18 @@ private void assertOpen()
} }
} }
}; };

if ( descriptor.type() == UNIQUE )
{
// The index population detects conflicts on the fly, however for updates coming in we're in a position
// where we cannot detect conflicts while applying, but instead afterwards.
updater = new DeferredConflictCheckingIndexUpdater( updater, this::newReader, descriptor );
}
return updater;
} }


abstract IndexReader newReader();

@Override @Override
public synchronized void close( boolean populationCompletedSuccessfully ) throws IOException public synchronized void close( boolean populationCompletedSuccessfully ) throws IOException
{ {
Expand Down Expand Up @@ -187,7 +222,7 @@ private void applyWithWorkSync( WorkSync<IndexUpdateApply<KEY,VALUE>,IndexUpdate
{ {
try try
{ {
workSync.apply( new IndexUpdateWork( updates ) ); workSync.apply( new IndexUpdateWork<>( updates ) );
} }
catch ( ExecutionException e ) catch ( ExecutionException e )
{ {
Expand Down Expand Up @@ -286,17 +321,47 @@ static class IndexUpdateWork<KEY extends NativeSchemaKey, VALUE extends NativeSc
} }


@Override @Override
public IndexUpdateWork combine( IndexUpdateWork work ) public IndexUpdateWork<KEY,VALUE> combine( IndexUpdateWork work )
{ {
ArrayList<IndexEntryUpdate<?>> combined = new ArrayList<>( updates ); ArrayList<IndexEntryUpdate<?>> combined = new ArrayList<>( updates );
combined.addAll( work.updates ); combined.addAll( work.updates );
return new IndexUpdateWork( combined ); return new IndexUpdateWork<>( combined );
} }


@Override @Override
public void apply( IndexUpdateApply indexUpdateApply ) throws Exception public void apply( IndexUpdateApply<KEY,VALUE> indexUpdateApply ) throws Exception
{ {
indexUpdateApply.process( updates ); indexUpdateApply.process( updates );
} }
} }

@Override
public void includeSample( IndexEntryUpdate<?> update )
{
switch ( descriptor.type() )
{
case GENERAL:
nonUniqueSampler.include( SamplingUtil.encodedStringValuesForSampling( (Object[]) update.values() ) );
break;
case UNIQUE:
uniqueSampler.increment( 1 );
break;
default:
throw new IllegalArgumentException( "Unexpected index type " + descriptor.type() );
}
}

@Override
public IndexSample sampleResult()
{
switch ( descriptor.type() )
{
case GENERAL:
return nonUniqueSampler.result();
case UNIQUE:
return uniqueSampler.result();
default:
throw new IllegalArgumentException( "Unexpected index type " + descriptor.type() );
}
}
} }
Expand Up @@ -19,15 +19,6 @@
*/ */
package org.neo4j.kernel.impl.index.schema; package org.neo4j.kernel.impl.index.schema;


import org.neo4j.kernel.api.index.IndexDirectoryStructure.Factory;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.storemigration.StoreMigrationParticipant;

import static org.neo4j.kernel.impl.index.schema.NativeSchemaIndexPopulator.BYTE_FAILED;
import static org.neo4j.kernel.impl.index.schema.NativeSchemaIndexPopulator.BYTE_ONLINE;
import static org.neo4j.kernel.impl.index.schema.NativeSchemaIndexPopulator.BYTE_POPULATING;

import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;


Expand All @@ -38,8 +29,12 @@
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.index.IndexAccessor; import org.neo4j.kernel.api.index.IndexAccessor;
import org.neo4j.kernel.api.index.IndexDirectoryStructure.Factory;
import org.neo4j.kernel.api.index.IndexPopulator; import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.storemigration.StoreMigrationParticipant;


/** /**
* Base class for native indexes on top of {@link GBPTree}. * Base class for native indexes on top of {@link GBPTree}.
Expand Down Expand Up @@ -75,19 +70,13 @@ public IndexPopulator getPopulator( long indexId, IndexDescriptor descriptor, In
} }


File storeFile = nativeIndexFileFromIndexId( indexId ); File storeFile = nativeIndexFileFromIndexId( indexId );
switch ( descriptor.type() ) Layout<KEY,VALUE> layout = layout( descriptor );
{ return newIndexPopulator( storeFile, layout, descriptor, indexId, samplingConfig );
case GENERAL:
return new NativeNonUniqueSchemaIndexPopulator<>( pageCache, fs, storeFile, layoutNonUnique(), samplingConfig,
monitor, descriptor, indexId );
case UNIQUE:
return new NativeUniqueSchemaIndexPopulator<>( pageCache, fs, storeFile, layoutUnique(), samplingConfig, monitor, descriptor,
indexId );
default:
throw new UnsupportedOperationException( "Can not create index populator of type " + descriptor.type() );
}
} }


protected abstract IndexPopulator newIndexPopulator( File storeFile, Layout<KEY, VALUE> layout, IndexDescriptor descriptor, long indexId,
IndexSamplingConfig samplingConfig );

@Override @Override
public IndexAccessor getOnlineAccessor( public IndexAccessor getOnlineAccessor(
long indexId, IndexDescriptor descriptor, IndexSamplingConfig samplingConfig ) throws IOException long indexId, IndexDescriptor descriptor, IndexSamplingConfig samplingConfig ) throws IOException
Expand Down

This file was deleted.

Expand Up @@ -24,38 +24,22 @@
import org.neo4j.index.internal.gbptree.Layout; import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.sampling.DefaultNonUniqueIndexSampler;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig; import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.api.index.sampling.NonUniqueIndexSampler; import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.storageengine.api.schema.IndexSample;


/** class NumberSchemaIndexPopulator extends NativeSchemaIndexPopulator<NumberSchemaKey,NativeSchemaValue>
* {@link NativeSchemaIndexPopulator} which can accept duplicate values (for different entity ids).
*/
class NativeNonUniqueSchemaIndexPopulator<KEY extends NativeSchemaKey, VALUE extends NativeSchemaValue>
extends NativeSchemaIndexPopulator<KEY,VALUE>
{ {
private NonUniqueIndexSampler sampler; NumberSchemaIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File storeFile, Layout<NumberSchemaKey,NativeSchemaValue> layout,

SchemaIndexProvider.Monitor monitor, IndexDescriptor descriptor, long indexId, IndexSamplingConfig samplingConfig )
NativeNonUniqueSchemaIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File storeFile, Layout<KEY,VALUE> layout,
IndexSamplingConfig samplingConfig, SchemaIndexProvider.Monitor monitor, IndexDescriptor descriptor, long indexId )
{
super( pageCache, fs, storeFile, layout, monitor, descriptor, indexId );
this.sampler = new DefaultNonUniqueIndexSampler( samplingConfig.sampleSizeLimit() );
}

@Override
public void includeSample( IndexEntryUpdate<?> update )
{ {
sampler.include( SamplingUtil.encodedStringValuesForSampling( (Object[]) update.values() ) ); super( pageCache, fs, storeFile, layout, monitor, descriptor, indexId, samplingConfig );
} }


@Override @Override
public IndexSample sampleResult() IndexReader newReader()
{ {
return sampler.result(); return new NumberSchemaIndexReader<>( tree, layout, samplingConfig, descriptor );
} }
} }
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.index.IndexAccessor; import org.neo4j.kernel.api.index.IndexAccessor;
import org.neo4j.kernel.api.index.IndexDirectoryStructure; import org.neo4j.kernel.api.index.IndexDirectoryStructure;
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig; import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.values.storable.ValueGroup; import org.neo4j.values.storable.ValueGroup;
Expand Down Expand Up @@ -64,6 +65,13 @@ protected NumberLayoutNonUnique layoutNonUnique()
return new NumberLayoutNonUnique(); return new NumberLayoutNonUnique();
} }


@Override
protected IndexPopulator newIndexPopulator( File storeFile, Layout<NumberSchemaKey,NativeSchemaValue> layout, IndexDescriptor descriptor, long indexId,
IndexSamplingConfig samplingConfig )
{
return new NumberSchemaIndexPopulator( pageCache, fs, storeFile, layout, monitor, descriptor, indexId, samplingConfig );
}

@Override @Override
protected IndexAccessor newIndexAccessor( File storeFile, Layout<NumberSchemaKey,NativeSchemaValue> layout, IndexDescriptor descriptor, protected IndexAccessor newIndexAccessor( File storeFile, Layout<NumberSchemaKey,NativeSchemaValue> layout, IndexDescriptor descriptor,
long indexId, IndexSamplingConfig samplingConfig ) throws IOException long indexId, IndexSamplingConfig samplingConfig ) throws IOException
Expand Down
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.File;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.storageengine.api.schema.IndexReader;

public class StringSchemaIndexPopulator extends NativeSchemaIndexPopulator<StringSchemaKey,NativeSchemaValue>
{
StringSchemaIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File storeFile, Layout<StringSchemaKey,NativeSchemaValue> layout,
SchemaIndexProvider.Monitor monitor, IndexDescriptor descriptor, long indexId, IndexSamplingConfig samplingConfig )
{
super( pageCache, fs, storeFile, layout, monitor, descriptor, indexId, samplingConfig );
}

@Override
IndexReader newReader()
{
return new StringSchemaIndexReader( tree, layout, samplingConfig, descriptor );
}
}

0 comments on commit 1b07e82

Please sign in to comment.