Skip to content

Commit

Permalink
Extract report behaviour from ConflictDetectingValueMerger
Browse files Browse the repository at this point in the history
Make it abstract and let subclass determine how conflict report should
be done. Also make ConflictDetectingValueMerge generic in REPORT_TYPE
so that subclass can decide what it needs to be able to correctly
report conflict.

Specifically, REPORT_TYPE is there so that we don't convert between KEY
and Value[] unnecessarily but instead adapt reporting to what we happen
to have available at insert time.
  • Loading branch information
burqen committed Mar 28, 2019
1 parent 149e602 commit f30ff66
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 24 deletions.
Expand Up @@ -23,18 +23,16 @@
import org.neo4j.index.internal.gbptree.ValueMerger; import org.neo4j.index.internal.gbptree.ValueMerger;
import org.neo4j.index.internal.gbptree.Writer; import org.neo4j.index.internal.gbptree.Writer;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.ValueTuple;


/** /**
* {@link ValueMerger} which will merely detect conflict, not change any value if conflict, i.e. if the * {@link ValueMerger} which will merely detect conflict, not change any value if conflict, i.e. if the
* key already exists. After this merge has been used in a call to {@link Writer#merge(Object, Object, ValueMerger)} * key already exists. After this merge has been used in a call to {@link Writer#merge(Object, Object, ValueMerger)}
* {@link #checkConflict(Value[])} can be called to check whether or not that call conflicted with * {@link #checkConflict(REPORT_TYPE)} can be called to check whether or not that call conflicted with
* an existing key. A call to {@link #checkConflict(Value[])} will also initialize the conflict flag. * an existing key. A call to {@link #checkConflict(REPORT_TYPE)} will also initialize the conflict flag.
* *
* @param <VALUE> type of values being merged. * @param <VALUE> type of values being merged.
*/ */
class ConflictDetectingValueMerger<KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue> implements ValueMerger<KEY,VALUE> abstract class ConflictDetectingValueMerger<KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue, REPORT_TYPE> implements ValueMerger<KEY,VALUE>
{ {
private final boolean compareEntityIds; private final boolean compareEntityIds;


Expand Down Expand Up @@ -76,17 +74,19 @@ boolean wasConflicting()
return conflict; return conflict;
} }


void reportConflict( Value[] values ) throws IndexEntryConflictException void reportConflict( REPORT_TYPE toReport ) throws IndexEntryConflictException
{ {
conflict = false; conflict = false;
throw new IndexEntryConflictException( existingNodeId, addedNodeId, ValueTuple.of( values ) ); doReportConflict( existingNodeId, addedNodeId, toReport );
} }


void checkConflict( Value[] values ) throws IndexEntryConflictException void checkConflict( REPORT_TYPE toReport ) throws IndexEntryConflictException
{ {
if ( wasConflicting() ) if ( wasConflicting() )
{ {
reportConflict( values ); reportConflict( toReport );
} }
} }

abstract void doReportConflict( long existingNodeId, long addedNodeId, REPORT_TYPE toReport ) throws IndexEntryConflictException;
} }
Expand Up @@ -45,6 +45,7 @@
import org.neo4j.storageengine.api.NodePropertyAccessor; import org.neo4j.storageengine.api.NodePropertyAccessor;
import org.neo4j.storageengine.api.schema.IndexSample; import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor;
import org.neo4j.values.storable.Value;


import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER; import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
import static org.neo4j.storageengine.api.schema.IndexDescriptor.Type.GENERAL; import static org.neo4j.storageengine.api.schema.IndexDescriptor.Type.GENERAL;
Expand All @@ -68,8 +69,8 @@ public abstract class NativeIndexPopulator<KEY extends NativeIndexKey<KEY>, VALU
private final UniqueIndexSampler uniqueSampler; private final UniqueIndexSampler uniqueSampler;
private final Consumer<PageCursor> additionalHeaderWriter; private final Consumer<PageCursor> additionalHeaderWriter;


