Skip to content

Commit

Permalink
Moves some printouts to a monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Jan 10, 2018
1 parent 4e5d5c1 commit 752180f
Show file tree
Hide file tree
Showing 15 changed files with 124 additions and 47 deletions.
Expand Up @@ -564,7 +564,8 @@ public static void doImport( PrintStream out, PrintStream err, InputStream in, F
ExecutionMonitors.defaultVisible( in ),
EMPTY,
dbConfig,
RecordFormatSelector.selectForConfig( dbConfig, logService.getInternalLogProvider() ) );
RecordFormatSelector.selectForConfig( dbConfig, logService.getInternalLogProvider() ),
new PrintingImportLogicMonitor( out, err ) );
printOverview( storeDir, nodesFiles, relationshipsFiles, configuration, out );
success = false;
try
Expand Down
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.tooling;

import java.io.PrintStream;

import org.neo4j.unsafe.impl.batchimport.ImportLogic;

class PrintingImportLogicMonitor implements ImportLogic.Monitor
{
private final PrintStream out;
private final PrintStream err;

PrintingImportLogicMonitor( PrintStream out, PrintStream err )
{
this.out = out;
this.err = err;
}

@Override
public void doubleRelationshipRecordUnitsEnabled()
{
out.println( "Will use double record units for all relationships" );
}

@Override
public void mayExceedNodeIdCapacity( long capacity, long estimatedCount )
{
err.printf( "WARNING: estimated number of relationships %d may exceed capacity %d of selected record format%n",
estimatedCount, capacity );
}

@Override
public void mayExceedRelationshipIdCapacity( long capacity, long estimatedCount )
{
err.printf( "WARNING: estimated number of nodes %d may exceed capacity %d of selected record format%n",
estimatedCount, capacity );
}
}
Expand Up @@ -50,6 +50,7 @@
import static java.lang.System.currentTimeMillis;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold;
import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY;
import static org.neo4j.unsafe.impl.batchimport.ImportLogic.NO_MONITOR;
import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitors.defaultVisible;

