Skip to content

Commit

Permalink
Make the fulltext index provider kernel extension loadable by the con…
Browse files Browse the repository at this point in the history
…sistency checker.

Also add the first support for multi-token schemas in the consistency checker, which is extensively used by the fulltext indexes.
The fulltext indexes will appear to the consistency checker as any other index, and will be checked in largely the same way.
  • Loading branch information
chrisvest committed Sep 5, 2018
1 parent 1c0bce0 commit 9823b98
Show file tree
Hide file tree
Showing 27 changed files with 140 additions and 91 deletions.
Expand Up @@ -57,6 +57,8 @@
import org.neo4j.kernel.impl.factory.DatabaseInfo;
import org.neo4j.kernel.impl.index.labelscan.NativeLabelScanStore;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.scheduler.CentralJobScheduler;
import org.neo4j.kernel.impl.scheduler.JobSchedulerFactory;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.NodeLabelsField;
Expand Down Expand Up @@ -84,6 +86,7 @@
import org.neo4j.logging.NullLogProvider;
import org.neo4j.logging.internal.LogService;
import org.neo4j.logging.internal.SimpleLogService;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.TransactionApplicationMode;
import org.neo4j.storageengine.api.schema.SchemaRule;
Expand Down Expand Up @@ -160,6 +163,7 @@ public DirectStoreAccess directStoreAccess()
if ( directStoreAccess == null )
{
life.start();
JobScheduler scheduler = life.add( JobSchedulerFactory.createInitialisedScheduler() );
fileSystem = new DefaultFileSystemAbstraction();
PageCache pageCache = getPageCache( fileSystem );
LogProvider logProvider = NullLogProvider.getInstance();
Expand Down Expand Up @@ -188,7 +192,7 @@ public DirectStoreAccess directStoreAccess()

Monitors monitors = new Monitors();
LabelScanStore labelScanStore = startLabelScanStore( pageCache, indexStoreView, monitors );
IndexProviderMap indexes = createIndexes( pageCache, fileSystem, directory.databaseDir(), config, logProvider, monitors);
IndexProviderMap indexes = createIndexes( pageCache, fileSystem, directory.databaseDir(), config, scheduler, logProvider, monitors);
directStoreAccess = new DirectStoreAccess( nativeStores, labelScanStore, indexes );
}
return directStoreAccess;
Expand All @@ -212,11 +216,11 @@ private LabelScanStore startLabelScanStore( PageCache pageCache, IndexStoreView
}

private IndexProviderMap createIndexes( PageCache pageCache, FileSystemAbstraction fileSystem, File storeDir,
Config config, LogProvider logProvider, Monitors monitors )
Config config, JobScheduler scheduler, LogProvider logProvider, Monitors monitors )
{
LogService logService = new SimpleLogService( logProvider, logProvider );
DatabaseKernelExtensions extensions = life.add( instantiateKernelExtensions( storeDir, fileSystem, config, logService,
pageCache, RecoveryCleanupWorkCollector.ignore(), DatabaseInfo.COMMUNITY, monitors ) );
pageCache, scheduler, RecoveryCleanupWorkCollector.ignore(), DatabaseInfo.COMMUNITY, monitors ) );
return life.add( new DefaultIndexProviderMap( extensions ) );
}

Expand Down
Expand Up @@ -150,10 +150,10 @@ private void verifyThatThereAreExactlyOneIndexEntryPerNodeInTheIndexes( int i, P
{
long nodeAId = data.first()[j];
assertEquals( 1, tx.schemaRead().nodesCountIndexed(
indexA, nodeAId, Values.of( nodeAId ) ) );
indexA, nodeAId, keyAId, Values.of( nodeAId ) ) );
long nodeBId = data.other()[j];
assertEquals( 1, tx.schemaRead().nodesCountIndexed(
indexB, nodeBId, Values.of( nodeBId ) ) );
indexB, nodeBId, keyBId, Values.of( nodeBId ) ) );
}
}
}
Expand Down
Expand Up @@ -178,13 +178,13 @@ public void applyConcurrentDeletesToPopulatedIndex() throws Throwable
try ( IndexReader indexReader = getIndexReader( propertyId, countryLabelId ) )
{
assertEquals("Should be removed by concurrent remove.",
0, indexReader.countIndexedNodes( 0, Values.of( "Sweden" )) );
0, indexReader.countIndexedNodes( 0, new int[] {propertyId}, Values.of( "Sweden" )) );
}

