Skip to content

Commit

Permalink
Plug in PhaseTracker to index population
Browse files Browse the repository at this point in the history
- Extract PhaseTracker interface, implemented by LoggingPhaseTracker
- StoreScan can be given a PhaseTracker instance before scan start
- Stop PhaseTracker in IndexPopulationJob finally clause to make sure we always log
  • Loading branch information
burqen committed Feb 7, 2019
1 parent e0879a3 commit e68de94
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 236 deletions.
Expand Up @@ -39,7 +39,6 @@
import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.helpers.collection.Iterables; import org.neo4j.helpers.collection.Iterables;
import org.neo4j.helpers.collection.Iterators; import org.neo4j.helpers.collection.Iterators;
import org.neo4j.kernel.impl.api.index.IndexPopulationJob; import org.neo4j.kernel.impl.api.index.IndexPopulationJob;
Expand All @@ -48,6 +47,8 @@
import org.neo4j.test.rule.TestDirectory; import org.neo4j.test.rule.TestDirectory;
import org.neo4j.values.storable.RandomValues; import org.neo4j.values.storable.RandomValues;


import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;


public class IndexPopulationIT public class IndexPopulationIT
Expand All @@ -58,11 +59,15 @@ public class IndexPopulationIT
private static final int TEST_TIMEOUT = 120_000; private static final int TEST_TIMEOUT = 120_000;
private static GraphDatabaseService database; private static GraphDatabaseService database;
private static ExecutorService executorService; private static ExecutorService executorService;
private static AssertableLogProvider logProvider;


@BeforeClass @BeforeClass
public static void setUp() public static void setUp()
{ {
database = new GraphDatabaseFactory().newEmbeddedDatabase( directory.storeDir() ); TestGraphDatabaseFactory factory = new TestGraphDatabaseFactory();
logProvider = new AssertableLogProvider( true );
factory.setInternalLogProvider( logProvider );
database = factory.newEmbeddedDatabase( directory.storeDir() );
executorService = Executors.newCachedThreadPool(); executorService = Executors.newCachedThreadPool();
} }


Expand Down Expand Up @@ -157,6 +162,44 @@ public void shutdownDatabaseDuringIndexPopulations()
assertableLogProvider.assertNone( AssertableLogProvider.inLog( IndexPopulationJob.class ).anyError() ); assertableLogProvider.assertNone( AssertableLogProvider.inLog( IndexPopulationJob.class ).anyError() );
} }


@Test
public void mustLogPhaseTracker()
{
Label nodeLabel = Label.label( "testLabel5" );
try ( Transaction transaction = database.beginTx() )
{
database.createNode( nodeLabel ).setProperty( "key", "hej" );
transaction.success();
}

// when
try ( Transaction tx = database.beginTx() )
{
database.schema().indexFor( nodeLabel ).on( "key" ).create();
tx.success();
}
try ( Transaction tx = database.beginTx() )
{
database.schema().awaitIndexesOnline( 1, TimeUnit.MINUTES );
tx.success();
}

// then
//noinspection unchecked
logProvider.assertContainsMessageMatching( allOf(
containsString( "TIME/PHASE" ),
containsString( "Final: " ),
containsString( "SCAN" ),
containsString( "WRITE" ),
containsString( "FLIP" ),
containsString( "totalTime=" ),
containsString( "avgTime=" ),
containsString( "minTime=" ),
containsString( "maxTime=" ),
containsString( "nbrOfReports=" )
) );
}

