Skip to content
This repository has been archived by the owner on Jun 9, 2020. It is now read-only.

Commit

Permalink
added support for optinally more relationships per node, reduce memor…
Browse files Browse the repository at this point in the history
…y usage by reusing property-blocks
  • Loading branch information
jexp committed Oct 31, 2012
1 parent 1fa94cc commit 5ea1c79
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 27 deletions.
15 changes: 15 additions & 0 deletions readme.md
Expand Up @@ -29,3 +29,18 @@ current limitations, constraints:
* only up to 2bn relationships (due to an int based multi-map)
* have to know max # of rels per node, properties per node and relationship
* relationships have to be pre-sorted by min(start,end)


future improvements:

* implement batch-importer CSV "API" on top of this
* stripe writes across store-files (i.e. strip the relationship-record file over 10 handlers, according to CPUs)
* parallelize writing to dynamic string and arraystore too
* change relationship-record updates for backwards pointers to run in a separate handler that is
RandomAccessFile-based (or nio2) and just writes the 2 int values directly at file-pos
* add a csv analyser / sorter that
* add support & parallelize index addition
* good support for index based lookup for relationship construction (kv-store, better in-memory structure, e.g. a collection of long[])
* use id-compression internally to save memory in structs (write a CompressedLongArray)
* reuse PropertyBlock, PropertyRecords, RelationshipRecords, NodeRecords, probably subclass them and override getId() etc. or copy the code
from the Store's to work with interfaces
Expand Up @@ -69,7 +69,7 @@ private void createHandlers(NeoStore neoStore, NodeStructFactory nodeStructFacto
propertyMappingHandlers = PropertyEncodingHandler.createHandlers(inserter);

propertyRecordCreatorHandler = new PropertyRecordCreatorHandler();
relationshipIdHandler = new RelationshipIdHandler(nodeStructFactory.getRelsPerNode());
relationshipIdHandler = new RelationshipIdHandler(nodeStructFactory.getMaxRelsPerNode());

//nodeWriter = new NodeFileWriteHandler(new File(nodeStore.getStorageFileName()));
nodeWriter = new NodeWriteRecordHandler(neoStore.getNodeStore());
Expand All @@ -87,7 +87,10 @@ void run() {

nodeStructFactory.fillStruct(nodeId,nodeStruct);

if (nodeId % (nodesToCreate / 10) == 0) log.info(nodeId + " " + (System.currentTimeMillis()-time)+" ms.");
if (nodeId % (nodesToCreate / 100) == 0) {
log.info(nodeId + " " + (System.currentTimeMillis()-time)+" ms.");
time = System.currentTimeMillis();
}
ringBuffer.publish(sequence);
}
}
Expand Down
16 changes: 11 additions & 5 deletions src/main/java/org/neo4j/batchimport/DisruptorTest.java
Expand Up @@ -7,7 +7,6 @@
import org.neo4j.unsafe.batchinsert.BatchInserterImpl;

