Skip to content

Commit

Permalink
Merge branch '3.3-gbptree-value-merger-fix' into 3.4
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Mar 8, 2018
2 parents 84f6749 + 5487923 commit c42b3f9
Show file tree
Hide file tree
Showing 25 changed files with 501 additions and 258 deletions.
Expand Up @@ -548,7 +548,12 @@ private void flip() throws FlipFailedKernelException
IndexSample sample = populator.sampleResult();
storeView.replaceIndexCounts( indexId, sample.uniqueValues(), sample.sampleSize(),
sample.indexSize() );
populator.close( true );
if ( populations.contains( IndexPopulation.this ) )
{
populator.close( true );
}
// else it has failed when applying the last updates from queue. This is done because a multi-populator
// may have multiple populators running and they should not affect each other
return null;
}, failedIndexProxyFactory );
log.info( "Index population completed. Index is now online: [%s]", indexUserDescription );
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;

import org.neo4j.internal.kernel.api.InternalIndexState;
Expand All @@ -32,8 +33,10 @@
import org.neo4j.kernel.api.exceptions.schema.UniquePropertyValueValidationException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.schema.constaints.ConstraintDescriptorFactory;
import org.neo4j.kernel.impl.api.index.updater.DelegatingIndexUpdater;
import org.neo4j.kernel.impl.index.schema.DeferredConflictCheckingIndexUpdater;
import org.neo4j.storageengine.api.schema.IndexReader;

/**
Expand Down Expand Up @@ -73,7 +76,8 @@ public IndexUpdater newUpdater( IndexUpdateMode mode )
switch ( mode )
{
case ONLINE:
return new DelegatingIndexUpdater( target.accessor.newUpdater( mode ) )
return new DelegatingIndexUpdater( new DeferredConflictCheckingIndexUpdater(
target.accessor.newUpdater( mode ), target::newReader, target.getDescriptor() ) )
{
@Override
public void process( IndexEntryUpdate<?> update )
Expand Down Expand Up @@ -136,6 +140,22 @@ protected IndexProxy getDelegate()
return target;
}

@Override
public void verifyDeferredConstraints( PropertyAccessor accessor ) throws IndexEntryConflictException, IOException
{
// If we've seen constraint violation failures in here when updates came in then fail immediately with those
if ( !failures.isEmpty() )
{
Iterator<IndexEntryConflictException> failureIterator = failures.iterator();
IndexEntryConflictException conflict = failureIterator.next();
failureIterator.forEachRemaining( conflict::addSuppressed );
throw conflict;
}

// Otherwise consolidate the usual verification
super.verifyDeferredConstraints( accessor );
}

@Override
public void validate() throws UniquePropertyValueValidationException
{
Expand Down
Expand Up @@ -19,8 +19,8 @@
*/
package org.neo4j.kernel.impl.index.schema;

import org.neo4j.index.internal.gbptree.GBPTree;
import org.neo4j.index.internal.gbptree.ValueMerger;
import org.neo4j.index.internal.gbptree.ValueMergers;
import org.neo4j.index.internal.gbptree.Writer;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.values.storable.Value;
Expand All @@ -34,58 +34,49 @@
*
* @param <VALUE> type of values being merged.
*/
abstract class ConflictDetectingValueMerger<KEY extends NativeSchemaKey, VALUE extends NativeSchemaValue> implements ValueMerger<KEY,VALUE>
class ConflictDetectingValueMerger<KEY extends NativeSchemaKey, VALUE extends NativeSchemaValue> implements ValueMerger<KEY,VALUE>
{
/**
* @throw IndexEntryConflictException if merge conflicted with an existing key. This call also clears the conflict flag.
*/
abstract void checkConflict( Value[] values ) throws IndexEntryConflictException;
private final boolean compareEntityIds;

private static ConflictDetectingValueMerger DONT_CHECK = new ConflictDetectingValueMerger()
private boolean conflict;
private long existingNodeId;
private long addedNodeId;

ConflictDetectingValueMerger( boolean compareEntityIds )
{
@Override
public Object merge( Object existingKey, Object newKey, Object existingValue, Object newValue )
{
return null;
}
this.compareEntityIds = compareEntityIds;
}

@Override
void checkConflict( Value[] values )
@Override
public VALUE merge( KEY existingKey, KEY newKey, VALUE existingValue, VALUE newValue )
{
if ( existingKey.getEntityId() != newKey.getEntityId() )
{
// do nothing
conflict = true;
existingNodeId = existingKey.getEntityId();
addedNodeId = newKey.getEntityId();
}
};
return null;
}

static <KEY extends NativeSchemaKey, VALUE extends NativeSchemaValue> ConflictDetectingValueMerger<KEY, VALUE> dontCheck()
/**
* To be called for a populated key that is about to be sent off to a {@link Writer}.
* {@link GBPTree}'s ability to check for conflicts while applying updates is an opportunity,
* but also complicates some scenarios. This is why the strictness can be tweaked like this.
*
* @param key key to let know about conflict detection strictness.
*/
void controlConflictDetection( KEY key )
{
return DONT_CHECK;
key.setCompareId( compareEntityIds );
}

