Skip to content

Commit

Permalink
Enable online updating for temporal values
Browse files Browse the repository at this point in the history
  • Loading branch information
OliviaYtterbrink authored and Lojjs committed Mar 5, 2018
1 parent 789832f commit 1e705c8
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 43 deletions.
Expand Up @@ -74,7 +74,7 @@ public void drop() throws IOException
}

@Override
public IndexUpdater newUpdater( IndexUpdateMode mode )
public NativeSchemaIndexUpdater<KEY, VALUE> newUpdater( IndexUpdateMode mode )
{
assertOpen();
try
Expand Down
Expand Up @@ -34,9 +34,8 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexAccessor;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
Expand Down Expand Up @@ -76,19 +75,7 @@ public void drop() throws IOException
@Override
public IndexUpdater newUpdater( IndexUpdateMode mode )
{
return new IndexUpdater()
{
@Override
public void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntryConflictException
{
throw new UnsupportedOperationException( "Not yet" );
}

@Override
public void close() throws IOException, IndexEntryConflictException
{
}
};
return new TemporalIndexUpdater( this, mode );
}

@Override
Expand Down Expand Up @@ -243,7 +230,9 @@ static class PartFactory implements TemporalIndexCache.Factory<PartAccessor<?>,
@Override
public PartAccessor<?> newDate() throws IOException
{
return new PartAccessor<>( pageCache, fs, temporalIndexFiles.date(),
TemporalIndexFiles.FileLayout<DateSchemaKey> fileLayout = temporalIndexFiles.date();
ensureIndexExists( fileLayout );
return new PartAccessor<>( pageCache, fs, fileLayout,
recoveryCleanupWorkCollector, monitor, descriptor, indexId, samplingConfig );
}

Expand Down Expand Up @@ -276,5 +265,20 @@ public PartAccessor<?> newDuration() throws IOException
{
throw new UnsupportedOperationException( "no comprende" );
}

private <KEY extends NativeSchemaKey> void ensureIndexExists( TemporalIndexFiles.FileLayout<KEY> fileLayout ) throws IOException
{
if ( !fs.fileExists( fileLayout.indexFile ) )
{
createEmptyIndex( fileLayout );
}
}

private <KEY extends NativeSchemaKey> void createEmptyIndex( TemporalIndexFiles.FileLayout<KEY> fileLayout ) throws IOException
{
IndexPopulator populator = new TemporalIndexPopulator.PartPopulator<>( pageCache, fs, fileLayout, monitor, descriptor, indexId );
populator.create();
populator.close( true );
}
}
}
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.IOException;

import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.impl.api.index.IndexUpdateMode;
import org.neo4j.kernel.impl.index.schema.fusion.FusionIndexUtils;

public class TemporalIndexUpdater extends TemporalIndexCache<NativeSchemaIndexUpdater<?, NativeSchemaValue>,IOException> implements IndexUpdater
{
TemporalIndexUpdater( TemporalIndexAccessor accessor, IndexUpdateMode mode )
{
super( new PartFactory( accessor, mode ) );
}

@Override
public void process( IndexEntryUpdate<?> update ) throws IOException, IndexEntryConflictException
{
IndexUpdater to = select( update.values()[0].valueGroup() );
switch ( update.updateMode() )
{
case ADDED:
case REMOVED:
to.process( update );
break;
case CHANGED:
IndexUpdater from = select( update.beforeValues()[0].valueGroup() );
// There are two cases:
// - both before/after go into the same updater --> pass update into that updater
if ( from == to )
{
from.process( update );
}
// - before go into one and after into the other --> REMOVED from one and ADDED into the other
else
{
from.process( IndexEntryUpdate.remove( update.getEntityId(), update.indexKey(), update.beforeValues() ) );
to.process( IndexEntryUpdate.add( update.getEntityId(), update.indexKey(), update.values() ) );
}
break;
default:
throw new IllegalArgumentException( "Unknown update mode" );
}
}

@Override
public void close() throws IOException, IndexEntryConflictException
{
FusionIndexUtils.forAll( NativeSchemaIndexUpdater::close, this );
}

static class PartFactory implements TemporalIndexCache.Factory<NativeSchemaIndexUpdater<?, NativeSchemaValue>,IOException>
{

private final TemporalIndexAccessor accessor;
private final IndexUpdateMode mode;

PartFactory( TemporalIndexAccessor accessor, IndexUpdateMode mode )
{
this.accessor = accessor;
this.mode = mode;
}

@Override
public NativeSchemaIndexUpdater<?, NativeSchemaValue> newDate() throws IOException
{
return accessor.date().newUpdater( mode );
}

@Override
public NativeSchemaIndexUpdater<?, NativeSchemaValue> newDateTime() throws IOException
{
throw new UnsupportedOperationException( "ma-a-da dayo" );
}

@Override
public NativeSchemaIndexUpdater<?, NativeSchemaValue> newDateTimeZoned() throws IOException
{
throw new UnsupportedOperationException( "ma-a-da dayo" );
}

@Override
public NativeSchemaIndexUpdater<?, NativeSchemaValue> newTime() throws IOException
{
throw new UnsupportedOperationException( "ma-a-da dayo" );
}

@Override
public NativeSchemaIndexUpdater<?, NativeSchemaValue> newTimeZoned() throws IOException
{
throw new UnsupportedOperationException( "ma-a-da dayo" );
}

@Override
public NativeSchemaIndexUpdater<?, NativeSchemaValue> newDuration() throws IOException
{
throw new UnsupportedOperationException( "ma-a-da dayo" );
}
}
}
Expand Up @@ -25,6 +25,7 @@
import org.junit.runners.Suite;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -71,9 +72,10 @@ public abstract static class Compatibility