private void prePopulateDatabase( GraphDatabaseService database, Label testLabel, String propertyName ) private void prePopulateDatabase( GraphDatabaseService database, Label testLabel, String propertyName )
{ {
final RandomValues randomValues = RandomValues.create(); final RandomValues randomValues = RandomValues.create();
Expand Down
Expand Up @@ -154,11 +154,12 @@ private void awaitCompletion()


/** /**
* Insert the given batch of updates into the index defined by the given {@link IndexPopulation}. * Insert the given batch of updates into the index defined by the given {@link IndexPopulation}.
* Called from {@link MultipleIndexPopulator#flush(IndexPopulation)}.
* *
* @param population the index population. * @param population the index population.
*/ */
@Override @Override
protected void flush( IndexPopulation population ) void doFlush( IndexPopulation population )
{ {
activeTasks.incrementAndGet(); activeTasks.incrementAndGet();
Collection<IndexEntryUpdate<?>> batch = population.takeCurrentBatch(); Collection<IndexEntryUpdate<?>> batch = population.takeCurrentBatch();
Expand Down
@@ -0,0 +1,231 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api.index;

import java.util.EnumMap;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;

import org.neo4j.helpers.TimeUtil;
import org.neo4j.logging.Log;
import org.neo4j.util.FeatureToggles;

public class LoggingPhaseTracker implements PhaseTracker
{

private static final int PERIOD_INTERVAL = FeatureToggles.getInteger( LoggingPhaseTracker.class, "period_interval", 600 );
private static final String MESSAGE_PREFIX = "TIME/PHASE ";

private final long periodIntervalInSeconds;
private final Log log;

private EnumMap<Phase,Logger> times = new EnumMap<>( Phase.class );
private Phase currentPhase;
private long timeEnterPhase;
private boolean stopped;
private long lastPeriodReport = -1;

LoggingPhaseTracker( Log log )
{
this( PERIOD_INTERVAL, log );
}

LoggingPhaseTracker( long periodIntervalInSeconds, Log log )
{
this.periodIntervalInSeconds = periodIntervalInSeconds;
this.log = log;
for ( Phase phase : Phase.values() )
{
times.put( phase, new Logger( phase ) );
}
}

@Override
public void enterPhase( Phase phase )
{
if ( stopped )
{
throw new IllegalStateException( "Trying to report a new phase after phase tracker has been stopped." );
}
if ( phase != currentPhase )
{
long now = logCurrentTime();
currentPhase = phase;
timeEnterPhase = now;

if ( lastPeriodReport == -1 )
{
lastPeriodReport = now;
}

long secondsSinceLastPeriodReport = TimeUnit.NANOSECONDS.toSeconds( now - lastPeriodReport );
if ( secondsSinceLastPeriodReport >= periodIntervalInSeconds )
{
// Report period
periodReport( secondsSinceLastPeriodReport );
lastPeriodReport = now;
}
}
}

@Override
public void stop()
{
stopped = true;
logCurrentTime();
currentPhase = null;
finalReport();
}

EnumMap<Phase,Logger> times()
{
return times;
}

private void finalReport()
{
log.debug( MESSAGE_PREFIX + mainReportString( "Final" ) );
}

private void periodReport( long secondsSinceLastPeriodReport )
{
String periodReportString = periodReportString( secondsSinceLastPeriodReport );
String mainReportString = mainReportString( "Total" );
log.debug( MESSAGE_PREFIX + mainReportString + ", " + periodReportString );
}

private String mainReportString( String title )
{
StringJoiner joiner = new StringJoiner( ", ", title + ": ", "" );
times.values().forEach( p -> joiner.add( p.toString() ) );
return joiner.toString();
}

private String periodReportString( long secondsSinceLastPeriodReport )
{
StringJoiner joiner = new StringJoiner( ", ", "Last " + secondsSinceLastPeriodReport + " sec: ", "" );
times.values().forEach( logger ->
{
joiner.add( logger.period().toString() );
logger.period().reset();
} );
return joiner.toString();
}

private long logCurrentTime()
{
long now = System.nanoTime();
if ( currentPhase != null )
{
Logger logger = times.get( currentPhase );
long nanoTime = now - timeEnterPhase;
logger.log( nanoTime );
}
return now;
}

public class Logger extends Counter
{
final Counter periodCounter;

private Logger( Phase phase )
{
super( phase );
periodCounter = new Counter( phase );
periodCounter.reset();
}

void log( long nanoTime )
{
super.log( nanoTime );
periodCounter.log( nanoTime );
}

Counter period()
{
return periodCounter;
}
}

public class Counter
{
private final Phase phase;
long totalTime;
long nbrOfReports;
long maxTime;
long minTime;

Counter( Phase phase )
{
this.phase = phase;
}

void log( long nanoTime )
{
totalTime += nanoTime;
nbrOfReports++;
maxTime = Math.max( maxTime, nanoTime );
minTime = Math.min( minTime, nanoTime );
}

void reset()
{
totalTime = 0;
nbrOfReports = 0;
maxTime = Long.MIN_VALUE;
minTime = Long.MAX_VALUE;
}

@Override
public String toString()
{
StringJoiner joiner = new StringJoiner( ", ", phase.toString() + "[", "]" );
if ( nbrOfReports == 0 )
{
addToString( "nbrOfReports", nbrOfReports, joiner, false );
}
else
{
long avgTime = totalTime / nbrOfReports;
addToString( "totalTime", totalTime, joiner, true );
addToString( "avgTime", avgTime, joiner, true );
addToString( "minTime", minTime, joiner, true );
addToString( "maxTime", maxTime, joiner, true );
addToString( "nbrOfReports", nbrOfReports, joiner, false );
}
return joiner.toString();
}

void addToString( String name, long measurement, StringJoiner joiner, boolean isTime )
{
String measurementString;
if ( isTime )
{
long timeRoundedToMillis = TimeUnit.MILLISECONDS.toNanos( TimeUnit.NANOSECONDS.toMillis( measurement ) );
measurementString = TimeUtil.nanosToString( timeRoundedToMillis );
}
else
{
measurementString = Long.toString( measurement );
}
joiner.add( String.format( "%s=%s", name, measurementString ) );
}
}
}
Expand Up @@ -106,6 +106,7 @@ public class MultipleIndexPopulator implements IndexPopulator
protected final Log log; protected final Log log;
private final EntityType type; private final EntityType type;
private final SchemaState schemaState; private final SchemaState schemaState;
private final PhaseTracker phaseTracker;
private StoreScan<IndexPopulationFailedKernelException> storeScan; private StoreScan<IndexPopulationFailedKernelException> storeScan;


public MultipleIndexPopulator( IndexStoreView storeView, LogProvider logProvider, EntityType type, SchemaState schemaState ) public MultipleIndexPopulator( IndexStoreView storeView, LogProvider logProvider, EntityType type, SchemaState schemaState )
Expand All @@ -115,6 +116,7 @@ public MultipleIndexPopulator( IndexStoreView storeView, LogProvider logProvider
this.log = logProvider.getLog( IndexPopulationJob.class ); this.log = logProvider.getLog( IndexPopulationJob.class );
this.type = type; this.type = type;
this.schemaState = schemaState; this.schemaState = schemaState;
this.phaseTracker = new LoggingPhaseTracker( logProvider.getLog( IndexPopulationJob.class ) );
} }


IndexPopulation addPopulator( IndexPopulator populator, CapableIndexDescriptor capableIndexDescriptor, FlippableIndexProxy flipper, IndexPopulation addPopulator( IndexPopulator populator, CapableIndexDescriptor capableIndexDescriptor, FlippableIndexProxy flipper,
Expand Down Expand Up @@ -172,6 +174,7 @@ StoreScan<IndexPopulationFailedKernelException> indexAllEntities()
{ {
storeScan = storeView.visitNodes( entityTokenIds, propertyKeyIdFilter, new EntityPopulationVisitor(), null, false ); storeScan = storeView.visitNodes( entityTokenIds, propertyKeyIdFilter, new EntityPopulationVisitor(), null, false );
} }
storeScan.setPhaseTracker( phaseTracker );
return new DelegatingStoreScan<IndexPopulationFailedKernelException>( storeScan ) return new DelegatingStoreScan<IndexPopulationFailedKernelException>( storeScan )
{ {
@Override @Override
Expand Down Expand Up @@ -268,6 +271,7 @@ public MultipleIndexUpdater newPopulatingUpdater( NodePropertyAccessor accessor
@Override @Override
public void close( boolean populationCompletedSuccessfully ) public void close( boolean populationCompletedSuccessfully )
{ {
phaseTracker.stop();
// closing the populators happens in flip, fail or individually when they are completed // closing the populators happens in flip, fail or individually when they are completed
} }


Expand Down Expand Up @@ -355,6 +359,12 @@ void flushAll()
} }


protected void flush( IndexPopulation population ) protected void flush( IndexPopulation population )
{
phaseTracker.enterPhase( LoggingPhaseTracker.Phase.WRITE );
doFlush( population );
}

void doFlush( IndexPopulation population )
{ {
try try
{ {
Expand Down Expand Up @@ -553,6 +563,7 @@ private void onUpdate( IndexEntryUpdate<?> update )


void flip( boolean verifyBeforeFlipping ) throws FlipFailedKernelException void flip( boolean verifyBeforeFlipping ) throws FlipFailedKernelException
{ {
phaseTracker.enterPhase( LoggingPhaseTracker.Phase.FLIP );
flipper.flip( () -> flipper.flip( () ->
{ {
populatorLock.lock(); populatorLock.lock();
Expand Down Expand Up @@ -678,5 +689,11 @@ public PopulationProgress getProgress()
{ {
return delegate.getProgress(); return delegate.getProgress();
} }

@Override
public void setPhaseTracker( PhaseTracker phaseTracker )
{
delegate.setPhaseTracker( phaseTracker );
}
} }
} }

0 comments on commit e68de94

Please sign in to comment.