static class Check<KEY extends NativeSchemaKey, VALUE extends NativeSchemaValue> extends ConflictDetectingValueMerger<KEY, VALUE>
void checkConflict( Value[] values ) throws IndexEntryConflictException
{
private boolean conflict;
private long existingNodeId;
private long addedNodeId;

@Override
public VALUE merge( KEY existingKey, KEY newKey, VALUE existingValue, VALUE newValue )
{
if ( existingKey.getEntityId() != newKey.getEntityId() )
{
conflict = true;
existingNodeId = existingKey.getEntityId();
addedNodeId = newKey.getEntityId();
}
return null;
}

void checkConflict( Value[] values ) throws IndexEntryConflictException
if ( conflict )
{
if ( conflict )
{
conflict = false;
throw new IndexEntryConflictException( existingNodeId, addedNodeId, ValueTuple.of( values ) );
}
conflict = false;
throw new IndexEntryConflictException( existingNodeId, addedNodeId, ValueTuple.of( values ) );
}
}
}
@@ -0,0 +1,120 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;

import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.exceptions.index.IndexNotApplicableKernelException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.storageengine.api.schema.IndexReader;
import org.neo4j.values.storable.ValueTuple;

import static org.neo4j.internal.kernel.api.IndexQuery.exact;
import static org.neo4j.kernel.impl.api.index.UpdateMode.REMOVED;

/**
* This deferring conflict checker solves e.g. a problem of applying updates to an index that is aware of,
* and also prevents, duplicates while applying. Consider this scenario:
*
* <pre>
* GIVEN:
* Node A w/ property value P
* Node B w/ property value Q
*
* WHEN Applying a transaction that:
* Sets A property value to Q
* Deletes B
* </pre>
*
* Then an index that is conscious about conflicts when applying may see intermediary conflicts,
* depending on the order in which updates are applied. Remembering which value tuples have been altered and
* checking conflicts for those in {@link #close()} works around that problem.
*
* This updater wrapping should only be used in specific places to solve specific problems, not generally
* when applying updates to online indexes.
*/
public class DeferredConflictCheckingIndexUpdater implements IndexUpdater
{
private final IndexUpdater actual;
private final Supplier<IndexReader> readerSupplier;
private final IndexDescriptor indexDescriptor;
private final Set<ValueTuple> touchedTuples = new HashSet<>();

public DeferredConflictCheckingIndexUpdater( IndexUpdater actual, Supplier<IndexReader> readerSupplier, IndexDescriptor indexDescriptor )
{
this.actual = actual;
this.readerSupplier = readerSupplier;
this.indexDescriptor = indexDescriptor;
}

@Override
public void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntryConflictException
{
actual.process( update );
if ( update.updateMode() != REMOVED )
{
touchedTuples.add( ValueTuple.of( update.values() ) );
}
}

@Override
public void close() throws IOException, IndexEntryConflictException
{
actual.close();
try ( IndexReader reader = readerSupplier.get() )
{
for ( ValueTuple tuple : touchedTuples )
{
PrimitiveLongIterator results = reader.query( queryOf( tuple ) );
if ( results.hasNext() )
{
long firstEntityId = results.next();
if ( results.hasNext() )
{
long secondEntityId = results.next();
throw new IndexEntryConflictException( firstEntityId, secondEntityId, tuple );
}
}
}
}
catch ( IndexNotApplicableKernelException e )
{
throw new IllegalArgumentException( "Unexpectedly the index reader couldn't handle this query", e );
}
}

private IndexQuery[] queryOf( ValueTuple tuple )
{
IndexQuery[] predicates = new IndexQuery[tuple.size()];
for ( int i = 0; i < predicates.length; i++ )
{
predicates[i] = exact( indexDescriptor.schema().getPropertyIds()[i], tuple.valueAt( i ) );
}
return predicates;
}
}
Expand Up @@ -20,8 +20,6 @@
package org.neo4j.kernel.impl.index.schema;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.fs.FileSystemAbstraction;
Expand Down Expand Up @@ -58,30 +56,6 @@ public void includeSample( IndexEntryUpdate<?> update )
@Override
public IndexSample sampleResult()
{
// Close the writer before scanning
try
{
closeWriter();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}

try
{
return sampler.result();
}
finally
{
try
{
instantiateWriter();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}
return sampler.result();
}
}

This file was deleted.

Expand Up @@ -79,7 +79,7 @@ public NativeSchemaIndexUpdater<KEY, VALUE> newUpdater( IndexUpdateMode mode )
assertOpen();
try
{
return singleUpdater.initialize( tree.writer(), true );
return singleUpdater.initialize( tree.writer() );
}
catch ( IOException e )
{
Expand Down

0 comments on commit c42b3f9

Please sign in to comment.