/**
Expand Down Expand Up @@ -161,7 +162,7 @@ public long maxMemoryUsage()
{
consumer = new ParallelBatchImporter( dir, fileSystem, null, importConfig,
new SimpleLogService( logging, logging ), defaultVisible(), EMPTY, dbConfig,
RecordFormatSelector.selectForConfig( dbConfig, logging ) );
RecordFormatSelector.selectForConfig( dbConfig, logging ), NO_MONITOR );
ImportTool.printOverview( dir, Collections.emptyList(), Collections.emptyList(), importConfig, System.out );
}
consumer.doImport( input );
Expand Down
Expand Up @@ -114,6 +114,7 @@
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_LOG_VERSION;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.UNKNOWN_TX_CHECKSUM;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.UNKNOWN_TX_COMMIT_TIMESTAMP;
import static org.neo4j.unsafe.impl.batchimport.ImportLogic.NO_MONITOR;
import static org.neo4j.unsafe.impl.batchimport.InputIterable.replayable;
import static org.neo4j.unsafe.impl.batchimport.input.Inputs.knownEstimates;
import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.withDynamicProcessorAssignment;
Expand Down Expand Up @@ -373,7 +374,7 @@ public boolean parallelRecordReadsWhenWriting()
BatchImporter importer = new ParallelBatchImporter( migrationDir.getAbsoluteFile(), fileSystem, pageCache,
importConfig, logService,
withDynamicProcessorAssignment( migrationBatchImporterMonitor( legacyStore, progressReporter,
importConfig ), importConfig ), additionalInitialIds, config, newFormat );
importConfig ), importConfig ), additionalInitialIds, config, newFormat, NO_MONITOR );
InputIterable nodes = replayable( () -> legacyNodesAsInput( legacyStore, requiresPropertyMigration, nodeInputCursors ) );
InputIterable relationships = replayable( () ->
legacyRelationshipsAsInput( legacyStore, requiresPropertyMigration, relationshipInputCursors ) );
Expand Down
Expand Up @@ -54,7 +54,6 @@
import org.neo4j.unsafe.impl.batchimport.input.CachedInput;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.EstimationSanityChecker;
import org.neo4j.unsafe.impl.batchimport.input.EstimationSanityChecker.Monitor;
import org.neo4j.unsafe.impl.batchimport.input.Input;
import org.neo4j.unsafe.impl.batchimport.input.Input.Estimates;
import org.neo4j.unsafe.impl.batchimport.input.InputCache;
Expand Down Expand Up @@ -85,6 +84,33 @@
*/
public class ImportLogic implements Closeable
{
public interface Monitor
{
void doubleRelationshipRecordUnitsEnabled();

void mayExceedNodeIdCapacity( long capacity, long estimatedCount );

void mayExceedRelationshipIdCapacity( long capacity, long estimatedCount );
}

public static final Monitor NO_MONITOR = new Monitor()
{
@Override
public void mayExceedRelationshipIdCapacity( long capacity, long estimatedCount )
{ // no-op
}

@Override
public void mayExceedNodeIdCapacity( long capacity, long estimatedCount )
{ // no-op
}

@Override
public void doubleRelationshipRecordUnitsEnabled()
{ // no-op
}
};

private final File storeDir;
private final FileSystemAbstraction fileSystem;
private final BatchingNeoStores neoStore;
Expand All @@ -95,6 +121,7 @@ public class ImportLogic implements Closeable
private final DataImporter.Monitor storeUpdateMonitor = new DataImporter.Monitor();
private final long maxMemory;
private final Dependencies dependencies = new Dependencies();
private final Monitor monitor;
private Input input;

// This map contains additional state that gets populated, created and used throughout the stages.
Expand Down Expand Up @@ -122,16 +149,18 @@ public class ImportLogic implements Closeable
* @param logService {@link LogService} to use.
* @param executionMonitor {@link ExecutionMonitor} to follow progress as the import proceeds.
* @param recordFormats which {@link RecordFormats record format} to use for the created db.
* @param monitor {@link Monitor} for some events.
*/
public ImportLogic( File storeDir, FileSystemAbstraction fileSystem, BatchingNeoStores neoStore,
Configuration config, LogService logService, ExecutionMonitor executionMonitor,
RecordFormats recordFormats )
RecordFormats recordFormats, Monitor monitor )
{
this.storeDir = storeDir;
this.fileSystem = fileSystem;
this.neoStore = neoStore;
this.config = config;
this.recordFormats = recordFormats;
this.monitor = monitor;
this.log = logService.getInternalLogProvider().getLog( getClass() );
this.executionMonitor = ExecutionSupervisors.withDynamicProcessorAssignment( executionMonitor, config );
this.maxMemory = config.maxMemoryUsage();
Expand All @@ -154,30 +183,15 @@ public void initialize( Input input ) throws IOException

if ( neoStore.determineDoubleRelationshipRecordUnits( inputEstimates ) )
{
System.out.println( "Will use double record units for all relationships" );
monitor.doubleRelationshipRecordUnitsEnabled();
}

executionMonitor.initialize( dependencies );
}

private void sanityCheckEstimatesWithRecordFormat( Estimates inputEstimates )
{
new EstimationSanityChecker( recordFormats, new Monitor()
{
@Override
public void nodeCountCapacity( long capacity, long estimatedCount )
{
System.err.printf( "WARNING: estimated number of relationships %d may exceed capacity %d of selected record format%n",
estimatedCount, capacity );
}

@Override
public void relationshipCountCapacity( long capacity, long estimatedCount )
{
System.err.printf( "WARNING: estimated number of nodes %d may exceed capacity %d of selected record format%n",
estimatedCount, capacity );
}
} ).sanityCheck( inputEstimates );
new EstimationSanityChecker( recordFormats, monitor ).sanityCheck( inputEstimates );
}

