Skip to content

Commit

Permalink
Close all index readers opened during lockingNodeUniqueIndexSeek
Browse files Browse the repository at this point in the history
Previously one of the readers was leaked and that forcing
lucene index to keep snapshot forever till next restart.
After X calls that where cause uncontrollable index size and
the number of files grows that were causing all sort of problems: OOD,
OOM, inability to calculate store size over JMX, etc.
  • Loading branch information
MishaDemianenko committed Jan 7, 2019
1 parent dd38b10 commit 84cb493
Show file tree
Hide file tree
Showing 9 changed files with 614 additions and 35 deletions.
Expand Up @@ -24,7 +24,6 @@
import org.neo4j.collection.primitive.PrimitiveLongCollections;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.graphdb.Resource;
import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.internal.kernel.api.NodeCursor;
import org.neo4j.internal.kernel.api.NodeValueIndexCursor;
Expand All @@ -47,7 +46,6 @@ final class DefaultNodeValueIndexCursor extends IndexCursor<IndexProgressor>
implements NodeValueIndexCursor, NodeValueClient
{
private Read read;
private Resource resource;
private long node;
private IndexQuery[] query;
private Value[] values;
Expand Down Expand Up @@ -151,10 +149,9 @@ public boolean next()
}
}

public void setRead( Read read, Resource resource )
public void setRead( Read read )
{
this.read = read;
this.resource = resource;
}

@Override
Expand Down Expand Up @@ -206,18 +203,7 @@ public void close()
this.added = emptyIterator();
this.removed = PrimitiveLongCollections.emptySet();

try
{
if ( resource != null )
{
resource.close();
resource = null;
}
}
finally
{
pool.accept( this );
}
pool.accept( this );
}
}

Expand Down
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.newapi;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;

import org.neo4j.internal.kernel.api.IndexReference;
import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException;
import org.neo4j.storageengine.api.schema.IndexReader;

import static org.neo4j.io.IOUtils.closeAllSilently;

class IndexReaders implements Closeable
{
private final List<IndexReader> indexReaders = new ArrayList<>();
private final IndexReference indexReference;
private final Read read;

IndexReaders( IndexReference indexReference, Read read )
{
this.indexReference = indexReference;
this.read = read;
}

IndexReader createReader() throws IndexNotFoundKernelException
{
IndexReader indexReader = read.indexReader( indexReference, true );
indexReaders.add( indexReader );
return indexReader;
}

@Override
public void close()
{
closeAllSilently( indexReaders );
}
}
Expand Up @@ -27,7 +27,6 @@
import java.util.Map;

