Skip to content

Commit

Permalink
Avoid accumulation of index updates during recovery
Browse files Browse the repository at this point in the history
Do not collect index updates during recovery since that can cause OOM.
Use iterators instead to iterate over required updates.
  • Loading branch information
MishaDemianenko committed May 15, 2017
1 parent a794834 commit 0d31a08
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 65 deletions.
Expand Up @@ -59,9 +59,9 @@ <FAILURE extends Exception> StoreScan<FAILURE> visitNodes(
* and puts those updates into {@code target}. * and puts those updates into {@code target}.
* *
* @param nodeId id of node to load. * @param nodeId id of node to load.
* @param target {@link Collection} to add updates into. * @return node updates container
*/ */
void nodeAsUpdates( long nodeId, Collection<NodeUpdates> target ); NodeUpdates nodeAsUpdates( long nodeId );


DoubleLongRegister indexUpdatesAndSize( long indexId, DoubleLongRegister output ); DoubleLongRegister indexUpdatesAndSize( long indexId, DoubleLongRegister output );


Expand Down Expand Up @@ -129,8 +129,9 @@ public void replaceIndexCounts( long indexId, long uniqueElements, long maxUniqu
} }


@Override @Override
public void nodeAsUpdates( long nodeId, Collection<NodeUpdates> target ) public NodeUpdates nodeAsUpdates( long nodeId )
{ {
return null;
} }


@Override @Override
Expand Down
Expand Up @@ -26,17 +26,17 @@
import java.util.EnumMap; import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;


import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.collection.primitive.PrimitiveLongSet; import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.collection.primitive.PrimitiveLongVisitor;
import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.helpers.collection.Iterators; import org.neo4j.helpers.collection.Iterators;
import org.neo4j.kernel.api.TokenNameLookup; import org.neo4j.kernel.api.TokenNameLookup;
import org.neo4j.kernel.api.exceptions.index.IndexActivationFailedKernelException; import org.neo4j.kernel.api.exceptions.index.IndexActivationFailedKernelException;
Expand Down Expand Up @@ -115,7 +115,9 @@ public interface Monitor
{ {
void applyingRecoveredData( PrimitiveLongSet recoveredNodeIds ); void applyingRecoveredData( PrimitiveLongSet recoveredNodeIds );


void appliedRecoveredData( Iterable<NodeUpdates> updates ); void applyingRecoveredUpdate( IndexEntryUpdate<LabelSchemaDescriptor> indexUpdate );

void recoveredUpdatesApplied();


void populationCompleteOn( IndexDescriptor descriptor ); void populationCompleteOn( IndexDescriptor descriptor );


Expand All @@ -127,7 +129,7 @@ public interface Monitor
public static class MonitorAdapter implements Monitor public static class MonitorAdapter implements Monitor
{ {
@Override @Override
public void appliedRecoveredData( Iterable<NodeUpdates> updates ) public void recoveredUpdatesApplied()
{ // Do nothing { // Do nothing
} }


Expand All @@ -136,6 +138,12 @@ public void applyingRecoveredData( PrimitiveLongSet recoveredNodeIds )
{ // Do nothing { // Do nothing
} }


@Override
public void applyingRecoveredUpdate( IndexEntryUpdate<LabelSchemaDescriptor> indexUpdate )
{
// empty
}

@Override @Override
public void populationCompleteOn( IndexDescriptor descriptor ) public void populationCompleteOn( IndexDescriptor descriptor )
{ // Do nothing { // Do nothing
Expand Down Expand Up @@ -440,12 +448,6 @@ private void apply( Iterable<IndexEntryUpdate<LabelSchemaDescriptor>> updates, I
} }
} }


public void convertToIndexUpdatesAndApply( Iterable<NodeUpdates> updates, IndexUpdateMode updateMode )
throws IOException, IndexEntryConflictException
{
apply( Iterables.flatMap( this::convertToIndexUpdates, updates ), updateMode );
}

@Override @Override
public Iterable<IndexEntryUpdate<LabelSchemaDescriptor>> convertToIndexUpdates( NodeUpdates nodeUpdates ) public Iterable<IndexEntryUpdate<LabelSchemaDescriptor>> convertToIndexUpdates( NodeUpdates nodeUpdates )
{ {
Expand Down Expand Up @@ -534,28 +536,29 @@ private void applyRecoveredUpdates() throws IOException, IndexEntryConflictExcep
updater.remove( recoveredNodeIds ); updater.remove( recoveredNodeIds );
} }