try ( IndexReader indexReader = getIndexReader( propertyId, colorLabelId ) )
{
assertEquals("Should be removed by concurrent remove.",
0, indexReader.countIndexedNodes( 3, Values.of( "green" )) );
0, indexReader.countIndexedNodes( 3, new int[] {propertyId}, Values.of( "green" )) );
}
}
}
Expand All @@ -208,13 +208,13 @@ public void applyConcurrentAddsToPopulatedIndex() throws Throwable
try ( IndexReader indexReader = getIndexReader( propertyId, countryLabelId ) )
{
assertEquals("Should be added by concurrent add.", 1,
indexReader.countIndexedNodes( otherNodes[0].getId(), Values.of( "Denmark" ) ) );
indexReader.countIndexedNodes( otherNodes[0].getId(), new int[] {propertyId}, Values.of( "Denmark" ) ) );
}

try ( IndexReader indexReader = getIndexReader( propertyId, carLabelId ) )
{
assertEquals("Should be added by concurrent add.", 1,
indexReader.countIndexedNodes( otherNodes[1].getId(), Values.of( "BMW" ) ) );
indexReader.countIndexedNodes( otherNodes[1].getId(), new int[] {propertyId}, Values.of( "BMW" ) ) );
}
}
}
Expand All @@ -238,18 +238,18 @@ public void applyConcurrentChangesToPopulatedIndex() throws Exception
try ( IndexReader indexReader = getIndexReader( propertyId, colorLabelId ) )
{
assertEquals( format( "Should be deleted by concurrent change. Reader is: %s, ", indexReader ), 0,
indexReader.countIndexedNodes( color2.getId(), Values.of( "green" ) ) );
indexReader.countIndexedNodes( color2.getId(), new int[] {propertyId}, Values.of( "green" ) ) );
}
try ( IndexReader indexReader = getIndexReader( propertyId, colorLabelId ) )
{
assertEquals("Should be updated by concurrent change.", 1,
indexReader.countIndexedNodes( color2.getId(), Values.of( "pink" ) ) );
indexReader.countIndexedNodes( color2.getId(), new int[] {propertyId}, Values.of( "pink" ) ) );
}

