Skip to content

Commit

Permalink
Fixes as issue with applying index updates to native unique indexes
Browse files Browse the repository at this point in the history
Given a uniqueness contraint on Foo:bar. If a transaction make changes
such that in the end, the constraint is not violated but some intermediate
individual step violates the constraint the index would still throw an
exception.

The reason this happens is that native index validates the uniqueness
constraint on every insert (since doing so is for free).

Although actually doing so when applying online updates has never been done
in the lucene indexes because all that is handled in the kernel layer
before committing a transaction anyway. Detecting a conflict while applying
updates to a unique index means that there's a bug inside the kernel layer
and would result in a kernel panic.

Therefore we avoid this problem and do what we do for lucene indexes,
namely by not checking conclicts when applying online updates.
  • Loading branch information
tinwelint committed Mar 1, 2018
1 parent 493cfe5 commit b9b302d
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 79 deletions.
Expand Up @@ -20,54 +20,72 @@
package org.neo4j.kernel.impl.index.schema; package org.neo4j.kernel.impl.index.schema;


import org.neo4j.index.internal.gbptree.ValueMerger; import org.neo4j.index.internal.gbptree.ValueMerger;
import org.neo4j.index.internal.gbptree.ValueMergers;
import org.neo4j.index.internal.gbptree.Writer; import org.neo4j.index.internal.gbptree.Writer;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.ValueTuple;


/** /**
* {@link ValueMerger} which will merely detect conflict, not change any value if conflict, i.e. if the * {@link ValueMerger} which will merely detect conflict, not change any value if conflict, i.e. if the
* key already exists. After this merge has been used in a call to {@link Writer#merge(Object, Object, ValueMerger)} * key already exists. After this merge has been used in a call to {@link Writer#merge(Object, Object, ValueMerger)}
* the {@link #wasConflict()} accessor can be called to check whether or not that call conflicted with * {@link #checkConflict(Value[])} can be called to check whether or not that call conflicted with
* an existing key. A call to {@link #wasConflict()} will also clear the conflict flag. * an existing key. A call to {@link #checkConflict(Value[])} will also clear the conflict flag.
* *
* @param <VALUE> type of values being merged. * @param <VALUE> type of values being merged.
*/ */
class ConflictDetectingValueMerger<KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> implements ValueMerger<KEY,VALUE> abstract class ConflictDetectingValueMerger<KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> implements ValueMerger<KEY,VALUE>
{ {
private boolean conflict; /**
private long existingNodeId; * @throw IndexEntryConflictException if merge conflicted with an existing key. This call also clears the conflict flag.
private long addedNodeId; */
abstract void checkConflict( Value[] values ) throws IndexEntryConflictException;


@Override private static ConflictDetectingValueMerger DONT_CHECK = new ConflictDetectingValueMerger()
public VALUE merge( KEY existingKey, KEY newKey, VALUE existingValue, VALUE newValue )
{ {
if ( existingKey.entityId != newKey.entityId ) @Override
public Object merge( Object existingKey, Object newKey, Object existingValue, Object newValue )
{ {
conflict = true; return null;
existingNodeId = existingKey.entityId;
addedNodeId = newKey.entityId;
} }
return null;
}


/** @Override
* @return whether or not merge conflicted with an existing key. This call also clears the conflict flag. void checkConflict( Value[] values )
*/
boolean wasConflict()
{
boolean result = conflict;
if ( conflict )
{ {
conflict = false; // do nothing
} }
return result; };
}


long existingNodeId() static <KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> ConflictDetectingValueMerger<KEY, VALUE> dontCheck()
{ {
return existingNodeId; return DONT_CHECK;
} }


long addedNodeId() static class Check<KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> extends ConflictDetectingValueMerger<KEY, VALUE>
{ {
return addedNodeId; private boolean conflict;
private long existingNodeId;
private long addedNodeId;

@Override
public VALUE merge( KEY existingKey, KEY newKey, VALUE existingValue, VALUE newValue )
{
if ( existingKey.entityId != newKey.entityId )
{
conflict = true;
existingNodeId = existingKey.entityId;
addedNodeId = newKey.entityId;
}
return null;
}

void checkConflict( Value[] values ) throws IndexEntryConflictException
{
if ( conflict )
{
conflict = false;
throw new IndexEntryConflictException( existingNodeId, addedNodeId, ValueTuple.of( values ) );
}
}
} }
} }
Expand Up @@ -74,7 +74,7 @@ public abstract class NativeSchemaNumberIndexPopulator<KEY extends SchemaNumberK
super( pageCache, fs, storeFile, layout, monitor, descriptor, indexId ); super( pageCache, fs, storeFile, layout, monitor, descriptor, indexId );
this.treeKey = layout.newKey(); this.treeKey = layout.newKey();
this.treeValue = layout.newValue(); this.treeValue = layout.newValue();
this.conflictDetectingValueMerger = new ConflictDetectingValueMerger<>(); this.conflictDetectingValueMerger = new ConflictDetectingValueMerger.Check<>();
} }