Iterable<NodeUpdates> updates = readNodeUpdatesFromRecoveredStore(); Iterator<NodeUpdates> updates = recoveredNodeUpdatesIterator();
convertToIndexUpdatesAndApply( updates, IndexUpdateMode.RECOVERY ); Iterator<IndexEntryUpdate<LabelSchemaDescriptor>> indexEntryUpdates =
monitor.appliedRecoveredData( updates ); Iterators.flatMap( nodeUpdates -> convertToIndexUpdates( nodeUpdates ).iterator(), updates );
while ( indexEntryUpdates.hasNext() )
{
IndexEntryUpdate<LabelSchemaDescriptor> indexUpdate = indexEntryUpdates.next();
monitor.applyingRecoveredUpdate( indexUpdate );
IndexUpdater updater = updaterMap.getUpdater( indexUpdate.indexKey().schema() );
if ( updater != null )
{
updater.process( indexUpdate );
}
}
monitor.recoveredUpdatesApplied();
} }
} }
recoveredNodeIds.clear(); recoveredNodeIds.clear();
} }


private Iterable<NodeUpdates> readNodeUpdatesFromRecoveredStore() private Iterator<NodeUpdates> recoveredNodeUpdatesIterator()
{ {
final List<NodeUpdates> nodeUpdates = new ArrayList<>(); PrimitiveLongIterator nodeIdIterator = recoveredNodeIds.iterator();
recoveredNodeIds.visitKeys( new PrimitiveLongVisitor<RuntimeException>() return new NodeUpdatesIterator( storeView, nodeIdIterator );
{
@Override
public boolean visited( long nodeId )
{
storeView.nodeAsUpdates( nodeId, nodeUpdates );
return false;
}
} );

return nodeUpdates;
} }


public void dropIndex( IndexRule rule ) public void dropIndex( IndexRule rule )
Expand Down
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api.index;

import java.util.Iterator;

import org.neo4j.collection.primitive.PrimitiveLongIterator;

class NodeUpdatesIterator implements Iterator<NodeUpdates>
{

private final IndexStoreView storeView;
private final PrimitiveLongIterator nodeIdIterator;
private NodeUpdates nextUpdate;

NodeUpdatesIterator( IndexStoreView storeView, PrimitiveLongIterator nodeIdIterator )
{
this.storeView = storeView;
this.nodeIdIterator = nodeIdIterator;
}

@Override
public boolean hasNext()
{
if ( nextUpdate == null )
{
while ( nodeIdIterator.hasNext() )
{
long nodeId = nodeIdIterator.next();
NodeUpdates updates = storeView.nodeAsUpdates( nodeId );
if ( updates != null )
{
nextUpdate = updates;
return true;
}
}
return false;
}
return true;
}

@Override
public NodeUpdates next()
{
NodeUpdates update = null;
if ( hasNext() )
{
update = this.nextUpdate;
this.nextUpdate = null;
}
return update;
}
}
Expand Up @@ -19,17 +19,16 @@
*/ */
package org.neo4j.kernel.impl.transaction.state.storeview; package org.neo4j.kernel.impl.transaction.state.storeview;


import java.util.Collection;
import java.util.function.IntPredicate; import java.util.function.IntPredicate;


import org.neo4j.collection.primitive.PrimitiveIntSet; import org.neo4j.collection.primitive.PrimitiveIntSet;
import org.neo4j.helpers.collection.Visitor; import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.api.exceptions.EntityNotFoundException; import org.neo4j.kernel.api.exceptions.EntityNotFoundException;
import org.neo4j.kernel.impl.api.index.NodeUpdates;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate; import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.kernel.api.properties.Property; import org.neo4j.kernel.api.properties.Property;
import org.neo4j.kernel.impl.api.CountsAccessor; import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.api.index.IndexStoreView; import org.neo4j.kernel.impl.api.index.IndexStoreView;
import org.neo4j.kernel.impl.api.index.NodeUpdates;
import org.neo4j.kernel.impl.api.index.StoreScan; import org.neo4j.kernel.impl.api.index.StoreScan;
import org.neo4j.kernel.impl.locking.LockService; import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.NeoStores; import org.neo4j.kernel.impl.store.NeoStores;
Expand Down Expand Up @@ -107,22 +106,22 @@ public <FAILURE extends Exception> StoreScan<FAILURE> visitNodes(
} }


