Skip to content

Commit

Permalink
Introduce NativeSchemaNumberIndex and separate updater
Browse files Browse the repository at this point in the history
NativeSchemaNumberIndex will act as common parent for
Populator and Accessor.
Updater will be used by both.
Also, clean up test hierarchy.
  • Loading branch information
burqen authored and tinwelint committed Jun 26, 2017
1 parent 4f8f8f4 commit fcb5cd0
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 181 deletions.
Expand Up @@ -45,6 +45,11 @@ public interface RecoveryCleanupWorkCollector extends Lifecycle
*/
RecoveryCleanupWorkCollector IMMEDIATE = new ImmediateRecoveryCleanupWorkCollector();

/**
* Ignore all clean jobs.
*/
RecoveryCleanupWorkCollector NULL = new NullRecoveryCleanupWorkCollector();

/**
* {@link RecoveryCleanupWorkCollector} which runs added {@link CleanupJob} as part of the {@link #add(CleanupJob)}
* call in the caller thread.
Expand All @@ -57,4 +62,12 @@ public void add( CleanupJob job )
job.run();
}
}

class NullRecoveryCleanupWorkCollector extends LifecycleAdapter implements RecoveryCleanupWorkCollector
{
@Override
public void add( CleanupJob job )
{ // no-op
}
}
}
Expand Up @@ -44,7 +44,7 @@ class NativeNonUniqueSchemaNumberIndexPopulator<KEY extends NumberKey, VALUE ext
NativeNonUniqueSchemaNumberIndexPopulator( PageCache pageCache, File storeFile, Layout<KEY,VALUE> layout,
RecoveryCleanupWorkCollector recoveryCleanupWorkCollector, IndexSamplingConfig samplingConfig )
{
super( pageCache, storeFile, layout, recoveryCleanupWorkCollector );
super( pageCache, storeFile, layout );
this.samplingConfig = samplingConfig;
}

Expand Down
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;

import org.neo4j.index.internal.gbptree.GBPTree;
import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.index.internal.gbptree.RecoveryCleanupWorkCollector;
import org.neo4j.io.pagecache.PageCache;

import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_READER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;

class NativeSchemaNumberIndex<KEY extends NumberKey, VALUE extends NumberValue>
{
final PageCache pageCache;
final File storeFile;
final Layout<KEY,VALUE> layout;

GBPTree<KEY,VALUE> tree;

NativeSchemaNumberIndex( PageCache pageCache, File storeFile, Layout<KEY,VALUE> layout )
{
this.pageCache = pageCache;
this.storeFile = storeFile;
this.layout = layout;
}

void instantiateTree( RecoveryCleanupWorkCollector recoveryCleanupWorkCollector ) throws IOException
{
tree = new GBPTree<>( pageCache, storeFile, layout, 0, NO_MONITOR, NO_HEADER_READER, NO_HEADER_WRITER,
recoveryCleanupWorkCollector );
}

void closeTree() throws IOException
{
tree = closeIfPresent( tree );
}

<T extends Closeable> T closeIfPresent( T closeable ) throws IOException
{
if ( closeable != null )
{
closeable.close();
}
return null;
}
}
Expand Up @@ -19,13 +19,11 @@
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;

import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.index.internal.gbptree.GBPTree;
import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.index.internal.gbptree.RecoveryCleanupWorkCollector;
Expand All @@ -38,11 +36,6 @@
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.impl.index.GBPTreeUtil;
import org.neo4j.values.ValueTuple;

import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_READER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;

/**
* {@link IndexPopulator} backed by a {@link GBPTree}.
Expand All @@ -51,55 +44,41 @@
* @param <VALUE> type of {@link NumberValue}.
*/
public abstract class NativeSchemaNumberIndexPopulator<KEY extends NumberKey, VALUE extends NumberValue>
implements IndexPopulator
extends NativeSchemaNumberIndex<KEY,VALUE> implements IndexPopulator
{
static final byte BYTE_ONLINE = 1;
static final byte BYTE_FAILED = 0;

private final PageCache pageCache;
private final File storeFile;
private final KEY treeKey;
private final VALUE treeValue;
private final RecoveryCleanupWorkCollector recoveryCleanupWorkCollector;
private final ConflictDetectingValueMerger<VALUE> conflictDetectingValueMerger;
protected final Layout<KEY,VALUE> layout;
private final NativeSchemaNumberIndexUpdater<KEY,VALUE> singleUpdater;

private Writer<KEY,VALUE> singleWriter;
private Writer<KEY,VALUE> singleTreeWriter;
private byte[] failureBytes;
private boolean dropped;

GBPTree<KEY,VALUE> tree;

NativeSchemaNumberIndexPopulator( PageCache pageCache, File storeFile, Layout<KEY,VALUE> layout,
RecoveryCleanupWorkCollector recoveryCleanupWorkCollector )
NativeSchemaNumberIndexPopulator( PageCache pageCache, File storeFile, Layout<KEY,VALUE> layout )
{
this.pageCache = pageCache;
this.storeFile = storeFile;
this.layout = layout;
super( pageCache, storeFile, layout );
this.treeKey = layout.newKey();
this.treeValue = layout.newValue();
this.recoveryCleanupWorkCollector = recoveryCleanupWorkCollector;
this.conflictDetectingValueMerger = new ConflictDetectingValueMerger<>();
singleUpdater = new NativeSchemaNumberIndexUpdater<>( layout.newKey(), layout.newValue() );
}

@Override
public synchronized void create() throws IOException
{
GBPTreeUtil.deleteIfPresent( pageCache, storeFile );
instantiateTree();
instantiateTree( RecoveryCleanupWorkCollector.IMMEDIATE );
instantiateWriter();
}

private void instantiateTree() throws IOException
{
tree = new GBPTree<>( pageCache, storeFile, layout, 0, NO_MONITOR, NO_HEADER_READER,
NO_HEADER_WRITER, recoveryCleanupWorkCollector );
}

void instantiateWriter() throws IOException
{
assert singleWriter == null;
singleWriter = tree.writer();
assert singleTreeWriter == null;
singleTreeWriter = tree.writer();
}

@Override
Expand Down Expand Up @@ -129,15 +108,8 @@ public void add( Collection<? extends IndexEntryUpdate<?>> updates ) throws Inde
@Override
public void add( IndexEntryUpdate<?> update ) throws IndexEntryConflictException, IOException
{
treeKey.from( update.getEntityId(), update.values() );
treeValue.from( update.getEntityId(), update.values() );
singleWriter.merge( treeKey, treeValue, conflictDetectingValueMerger );
if ( conflictDetectingValueMerger.wasConflict() )
{
long existingNodeId = conflictDetectingValueMerger.existingNodeId();
long addedNodeId = conflictDetectingValueMerger.addedNodeId();
throw new IndexEntryConflictException( existingNodeId, addedNodeId, ValueTuple.of( update.values() ) );
}
NativeSchemaNumberIndexUpdater
.processAdd( treeKey, treeValue, update, singleTreeWriter, conflictDetectingValueMerger );
}

@Override
Expand All @@ -150,7 +122,7 @@ public void verifyDeferredConstraints( PropertyAccessor propertyAccessor )
@Override
public IndexUpdater newPopulatingUpdater( PropertyAccessor accessor ) throws IOException
{
return new NativeSchemaIndexUpdater();
return singleUpdater.initialize( singleTreeWriter, false );
}

@Override
Expand Down Expand Up @@ -199,7 +171,7 @@ private void ensureTreeInstantiated() throws IOException
{
if ( tree == null )
{
instantiateTree();
instantiateTree( RecoveryCleanupWorkCollector.NULL );
}
}

Expand All @@ -225,49 +197,8 @@ private void markTreeAsOnline() throws IOException
tree.checkpoint( IOLimiter.unlimited(), ( pc ) -> pc.putByte( BYTE_ONLINE ) );
}