import org.neo4j.helpers.collection.CastingIterator;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.internal.kernel.api.CapableIndexReference;
import org.neo4j.internal.kernel.api.CursorFactory;
import org.neo4j.internal.kernel.api.ExplicitIndexRead;
Expand Down Expand Up @@ -98,7 +97,6 @@
import static java.lang.Math.min;
import static org.neo4j.internal.kernel.api.exceptions.schema.ConstraintValidationException.Phase.VALIDATION;
import static org.neo4j.internal.kernel.api.exceptions.schema.SchemaKernelException.OperationContext.CONSTRAINT_CREATION;
import static org.neo4j.internal.kernel.api.schema.SchemaDescriptorPredicates.hasProperty;
import static org.neo4j.kernel.api.StatementConstants.NO_SUCH_NODE;
import static org.neo4j.kernel.api.StatementConstants.NO_SUCH_PROPERTY_KEY;
import static org.neo4j.kernel.api.schema.index.SchemaIndexDescriptor.Type.UNIQUE;
Expand Down Expand Up @@ -418,9 +416,11 @@ private void validateNoExistingNodeWithExactValues( IndexBackedConstraintDescrip
IndexQuery.ExactPredicate[] propertyValues, long modifiedNode
) throws UniquePropertyValueValidationException, UnableToValidateConstraintException
{
try ( DefaultNodeValueIndexCursor valueCursor = cursors.allocateNodeValueIndexCursor() )
SchemaIndexDescriptor schemaIndexDescriptor = constraint.ownedIndexDescriptor();
CapableIndexReference indexReference = allStoreHolder.indexGetCapability( schemaIndexDescriptor );
try ( DefaultNodeValueIndexCursor valueCursor = cursors.allocateNodeValueIndexCursor();
IndexReaders indexReaders = new IndexReaders( indexReference, allStoreHolder ) )
{
SchemaIndexDescriptor schemaIndexDescriptor = constraint.ownedIndexDescriptor();
assertIndexOnline( schemaIndexDescriptor );
int labelId = schemaIndexDescriptor.schema().keyId();

Expand All @@ -430,8 +430,7 @@ private void validateNoExistingNodeWithExactValues( IndexBackedConstraintDescrip
indexEntryResourceId( labelId, propertyValues )
);

allStoreHolder.nodeIndexSeekWithFreshIndexReader(
allStoreHolder.indexGetCapability( schemaIndexDescriptor ), valueCursor, propertyValues );
allStoreHolder.nodeIndexSeekWithFreshIndexReader( valueCursor, indexReaders.createReader(), propertyValues );
if ( valueCursor.next() && valueCursor.nodeReference() != modifiedNode )
{
throw new UniquePropertyValueValidationException( constraint, VALIDATION,
Expand Down
Expand Up @@ -111,7 +111,7 @@ public final void nodeIndexSeek(

DefaultNodeValueIndexCursor cursorImpl = (DefaultNodeValueIndexCursor) cursor;
IndexReader reader = indexReader( index, false );
cursorImpl.setRead( this, null );
cursorImpl.setRead( this );
IndexProgressor.NodeValueClient target = withFullValuePrecision( cursorImpl, query, reader );
reader.query( target, indexOrder, query );
}
Expand All @@ -122,7 +122,7 @@ public void nodeIndexDistinctValues( IndexReference index, NodeValueIndexCursor
ktx.assertOpen();
DefaultNodeValueIndexCursor cursorImpl = (DefaultNodeValueIndexCursor) cursor;
IndexReader reader = indexReader( index, true );
cursorImpl.setRead( this, null );
cursorImpl.setRead( this );
try ( CursorPropertyAccessor accessor = new CursorPropertyAccessor( cursors.allocateNodeCursor(), cursors.allocatePropertyCursor(), this ) )
{
reader.distinctValues( cursorImpl, accessor );
Expand Down Expand Up @@ -189,14 +189,15 @@ public final long lockingNodeUniqueIndexSeek(
//First try to find node under a shared lock
//if not found upgrade to exclusive and try again
locks.acquireShared( lockTracer, INDEX_ENTRY, indexEntryId );
try ( DefaultNodeValueIndexCursor cursor = cursors.allocateNodeValueIndexCursor() )
try ( DefaultNodeValueIndexCursor cursor = cursors.allocateNodeValueIndexCursor();
IndexReaders readers = new IndexReaders( index, this ) )
{
nodeIndexSeekWithFreshIndexReader( index, cursor, predicates );
nodeIndexSeekWithFreshIndexReader( cursor, readers.createReader(), predicates );
if ( !cursor.next() )
{
locks.releaseShared( INDEX_ENTRY, indexEntryId );
locks.acquireExclusive( lockTracer, INDEX_ENTRY, indexEntryId );
nodeIndexSeekWithFreshIndexReader( index, cursor, predicates );
nodeIndexSeekWithFreshIndexReader( cursor, readers.createReader(), predicates );
if ( cursor.next() ) // we found it under the exclusive lock
{
// downgrade to a shared lock
Expand All @@ -210,14 +211,13 @@ public final long lockingNodeUniqueIndexSeek(
}

void nodeIndexSeekWithFreshIndexReader(
IndexReference index,
DefaultNodeValueIndexCursor cursor,
IndexQuery.ExactPredicate... query ) throws IndexNotFoundKernelException, IndexNotApplicableKernelException
IndexReader indexReader,
IndexQuery.ExactPredicate... query ) throws IndexNotApplicableKernelException
{
IndexReader reader = indexReader( index, true );
cursor.setRead( this, reader );
IndexProgressor.NodeValueClient target = withFullValuePrecision( cursor, query, reader );
reader.query( target, IndexOrder.NONE, query );
cursor.setRead( this );
IndexProgressor.NodeValueClient target = withFullValuePrecision( cursor, query, indexReader );
indexReader.query( target, IndexOrder.NONE, query );
}

@Override
Expand All @@ -235,7 +235,7 @@ public final void nodeIndexScan(

// for a scan, we simply query for existence of the first property, which covers all entries in an index
int firstProperty = index.properties()[0];
((DefaultNodeValueIndexCursor) cursor).setRead( this, null );
((DefaultNodeValueIndexCursor) cursor).setRead( this );
indexReader( index, false ).query( (DefaultNodeValueIndexCursor) cursor, indexOrder, IndexQuery.exists( firstProperty ) );
}

Expand Down Expand Up @@ -804,5 +804,4 @@ private void assertPredicatesMatchSchema( IndexReference index, IndexQuery.Exact
}
}
}

}
@@ -0,0 +1,137 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import org.junit.Rule;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.internal.kernel.api.CapableIndexReference;
import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.internal.kernel.api.Read;
import org.neo4j.internal.kernel.api.TokenRead;
import org.neo4j.internal.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.index.schema.tracking.TrackingIndexExtensionFactory;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.test.TestGraphDatabaseFactory;
import org.neo4j.test.rule.TestDirectory;
import org.neo4j.test.rule.fs.DefaultFileSystemRule;

import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.neo4j.graphdb.Label.label;
import static org.neo4j.kernel.impl.index.schema.tracking.TrackingReadersIndexAccessor.numberOfClosedReaders;
import static org.neo4j.kernel.impl.index.schema.tracking.TrackingReadersIndexAccessor.numberOfOpenReaders;

public class UniqueIndexSeekIT
{
@Rule
public final DefaultFileSystemRule fs = new DefaultFileSystemRule();
@Rule
public final TestDirectory directory = TestDirectory.testDirectory( fs );

@Test
public void uniqueIndexSeekDoNotLeakIndexReaders() throws KernelException
{
TrackingIndexExtensionFactory indexExtensionFactory = new TrackingIndexExtensionFactory();
GraphDatabaseAPI database = createDatabase( indexExtensionFactory );
try
{

Label label = label( "spaceship" );
String nameProperty = "name";
createUniqueConstraint( database, label, nameProperty );

generateRandomData( database, label, nameProperty );

assertNotNull( indexExtensionFactory.getIndexProvider() );
assertThat( numberOfClosedReaders(), greaterThan( 0L ) );
assertThat( numberOfOpenReaders(), greaterThan( 0L ) );
assertEquals( numberOfClosedReaders(), numberOfOpenReaders() );

lockNodeUsingUniqueIndexSeek( database, label, nameProperty );

assertEquals( numberOfClosedReaders(), numberOfOpenReaders() );
}
finally
{
database.shutdown();
}
}

private GraphDatabaseAPI createDatabase( TrackingIndexExtensionFactory indexExtensionFactory )
{
return (GraphDatabaseAPI) new TestGraphDatabaseFactory()
.setKernelExtensions( singletonList( indexExtensionFactory ) ).newEmbeddedDatabaseBuilder( directory.graphDbDir() ).newGraphDatabase();
}

private void lockNodeUsingUniqueIndexSeek( GraphDatabaseAPI database, Label label, String nameProperty ) throws KernelException
{
try ( Transaction transaction = database.beginTx() )
{
ThreadToStatementContextBridge contextBridge = database.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class );
KernelTransaction kernelTransaction = contextBridge.getKernelTransactionBoundToThisThread( true );
TokenRead tokenRead = kernelTransaction.tokenRead();
Read dataRead = kernelTransaction.dataRead();

int labelId = tokenRead.nodeLabel( label.name() );
int propertyId = tokenRead.propertyKey( nameProperty );
CapableIndexReference indexReference = kernelTransaction.schemaRead().index( labelId, propertyId );
dataRead.lockingNodeUniqueIndexSeek( indexReference, IndexQuery.ExactPredicate.exact( propertyId, "value" ) );
transaction.success();
}
}

private void generateRandomData( GraphDatabaseAPI database, Label label, String nameProperty )
{
for ( int i = 0; i < 1000; i++ )
{
try ( Transaction transaction = database.beginTx() )
{
Node node = database.createNode( label );
node.setProperty( nameProperty, "PlanetExpress" + i );
transaction.success();
}
}
}

private void createUniqueConstraint( GraphDatabaseAPI database, Label label, String nameProperty )
{
try ( Transaction transaction = database.beginTx() )
{
database.schema().constraintFor( label ).assertPropertyIsUnique( nameProperty ).create();
transaction.success();
}
try ( Transaction transaction = database.beginTx() )
{
database.schema().awaitIndexesOnline( 1, TimeUnit.MINUTES );
transaction.success();
}
}
}

0 comments on commit 84cb493

Please sign in to comment.