private ConflictDetectingValueMerger<KEY,VALUE> mainConflictDetector; private ConflictDetectingValueMerger<KEY,VALUE,Value[]> mainConflictDetector;
private ConflictDetectingValueMerger<KEY,VALUE> updatesConflictDetector; private ConflictDetectingValueMerger<KEY,VALUE,Value[]> updatesConflictDetector;


private byte[] failureBytes; private byte[] failureBytes;
private boolean dropped; private boolean dropped;
Expand Down Expand Up @@ -119,12 +120,12 @@ protected synchronized void create( Consumer<PageCursor> headerWriter )
mainConflictDetector = getMainConflictDetector(); mainConflictDetector = getMainConflictDetector();
// for updates we have to have uniqueness on (value,entityId) to allow for intermediary violating updates. // for updates we have to have uniqueness on (value,entityId) to allow for intermediary violating updates.
// there are added conflict checks after updates have been applied. // there are added conflict checks after updates have been applied.
updatesConflictDetector = new ConflictDetectingValueMerger<>( true ); updatesConflictDetector = new ThrowingConflictDetector<>( true );
} }


ConflictDetectingValueMerger<KEY,VALUE> getMainConflictDetector() ConflictDetectingValueMerger<KEY,VALUE,Value[]> getMainConflictDetector()
{ {
return new ConflictDetectingValueMerger<>( descriptor.type() == GENERAL ); return new ThrowingConflictDetector<>( descriptor.type() == GENERAL );
} }


@Override @Override
Expand Down Expand Up @@ -260,7 +261,7 @@ void markTreeAsOnline()
tree.checkpoint( IOLimiter.UNLIMITED, new NativeIndexHeaderWriter( BYTE_ONLINE, additionalHeaderWriter ) ); tree.checkpoint( IOLimiter.UNLIMITED, new NativeIndexHeaderWriter( BYTE_ONLINE, additionalHeaderWriter ) );
} }