private <T extends Closeable> T closeIfPresent( T closeable ) throws IOException
{
if ( closeable != null )
{
closeable.close();
}
return null;
}

void closeWriter() throws IOException
{
singleWriter = closeIfPresent( singleWriter );
}

private void closeTree() throws IOException
{
tree = closeIfPresent( tree );
}

private class NativeSchemaIndexUpdater implements IndexUpdater
{
private boolean closed;

@Override
public void process( IndexEntryUpdate update ) throws IOException, IndexEntryConflictException
{
if ( closed )
{
throw new IllegalStateException( "Index updater has been closed." );
}
add( update );
}

@Override
public void remove( PrimitiveLongSet nodeIds ) throws IOException
{
throw new UnsupportedOperationException( "Implement me" );
}

@Override
public void close() throws IOException, IndexEntryConflictException
{
closed = true;
}
singleTreeWriter = closeIfPresent( singleTreeWriter );
}
}
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.IOException;

import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.index.internal.gbptree.Writer;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.values.ValueTuple;

class NativeSchemaNumberIndexUpdater<KEY extends NumberKey, VALUE extends NumberValue>
implements IndexUpdater
{
private final KEY treeKey;
private final VALUE treeValue;
private final ConflictDetectingValueMerger<VALUE> conflictDetectingValueMerger;
private Writer<KEY,VALUE> writer;

private boolean closed = true;
private boolean manageClosingOfWriter;

NativeSchemaNumberIndexUpdater( KEY treeKey, VALUE treeValue )
{
this.treeKey = treeKey;
this.treeValue = treeValue;
this.conflictDetectingValueMerger = new ConflictDetectingValueMerger<VALUE>();
}

NativeSchemaNumberIndexUpdater<KEY,VALUE> initialize( Writer<KEY,VALUE> writer, boolean manageClosingOfWriter )
{
if ( !closed )
{
throw new IllegalStateException( "Updater still open" );
}

this.manageClosingOfWriter = manageClosingOfWriter;
this.writer = writer;
closed = false;
return this;
}

@Override
public void process( IndexEntryUpdate update ) throws IOException, IndexEntryConflictException
{
assertOpen();
processAdd( treeKey, treeValue, update, writer, conflictDetectingValueMerger );
}

@Override
public void close() throws IOException, IndexEntryConflictException
{
if ( manageClosingOfWriter )
{
writer.close();
}
closed = true;
}

@Override
public void remove( PrimitiveLongSet nodeIds ) throws IOException
{
assertOpen();
}

private void assertOpen()
{
if ( closed )
{
throw new IllegalStateException( "Updater has been closed" );
}
}

static <KEY extends NumberKey, VALUE extends NumberValue> void processAdd( KEY treeKey, VALUE treeValue,
IndexEntryUpdate update, Writer<KEY,VALUE> singleWriter,
ConflictDetectingValueMerger<VALUE> conflictDetectingValueMerger )
throws IOException, IndexEntryConflictException
{
treeKey.from( update.getEntityId(), update.values() );
treeValue.from( update.getEntityId(), update.values() );
singleWriter.merge( treeKey, treeValue, conflictDetectingValueMerger );
if ( conflictDetectingValueMerger.wasConflict() )
{
long existingNodeId = conflictDetectingValueMerger.existingNodeId();
long addedNodeId = conflictDetectingValueMerger.addedNodeId();
throw new IndexEntryConflictException( existingNodeId, addedNodeId, ValueTuple.of( update.values() ) );
}
}
}
Expand Up @@ -39,7 +39,7 @@ class NativeUniqueSchemaNumberIndexPopulator<KEY extends NumberKey, VALUE extend
NativeUniqueSchemaNumberIndexPopulator( PageCache pageCache, File storeFile, Layout<KEY,VALUE> layout,
RecoveryCleanupWorkCollector recoveryCleanupWorkCollector )
{
super( pageCache, storeFile, layout, recoveryCleanupWorkCollector );
super( pageCache, storeFile, layout );
this.sampler = new UniqueIndexSampler();
}

Expand Down

0 comments on commit fcb5cd0

Please sign in to comment.