import java.io.*;
import java.util.Arrays;
import java.util.Map;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -77,9 +76,10 @@ private static Map<String, String> config() {

public static class TestNodeStructFactory implements NodeStructFactory {
public static final int RELS_PER_NODE = 10;
public static final int MAX_RELS_PER_NODE = TestNodeStructFactory.RELS_PER_NODE;
public static final int REL_PROPERTY_COUNT = 1;
public static final int NODE_PROPERTY_COUNT = 2;
public static final int[] REL_OFFSETS = new int[RELS_PER_NODE];
public static final int[] REL_OFFSETS = new int[MAX_RELS_PER_NODE];

// constant values, to avoid boxing every time
private final static Float WEIGHT = 10F;
Expand All @@ -92,12 +92,12 @@ public static class TestNodeStructFactory implements NodeStructFactory {
private int type;

static {
for (int i = 0; i < TestNodeStructFactory.RELS_PER_NODE; i++) TestNodeStructFactory.REL_OFFSETS[i] = 1 << 2 * i;
for (int i = 0; i < MAX_RELS_PER_NODE; i++) TestNodeStructFactory.REL_OFFSETS[i] = 1 << 2 * i;
}

@Override
public NodeStruct newInstance() {
return new NodeStruct(NODE_PROPERTY_COUNT, RELS_PER_NODE, REL_PROPERTY_COUNT);
return new NodeStruct(NODE_PROPERTY_COUNT);
}

@Override
Expand All @@ -109,6 +109,7 @@ public void init(BatchInserterImpl inserter) {
age = inserter.getPropertyKeyId("age");
weight = inserter.getPropertyKeyId("weight");
type = inserter.getRelTypeId("CONNECTS");
NodeStruct.classInit(RELS_PER_NODE,REL_PROPERTY_COUNT);
}

@Override
Expand All @@ -118,7 +119,7 @@ public void fillStruct(long nodeId, NodeStruct nodeStruct) {
nodeStruct.addProperty(blocked, Boolean.TRUE);
nodeStruct.addProperty(age, VALUE);
// now only "local" relationships close to the original node-id
for (int r = 0; r < TestNodeStructFactory.RELS_PER_NODE; r++) {
for (int r = 0; r < MAX_RELS_PER_NODE; r++) {
long target = nodeId + TestNodeStructFactory.REL_OFFSETS[r];
// only target nodes beyond the current one
if (target >= NODES_TO_CREATE) continue;
Expand All @@ -131,5 +132,10 @@ public void fillStruct(long nodeId, NodeStruct nodeStruct) {
public int getRelsPerNode() {
return RELS_PER_NODE;
}

@Override
public int getMaxRelsPerNode() {
return MAX_RELS_PER_NODE;
}
}
}
2 changes: 2 additions & 0 deletions src/main/java/org/neo4j/batchimport/NodeStructFactory.java
Expand Up @@ -16,4 +16,6 @@ public interface NodeStructFactory extends EventFactory<NodeStruct> {
void fillStruct(long nodeId, NodeStruct nodeStruct);

int getRelsPerNode();

int getMaxRelsPerNode();
}
@@ -1,5 +1,7 @@
package org.neo4j.batchimport.collections;

import edu.ucla.sspace.util.primitive.IntSet;

import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;

Expand Down
Expand Up @@ -34,7 +34,7 @@ public void onEvent(NodeStruct event, long sequence, boolean endOfBatch) throws
if ((sequence & MASK) != pos) return;
encodeProperties(event);
for (int i = 0; i < event.relationshipCount; i++) {
encodeProperties(event.relationships[i]);
encodeProperties(event.getRelationship(i));
}
}

Expand Down
Expand Up @@ -2,6 +2,7 @@

import com.lmax.disruptor.EventHandler;
import org.neo4j.batchimport.structs.NodeStruct;
import org.neo4j.batchimport.structs.Property;
import org.neo4j.batchimport.structs.PropertyHolder;
import org.neo4j.kernel.impl.nioneo.store.PropertyBlock;
import org.neo4j.kernel.impl.nioneo.store.PropertyRecord;
Expand All @@ -19,7 +20,7 @@ public class PropertyRecordCreatorHandler implements EventHandler<NodeStruct> {
public void onEvent(NodeStruct event, long sequence, boolean endOfBatch) throws Exception {
createPropertyRecords(event);
for (int i = 0; i < event.relationshipCount; i++) {
createPropertyRecords(event.relationships[i]);
createPropertyRecords(event.getRelationship(i));
}
event.lastPropertyId = propertyId;
}
Expand All @@ -32,7 +33,8 @@ private void createPropertyRecords(PropertyHolder holder) {
int index=0;
holder.propertyRecords[index++] = currentRecord;
for (int i = 0; i < holder.propertyCount; i++) {
PropertyBlock block = holder.properties[i].block;
Property property = holder.properties[i];
PropertyBlock block = property.block;
if (currentRecord.size() + block.getSize() > PAYLOAD_SIZE){
currentRecord.setNextProp(propertyId);
currentRecord = createRecord(propertyId);
Expand All @@ -41,6 +43,7 @@ private void createPropertyRecords(PropertyHolder holder) {
holder.propertyRecords[index++] = currentRecord;
}
currentRecord.addPropertyBlock(block);
property.clean();
}
if (index<holder.propertyRecords.length) holder.propertyRecords[index]=null;
}
Expand Down
Expand Up @@ -23,7 +23,7 @@ public void onEvent(NodeStruct event, long sequence, boolean endOfBatch) throws
if (propStore.getHighId() <= event.lastPropertyId) propStore.setHighId(event.lastPropertyId);
writePropertyRecords(event);
for (int i = 0; i < event.relationshipCount; i++) {
writePropertyRecords(event.relationships[i]);
writePropertyRecords(event.getRelationship(i));
}
if (endOfBatch) propStore.flushAll();
}
Expand Down
Expand Up @@ -31,7 +31,7 @@ public RelationshipIdHandler(int relsPerNode) {

public void onEvent(NodeStruct event, long nodeId, boolean endOfBatch) throws Exception {
for (int i = 0; i < event.relationshipCount; i++) {
Relationship relationship = event.relationships[i];
Relationship relationship = event.getRelationship(i);
long relId = this.relId++;
relationship.id = relId;
storeFutureRelId(nodeId, relationship,relId);
Expand Down Expand Up @@ -60,7 +60,7 @@ private long[] futureRelIds(long nodeId, ReverseRelationshipMap futureRelIds) {
}

private long firstRelationshipId(NodeStruct event) {
if (event.relationshipCount>0) return event.relationships[0].id;
if (event.relationshipCount>0) return event.getRelationship(0).id;
if (event.outgoingRelationshipsToUpdate!=null) return event.outgoingRelationshipsToUpdate[0];
if (event.incomingRelationshipsToUpdate!=null) return event.incomingRelationshipsToUpdate[0];
return Record.NO_PREV_RELATIONSHIP.intValue();
Expand All @@ -71,7 +71,7 @@ private long maxRelationshipId(NodeStruct event) {

if (event.incomingRelationshipsToUpdate!=null) result=Math.max(event.incomingRelationshipsToUpdate[Utils.size(event.incomingRelationshipsToUpdate)-1],result);
if (event.outgoingRelationshipsToUpdate!=null) result=Math.max(event.outgoingRelationshipsToUpdate[Utils.size(event.outgoingRelationshipsToUpdate)-1],result);
if (event.relationshipCount>0) result=Math.max(event.relationships[event.relationshipCount-1].id,result);
if (event.relationshipCount>0) result=Math.max(event.getRelationship(event.relationshipCount-1).id,result);
return result;
}

Expand Down
Expand Up @@ -33,8 +33,8 @@ public void onEvent(NodeStruct event, long nodeId, boolean endOfBatch) throws Ex

long prevId = Record.NO_PREV_RELATIONSHIP.intValue();
for (int i = 0; i < count; i++) {
long nextId = i+1 < count ? event.relationships[i + 1].id : followingNextRelationshipId;
Relationship relationship = event.relationships[i];
long nextId = i+1 < count ? event.getRelationship(i+1).id : followingNextRelationshipId;
Relationship relationship = event.getRelationship(i);
relationshipWriter.create(nodeId,event, relationship, prevId, nextId);
prevId = relationship.id;
counter++;
Expand Down
38 changes: 32 additions & 6 deletions src/main/java/org/neo4j/batchimport/structs/NodeStruct.java
Expand Up @@ -2,6 +2,9 @@

import org.neo4j.kernel.impl.nioneo.store.Record;

import java.util.ArrayList;
import java.util.List;

/**
* @author mh
* @since 27.10.12
Expand All @@ -11,29 +14,52 @@ public class NodeStruct extends PropertyHolder {
public volatile long nextRel = Record.NO_NEXT_RELATIONSHIP.intValue();
//long o1,o2,o3,o4,o5,o6,o7;

public final Relationship[] relationships;
private final Relationship[] relationships;
public final List<Relationship> moreRelationships = new ArrayList<Relationship>();
public volatile int relationshipCount;

public volatile long lastPropertyId;
public volatile long maxRelationshipId;
public volatile long[] outgoingRelationshipsToUpdate;
public volatile long[] incomingRelationshipsToUpdate;
private static int avgRelCount;
private static int relPropertyCount;

public static void classInit(int avgRelCount, int relPropertyCount) {
NodeStruct.avgRelCount = avgRelCount;
NodeStruct.relPropertyCount = relPropertyCount;
}

public NodeStruct(int propertyCount, int relCount, int relPropertyCount) {
public NodeStruct(int propertyCount) {
super(propertyCount);
this.relationships=new Relationship[relCount];
for (int i = 0; i < relCount; i++) {
relationships[i]=new Relationship(relPropertyCount);
this.relationships=new Relationship[NodeStruct.avgRelCount];
for (int i = 0; isRelInArray(i); i++) {
relationships[i]=new Relationship(NodeStruct.relPropertyCount);
}
}

public NodeStruct init() {
super.init();
if (!isRelInArray(relationshipCount)) moreRelationships.clear();
relationshipCount=0;
nextRel = Record.NO_NEXT_RELATIONSHIP.intValue();
return this;
}
public Relationship addRel(long other, boolean outgoing, int type) {
return relationships[relationshipCount++].init(other,outgoing,type);
if (isRelInArray(relationshipCount++)) {
return relationships[relationshipCount-1].init(other,outgoing,type);
}
Relationship rel = new Relationship(relPropertyCount).init(other, outgoing, type);
moreRelationships.add(rel);
return rel;
}

public Relationship getRelationship(int i) {
if (isRelInArray(i)) return relationships[i];
return moreRelationships.get(i-avgRelCount);
}

private boolean isRelInArray(int i) {
return i<avgRelCount;
}
}
12 changes: 7 additions & 5 deletions src/main/java/org/neo4j/batchimport/structs/Property.java
Expand Up @@ -10,16 +10,18 @@
public class Property {
public volatile int nameIndex;
public volatile Object value;
public volatile PropertyBlock block;
public final PropertyBlock block = new PropertyBlock();

void init(int index, Object value) {
this.nameIndex =index;
this.nameIndex = index;
this.value = value;
this.block = null;
this.block.clean();
}
public void encode(PropertyStore propStore) {
PropertyBlock block = new PropertyBlock();
propStore.encodeValue(block, nameIndex, value);
this.block = block;
}

public void clean() {
this.value = null;
}
}
Expand Up @@ -28,6 +28,7 @@
public class PropertyBlock
{
private static final int MAX_ARRAY_TOSTRING_SIZE = 4;
public static final long[] EMPTY_LONG_ARRAY = new long[0];
private final List<DynamicRecord> valueRecords = new ArrayList<DynamicRecord>(8);
private long[] valueBlocks;
// private boolean inUse;
Expand Down Expand Up @@ -63,6 +64,12 @@ public void setSingleBlock( long value )
light = true;
}

public void clean() {
valueBlocks = EMPTY_LONG_ARRAY;
light = true;
valueRecords.clear();
}

public void addValueRecord( DynamicRecord record )
{
valueRecords.add( record );
Expand Down

0 comments on commit 5ea1c79

Please sign in to comment.