private void processUpdates( Iterable<? extends IndexEntryUpdate<?>> indexEntryUpdates, ConflictDetectingValueMerger<KEY,VALUE> conflictDetector ) private void processUpdates( Iterable<? extends IndexEntryUpdate<?>> indexEntryUpdates, ConflictDetectingValueMerger<KEY,VALUE,Value[]> conflictDetector )
throws IndexEntryConflictException throws IndexEntryConflictException
{ {
try ( Writer<KEY,VALUE> writer = tree.writer() ) try ( Writer<KEY,VALUE> writer = tree.writer() )
Expand Down
Expand Up @@ -33,7 +33,7 @@ class NativeIndexUpdater<KEY extends NativeIndexKey<KEY>, VALUE extends NativeIn
{ {
private final KEY treeKey; private final KEY treeKey;
private final VALUE treeValue; private final VALUE treeValue;
private final ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger = new ConflictDetectingValueMerger<>( true ); private final ConflictDetectingValueMerger<KEY,VALUE,Value[]> conflictDetectingValueMerger = new ThrowingConflictDetector<>( true );
private Writer<KEY,VALUE> writer; private Writer<KEY,VALUE> writer;


private boolean closed = true; private boolean closed = true;
Expand Down Expand Up @@ -79,7 +79,7 @@ private void assertOpen()
} }


static <KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue> void processUpdate( KEY treeKey, VALUE treeValue, static <KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue> void processUpdate( KEY treeKey, VALUE treeValue,
IndexEntryUpdate<?> update, Writer<KEY,VALUE> writer, ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger ) IndexEntryUpdate<?> update, Writer<KEY,VALUE> writer, ConflictDetectingValueMerger<KEY,VALUE,Value[]> conflictDetectingValueMerger )
throws IndexEntryConflictException throws IndexEntryConflictException
{ {
switch ( update.updateMode() ) switch ( update.updateMode() )
Expand Down Expand Up @@ -109,7 +109,7 @@ private static <KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue>


private static <KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue> void processChange( KEY treeKey, VALUE treeValue, private static <KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue> void processChange( KEY treeKey, VALUE treeValue,
IndexEntryUpdate<?> update, Writer<KEY,VALUE> writer, IndexEntryUpdate<?> update, Writer<KEY,VALUE> writer,
ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger ) ConflictDetectingValueMerger<KEY,VALUE,Value[]> conflictDetectingValueMerger )
throws IndexEntryConflictException throws IndexEntryConflictException
{ {
// Remove old entry // Remove old entry
Expand All @@ -124,7 +124,7 @@ private static <KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue>
} }


private static <KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue> void processAdd( KEY treeKey, VALUE treeValue, IndexEntryUpdate<?> update, private static <KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue> void processAdd( KEY treeKey, VALUE treeValue, IndexEntryUpdate<?> update,
Writer<KEY,VALUE> writer, ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger ) Writer<KEY,VALUE> writer, ConflictDetectingValueMerger<KEY,VALUE,Value[]> conflictDetectingValueMerger )
throws IndexEntryConflictException throws IndexEntryConflictException
{ {
initializeKeyAndValueFromUpdate( treeKey, treeValue, update.getEntityId(), update.values() ); initializeKeyAndValueFromUpdate( treeKey, treeValue, update.getEntityId(), update.values() );
Expand Down
Expand Up @@ -34,8 +34,8 @@
import org.neo4j.kernel.api.index.IndexPopulator; import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.IndexProvider; import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.storageengine.api.NodePropertyAccessor;
import org.neo4j.kernel.impl.index.schema.config.SpaceFillingCurveSettings; import org.neo4j.kernel.impl.index.schema.config.SpaceFillingCurveSettings;
import org.neo4j.storageengine.api.NodePropertyAccessor;
import org.neo4j.storageengine.api.schema.IndexSample; import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.storageengine.api.schema.StoreIndexDescriptor; import org.neo4j.storageengine.api.schema.StoreIndexDescriptor;
import org.neo4j.values.storable.CoordinateReferenceSystem; import org.neo4j.values.storable.CoordinateReferenceSystem;
Expand Down Expand Up @@ -170,12 +170,12 @@ boolean canCheckConflictsWithoutStoreAccess()
} }


@Override @Override
ConflictDetectingValueMerger<SpatialIndexKey,NativeIndexValue> getMainConflictDetector() ConflictDetectingValueMerger<SpatialIndexKey,NativeIndexValue,Value[]> getMainConflictDetector()
{ {
// Because of lossy point representation in index we need to always compare on node id, // Because of lossy point representation in index we need to always compare on node id,
// even for unique indexes. If we don't we risk throwing constraint violation exception // even for unique indexes. If we don't we risk throwing constraint violation exception
// for points that are in fact unique. // for points that are in fact unique.
return new ConflictDetectingValueMerger<>( true ); return new ThrowingConflictDetector<>( true );
} }


@Override @Override
Expand Down
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.ValueTuple;

class ThrowingConflictDetector<KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue> extends ConflictDetectingValueMerger<KEY,VALUE,Value[]>
{
ThrowingConflictDetector( boolean compareEntityIds )
{
super( compareEntityIds );
}

@Override
void doReportConflict( long existingNodeId, long addedNodeId, Value[] values ) throws IndexEntryConflictException
{
throw new IndexEntryConflictException( existingNodeId, addedNodeId, ValueTuple.of( values ) );
}
}
Expand Up @@ -30,9 +30,9 @@
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.neo4j.helpers.ArrayUtil.array; import static org.neo4j.helpers.ArrayUtil.array;


public class ConflictDetectingValueMergerTest public class ThrowingConflictDetectorTest
{ {
private final ConflictDetectingValueMerger<NumberIndexKey,NativeIndexValue> detector = new ConflictDetectingValueMerger<>( true ); private final ThrowingConflictDetector<NumberIndexKey,NativeIndexValue> detector = new ThrowingConflictDetector<>( true );


@Test @Test
public void shouldReportConflictOnSameValueAndDifferentEntityIds() public void shouldReportConflictOnSameValueAndDifferentEntityIds()
Expand Down

0 comments on commit f30ff66

Please sign in to comment.