Skip to content

Commit

Permalink
First draft at making the resumable store copy use index id to identi…
Browse files Browse the repository at this point in the history
…fy indexes, instead of IndexDescriptor.

It compiles, but tests are likely not passing at this point.
  • Loading branch information
chrisvest committed Mar 7, 2018
1 parent 5d33021 commit bc8bc15
Show file tree
Hide file tree
Showing 35 changed files with 220 additions and 255 deletions.
Expand Up @@ -19,9 +19,6 @@
*/
package org.neo4j.internal.kernel.api.schema;

/**
* Please note that this interface has dependencies in other modules. See {@see org.neo4j.causalclustering.catchup.storecopy.IndexDescriptorSerializer}
*/
public interface LabelSchemaDescriptor extends SchemaDescriptor, LabelSchemaSupplier
{
int getLabelId();
Expand Down
Expand Up @@ -36,8 +36,6 @@
/**
* Internal representation of a graph index, including the schema unit it targets (eg. label-property combination)
* and the type of index. UNIQUE indexes are used to back uniqueness constraints.
*
* Please note that this class has dependencies in other modules. See {@see org.neo4j.causalclustering.catchup.storecopy.IndexDescriptorSerializer}
*/
public class IndexDescriptor implements LabelSchemaSupplier
{
Expand Down
Expand Up @@ -152,6 +152,12 @@ public String toString()
return String.format( "%s -> %s", getClass().getSimpleName(), getDelegate().toString() );
}

@Override
public long getIndexId()
{
return getDelegate().getIndexId();
}

@Override
public ResourceIterator<File> snapshotFiles() throws IOException
{
Expand Down
Expand Up @@ -114,4 +114,10 @@ public IndexReader newReader()
{
throw new UnsupportedOperationException();
}

@Override
public long getIndexId()
{
return indexMeta.getIndexId();
}
}
Expand Up @@ -355,6 +355,20 @@ public void validate() throws IndexPopulationFailedKernelException, UniqueProper
}
}

@Override
public long getIndexId()
{
lock.readLock().lock();
try
{
return delegate.getIndexId();
}
finally
{
lock.readLock().unlock();
}
}