@Override @Override
public void nodeAsUpdates( long nodeId, Collection<NodeUpdates> target ) public NodeUpdates nodeAsUpdates( long nodeId )
{ {
NodeRecord node = nodeStore.getRecord( nodeId, nodeStore.newRecord(), FORCE ); NodeRecord node = nodeStore.getRecord( nodeId, nodeStore.newRecord(), FORCE );
if ( !node.inUse() ) if ( !node.inUse() )
{ {
return; return null;
} }
long firstPropertyId = node.getNextProp(); long firstPropertyId = node.getNextProp();
if ( firstPropertyId == Record.NO_NEXT_PROPERTY.intValue() ) if ( firstPropertyId == Record.NO_NEXT_PROPERTY.intValue() )
{ {
return; // no properties => no updates (it's not going to be in any index) return null; // no properties => no updates (it's not going to be in any index)
} }
long[] labels = parseLabelsField( node ).get( nodeStore ); long[] labels = parseLabelsField( node ).get( nodeStore );
if ( labels.length == 0 ) if ( labels.length == 0 )
{ {
return; // no labels => no updates (it's not going to be in any index) return null; // no labels => no updates (it's not going to be in any index)
} }
NodeUpdates.Builder update = NodeUpdates.forNode( nodeId, labels ); NodeUpdates.Builder update = NodeUpdates.forNode( nodeId, labels );
for ( PropertyRecord propertyRecord : propertyStore.getPropertyRecordChain( firstPropertyId ) ) for ( PropertyRecord propertyRecord : propertyStore.getPropertyRecordChain( firstPropertyId ) )
Expand All @@ -133,7 +132,7 @@ public void nodeAsUpdates( long nodeId, Collection<NodeUpdates> target )
update.added( property.getKeyIndexId(), value ); update.added( property.getKeyIndexId(), value );
} }
} }
target.add( update.build() ); return update.build();
} }


@Override @Override
Expand Down
Expand Up @@ -38,6 +38,7 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
Expand All @@ -52,7 +53,6 @@
import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.collection.BoundedIterable; import org.neo4j.helpers.collection.BoundedIterable;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.helpers.collection.Iterators; import org.neo4j.helpers.collection.Iterators;
import org.neo4j.helpers.collection.Visitor; import org.neo4j.helpers.collection.Visitor;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
Expand Down Expand Up @@ -684,17 +684,19 @@ public void recoveredUpdatesShouldBeApplied() throws Exception


final NodeUpdates nodeUpdate1 = addNodeUpdate( nodeId1, "foo" ); final NodeUpdates nodeUpdate1 = addNodeUpdate( nodeId1, "foo" );
final NodeUpdates nodeUpdate2 = addNodeUpdate( nodeId2, "bar" ); final NodeUpdates nodeUpdate2 = addNodeUpdate( nodeId2, "bar" );
final Set<NodeUpdates> nodeUpdates = asSet( nodeUpdate1, nodeUpdate2 );


final AtomicBoolean applyingRecoveredDataCalled = new AtomicBoolean(); final AtomicBoolean applyingRecoveredDataCalled = new AtomicBoolean();
final AtomicBoolean appliedRecoveredDataCalled = new AtomicBoolean(); final AtomicBoolean recoveredUpdatesAppliedCalled = new AtomicBoolean();


// Mockito not used here because it does not work well with mutable objects (set of recovered node ids in // Mockito not used here because it does not work well with mutable objects (set of recovered node ids in
// this case, which is cleared at the end of recovery). // this case, which is cleared at the end of recovery).
// See https://code.google.com/p/mockito/issues/detail?id=126 and // See https://code.google.com/p/mockito/issues/detail?id=126 and
// https://groups.google.com/forum/#!topic/mockito/_A4BpsEAY9s // https://groups.google.com/forum/#!topic/mockito/_A4BpsEAY9s
IndexingService.Monitor monitor = new IndexingService.MonitorAdapter() IndexingService.Monitor monitor = new IndexingService.MonitorAdapter()
{ {

private Set<IndexEntryUpdate<LabelSchemaDescriptor>> updates = new HashSet<>();

@Override @Override
public void applyingRecoveredData( PrimitiveLongSet recoveredNodeIds ) public void applyingRecoveredData( PrimitiveLongSet recoveredNodeIds )
{ {
Expand All @@ -703,19 +705,26 @@ public void applyingRecoveredData( PrimitiveLongSet recoveredNodeIds )
} }


@Override @Override
public void appliedRecoveredData( Iterable<NodeUpdates> updates ) public void applyingRecoveredUpdate( IndexEntryUpdate<LabelSchemaDescriptor> indexUpdate )
{ {
assertEquals( nodeUpdates, Iterables.asSet( updates ) ); updates.add( indexUpdate );
appliedRecoveredDataCalled.set( true ); }

@Override
public void recoveredUpdatesApplied()
{
recoveredUpdatesAppliedCalled.set( true );
assertEquals( 2, updates.size() );
} }
}; };