@Override @Override
Expand Down
Expand Up @@ -27,12 +27,14 @@
import org.neo4j.kernel.api.index.IndexUpdater; import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.values.storable.ValueTuple; import org.neo4j.values.storable.ValueTuple;


import static org.neo4j.kernel.impl.index.schema.ConflictDetectingValueMerger.dontCheck;

class NativeSchemaNumberIndexUpdater<KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> class NativeSchemaNumberIndexUpdater<KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue>
implements IndexUpdater implements IndexUpdater
{ {
private final KEY treeKey; private final KEY treeKey;
private final VALUE treeValue; private final VALUE treeValue;
private final ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger; private final ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger = dontCheck();
private Writer<KEY,VALUE> writer; private Writer<KEY,VALUE> writer;


private boolean closed = true; private boolean closed = true;
Expand All @@ -42,7 +44,6 @@ class NativeSchemaNumberIndexUpdater<KEY extends SchemaNumberKey, VALUE extends
{ {
this.treeKey = treeKey; this.treeKey = treeKey;
this.treeValue = treeValue; this.treeValue = treeValue;
this.conflictDetectingValueMerger = new ConflictDetectingValueMerger<>();
} }


NativeSchemaNumberIndexUpdater<KEY,VALUE> initialize( Writer<KEY,VALUE> writer, boolean manageClosingOfWriter ) NativeSchemaNumberIndexUpdater<KEY,VALUE> initialize( Writer<KEY,VALUE> writer, boolean manageClosingOfWriter )
Expand Down Expand Up @@ -124,7 +125,7 @@ private static <KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> vo
treeKey.from( update.getEntityId(), update.values() ); treeKey.from( update.getEntityId(), update.values() );
treeValue.from( update.values() ); treeValue.from( update.values() );
writer.merge( treeKey, treeValue, conflictDetectingValueMerger ); writer.merge( treeKey, treeValue, conflictDetectingValueMerger );
assertNoConflict( update, conflictDetectingValueMerger ); conflictDetectingValueMerger.checkConflict( update.values() );
} }


static <KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> void processAdd( KEY treeKey, VALUE treeValue, static <KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> void processAdd( KEY treeKey, VALUE treeValue,
Expand All @@ -135,17 +136,6 @@ static <KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> void proce
treeKey.from( update.getEntityId(), update.values() ); treeKey.from( update.getEntityId(), update.values() );
treeValue.from( update.values() ); treeValue.from( update.values() );
writer.merge( treeKey, treeValue, conflictDetectingValueMerger ); writer.merge( treeKey, treeValue, conflictDetectingValueMerger );
assertNoConflict( update, conflictDetectingValueMerger ); conflictDetectingValueMerger.checkConflict( update.values() );
}

private static <KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> void assertNoConflict( IndexEntryUpdate<?> update,
ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger ) throws IndexEntryConflictException
{
if ( conflictDetectingValueMerger.wasConflict() )
{
long existingNodeId = conflictDetectingValueMerger.existingNodeId();
long addedNodeId = conflictDetectingValueMerger.addedNodeId();
throw new IndexEntryConflictException( existingNodeId, addedNodeId, ValueTuple.of( update.values() ) );
}
} }
} }
Expand Up @@ -21,23 +21,28 @@


import org.junit.Test; import org.junit.Test;


import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.values.storable.Value; import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.Values; import org.neo4j.values.storable.Values;


import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.helpers.ArrayUtil.array;
import static org.neo4j.values.storable.Values.stringValue;


public class ConflictDetectingValueMergerTest public class ConflictDetectingValueMergerTest
{ {
private final ConflictDetectingValueMerger<SchemaNumberKey,SchemaNumberValue> detector = new ConflictDetectingValueMerger<>(); private final ConflictDetectingValueMerger<SchemaNumberKey,SchemaNumberValue> detector = new ConflictDetectingValueMerger.Check<>();


@Test @Test
public void shouldReportConflictOnSameValueAndDifferentEntityIds() throws Exception public void shouldReportConflictOnSameValueAndDifferentEntityIds() throws Exception
{ {
// given // given
Value value = Values.of( 123); Value value = Values.of( 123 );
long entityId1 = 10; long entityId1 = 10;
long entityId2 = 20; long entityId2 = 20;


Expand All @@ -50,9 +55,17 @@ public void shouldReportConflictOnSameValueAndDifferentEntityIds() throws Except


// then // then
assertNull( merged ); assertNull( merged );
assertTrue( detector.wasConflict() ); try
assertEquals( entityId1, detector.existingNodeId() ); {
assertEquals( entityId2, detector.addedNodeId() ); detector.checkConflict( array( value ) );
fail( "Should've detected conflict" );
}
catch ( IndexEntryConflictException e )
{
assertEquals( entityId1, e.getExistingNodeId() );
assertEquals( entityId2, e.getAddedNodeId() );
assertEquals( value, e.getSinglePropertyValue() );
}
} }


@Test @Test
Expand All @@ -71,7 +84,7 @@ public void shouldNotReportConflictOnSameValueSameEntityId() throws Exception


// then // then
assertNull( merged ); assertNull( merged );
assertFalse( detector.wasConflict() ); detector.checkConflict( array() ); // <-- should not throw conflict exception
} }


