Skip to content
This repository has been archived by the owner on Jun 9, 2020. It is now read-only.

Commit

Permalink
extracted forward relationship-handling into a separate handler
Browse files Browse the repository at this point in the history
  • Loading branch information
jexp committed Nov 13, 2012
1 parent b3e9337 commit dc1e0dd
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 51 deletions.
4 changes: 2 additions & 2 deletions batch.properties
Expand Up @@ -4,6 +4,6 @@ use_memory_mapped_buffers=true
neostore.propertystore.db.index.keys.mapped_memory=5M
neostore.propertystore.db.index.mapped_memory=5M
neostore.nodestore.db.mapped_memory=200M
neostore.relationshipstore.db.mapped_memory=1000M
neostore.propertystore.db.mapped_memory=1000M
neostore.relationshipstore.db.mapped_memory=500M
neostore.propertystore.db.mapped_memory=200M
neostore.propertystore.db.strings.mapped_memory=200M
11 changes: 8 additions & 3 deletions src/main/java/org/neo4j/batchimport/DisruptorBatchInserter.java
Expand Up @@ -44,6 +44,7 @@ public class DisruptorBatchInserter {
private final NodeStructFactory nodeStructFactory;
private volatile boolean stop;
private CleanupMemoryHandler cleanupMemoryHandler;
private ForwardRelationshipUpdateHandler forwardRelationshipUpdateHandler;

public DisruptorBatchInserter(String storeDir, final Map<String, String> config, long nodesToCreate, final NodeStructFactory nodeStructFactory) {
this.storeDir = storeDir;
Expand All @@ -70,8 +71,9 @@ void init() {
disruptor.
handleEventsWith(propertyMappingHandlers).
then(propertyRecordCreatorHandler, relationshipIdHandler).
then(relationshipWriter, propertyWriter).
then(nodeWriter,cleanupMemoryHandler);
then(forwardRelationshipUpdateHandler, propertyWriter).
then(relationshipWriter, nodeWriter).
then(cleanupMemoryHandler);
}

private void createHandlers(NeoStore neoStore, NodeStructFactory nodeStructFactory) {
Expand All @@ -83,7 +85,9 @@ private void createHandlers(NeoStore neoStore, NodeStructFactory nodeStructFacto
//nodeWriter = new NodeFileWriteHandler(new File(nodeStore.getStorageFileName()));
nodeWriter = new NodeWriteRecordHandler(neoStore.getNodeStore());
propertyWriter = new PropertyWriteRecordHandler(neoStore.getPropertyStore());
relationshipWriter = new RelationshipWriteHandler(new RelationshipRecordWriter(neoStore.getRelationshipStore()), nodeStructFactory.getTotalNrOfRels());
final RelationshipRecordWriter relationshipRecordWriter = new RelationshipRecordWriter(neoStore.getRelationshipStore());
relationshipWriter = new RelationshipWriteHandler(relationshipRecordWriter, nodeStructFactory.getTotalNrOfRels());
forwardRelationshipUpdateHandler = new ForwardRelationshipUpdateHandler(relationshipRecordWriter, nodeStructFactory.getTotalNrOfRels());
cleanupMemoryHandler = new CleanupMemoryHandler();
//relationshipWriter = new RelationshipWriteHandler(new RelationshipFileWriter(new File(neoStore.getRelationshipStore().getStorageFileName())));
}
Expand Down Expand Up @@ -111,6 +115,7 @@ void shutdown() {

nodeWriter.close();
propertyWriter.close();
forwardRelationshipUpdateHandler.close();
relationshipWriter.close();

inserter.shutdown();
Expand Down
@@ -0,0 +1,74 @@
package org.neo4j.batchimport.handlers;

import com.lmax.disruptor.EventHandler;
import org.neo4j.batchimport.structs.NodeStruct;
import org.neo4j.batchimport.structs.Relationship;
import org.neo4j.kernel.impl.nioneo.store.Record;

import java.io.IOException;

/**
* @author mh
* @since 27.10.12
*/
public class ForwardRelationshipUpdateHandler implements EventHandler<NodeStruct> {
private long counter;

private final ForwardRelationshipUpdateManager futureNodeRelInfo;
private final RelationshipUpdateCache cache;

public ForwardRelationshipUpdateHandler(RelationshipWriter relationshipWriter, final long totalNrOfRels) {
cache = new RelationshipUpdateCache(relationshipWriter, totalNrOfRels);
futureNodeRelInfo = new ForwardRelationshipUpdateManager(cache);
}

@Override
public void onEvent(NodeStruct event, long nodeId, boolean endOfBatch) throws Exception {
event.firstRel = firstRelationshipId(event,futureNodeRelInfo.getFirstRelId(nodeId));

if (Record.NO_NEXT_RELATIONSHIP.is(event.firstRel)) return;

event.prevId = futureNodeRelInfo.done(nodeId, getFirstOwnRelationshipId(event));

int count = event.relationshipCount;

for (int i = 0; i < count; i++) {
Relationship relationship = event.getRelationship(i);
storeFutureRelId(nodeId, relationship);

counter++;
}
}

private long firstRelationshipId(NodeStruct event, Long firstFutureId) {
if (firstFutureId!=null) return firstFutureId;
return getFirstOwnRelationshipId(event);
}

private long getFirstOwnRelationshipId(NodeStruct event) {
if (event.relationshipCount == 0) return Record.NO_PREV_RELATIONSHIP.intValue();
return event.getRelationship(0).id;
}


@Override
public String toString() {
return "rel-update-handler " + counter + " "+cache;
}

public void close() {
try {
cache.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void storeFutureRelId(long nodeId, Relationship relationship) throws IOException {
long other = relationship.other();
if (other <= nodeId) return;
final boolean otherDirection = !relationship.outgoing();
if (relationship.id==4842) System.out.println(nodeId+" relationship = " + relationship);
futureNodeRelInfo.add(other,relationship.id,otherDirection);
}
}
Expand Up @@ -17,7 +17,7 @@ class RelationshipUpdateInfo { // todo replace with long or bytebuffer/array
volatile boolean outgoing;
private final long firstId;

public RelationshipUpdateInfo(long firstId) { // todo replace by prevId
public RelationshipUpdateInfo(long firstId) { // todo replace by prevId??
this.firstId = firstId;
}

Expand Down
Expand Up @@ -103,7 +103,7 @@ private void flushBuffer(boolean force) throws IOException {
* only works for prevId & nextId <= MAXINT
*/
@Override
public void update(long id, boolean outgoing, long prevId, long nextId) throws IOException {
public boolean update(long id, boolean outgoing, long prevId, long nextId) throws IOException {
flushBuffer(true);
long position = id * RelationshipStore.RECORD_SIZE + 1 + 4 + 4 + 4; // inUse, firstNode, secondNode, relType

Expand All @@ -120,6 +120,7 @@ public void update(long id, boolean outgoing, long prevId, long nextId) throws I

updated += channel.write(updateBuffer);
channel.position(oldPos);
return true;
}

@Override
Expand Down
Expand Up @@ -27,8 +27,9 @@ public void create(long nodeId, NodeStruct event, Relationship relationship, lon
}

@Override
public void update(long relId, boolean outgoing, long prevId, long nextId) {
RelationshipRecord record = relationshipStore.getRecord(relId);
public boolean update(long relId, boolean outgoing, long prevId, long nextId) {
RelationshipRecord record = relationshipStore.getLightRel(relId);
if (record==null) return false;
if (outgoing) {
record.setFirstPrevRel(prevId);
record.setFirstNextRel(nextId);
Expand All @@ -38,6 +39,7 @@ public void update(long relId, boolean outgoing, long prevId, long nextId) {
}
updateRecord(record);
updated++;
return true;
}

private void updateRecord(RelationshipRecord record) {
Expand Down
@@ -1,6 +1,10 @@
package org.neo4j.batchimport.handlers;

import edu.ucla.sspace.util.primitive.IntSet;
import edu.ucla.sspace.util.primitive.TroveIntSet;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;

Expand Down Expand Up @@ -40,7 +44,7 @@ void written(long count) {

@Override
public String toString() {
return String.format("buffer %d min %d max %d added %d written %d%n",idx,min==Long.MAX_VALUE?-1:min,max==Long.MIN_VALUE?-1:max,added,written);
return String.format("buffer %d min %d max %d added %d written %d %n",idx,min==Long.MAX_VALUE?-1:min,max==Long.MIN_VALUE?-1:max,added,written);
}
}
public RelationshipUpdateCache(RelationshipWriter relationshipUpdater, long total) {
Expand All @@ -66,32 +70,45 @@ private ByteBuffer[] createBuffers(final int buckets, final int capacity) {
return buffers;
}

public void update(long relId, boolean outgoing, long prevId, long nextId) throws IOException {
public boolean update(long relId, boolean outgoing, long prevId, long nextId) throws IOException {
ByteBuffer buffer = selectBuffer(relId);

// final int position = buffer.position();
// final int limit = buffer.limit();
// System.out.printf("rel %d buffer pos %d buffer limit %d pos==limit %s %n",relId,buffer.position(),buffer.limit(),buffer.position()==buffer.limit());
flushBuffer(buffer, false);
// final int posAfterFlush = buffer.position();
addToBuffer(buffer, relId, outgoing, prevId, nextId);
flushBuffer(buffer,false);
// final int posAfterAdd = buffer.position();
return true;
}

private void addToBuffer(ByteBuffer buffer, long relId, boolean outgoing, long prevId, long nextId) {
// final int position = buffer.position();
int relIdMod = (int)((relId & 0x700000000L) >> 31); //0..2
int prevIdMod = prevId <= 0 ? 0 : (int)((prevId & 0x700000000L) >> 28); //3..5
int nextIdMod = nextId <= 0 ? 0 : (int)((nextId & 0x700000000L) >> 25); //6..8
final int outgoingMod = (outgoing ? 1 : 0) << 9;
// x x|xx xxx xxx
short header = (short) (outgoingMod | nextIdMod | prevIdMod |relIdMod);
buffer.putShort(header).putInt((int)relId).putInt((int) prevId).putInt((int) nextId);
try {
buffer.putShort(header);
buffer.putInt((int)relId);
buffer.putInt((int) prevId);
buffer.putInt((int) nextId);
} catch (BufferOverflowException e) {
throw e;
}
}

private void updateFromBuffer(ByteBuffer buffer) throws IOException {
private boolean updateFromBuffer(ByteBuffer buffer) throws IOException {
// x x|xx xxx xxx
short header = buffer.getShort();
long relId = readIntAsLong(buffer,header & 0x07);
long prevId = readIntAsLong(buffer, header>>3 & 0x07);
long nextId = readIntAsLong(buffer, header>>6 & 0x07);
boolean outgoing = (header & 0x0200 /*0010.0000*/) != 0;

relationshipUpdater.update(relId, outgoing, prevId, nextId);
return relationshipUpdater.update(relId, outgoing, prevId, nextId);
}

private ByteBuffer selectBuffer(long relId) {
Expand All @@ -106,12 +123,36 @@ private void flushBuffer(ByteBuffer buffer, boolean force) throws IOException {
if (force || buffer.position()==buffer.limit()) {
buffer.limit(buffer.position());
buffer.position(0);
// long time=System.currentTimeMillis();
while (buffer.position()!=buffer.limit()) updateFromBuffer(buffer);
stats[idx(buffer)].written(buffer.position()/RECORD_SIZE);
// System.out.println("Flushed buffer "+idx(buffer)+" in "+(System.currentTimeMillis()-time)+" ms.");
buffer.clear().limit(CAPACITY);
IntSet failedPositions = new TroveIntSet(100);
while (buffer.position() != buffer.limit()) {
final int position = buffer.position();
if (!updateFromBuffer(buffer)) failedPositions.add(position);
}
System.out.println("failedPositions.size() = " + failedPositions.size());
stats[idx(buffer)].written(buffer.position()/RECORD_SIZE - failedPositions.size());
buffer.limit(CAPACITY);
int initialPos=failedPositions.isEmpty() ? 0 : copyFailedPositions(buffer, failedPositions);
buffer.position(initialPos);
}
}

private int copyFailedPositions(ByteBuffer buffer, IntSet failedPositions) {
byte[] tmp=new byte[RECORD_SIZE];
int writePos=0;
for (Integer pos : failedPositions) {
buffer.position(pos);
buffer.get(tmp);
buffer.position(writePos);
buffer.put(tmp);
writePos=buffer.position();
}
try {
// give the relationship-writer time to write out the relationships
if (writePos==buffer.limit()) Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return writePos;
}

private int idx(ByteBuffer buffer) {
Expand Down
Expand Up @@ -7,5 +7,5 @@
* @since 11.11.12
*/
public interface RelationshipUpdater {
void update(long relId, boolean outgoing, long prevId, long nextId) throws IOException;
boolean update(long relId, boolean outgoing, long prevId, long nextId) throws IOException;
}
Expand Up @@ -15,44 +15,39 @@ public class RelationshipWriteHandler implements EventHandler<NodeStruct> {
private long counter;
private final RelationshipWriter relationshipWriter;

// store reverse node-id to rel-id for future updates of relationship-records
// todo reuse and pool the CompactLongRecords, so we can skip IntArray creation
// final ReverseRelationshipMap futureModeRelIdQueue = new ConcurrentLongReverseRelationshipMap();
ForwardRelationshipUpdateManager futureNodeRelInfo;
private final RelationshipUpdateCache cache;
// ForwardRelationshipUpdateManager futureNodeRelInfo;
// private final RelationshipUpdateCache cache;

public RelationshipWriteHandler(RelationshipWriter relationshipWriter, final long totalNrOfRels) {
this.relationshipWriter = relationshipWriter;
cache = new RelationshipUpdateCache(relationshipWriter, totalNrOfRels);
futureNodeRelInfo = new ForwardRelationshipUpdateManager(cache);
// cache = new RelationshipUpdateCache(relationshipWriter, totalNrOfRels);
// futureNodeRelInfo = new ForwardRelationshipUpdateManager(cache);
}

@Override
public void onEvent(NodeStruct event, long nodeId, boolean endOfBatch) throws Exception {

// CompactLongRecord relationshipsToUpdate = futureModeRelIdQueue.retrieve(nodeId);

event.firstRel = firstRelationshipId(event,futureNodeRelInfo.getFirstRelId(nodeId));
// event.firstRel = firstRelationshipId(event,futureNodeRelInfo.getFirstRelId(nodeId));

if (Record.NO_NEXT_RELATIONSHIP.is(event.firstRel)) return;

long maxRelationshipId = maxRelationshipId(event);

relationshipWriter.start(maxRelationshipId);


int count = event.relationshipCount;

long followingNextRelationshipId = Record.NO_NEXT_RELATIONSHIP.intValue();

long prevId = futureNodeRelInfo.done(nodeId, getFirstOwnRelationshipId(event));
// long prevId = futureNodeRelInfo.done(nodeId, getFirstOwnRelationshipId(event));
long prevId = event.prevId;

for (int i = 0; i < count; i++) {
long nextId = i+1 < count ? event.getRelationship(i+1).id : followingNextRelationshipId;
Relationship relationship = event.getRelationship(i);
relationshipWriter.create(nodeId,event, relationship, prevId, nextId);
prevId = relationship.id;
storeFutureRelId(nodeId, relationship,prevId);
// storeFutureRelId(nodeId, relationship,prevId);

counter++;
}
Expand All @@ -62,18 +57,18 @@ public void onEvent(NodeStruct event, long nodeId, boolean endOfBatch) throws Ex

@Override
public String toString() {
return "rel-record-writer " + counter + " \n"+relationshipWriter+" "+cache;
return "rel-record-writer " + counter + " \n"+relationshipWriter; // +" "+cache;
}
public void close() {
try {
cache.close();
// cache.close();
relationshipWriter.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}


/*
private void storeFutureRelId(long nodeId, Relationship relationship, long relId) throws IOException {
long other = relationship.other();
if (other < nodeId) return;
Expand All @@ -90,6 +85,7 @@ private long getFirstOwnRelationshipId(NodeStruct event) {
if (event.relationshipCount == 0) return Record.NO_PREV_RELATIONSHIP.intValue();
return event.getRelationship(0).id;
}
*/


private long maxRelationshipId(NodeStruct event) {
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/neo4j/batchimport/structs/NodeStruct.java
Expand Up @@ -10,9 +10,9 @@
* @since 27.10.12
*/
public class NodeStruct extends PropertyHolder {
//long p1,p2,p3,p4,p5,p6,p7;
public volatile long firstRel = Record.NO_NEXT_RELATIONSHIP.intValue();
//long o1,o2,o3,o4,o5,o6,o7;

public volatile long prevId;

private final Relationship[] relationships;
public final List<Relationship> moreRelationships = new ArrayList<Relationship>();
Expand All @@ -39,6 +39,7 @@ public NodeStruct(int propertyCount) {
public NodeStruct init() {
super.init();
firstRel = Record.NO_NEXT_RELATIONSHIP.intValue();
prevId = Record.NO_PREV_RELATIONSHIP.intValue();
clearRelationshipInfo();
return this;
}
Expand Down

0 comments on commit dc1e0dd

Please sign in to comment.