IndexingService indexing = newIndexingServiceWithMockedDependencies( populator, accessor, withData(), monitor ); IndexingService indexing = newIndexingServiceWithMockedDependencies( populator, accessor, withData(), monitor );
indexing.createIndexes( IndexRule
.indexRule( 1, IndexDescriptorFactory.forLabel( labelId, index.schema().getPropertyId() ),
new SchemaIndexProvider.Descriptor( "quantum-dex", "25.0" ) ) );


doAnswer( nodeUpdatesAnswer( nodeUpdate1 ) ).when( storeView ) when( storeView.nodeAsUpdates( eq( nodeId1 ) ) ).thenReturn( nodeUpdate1 );
.nodeAsUpdates( eq( nodeId1 ), any( Collection.class ) ); when( storeView.nodeAsUpdates( eq( nodeId2 ) ) ).thenReturn( nodeUpdate2 );
doAnswer( nodeUpdatesAnswer( nodeUpdate2 ) ).when( storeView )
.nodeAsUpdates( eq( nodeId2 ), any( Collection.class ) );


// When // When
life.init(); life.init();
Expand All @@ -725,7 +734,7 @@ public void appliedRecoveredData( Iterable<NodeUpdates> updates )


// Then // Then
assertTrue( "applyingRecoveredData was not called", applyingRecoveredDataCalled.get() ); assertTrue( "applyingRecoveredData was not called", applyingRecoveredDataCalled.get() );
assertTrue( "appliedRecoveredData was not called", appliedRecoveredDataCalled.get() ); assertTrue( "recoveredUpdatesApplied was not called", recoveredUpdatesAppliedCalled.get() );
} }


private IndexUpdates nodeIdsAsIndexUpdates( PrimitiveLongSet nodeIds ) private IndexUpdates nodeIdsAsIndexUpdates( PrimitiveLongSet nodeIds )
Expand Down Expand Up @@ -769,8 +778,7 @@ public void shouldNotLoseIndexDescriptorDueToOtherSimilarIndexDuringRecovery() t
// GIVEN // GIVEN
long nodeId = 0, indexId = 1, otherIndexId = 2; long nodeId = 0, indexId = 1, otherIndexId = 2;
NodeUpdates update = addNodeUpdate( nodeId, "value" ); NodeUpdates update = addNodeUpdate( nodeId, "value" );
doAnswer( nodeUpdatesAnswer( update ) ).when( storeView ) when( storeView.nodeAsUpdates( eq( nodeId ) ) ).thenReturn( update );
.nodeAsUpdates( eq( nodeId ), any( Collection.class ) );
DoubleLongRegister register = mock( DoubleLongRegister.class ); DoubleLongRegister register = mock( DoubleLongRegister.class );
when( register.readSecond() ).thenReturn( 42L ); when( register.readSecond() ).thenReturn( 42L );
when( storeView.indexSample( anyLong(), any( DoubleLongRegister.class ) ) ) when( storeView.indexSample( anyLong(), any( DoubleLongRegister.class ) ) )
Expand All @@ -797,9 +805,7 @@ populator, accessor, withData( update ), index
// and WHEN starting, i.e. completing recovery // and WHEN starting, i.e. completing recovery
life.start(); life.start();


// THEN our index should still have been recovered properly verify( accessor ).newUpdater( RECOVERY );
// apparently we create updaters two times during recovery, get over it
verify( accessor, times( 2 ) ).newUpdater( RECOVERY );
} }


@Test @Test
Expand Down Expand Up @@ -1369,17 +1375,6 @@ public ResourceIterator<File> snapshotFiles()
} }
} }


@SuppressWarnings( "unchecked" )
private Answer nodeUpdatesAnswer( NodeUpdates... updates )
{
return invocation ->
{
Collection<NodeUpdates> target = (Collection<NodeUpdates>) invocation.getArguments()[1];
target.addAll( asList( updates ) );
return null;
};
}

private IndexRule indexRule( long ruleId, int labelId, int propertyKeyId, SchemaIndexProvider.Descriptor private IndexRule indexRule( long ruleId, int labelId, int propertyKeyId, SchemaIndexProvider.Descriptor
providerDescriptor ) providerDescriptor )
{ {
Expand Down

0 comments on commit 0d31a08

Please sign in to comment.