private static SchemaNumberKey key( long entityId, Value... value ) private static SchemaNumberKey key( long entityId, Value... value )
Expand Down
Expand Up @@ -29,7 +29,6 @@


import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexAccessor; import org.neo4j.kernel.api.index.IndexAccessor;
import org.neo4j.kernel.api.index.IndexEntryUpdate; import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexPopulator; import org.neo4j.kernel.api.index.IndexPopulator;
Expand Down Expand Up @@ -124,7 +123,7 @@ public void getPopulatorMustCreateNonUniquePopulatorForTypeGeneral() throws Exce
/* getOnlineAccessor */ /* getOnlineAccessor */


@Test @Test
public void getOnlineAccessorMustCreateUniqueAccessorForTypeUnique() throws Exception public void shouldNotCheckConflictsWhenApplyingUpdatesInOnlineAccessor() throws Exception
{ {
// given // given
provider = newProvider(); provider = newProvider();
Expand All @@ -137,33 +136,6 @@ public void getOnlineAccessorMustCreateUniqueAccessorForTypeUnique() throws Exce
Value value = Values.intValue( 1 ); Value value = Values.intValue( 1 );
indexUpdater.process( IndexEntryUpdate.add( 1, descriptor.schema(), value ) ); indexUpdater.process( IndexEntryUpdate.add( 1, descriptor.schema(), value ) );


// then
try
{
indexUpdater.process( IndexEntryUpdate.add( 2, descriptor.schema(), value ) );
fail( "Should have failed" );
}
catch ( IndexEntryConflictException e )
{
// good
}
}
}

@Test
public void getOnlineAccessorMustCreateNonUniqueAccessorForTypeGeneral() throws Exception
{
// given
provider = newProvider();

// when
IndexDescriptor descriptor = descriptor();
try ( IndexAccessor accessor = provider.getOnlineAccessor( indexId, descriptor, samplingConfig() );
IndexUpdater indexUpdater = accessor.newUpdater( IndexUpdateMode.ONLINE ) )
{
Value value = Values.intValue( 1 );
indexUpdater.process( IndexEntryUpdate.add( 1, descriptor.schema(), value ) );

// then // then
// ... expect no failure on duplicate value // ... expect no failure on duplicate value
indexUpdater.process( IndexEntryUpdate.add( 2, descriptor.schema(), value ) ); indexUpdater.process( IndexEntryUpdate.add( 2, descriptor.schema(), value ) );
Expand Down
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.api.impl.index;

import org.junit.Rule;
import org.junit.Test;

import java.util.Map;

import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.NotFoundException;
import org.neo4j.graphdb.Transaction;
import org.neo4j.test.rule.DatabaseRule;
import org.neo4j.test.rule.ImpermanentDatabaseRule;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

public class AccidentalUniquenessConstraintViolationIT
{
private static final Label Foo = Label.label( "Foo" );
private static final String BAR = "bar";

@Rule
public final DatabaseRule db = new ImpermanentDatabaseRule();

@Test
public void shouldApplyChangesWithIntermediateConstraintViolations() throws Exception
{
// given
try ( Transaction tx = db.beginTx() )
{
db.schema().constraintFor( Foo ).assertPropertyIsUnique( BAR ).create();
tx.success();
}
Node fourtyTwo;
Node fourtyOne;
try ( Transaction tx = db.beginTx() )
{
fourtyTwo = db.createNode( Foo );
fourtyTwo.setProperty( BAR, 42 );
fourtyOne = db.createNode( Foo );
fourtyOne.setProperty( BAR, 41 );
tx.success();
}

// when
try ( Transaction tx = db.beginTx() )
{
Map<String,Object> props = fourtyOne.getAllProperties();
fourtyOne.delete();
fourtyTwo.setProperty( BAR, props.get( BAR ) );
tx.success();
}

// then
try ( Transaction tx = db.beginTx() )
{
assertEquals( 41, fourtyTwo.getProperty( BAR ) );
try
{
fourtyOne.getProperty( BAR );
fail( "Should be deleted" );
}
catch ( NotFoundException e )
{
// good
}
tx.success();
}
}
}

0 comments on commit b9b302d

Please sign in to comment.