Skip to content

Commit

Permalink
Each BatchingPagedFile has one read, one write cursor
Browse files Browse the repository at this point in the history
to be able to paralellize some more moving forward.
  • Loading branch information
tinwelint committed Mar 3, 2015
1 parent b098e20 commit f0a9109
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 129 deletions.
Expand Up @@ -75,6 +75,7 @@


import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;

import static org.neo4j.function.Functions.constant; import static org.neo4j.function.Functions.constant;
import static org.neo4j.helpers.collection.IteratorUtil.count; import static org.neo4j.helpers.collection.IteratorUtil.count;
import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY; import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY;
Expand Down Expand Up @@ -314,7 +315,6 @@ public void write( ByteBuffer data, long position, Pool<ByteBuffer> pool )
LockSupport.parkNanos( random.nextInt( 500 ) * 1_000_000 ); // slowness comes from here LockSupport.parkNanos( random.nextInt( 500 ) * 1_000_000 ); // slowness comes from here
} }
delegate.write( data, position, pool ); delegate.write( data, position, pool );

} }
}; };
} }
Expand Down
Expand Up @@ -52,15 +52,13 @@ public boolean process( NodeRecord node )
{ {
long nodeId = node.getId(); long nodeId = node.getId();
long firstRel = nodeRelationshipLink.getFirstRel( nodeId, this ); long firstRel = nodeRelationshipLink.getFirstRel( nodeId, this );
if ( firstRel == -1 ) if ( firstRel != -1 )
{ {
return false; node.setNextRel( firstRel );
} if ( nodeRelationshipLink.isDense( nodeId ) )

{
node.setNextRel( firstRel ); node.setDense( true );
if ( nodeRelationshipLink.isDense( nodeId ) ) }
{
node.setDense( true );
} }
return true; return true;
} }
Expand Down
Expand Up @@ -170,7 +170,7 @@ public void doImport( Input input ) throws IOException


// Switch to reverse updating mode and release references that are no longer used so they can be collected // Switch to reverse updating mode and release references that are no longer used so they can be collected
writerFactory.awaitEverythingWritten(); writerFactory.awaitEverythingWritten();
neoStore.switchToUpdateMode(); neoStore.flush();


// Remaining node processors // Remaining node processors
nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() ); nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() );
Expand Down
Expand Up @@ -51,8 +51,6 @@
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold;
import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.kernel.impl.store.StoreFactory.configForStoreDir; import static org.neo4j.kernel.impl.store.StoreFactory.configForStoreDir;
import static org.neo4j.unsafe.impl.batchimport.store.BatchingPageCache.Mode.APPEND_ONLY;
import static org.neo4j.unsafe.impl.batchimport.store.BatchingPageCache.Mode.UPDATE;


/** /**
* Creator and accessor of {@link NeoStore} with some logic to provide very batch friendly services to the * Creator and accessor of {@link NeoStore} with some logic to provide very batch friendly services to the
Expand All @@ -67,7 +65,7 @@ public class BatchingNeoStore implements AutoCloseable
private final BatchingRelationshipTypeTokenRepository relationshipTypeRepository; private final BatchingRelationshipTypeTokenRepository relationshipTypeRepository;
private final StringLogger logger; private final StringLogger logger;
private final Config neo4jConfig; private final Config neo4jConfig;
private final BatchingPageCache pageCacheFactory; private final BatchingPageCache pageCache;
private final NeoStore neoStore; private final NeoStore neoStore;
private final WriterFactory writerFactory; private final WriterFactory writerFactory;


Expand All @@ -84,9 +82,9 @@ public BatchingNeoStore( FileSystemAbstraction fileSystem, String storeDir,
GraphDatabaseSettings.class ), GraphDatabaseSettings.class ),
new File( storeDir ) ); new File( storeDir ) );


this.pageCacheFactory = new BatchingPageCache( fileSystem, config.fileChannelBufferSize(), this.pageCache = new BatchingPageCache( fileSystem, config.fileChannelBufferSize(),
config.bigFileChannelBufferSizeMultiplier(), writerFactory, writeMonitor, APPEND_ONLY ); config.bigFileChannelBufferSizeMultiplier(), writerFactory, writeMonitor );
this.neoStore = newNeoStore( pageCacheFactory ); this.neoStore = newNeoStore( pageCache );
flushNeoStoreAndAwaitEverythingWritten(); flushNeoStoreAndAwaitEverythingWritten();
if ( alreadyContainsData( neoStore ) ) if ( alreadyContainsData( neoStore ) )
{ {
Expand Down Expand Up @@ -126,7 +124,7 @@ public static void createStore( FileSystemAbstraction fileSystem, String storeDi
{ {
PageCache pageCache = new BatchingPageCache( fileSystem, Configuration.DEFAULT.fileChannelBufferSize(), PageCache pageCache = new BatchingPageCache( fileSystem, Configuration.DEFAULT.fileChannelBufferSize(),
Configuration.DEFAULT.bigFileChannelBufferSizeMultiplier(), Configuration.DEFAULT.bigFileChannelBufferSizeMultiplier(),
BatchingPageCache.SYNCHRONOUS, Monitor.NO_MONITOR, APPEND_ONLY ); BatchingPageCache.SYNCHRONOUS, Monitor.NO_MONITOR );
StoreFactory storeFactory = new StoreFactory( StoreFactory storeFactory = new StoreFactory(
fileSystem, new File( storeDir ), pageCache, StringLogger.DEV_NULL, new Monitors() ); fileSystem, new File( storeDir ), pageCache, StringLogger.DEV_NULL, new Monitors() );
storeFactory.createNeoStore().close(); storeFactory.createNeoStore().close();
Expand Down Expand Up @@ -180,11 +178,6 @@ public CountsTracker getCountsStore()
return neoStore.getCounts(); return neoStore.getCounts();
} }


public void switchToUpdateMode()
{
pageCacheFactory.setMode( UPDATE );
}

@Override @Override
public void close() public void close()
{ {
Expand All @@ -205,4 +198,9 @@ private void flushNeoStoreAndAwaitEverythingWritten()
// That's why we have to wait for any pending I/O jobs to be written after the flush. // That's why we have to wait for any pending I/O jobs to be written after the flush.
writerFactory.awaitEverythingWritten(); writerFactory.awaitEverythingWritten();
} }

public void flush() throws IOException
{
pageCache.flush();
}
} }

0 comments on commit f0a9109

Please sign in to comment.