protected File graphDbDir;
protected FileSystemAbstraction fs;
protected final IndexProviderCompatibilityTestSuite testSuite;
protected SchemaIndexProvider indexProvider;
protected IndexDescriptor descriptor;
final IndexProviderCompatibilityTestSuite testSuite;
final List<NodeAndValue> allValues;

@Before
public void setup()
Expand All @@ -88,6 +90,7 @@ public Compatibility( IndexProviderCompatibilityTestSuite testSuite, IndexDescri
{
this.testSuite = testSuite;
this.descriptor = descriptor;
this.allValues = allValues( testSuite.getSupportedValues() );
pageCacheAndDependenciesRule = new PageCacheAndDependenciesRule( DefaultFileSystemRule::new, testSuite.getClass() );
}

Expand Down Expand Up @@ -115,5 +118,28 @@ protected void withPopulator( IndexPopulator populator, ThrowingConsumer<IndexPo
}
}
}

private static List<NodeAndValue> allValues( Iterable<Value> supportedValues )
{
long nodeIds = 0;
List<NodeAndValue> result = new ArrayList<>();
for ( Value value : supportedValues )
{
result.add( new NodeAndValue( nodeIds++, value ) );
}
return result;
}

static class NodeAndValue
{
final long nodeId;
final Value value;

NodeAndValue( long nodeId, Value value )
{
this.nodeId = nodeId;
this.value = value;
}
}
}
}
Expand Up @@ -22,7 +22,9 @@
import org.junit.Ignore;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
Expand Down Expand Up @@ -121,6 +123,23 @@ public void testIndexSeekByPrefixOnNonStrings() throws Exception
assertThat( query( IndexQuery.stringPrefix( 1, "2" ) ), equalTo( EMPTY_LIST ) );
}

@Test
public void shouldUpdateWithAllValues() throws Exception
{
// GIVEN
List<IndexEntryUpdate<?>> updates = new ArrayList<>();
allValues.forEach( entry -> updates.add( IndexEntryUpdate.add( entry.nodeId, descriptor.schema(), entry.value ) ) );
updateAndCommit( updates );

// then
int propertyKeyId = descriptor.schema().getPropertyId();
for ( NodeAndValue entry : allValues )
{
List<Long> result = query( IndexQuery.exact( propertyKeyId, entry.value ) );
assertThat( result, equalTo( Collections.singletonList( entry.nodeId ) ) );
}
}

// This behaviour is expected by General indexes

@Ignore( "Not a test. This is a compatibility suite" )
Expand Down
Expand Up @@ -159,31 +159,6 @@ public void shouldApplyUpdatesIdempotently() throws Exception
}
}

private List<NodeAndValue> allValues( Iterable<Value> supportedValues )
{
long nodeIds = 0;
List<NodeAndValue> result = new ArrayList<>();
for ( Value value : supportedValues )
{
result.add( new NodeAndValue( nodeIds++, value ) );
}
return result;
}

private List<NodeAndValue> allValues = allValues( testSuite.getSupportedValues() );

static class NodeAndValue
{
final long nodeId;
final Value value;

NodeAndValue( long nodeId, Value value )
{
this.nodeId = nodeId;
this.value = value;
}
}

@Test
public void shouldPopulateWithAllValues() throws Exception
{
Expand Down

0 comments on commit 1e705c8

Please sign in to comment.