Skip to content

Commit

Permalink
Generify Lucene index
Browse files Browse the repository at this point in the history
  • Loading branch information
ragadeeshu authored and chrisvest committed Jul 19, 2018
1 parent cebf756 commit 1301876
Show file tree
Hide file tree
Showing 27 changed files with 736 additions and 386 deletions.
Expand Up @@ -23,6 +23,7 @@
import java.util.List;

import org.neo4j.helpers.collection.Iterators;
import org.neo4j.internal.kernel.api.schema.SchemaDescriptor;
import org.neo4j.internal.kernel.api.schema.SchemaUtil;
import org.neo4j.values.storable.ValueCategory;

Expand All @@ -49,6 +50,11 @@ public interface IndexReference extends IndexCapability
*/
int[] properties();

/**
* Returns the schema of this index.
*/
SchemaDescriptor schema();

/**
* Returns the key (or name) of the index provider that backs this index.
*/
Expand Down Expand Up @@ -117,6 +123,12 @@ public int[] properties()
return new int[0];
}

@Override
public SchemaDescriptor schema()
{
return SchemaDescriptor.NO_SCHEMA;
}

@Override
public String providerKey()
{
Expand Down
Expand Up @@ -71,6 +71,14 @@ public interface SchemaRead
*/
Iterator<IndexReference> indexesGetForLabel( int labelId );

/**
* Returns the index with the given name
*
* @param name The name of the index you are looking for
* @return The index associated with the given name
*/
IndexReference indexGetForName( String name );

/**
* Returns all indexes used in the database
*
Expand Down
Expand Up @@ -38,6 +38,75 @@
*/
public interface SchemaDescriptor extends SchemaDescriptorSupplier
{
SchemaDescriptor NO_SCHEMA = new SchemaDescriptor()
{
@Override
public boolean isAffected( long[] entityIds )
{
return false;
}

@Override
public <R> R computeWith( SchemaComputer<R> computer )
{
return null;
}

@Override
public void processWith( SchemaProcessor processor )
{

}

@Override
public String userDescription( TokenNameLookup tokenNameLookup )
{
return "NO_SCHEMA";
}

@Override
public int[] getPropertyIds()
{
return new int[0];
}

@Override
public int[] getEntityTokenIds()
{
return new int[0];
}

@Override
public int keyId()
{
return 0;
}

@Override
public ResourceType keyType()
{
return null;
}

@Override
public EntityType entityType()
{
return null;
}

@Override
public PropertySchemaType propertySchemaType()
{
return null;
}

@Override
public SchemaDescriptor schema()
{
return null;
}
};

int[] ANY_ENTITY_TOKEN = new int[0];

static boolean isAnyEntityTokenSchema( SchemaDescriptor schema )
Expand Down
Expand Up @@ -176,6 +176,11 @@ public Iterator<CapableIndexDescriptor> indexesByProperty( int propertyId )
return schemaCacheState.indexesByProperty( propertyId );
}

public CapableIndexDescriptor indexDescriptorForName( String name )
{
return schemaCacheState.indexDescriptorByName( name );
}

private static class SchemaCacheState
{
private final ConstraintSemantics constraintSemantics;
Expand All @@ -186,6 +191,7 @@ private static class SchemaCacheState

private final Map<SchemaDescriptor,CapableIndexDescriptor> indexDescriptors;
private final MutableIntObjectMap<Set<CapableIndexDescriptor>> indexDescriptorsByLabel;
private final Map<String,CapableIndexDescriptor> indexDescriptorsByName;

private final Map<Class<?>,Object> dependantState;
private final MutableIntObjectMap<List<CapableIndexDescriptor>> indexByProperty;
Expand All @@ -200,6 +206,7 @@ private static class SchemaCacheState

this.indexDescriptors = new HashMap<>();
this.indexDescriptorsByLabel = new IntObjectHashMap<>();
this.indexDescriptorsByName = new HashMap<>();
this.dependantState = new ConcurrentHashMap<>();
this.indexByProperty = new IntObjectHashMap<>();
load( rules );
Expand All @@ -214,6 +221,7 @@ private static class SchemaCacheState

this.indexDescriptors = new HashMap<>( schemaCacheState.indexDescriptors );
this.indexDescriptorsByLabel = IntObjectHashMap.newMap( schemaCacheState.indexDescriptorsByLabel );
this.indexDescriptorsByName = new HashMap<>( schemaCacheState.indexDescriptorsByName );
this.dependantState = new ConcurrentHashMap<>();
this.indexByProperty = IntObjectHashMap.newMap( schemaCacheState.indexByProperty );
this.indexProviderMap = schemaCacheState.indexProviderMap;
Expand Down Expand Up @@ -262,6 +270,11 @@ CapableIndexDescriptor indexDescriptor( SchemaDescriptor descriptor )
return indexDescriptors.get( descriptor );
}

CapableIndexDescriptor indexDescriptorByName( String name )
{
return indexDescriptorsByName.get( name );
}

Iterator<CapableIndexDescriptor> indexesByProperty( int propertyId )
{
List<CapableIndexDescriptor> indexes = indexByProperty.get( propertyId );
Expand Down Expand Up @@ -293,6 +306,14 @@ else if ( rule instanceof StoreIndexDescriptor )
indexDescriptorById.put( index.getId(), index );
SchemaDescriptor schemaDescriptor = index.schema();
indexDescriptors.put( schemaDescriptor, index );
indexDescriptorsByName.put( rule.getName(), index );
for ( int entityTokenId : schemaDescriptor.getEntityTokenIds() )
{
Set<CapableIndexDescriptor> forLabel =
indexDescriptorsByLabel.getIfAbsent( entityTokenId, HashSet::new );
forLabel.add( index );
}


Set<CapableIndexDescriptor> forLabel =
indexDescriptorsByLabel.getIfAbsentPut( schemaDescriptor.keyId(), HashSet::new );
Expand All @@ -319,12 +340,16 @@ else if ( indexDescriptorById.containsKey( id ) )
CapableIndexDescriptor index = indexDescriptorById.remove( id );
SchemaDescriptor schema = index.schema();
indexDescriptors.remove( schema );
indexDescriptorsByName.remove( index.getName() );

Set<CapableIndexDescriptor> forLabel = indexDescriptorsByLabel.get( schema.keyId() );
forLabel.remove( index );
if ( forLabel.isEmpty() )
for ( int entityTokenId : schema.getEntityTokenIds() )
{
indexDescriptorsByLabel.remove( schema.keyId() );
Set<CapableIndexDescriptor> forLabel = indexDescriptorsByLabel.get( entityTokenId );
forLabel.remove( index );
if ( forLabel.isEmpty() )
{
indexDescriptorsByLabel.remove( entityTokenId );
}
}

for ( int propertyId : index.schema().getPropertyIds() )
Expand Down
Expand Up @@ -410,6 +410,18 @@ public Iterator<IndexReference> indexesGetForLabel( int labelId )
return (Iterator)iterator;
}

@Override
public IndexReference indexGetForName( String name )
{
ktx.assertOpen();

// This is only used for querying, and thus is is "fine" (at least for now) that it is not tx-state aware

IndexDescriptor index = storageReader.indexGetForName( name );
//TODO locking
return index;
}

@Override
public Iterator<IndexReference> indexesGetAll()
{
Expand Down
Expand Up @@ -140,6 +140,12 @@ public Iterator<CapableIndexDescriptor> indexesGetForLabel( int labelId )
return schemaCache.indexDescriptorsForLabel( labelId );
}

@Override
public CapableIndexDescriptor indexGetForName( String name )
{
return schemaCache.indexDescriptorForName( name );
}

@Override
public Iterator<CapableIndexDescriptor> indexesGetAll()
{
Expand Down
Expand Up @@ -126,6 +126,12 @@ public interface StorageReader extends AutoCloseable
*/
Iterator<CapableIndexDescriptor> indexesGetForLabel( int labelId );

/**
* @param name name of index to find
* @return {@link IndexDescriptor} associated with the given {@code name}.
*/
CapableIndexDescriptor indexGetForName( String name );

/**
* @return all {@link CapableIndexDescriptor} in storage.
*/
Expand Down
Expand Up @@ -187,6 +187,12 @@ public Iterator<CapableIndexDescriptor> indexesGetForLabel( int labelId )
throw new UnsupportedOperationException( "Not implemented yet" );
}

@Override
public CapableIndexDescriptor indexGetForName( String name )
{
throw new UnsupportedOperationException( "Not implemented yet" );
}

@Override
public Iterator<CapableIndexDescriptor> indexesGetAll()
{
Expand Down
Expand Up @@ -43,7 +43,12 @@
import org.neo4j.kernel.api.impl.index.partition.IndexPartitionFactory;
import org.neo4j.kernel.api.impl.index.partition.PartitionSearcher;
import org.neo4j.kernel.api.impl.index.storage.PartitionedIndexStorage;
import org.neo4j.kernel.api.impl.schema.writer.LuceneIndexWriter;
import org.neo4j.kernel.api.impl.schema.writer.PartitionedIndexWriter;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.storageengine.api.schema.IndexReader;

import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;

/**
Expand All @@ -55,9 +60,13 @@
* @see WritableAbstractDatabaseIndex
* @see ReadOnlyAbstractDatabaseIndex
*/
public abstract class AbstractLuceneIndex
public abstract class AbstractLuceneIndex<READER extends IndexReader>
{
private static final String KEY_STATUS = "status";
private static final String ONLINE = "online";
private static final Map<String,String> ONLINE_COMMIT_USER_DATA = singletonMap( KEY_STATUS, ONLINE );
protected final PartitionedIndexStorage indexStorage;
protected final IndexDescriptor descriptor;
private final IndexPartitionFactory partitionFactory;

// Note that we rely on the thread-safe internal snapshot feature of the CopyOnWriteArrayList
Expand All @@ -66,10 +75,11 @@ public abstract class AbstractLuceneIndex

private volatile boolean open;

public AbstractLuceneIndex( PartitionedIndexStorage indexStorage, IndexPartitionFactory partitionFactory )
public AbstractLuceneIndex( PartitionedIndexStorage indexStorage, IndexPartitionFactory partitionFactory, IndexDescriptor descriptor )
{
this.indexStorage = indexStorage;
this.partitionFactory = partitionFactory;
this.descriptor = descriptor;
}

/**
Expand Down Expand Up @@ -179,6 +189,24 @@ public boolean isValid()
return true;
}

public LuceneIndexWriter getIndexWriter( WritableAbstractDatabaseIndex writableAbstractDatabaseIndex )
{
ensureOpen();
return new PartitionedIndexWriter( writableAbstractDatabaseIndex );
}

public READER getIndexReader() throws IOException
{
ensureOpen();
List<AbstractIndexPartition> partitions = getPartitions();
return hasSinglePartition( partitions ) ? createSimpleReader( partitions ) : createPartitionedReader( partitions );
}

public IndexDescriptor getDescriptor()
{
return descriptor;
}

/**
* Close index and deletes all it's partitions.
*/
Expand Down Expand Up @@ -399,4 +427,51 @@ private File createNewPartitionFolder() throws IOException
indexStorage.prepareFolder( partitionFolder );
return partitionFolder;
}

/**
* Check if this index is marked as online.
*
* @return <code>true</code> if index is online, <code>false</code> otherwise
* @throws IOException
*/
public boolean isOnline() throws IOException
{
ensureOpen();
AbstractIndexPartition partition = getFirstPartition( getPartitions() );
Directory directory = partition.getDirectory();
try ( DirectoryReader reader = DirectoryReader.open( directory ) )
{
Map<String,String> userData = reader.getIndexCommit().getUserData();
return ONLINE.equals( userData.get( KEY_STATUS ) );
}
}

/**
* Marks index as online by including "status" -> "online" map into commit metadata of the first partition.
*
* @throws IOException
*/
public void markAsOnline() throws IOException
{
ensureOpen();
AbstractIndexPartition partition = getFirstPartition( getPartitions() );
IndexWriter indexWriter = partition.getIndexWriter();
indexWriter.setCommitData( ONLINE_COMMIT_USER_DATA );
flush( false );
}

/**
* Writes the given failure message to the failure storage.
*
* @param failure the failure message.
* @throws IOException
*/
public void markAsFailed( String failure ) throws IOException
{
indexStorage.storeIndexFailure( failure );
}

protected abstract READER createSimpleReader( List<AbstractIndexPartition> partitions ) throws IOException;

protected abstract READER createPartitionedReader( List<AbstractIndexPartition> partitions ) throws IOException;
}

0 comments on commit 1301876

Please sign in to comment.