Skip to content

Commit

Permalink
Merge pull request #8942 from tinwelint/3.2-build-lss-during-import
Browse files Browse the repository at this point in the history
Revert "Doesn't build label scan store as part of import"
  • Loading branch information
tinwelint committed Mar 6, 2017
2 parents 41ae80e + 6ee6140 commit d84abc9
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 5 deletions.
Expand Up @@ -38,6 +38,7 @@ public DeleteDuplicateNodesStage( Configuration config, PrimitiveLongIterator du
BatchingNeoStores neoStore )
{
super( "DEDUP", config );
add( new DeleteDuplicateNodesStep( control(), config, duplicateNodeIds, neoStore.getNodeStore() ) );
add( new DeleteDuplicateNodesStep( control(), config, duplicateNodeIds,
neoStore.getNodeStore(), neoStore.getLabelScanStore() ) );
}
}
Expand Up @@ -22,23 +22,30 @@
import java.io.IOException;

import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.labelscan.LabelScanWriter;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.unsafe.impl.batchimport.staging.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.LonelyProcessingStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static org.neo4j.collection.primitive.PrimitiveLongCollections.EMPTY_LONG_ARRAY;
import static org.neo4j.kernel.api.labelscan.NodeLabelUpdate.labelChanges;

public class DeleteDuplicateNodesStep extends LonelyProcessingStep
{
private final NodeStore nodeStore;
private final PrimitiveLongIterator nodeIds;
private final LabelScanWriter labelScanWriter;

public DeleteDuplicateNodesStep( StageControl control, Configuration config, PrimitiveLongIterator nodeIds,
NodeStore nodeStore )
NodeStore nodeStore, LabelScanStore labelScanStore )
{
super( control, "DEDUP", config );
this.nodeStore = nodeStore;
this.nodeIds = nodeIds;
this.labelScanWriter = labelScanStore.newWriter();
}

@Override
Expand All @@ -50,6 +57,14 @@ protected void process() throws IOException
long duplicateNodeId = nodeIds.next();
record.setId( duplicateNodeId );
nodeStore.updateRecord( record );
labelScanWriter.write( labelChanges( duplicateNodeId, EMPTY_LONG_ARRAY, EMPTY_LONG_ARRAY ) );
}
}

@Override
public void close() throws Exception
{
labelScanWriter.close();
super.close();
}
}
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.labelscan.LabelScanWriter;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static org.neo4j.collection.primitive.PrimitiveLongCollections.EMPTY_LONG_ARRAY;
import static org.neo4j.kernel.api.labelscan.NodeLabelUpdate.labelChanges;

/**
* Populates a {@link LabelScanWriter} with all node labels from {@link Batch batches} passing by.
*/
public class LabelScanStorePopulationStep extends ProcessorStep<Batch<InputNode,NodeRecord>>
{
private final LabelScanWriter writer;

public LabelScanStorePopulationStep( StageControl control, Configuration config, LabelScanStore labelScanStore )
{
super( control, "LABEL SCAN", config, 1 );
this.writer = labelScanStore.newWriter();
}

@Override
protected void process( Batch<InputNode,NodeRecord> batch, BatchSender sender ) throws Throwable
{
int length = batch.labels.length;
for ( int i = 0; i < length; i++ )
{
long[] labels = batch.labels[i];
NodeRecord node = batch.records[i];
if ( labels != null && node.inUse() )
{
writer.write( labelChanges( node.getId(), EMPTY_LONG_ARRAY, labels ) );
}
}
sender.send( batch );
}

@Override
public void close() throws Exception
{
super.close();
writer.close();
}
}
Expand Up @@ -65,7 +65,7 @@ public class NodeStage extends Stage