@Override
public ResourceIterator<File> snapshotFiles() throws IOException
{
Expand Down
Expand Up @@ -25,17 +25,24 @@

public class IndexMeta
{
private final long ruleId;
private final IndexDescriptor indexDescriptor;
private final SchemaIndexProvider.Descriptor providerDescriptor;
private final IndexCapability indexCapability;

public IndexMeta( IndexDescriptor indexDescriptor, SchemaIndexProvider.Descriptor providerDescriptor, IndexCapability indexCapability )
public IndexMeta( long ruleId, IndexDescriptor indexDescriptor, SchemaIndexProvider.Descriptor providerDescriptor, IndexCapability indexCapability )
{
this.ruleId = ruleId;
this.indexDescriptor = indexDescriptor;
this.providerDescriptor = providerDescriptor;
this.indexCapability = indexCapability;
}

public long getIndexId()
{
return ruleId;
}

public IndexDescriptor indexDescriptor()
{
return indexDescriptor;
Expand Down
Expand Up @@ -116,6 +116,8 @@ public interface IndexProxy extends LabelSchemaSupplier

void validate() throws IndexPopulationFailedKernelException, UniquePropertyValueValidationException;

long getIndexId();

ResourceIterator<File> snapshotFiles() throws IOException;

default void verifyDeferredConstraints( PropertyAccessor accessor ) throws IndexEntryConflictException, IOException
Expand Down
Expand Up @@ -68,7 +68,7 @@ IndexProxy createPopulatingIndexProxy( final long ruleId,
// TODO: This is here because there is a circular dependency from PopulatingIndexProxy to FlippableIndexProxy
final String indexUserDescription = indexUserDescription( descriptor, providerDescriptor );
IndexPopulator populator = populatorFromProvider( providerDescriptor, ruleId, descriptor, samplingConfig );
IndexMeta indexMeta = indexMetaFromProvider( providerDescriptor, descriptor );
IndexMeta indexMeta = indexMetaFromProvider( ruleId, providerDescriptor, descriptor );

FailedIndexProxyFactory failureDelegateFactory = new FailedPopulatingIndexProxyFactory(
indexMeta,
Expand Down Expand Up @@ -104,10 +104,10 @@ IndexProxy createPopulatingIndexProxy( final long ruleId,
return new ContractCheckingIndexProxy( flipper, false );
}

IndexProxy createRecoveringIndexProxy( IndexDescriptor descriptor,
IndexProxy createRecoveringIndexProxy( long ruleId, IndexDescriptor descriptor,
SchemaIndexProvider.Descriptor providerDescriptor )
{
IndexMeta indexMeta = indexMetaFromProvider( providerDescriptor, descriptor );
IndexMeta indexMeta = indexMetaFromProvider( ruleId, providerDescriptor, descriptor );
IndexProxy proxy = new RecoveringIndexProxy( indexMeta );
return new ContractCheckingIndexProxy( proxy, true );
}
Expand All @@ -121,7 +121,7 @@ IndexProxy createOnlineIndexProxy( long ruleId,
{
IndexAccessor onlineAccessor =
onlineAccessorFromProvider( providerDescriptor, ruleId, descriptor, samplingConfig );
IndexMeta indexMeta = indexMetaFromProvider( providerDescriptor, descriptor );
IndexMeta indexMeta = indexMetaFromProvider( ruleId, providerDescriptor, descriptor );
IndexProxy proxy;
proxy = new OnlineIndexProxy( ruleId, indexMeta, onlineAccessor, storeView, false );
proxy = new ContractCheckingIndexProxy( proxy, true );
Expand All @@ -132,7 +132,7 @@ IndexProxy createOnlineIndexProxy( long ruleId,
logProvider.getLog( getClass() ).error( "Failed to open index: " + ruleId +
" (" + descriptor.userDescription( tokenNameLookup ) +
"), requesting re-population.", e );
return createRecoveringIndexProxy( descriptor, providerDescriptor );
return createRecoveringIndexProxy( ruleId, descriptor, providerDescriptor );
}
}

Expand All @@ -142,7 +142,7 @@ IndexProxy createFailedIndexProxy( long ruleId,
IndexPopulationFailure populationFailure )
{
IndexPopulator indexPopulator = populatorFromProvider( providerDescriptor, ruleId, descriptor, samplingConfig );
IndexMeta indexMeta = indexMetaFromProvider( providerDescriptor, descriptor );
IndexMeta indexMeta = indexMetaFromProvider( ruleId, providerDescriptor, descriptor );
String indexUserDescription = indexUserDescription( descriptor, providerDescriptor );
IndexProxy proxy;
proxy = new FailedIndexProxy(
Expand Down Expand Up @@ -178,9 +178,9 @@ private IndexAccessor onlineAccessorFromProvider( SchemaIndexProvider.Descriptor
return indexProvider.getOnlineAccessor( ruleId, descriptor, samplingConfig );
}

private IndexMeta indexMetaFromProvider( SchemaIndexProvider.Descriptor providerDescriptor, IndexDescriptor indexDescriptor )
private IndexMeta indexMetaFromProvider( long ruleId, SchemaIndexProvider.Descriptor providerDescriptor, IndexDescriptor indexDescriptor )
{
IndexCapability indexCapability = providerMap.apply( providerDescriptor ).getCapability( indexDescriptor );
return new IndexMeta( indexDescriptor, providerDescriptor, indexCapability );
return new IndexMeta( ruleId, indexDescriptor, providerDescriptor, indexCapability );
}
}
Expand Up @@ -24,19 +24,17 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.collection.primitive.PrimitiveLongObjectMap;
import org.neo4j.collection.primitive.PrimitiveLongObjectVisitor;
import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.function.ThrowingConsumer;
import org.neo4j.function.ThrowingFunction;
import org.neo4j.graphdb.ResourceIterator;
Expand Down Expand Up @@ -209,7 +207,8 @@ public void init()
break;
case POPULATING:
// The database was shut down during population, or a crash has occurred, or some other sad thing.
indexProxy = indexProxyCreator.createRecoveringIndexProxy( descriptor, providerDescriptor );
indexProxy = indexProxyCreator.createRecoveringIndexProxy(
indexId, descriptor, providerDescriptor );
break;
case FAILED:
IndexPopulationFailure failure = failure( provider.getPopulationFailure( indexId, descriptor ) );
Expand Down Expand Up @@ -638,35 +637,27 @@ private void closeAllIndexes()
} );
}

public Set<IndexDescriptor> getIndexDescriptors()
public PrimitiveLongSet getIndexIds()
{
Set<IndexDescriptor> indexDescriptors = new HashSet<>();
for ( IndexProxy indexProxy : indexMapRef.getAllIndexProxies() )
Iterable<IndexProxy> indexProxies = indexMapRef.getAllIndexProxies();
PrimitiveLongSet indexIds = Primitive.longSet();
for ( IndexProxy indexProxy : indexProxies )
{
indexDescriptors.add( indexProxy.getDescriptor() );
indexIds.add( indexProxy.getIndexId() );
}
return indexDescriptors;
return indexIds;
}

public ResourceIterator<File> snapshotIndexFiles( Predicate<IndexDescriptor> filter ) throws IOException
public ResourceIterator<File> snapshotIndexFiles() throws IOException
{
Collection<ResourceIterator<File>> snapshots = new ArrayList<>();
for ( IndexProxy indexProxy : indexMapRef.getAllIndexProxies() )
{
IndexDescriptor providerDescriptor = indexProxy.getDescriptor();
if ( filter.test( providerDescriptor ) )
{
snapshots.add( indexProxy.snapshotFiles() );
}
snapshots.add( indexProxy.snapshotFiles() );
}
return Iterators.concatResourceIterators( snapshots.iterator() );
}

public ResourceIterator<File> snapshotIndexFiles() throws IOException
{
return snapshotIndexFiles( d -> true );
}

private IndexPopulationJob newIndexPopulationJob()
{
MultipleIndexPopulator multiPopulator = multiPopulatorFactory.create( storeView, logProvider );
Expand Down Expand Up @@ -760,7 +751,7 @@ public IndexMap apply( IndexMap indexMap ) throws IOException
}
else
{
index = indexProxyCreator.createRecoveringIndexProxy( descriptor, providerDescriptor );
index = indexProxyCreator.createRecoveringIndexProxy( ruleId, descriptor, providerDescriptor );
}

indexMap.putIndexProxy( rule.getId(), index );
Expand Down
Expand Up @@ -204,6 +204,12 @@ public void validate()
// ok, it's online so it's valid
}

@Override
public long getIndexId()
{
return indexId;
}

@Override
public IndexPopulationFailure getPopulationFailure() throws IllegalStateException
{
Expand Down
Expand Up @@ -159,6 +159,12 @@ public void validate()
throw new IllegalStateException( "Cannot validate index while it is still populating: " + job );
}

@Override
public long getIndexId()
{
return indexMeta.getIndexId();
}

@Override
public ResourceIterator<File> snapshotFiles()
{
Expand Down
Expand Up @@ -51,19 +51,19 @@ public boolean awaitStoreScanCompleted()
@Override
public void activate()
{
throw unsupportedOperation( "Cannot activate recovering index." );
throw unsupportedOperation( "Cannot activate recovering index." );
}

@Override
public void validate()
{
throw unsupportedOperation( "Cannot validate recovering index." );
throw unsupportedOperation( "Cannot validate recovering index." );
}

@Override
public ResourceIterator<File> snapshotFiles()
{
throw unsupportedOperation( "Cannot snapshot a recovering index." );
throw unsupportedOperation( "Cannot snapshot a recovering index." );
}

@Override
Expand Down
Expand Up @@ -23,15 +23,14 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;

import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.graphdb.Resource;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.ExplicitIndexProviderLookup;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.store.format.RecordFormat;
Expand All @@ -56,9 +55,9 @@ public class NeoStoreFileIndexListing
this.explicitIndexProviders = explicitIndexProviders;
}

public Collection<IndexDescriptor> listIndexDescriptors()
public PrimitiveLongSet getIndexIds()
{
return indexingService.getIndexDescriptors();
return indexingService.getIndexIds();
}

Resource gatherSchemaIndexFiles( Collection<StoreFileMetadata> targetFiles ) throws IOException
Expand Down Expand Up @@ -98,11 +97,19 @@ private void getSnapshotFilesMetadata( ResourceIterator<File> snapshot, Collecti
snapshot.stream().map( toStoreFileMetatadata ).forEach( targetFiles::add );
}

public ResourceIterator<StoreFileMetadata> getSnapshot( IndexDescriptor descriptor ) throws IOException
public ResourceIterator<StoreFileMetadata> getSnapshot( long indexId ) throws IOException
{
ResourceIterator<File> snapshot = indexingService.snapshotIndexFiles( descriptor::equals );
ArrayList<StoreFileMetadata> files = new ArrayList<>();
getSnapshotFilesMetadata( snapshot, files );
return resourceIterator( files.iterator(), snapshot );
try
{
ResourceIterator<File> snapshot = indexingService.getIndexProxy( indexId ).snapshotFiles();
ArrayList<StoreFileMetadata> files = new ArrayList<>();
getSnapshotFilesMetadata( snapshot, files );
return resourceIterator( files.iterator(), snapshot );
}
catch ( IndexNotFoundKernelException e )
{
// Perhaps it got dropped after getIndexIds() was called.
return Iterators.emptyResourceIterator();
}
}
}
Expand Up @@ -350,7 +350,7 @@ private static IndexPopulator addPopulator( BatchingMultipleIndexPopulator batch

batchingPopulator.addPopulator(
populator, descriptor.schema().getLabelId(),
new IndexMeta( descriptor, new SchemaIndexProvider.Descriptor( "foo", "1" ), NO_CAPABILITY ),
new IndexMeta( 1, descriptor, new SchemaIndexProvider.Descriptor( "foo", "1" ), NO_CAPABILITY ),
flipper, failedIndexProxyFactory, "testIndex" );

return populator;
Expand Down
Expand Up @@ -80,6 +80,6 @@ public void shouldLogReasonForDroppingIndex() throws IOException

private IndexMeta indexMeta( IndexDescriptor descriptor )
{
return new IndexMeta( descriptor, providerDescriptor, indexCapability );
return new IndexMeta( 1, descriptor, providerDescriptor, indexCapability );
}
}
Expand Up @@ -25,7 +25,6 @@
import org.junit.Test;
import org.mockito.ArgumentMatchers;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -48,7 +47,6 @@
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.IndexUpdater;
Expand Down Expand Up @@ -617,7 +615,7 @@ private IndexPopulationJob newIndexPopulationJob( FailedIndexProxyFactory failur

MultipleIndexPopulator multiPopulator = new MultipleIndexPopulator( storeView, logProvider );
IndexPopulationJob job = new IndexPopulationJob( multiPopulator, NO_MONITOR, stateHolder );
job.addPopulator( populator, indexId, new IndexMeta( descriptor, PROVIDER_DESCRIPTOR, NO_CAPABILITY ),
job.addPopulator( populator, indexId, new IndexMeta( indexId, descriptor, PROVIDER_DESCRIPTOR, NO_CAPABILITY ),
format( ":%s(%s)", FIRST.name(), name ), flipper, failureDelegateFactory );
return job;
}
Expand Down

0 comments on commit bc8bc15

Please sign in to comment.