try ( IndexReader indexReader = getIndexReader( propertyId, carLabelId ) )
{
assertEquals("Should be added by concurrent change.", 1,
indexReader.countIndexedNodes( car2.getId(), Values.of( "SAAB" ) ) );
indexReader.countIndexedNodes( car2.getId(), new int[] {propertyId}, Values.of( "SAAB" ) ) );
}
}
}
Expand Down
Expand Up @@ -232,8 +232,9 @@ public Result runFullConsistencyCheck( DatabaseLayout databaseLayout, Config con
// Bootstrap kernel extensions
Monitors monitors = new Monitors();
LifeSupport life = new LifeSupport();
JobScheduler jobScheduler = life.add( JobSchedulerFactory.createInitialisedScheduler() );
DatabaseKernelExtensions extensions = life.add( instantiateKernelExtensions( databaseLayout.databaseDirectory(),
fileSystem, config, new SimpleLogService( logProvider, logProvider ), pageCache,
fileSystem, config, new SimpleLogService( logProvider, logProvider ), pageCache, jobScheduler,
RecoveryCleanupWorkCollector.ignore(),
// May be enterprise edition, but in consistency checker we only care about the operational mode
COMMUNITY,
Expand Down
Expand Up @@ -19,29 +19,36 @@
*/
package org.neo4j.consistency.checking.full;

import java.util.Arrays;

import org.neo4j.consistency.checking.CheckerEngine;
import org.neo4j.consistency.checking.RecordCheck;
import org.neo4j.consistency.report.ConsistencyReport;
import org.neo4j.consistency.store.RecordAccess;
import org.neo4j.consistency.store.synthetic.IndexEntry;
import org.neo4j.internal.kernel.api.schema.SchemaDescriptor;
import org.neo4j.storageengine.api.schema.StoreIndexDescriptor;

public class IndexCheck implements RecordCheck<IndexEntry, ConsistencyReport.IndexConsistencyReport>
{
private final StoreIndexDescriptor indexRule;
private final long[] entityTokenLongIds;
private final SchemaDescriptor.PropertySchemaType propertySchemaType;

public IndexCheck( StoreIndexDescriptor indexRule )
{
this.indexRule = indexRule;
SchemaDescriptor schema = indexRule.schema();
int[] entityTokenIntIds = schema.getEntityTokenIds();
entityTokenLongIds = new long[entityTokenIntIds.length];
for ( int i = 0; i < entityTokenIntIds.length; i++ )
{
entityTokenLongIds[i] = entityTokenIntIds[i];
}
propertySchemaType = schema.propertySchemaType();
}

@Override
public void check( IndexEntry record, CheckerEngine<IndexEntry, ConsistencyReport.IndexConsistencyReport> engine, RecordAccess records )
{
int[] entityTokenIds = indexRule.schema().getEntityTokenIds();
engine.comparativeCheck( records.node( record.getId() ),
new NodeInUseWithCorrectLabelsCheck<>( Arrays.stream( entityTokenIds ).asLongStream().toArray(), false ) );
NodeInUseWithCorrectLabelsCheck<IndexEntry,ConsistencyReport.IndexConsistencyReport> checker =
new NodeInUseWithCorrectLabelsCheck<>( entityTokenLongIds, propertySchemaType, false );
engine.comparativeCheck( records.node( record.getId() ), checker );
}
}
Expand Up @@ -21,13 +21,16 @@

import org.apache.commons.lang3.ArrayUtils;

import java.util.Arrays;

import org.neo4j.collection.PrimitiveLongCollections;
import org.neo4j.consistency.checking.CheckerEngine;
import org.neo4j.consistency.checking.ComparativeRecordChecker;
import org.neo4j.consistency.checking.LabelChainWalker;
import org.neo4j.consistency.report.ConsistencyReport;
import org.neo4j.consistency.store.RecordAccess;
import org.neo4j.consistency.store.RecordReference;
import org.neo4j.internal.kernel.api.schema.SchemaDescriptor;
import org.neo4j.kernel.impl.store.DynamicNodeLabels;
import org.neo4j.kernel.impl.store.NodeLabels;
import org.neo4j.kernel.impl.store.NodeLabelsField;
Expand All @@ -42,12 +45,14 @@ public class NodeInUseWithCorrectLabelsCheck
implements ComparativeRecordChecker<RECORD, NodeRecord, REPORT>
{
private final long[] indexLabels;
private final SchemaDescriptor.PropertySchemaType propertySchemaType;
private final boolean checkStoreToIndex;

public NodeInUseWithCorrectLabelsCheck( long[] expectedLabels, boolean checkStoreToIndex )
public NodeInUseWithCorrectLabelsCheck( long[] expectedEntityTokenIds, SchemaDescriptor.PropertySchemaType propertySchemaType, boolean checkStoreToIndex )
{
this.propertySchemaType = propertySchemaType;
this.checkStoreToIndex = checkStoreToIndex;
this.indexLabels = sortAndDeduplicate( expectedLabels );
this.indexLabels = sortAndDeduplicate( expectedEntityTokenIds );
}

private static long[] sortAndDeduplicate( long[] labels )
Expand Down Expand Up @@ -94,38 +99,63 @@ private void validateLabelIds( NodeRecord nodeRecord, long[] storeLabels, REPORT
{
storeLabels = sortAndDeduplicate( storeLabels );

int indexLabelsCursor = 0;
int storeLabelsCursor = 0;

while ( indexLabelsCursor < indexLabels.length && storeLabelsCursor < storeLabels.length )
if ( propertySchemaType == SchemaDescriptor.PropertySchemaType.COMPLETE_ALL_TOKENS )
{
long indexLabel = indexLabels[indexLabelsCursor];
long storeLabel = storeLabels[storeLabelsCursor];
if ( indexLabel < storeLabel )
{ // node store has a label which isn't in label scan store
report.nodeDoesNotHaveExpectedLabel( nodeRecord, indexLabel );
indexLabelsCursor++;
// The node must have all of the labels specified by the index.
int indexLabelsCursor = 0;
int storeLabelsCursor = 0;

while ( indexLabelsCursor < indexLabels.length && storeLabelsCursor < storeLabels.length )
{
long indexLabel = indexLabels[indexLabelsCursor];
long storeLabel = storeLabels[storeLabelsCursor];
if ( indexLabel < storeLabel )
{ // node store has a label which isn't in label scan store
report.nodeDoesNotHaveExpectedLabel( nodeRecord, indexLabel );
indexLabelsCursor++;
}
else if ( indexLabel > storeLabel )
{ // label scan store has a label which isn't in node store
reportNodeLabelNotInIndex( report, nodeRecord, storeLabel );
storeLabelsCursor++;
}
else
{ // both match
indexLabelsCursor++;
storeLabelsCursor++;
}
}
else if ( indexLabel > storeLabel )
{ // label scan store has a label which isn't in node store
reportNodeLabelNotInIndex( report, nodeRecord, storeLabel );
storeLabelsCursor++;

while ( indexLabelsCursor < indexLabels.length )
{
report.nodeDoesNotHaveExpectedLabel( nodeRecord, indexLabels[indexLabelsCursor++] );
}
else
{ // both match
indexLabelsCursor++;
while ( storeLabelsCursor < storeLabels.length )
{
reportNodeLabelNotInIndex( report, nodeRecord, storeLabels[storeLabelsCursor] );
storeLabelsCursor++;
}
}

while ( indexLabelsCursor < indexLabels.length )
else if ( propertySchemaType == SchemaDescriptor.PropertySchemaType.PARTIAL_ANY_TOKEN )
{
report.nodeDoesNotHaveExpectedLabel( nodeRecord, indexLabels[indexLabelsCursor++] );
// The node must have at least one label in the index.
for ( long storeLabel : storeLabels )
{
if ( Arrays.binarySearch( indexLabels, storeLabel ) >= 0 )
{
// The node has one of the indexed labels, so we're good.
return;
}
}
// The node had none of the indexed labels, so we report all of them as missing.
for ( long indexLabel : indexLabels )
{
report.nodeDoesNotHaveExpectedLabel( nodeRecord, indexLabel );
}
}
while ( storeLabelsCursor < storeLabels.length )
else
{
reportNodeLabelNotInIndex( report, nodeRecord, storeLabels[storeLabelsCursor] );
storeLabelsCursor++;
throw new IllegalStateException( "Unknown property schema type '" + propertySchemaType + "'." );
}
}

Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.neo4j.consistency.checking.ChainCheck;
import org.neo4j.consistency.checking.CheckerEngine;
Expand Down Expand Up @@ -93,12 +94,11 @@ private void matchIndexesToNode(
RecordAccess records,
Collection<PropertyRecord> propertyRecs )
{
Set<Long> labels = NodeLabelReader.getListOfLabels( record, records, engine );
long[] labels = NodeLabelReader.getListOfLabels( record, records, engine ).stream().mapToLong( Long::longValue ).toArray();
IntObjectMap<PropertyBlock> nodePropertyMap = null;
for ( StoreIndexDescriptor indexRule : indexes.onlineRules() )
{
long labelId = indexRule.schema().keyId();
if ( labels.contains( labelId ) )
if ( indexRule.schema().isAffected( labels ) )
{
if ( nodePropertyMap == null )
{
Expand All @@ -119,7 +119,7 @@ private void matchIndexesToNode(
}
else
{
long count = reader.countIndexedNodes( nodeId, values );
long count = reader.countIndexedNodes( nodeId, indexPropertyIds, values );
reportIncorrectIndexCount( values, engine, indexRule, count );
}
}
Expand Down
Expand Up @@ -27,6 +27,8 @@
import org.neo4j.consistency.store.synthetic.LabelScanDocument;
import org.neo4j.kernel.api.labelscan.NodeLabelRange;

import static org.neo4j.internal.kernel.api.schema.SchemaDescriptor.PropertySchemaType.COMPLETE_ALL_TOKENS;

public class LabelScanCheck implements RecordCheck<LabelScanDocument, ConsistencyReport.LabelScanConsistencyReport>
{
@Override
Expand All @@ -36,8 +38,9 @@ public void check( LabelScanDocument record, CheckerEngine<LabelScanDocument,
NodeLabelRange range = record.getNodeLabelRange();
for ( long nodeId : range.nodes() )
{
long[] labels = record.getNodeLabelRange().labels( nodeId );
engine.comparativeCheck( records.node( nodeId ),
new NodeInUseWithCorrectLabelsCheck<>( record.getNodeLabelRange().labels( nodeId ), true ) );
new NodeInUseWithCorrectLabelsCheck<>( labels, COMPLETE_ALL_TOKENS, true ) );
}
}
}
Expand Up @@ -36,6 +36,7 @@
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.internal.LogService;
import org.neo4j.scheduler.JobScheduler;

/**
* Utility for loading {@link IndexProvider} instances from {@link DatabaseKernelExtensions}.
Expand All @@ -45,11 +46,11 @@ public class SchemaIndexExtensionLoader

@SuppressWarnings( "unchecked" )
public static DatabaseKernelExtensions instantiateKernelExtensions( File databaseDirectory, FileSystemAbstraction fileSystem,
Config config, LogService logService, PageCache pageCache,
Config config, LogService logService, PageCache pageCache, JobScheduler jobScheduler,
RecoveryCleanupWorkCollector recoveryCollector, DatabaseInfo databaseInfo, Monitors monitors )
{
Dependencies deps = new Dependencies();
deps.satisfyDependencies( fileSystem, config, logService, pageCache, recoveryCollector, monitors );
deps.satisfyDependencies( fileSystem, config, logService, pageCache, recoveryCollector, monitors, jobScheduler );
@SuppressWarnings( "rawtypes" )
Iterable kernelExtensions = Service.load( KernelExtensionFactory.class );
KernelContext kernelContext = new SimpleKernelContext( databaseDirectory, databaseInfo, deps );
Expand Down

0 comments on commit 9823b98

Please sign in to comment.