public NodeStage( Configuration config, IoMonitor writeMonitor,
InputIterable<InputNode> nodes, IdMapper idMapper, IdGenerator idGenerator,
BatchingNeoStores neoStore, InputCache inputCache,
BatchingNeoStores neoStore, InputCache inputCache, LabelScanStore labelScanStore,
EntityStoreUpdaterStep.Monitor storeUpdateMonitor,
NodeRelationshipCache cache,
StatsProvider memoryUsage ) throws IOException
Expand All @@ -83,6 +83,7 @@ public NodeStage( Configuration config, IoMonitor writeMonitor,
add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) );
add( new NodeEncoderStep( control(), config, idMapper, idGenerator,
neoStore.getLabelRepository(), nodeStore, memoryUsage ) );
add( new LabelScanStorePopulationStep( control(), config, labelScanStore ) );
add( new EntityStoreUpdaterStep<>( control(), config, nodeStore, propertyStore, writeMonitor,
storeUpdateMonitor ) );
}
Expand Down
Expand Up @@ -180,7 +180,7 @@ public void doImport( Input input ) throws IOException

// Stage 1 -- nodes, properties, labels
NodeStage nodeStage = new NodeStage( config, writeMonitor,
nodes, idMapper, idGenerator, neoStore, inputCache,
nodes, idMapper, idGenerator, neoStore, inputCache, neoStore.getLabelScanStore(),
storeUpdateMonitor, nodeRelationshipCache, memoryUsageStats );
executeStages( nodeStage );
if ( idMapper.needsPreparation() )
Expand Down
Expand Up @@ -30,11 +30,14 @@
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.io.pagecache.tracing.cursor.DefaultPageCursorTracerSupplier;
import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracerSupplier;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.extension.KernelExtensions;
import org.neo4j.kernel.extension.UnsatisfiedDependencyStrategies;
import org.neo4j.kernel.extension.dependency.NamedLabelScanStoreSelectionStrategy;
import org.neo4j.kernel.impl.api.index.IndexStoreView;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
import org.neo4j.kernel.impl.factory.DatabaseInfo;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.pagecache.ConfiguringPageCacheFactory;
Expand Down Expand Up @@ -62,6 +65,7 @@

import static java.lang.String.valueOf;
import static java.nio.file.StandardOpenOption.DELETE_ON_CLOSE;

import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.pagecache_memory;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
Expand All @@ -85,13 +89,14 @@ public class BatchingNeoStores implements AutoCloseable
private final PageCache pageCache;
private final NeoStores neoStores;
private final LifeSupport life = new LifeSupport();
private final LabelScanStore labelScanStore;
private final IoTracer ioTracer;
private final RecordFormats recordFormats;

// Some stores are considered temporary during the import and will be reordered/restructured
// into the main store. These temporary stores will live here
private final NeoStores temporaryNeoStores;
private boolean externalPageCache;
private final boolean externalPageCache;

private BatchingNeoStores( FileSystemAbstraction fileSystem, PageCache pageCache, File storeDir,
RecordFormats recordFormats, Config neo4jConfig, LogService logService, AdditionalInitialIds initialIds,
Expand Down Expand Up @@ -155,12 +160,15 @@ private BatchingNeoStores( FileSystemAbstraction fileSystem, PageCache pageCache
dependencies.satisfyDependency( this );
dependencies.satisfyDependency( logService );
dependencies.satisfyDependency( IndexStoreView.EMPTY );
dependencies.satisfyDependency( pageCache );
KernelContext kernelContext = new SimpleKernelContext( storeDir, DatabaseInfo.UNKNOWN, dependencies );
@SuppressWarnings( { "unchecked", "rawtypes" } )
KernelExtensions extensions = life.add( new KernelExtensions(
kernelContext, (Iterable) Service.load( KernelExtensionFactory.class ),
dependencies, UnsatisfiedDependencyStrategies.ignore() ) );
life.start();
labelScanStore = life.add( extensions.resolveDependency( LabelScanStoreProvider.class,
new NamedLabelScanStoreSelectionStrategy( neo4jConfig ) ).getLabelScanStore() );
}

public static BatchingNeoStores batchingNeoStores( FileSystemAbstraction fileSystem, File storeDir,
Expand Down Expand Up @@ -293,6 +301,11 @@ public long getLastCommittedTransactionId()
return neoStores.getMetaDataStore().getLastCommittedTransactionId();
}

public LabelScanStore getLabelScanStore()
{
return labelScanStore;
}

public NeoStores getNeoStores()
{
return neoStores;
Expand Down

0 comments on commit d84abc9

Please sign in to comment.