/**
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.unsafe.impl.batchimport.ImportLogic.Monitor;
import org.neo4j.unsafe.impl.batchimport.input.Input;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor;
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;
Expand Down Expand Up @@ -54,10 +55,11 @@ public class ParallelBatchImporter implements BatchImporter
private final RecordFormats recordFormats;
private final ExecutionMonitor executionMonitor;
private final AdditionalInitialIds additionalInitialIds;
private final Monitor monitor;

public ParallelBatchImporter( File storeDir, FileSystemAbstraction fileSystem, PageCache externalPageCache,
Configuration config, LogService logService, ExecutionMonitor executionMonitor,
AdditionalInitialIds additionalInitialIds, Config dbConfig, RecordFormats recordFormats )
AdditionalInitialIds additionalInitialIds, Config dbConfig, RecordFormats recordFormats, ImportLogic.Monitor monitor )
{
this.externalPageCache = externalPageCache;
this.storeDir = storeDir;
Expand All @@ -68,6 +70,7 @@ public ParallelBatchImporter( File storeDir, FileSystemAbstraction fileSystem, P
this.recordFormats = recordFormats;
this.executionMonitor = executionMonitor;
this.additionalInitialIds = additionalInitialIds;
this.monitor = monitor;
}

@Override
Expand All @@ -76,7 +79,7 @@ public void doImport( Input input ) throws IOException
try ( BatchingNeoStores store = instantiateNeoStores( fileSystem, storeDir, externalPageCache, recordFormats,
config, logService, additionalInitialIds, dbConfig );
ImportLogic logic = new ImportLogic( storeDir, fileSystem, store, config, logService,
executionMonitor, recordFormats ) )
executionMonitor, recordFormats, monitor ) )
{
store.createNew();
logic.initialize( input );
Expand Down
Expand Up @@ -22,20 +22,14 @@
import java.util.function.BiConsumer;

import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.unsafe.impl.batchimport.ImportLogic;

public class EstimationSanityChecker
{
public interface Monitor
{
void nodeCountCapacity( long capacity, long estimatedCount );

void relationshipCountCapacity( long capacity, long estimatedCount );
}

private final RecordFormats formats;
private final Monitor monitor;
private final ImportLogic.Monitor monitor;

public EstimationSanityChecker( RecordFormats formats, Monitor monitor )
public EstimationSanityChecker( RecordFormats formats, ImportLogic.Monitor monitor )
{
this.formats = formats;
this.monitor = monitor;
Expand All @@ -44,9 +38,9 @@ public EstimationSanityChecker( RecordFormats formats, Monitor monitor )
public void sanityCheck( Input.Estimates estimates )
{
sanityCheckEstimateWithMaxId( estimates.numberOfNodes(), formats.node().getMaxId(),
monitor::nodeCountCapacity );
monitor::mayExceedNodeIdCapacity );
sanityCheckEstimateWithMaxId( estimates.numberOfRelationships(), formats.relationship().getMaxId(),
monitor::relationshipCountCapacity );
monitor::mayExceedRelationshipIdCapacity );
}

private void sanityCheckEstimateWithMaxId( long estimate, long max, BiConsumer<Long,Long> reporter )
Expand Down
Expand Up @@ -52,6 +52,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import static org.neo4j.unsafe.impl.batchimport.ImportLogic.NO_MONITOR;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.NO_DECORATOR;
import static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.COMMAS;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.data;
Expand Down Expand Up @@ -80,7 +81,7 @@ public void shouldExitAndThrowExceptionOnPanic() throws Exception
// GIVEN
BatchImporter importer = new ParallelBatchImporter( directory.absolutePath(), fs, null, Configuration.DEFAULT,
NullLogService.getInstance(), ExecutionMonitors.invisible(), AdditionalInitialIds.EMPTY,
Config.defaults(), StandardV3_0.RECORD_FORMATS );
Config.defaults(), StandardV3_0.RECORD_FORMATS, NO_MONITOR );
Iterable<DataFactory> nodeData =
datas( data( NO_DECORATOR, fileAsCharReadable( nodeCsvFileWithBrokenEntries() ) ) );
Input brokenCsvInput = new CsvInput(
Expand Down
Expand Up @@ -23,7 +23,7 @@

import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.store.format.standard.Standard;
import org.neo4j.unsafe.impl.batchimport.input.EstimationSanityChecker.Monitor;
import org.neo4j.unsafe.impl.batchimport.ImportLogic.Monitor;
import org.neo4j.unsafe.impl.batchimport.input.Input.Estimates;

import static org.mockito.Mockito.mock;
Expand All @@ -45,8 +45,8 @@ public void shouldWarnAboutCountGettingCloseToCapacity() throws Exception
new EstimationSanityChecker( formats, monitor ).sanityCheck( estimates );

// then
verify( monitor ).nodeCountCapacity( formats.node().getMaxId(), estimates.numberOfNodes() );
verify( monitor ).relationshipCountCapacity( formats.relationship().getMaxId(), estimates.numberOfRelationships() );
verify( monitor ).mayExceedNodeIdCapacity( formats.node().getMaxId(), estimates.numberOfNodes() );
verify( monitor ).mayExceedRelationshipIdCapacity( formats.relationship().getMaxId(), estimates.numberOfRelationships() );
}

@Test
Expand All @@ -62,8 +62,8 @@ public void shouldWarnAboutCounthigherThanCapacity() throws Exception
new EstimationSanityChecker( formats, monitor ).sanityCheck( estimates );

// then
verify( monitor ).nodeCountCapacity( formats.node().getMaxId(), estimates.numberOfNodes() );
verify( monitor ).relationshipCountCapacity( formats.relationship().getMaxId(), estimates.numberOfRelationships() );
verify( monitor ).mayExceedNodeIdCapacity( formats.node().getMaxId(), estimates.numberOfNodes() );
verify( monitor ).mayExceedRelationshipIdCapacity( formats.relationship().getMaxId(), estimates.numberOfRelationships() );
}

@Test
Expand Down
Expand Up @@ -79,6 +79,7 @@
import static org.neo4j.kernel.impl.util.AutoCreatingHashMap.nested;
import static org.neo4j.kernel.impl.util.AutoCreatingHashMap.values;
import static org.neo4j.register.Registers.newDoubleLongRegister;
import static org.neo4j.unsafe.impl.batchimport.ImportLogic.NO_MONITOR;
import static org.neo4j.unsafe.impl.batchimport.input.Collectors.silentBadCollector;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.NO_DECORATOR;
import static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.COMMAS;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void shouldImportDataComingFromCsvFiles() throws Exception
Config dbConfig = Config.defaults();
BatchImporter importer = new ParallelBatchImporter( directory.graphDbDir(), fileSystemRule.get(), null,
smallBatchSizeConfig(), NullLogService.getInstance(), invisible(), AdditionalInitialIds.EMPTY, dbConfig,
RecordFormatSelector.defaultFormat() );
RecordFormatSelector.defaultFormat(), NO_MONITOR );
List<InputEntity> nodeData = randomNodeData();
List<InputEntity> relationshipData = randomRelationshipData( nodeData );

Expand Down
Expand Up @@ -81,6 +81,7 @@
import static org.neo4j.kernel.impl.store.NoStoreHeader.NO_STORE_HEADER;
import static org.neo4j.kernel.impl.store.format.standard.Standard.LATEST_RECORD_FORMATS;
import static org.neo4j.kernel.impl.store.record.RecordLoad.CHECK;
import static org.neo4j.unsafe.impl.batchimport.ImportLogic.NO_MONITOR;
import static org.neo4j.unsafe.impl.batchimport.input.RandomEntityDataGenerator.convert;
import static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.COMMAS;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatNodeFileHeader;
Expand Down Expand Up @@ -114,7 +115,7 @@ public void shouldCalculateCorrectEstimates() throws Exception
FileSystemAbstraction fs = new DefaultFileSystemAbstraction();
new ParallelBatchImporter( storeDir, fs, null, Configuration.DEFAULT,
NullLogService.getInstance(), ExecutionMonitors.invisible(), AdditionalInitialIds.EMPTY, config,
format ).doImport( input );
format, NO_MONITOR ).doImport( input );

// then compare estimates with actual disk sizes
try ( PageCache pageCache = new ConfiguringPageCacheFactory( fs, config, PageCacheTracer.NULL,
Expand Down
Expand Up @@ -44,6 +44,7 @@
import static org.neo4j.kernel.impl.store.format.standard.Standard.LATEST_RECORD_FORMATS;
import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY;
import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT;
import static org.neo4j.unsafe.impl.batchimport.ImportLogic.NO_MONITOR;
import static org.neo4j.unsafe.impl.batchimport.input.DataGeneratorInput.bareboneNodeHeader;
import static org.neo4j.unsafe.impl.batchimport.input.DataGeneratorInput.bareboneRelationshipHeader;
import static org.neo4j.unsafe.impl.batchimport.input.csv.IdType.INTEGER;
Expand Down Expand Up @@ -76,7 +77,7 @@ public void shouldReportProgressOfNodeImport() throws Exception

// when
new ParallelBatchImporter( storage.directory().absolutePath(), storage.fileSystem(), storage.pageCache(), DEFAULT,
NullLogService.getInstance(), monitor, EMPTY, defaults(), LATEST_RECORD_FORMATS ).doImport( input );
NullLogService.getInstance(), monitor, EMPTY, defaults(), LATEST_RECORD_FORMATS, NO_MONITOR ).doImport( input );

// then
progress.assertAllProgressReachedEnd();
Expand Down

0 comments on commit 752180f

Please sign in to comment.