Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

custom implementation of input stream chunker

  • Loading branch information...
commit 0736d44cf39b57397f998d8c7fbd8197d1b0bd48 1 parent fbaafad
@jexp authored
View
3  generate.sh
@@ -0,0 +1,3 @@
+source ./settings.sh
+
+mvn clean test-compile exec:java -Dexec.mainClass=org.neo4j.batchimport.TestDataGenerator -Dexec.classpathScope=test -Dexec.args=sorted
View
3  import_csv.sh
@@ -0,0 +1,3 @@
+source ./settings.sh
+
+mvn clean test-compile exec:java -Dexec.mainClass=org.neo4j.batchimport.ParallelImporter -Dexec.classpathScope=test -Dexec.args="/mnt/parallel.db nodes.csv rels.csv 100000000 4 50 100 2 ONE,TWO,THREE,FOUR,FIVE,SIX,SEVEN,EIGHT,NINE,TEN"
View
2  src/main/java/org/neo4j/batchimport/DisruptorBatchInserter.java
@@ -48,7 +48,7 @@
public DisruptorBatchInserter(String storeDir, final Map<String, String> config, long nodesToCreate, final NodeStructFactory nodeStructFactory) {
this.storeDir = storeDir;
- final int minBufferBits = 18; // (int) (Math.log(nodesToCreate / 100) / Math.log(2));
+ final int minBufferBits = 10; // (int) (Math.log(nodesToCreate / 100) / Math.log(2));
RING_SIZE = 1 << minBufferBits; //Math.min(minBufferBits,18);
System.out.println("Ring size "+RING_SIZE);
this.config = config;
View
65 src/main/java/org/neo4j/batchimport/ParallelImporter.java
@@ -1,10 +1,10 @@
package org.neo4j.batchimport;
import org.apache.log4j.Logger;
-import org.neo4j.batchimport.importer.RowData;
import org.neo4j.batchimport.structs.NodeStruct;
import org.neo4j.batchimport.structs.PropertyHolder;
import org.neo4j.batchimport.structs.Relationship;
+import org.neo4j.batchimport.utils.Chunker;
import org.neo4j.batchimport.utils.Params;
import org.neo4j.consistency.ConsistencyCheckTool;
import org.neo4j.helpers.collection.MapUtil;
@@ -13,6 +13,7 @@
import java.io.*;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -65,16 +66,18 @@
private final int propsPerRel;
private Report report;
private BufferedReader nodesReader;
- private RowData nodesData;
private BufferedReader relsReader;
- private RowData relsData;
private String[] relTypes;
private int[] nodePropIds;
private int[] relPropIds;
- private Object[] relHeader;
private int[] relTypeIds;
private final int relTypesCount;
- private Object[] relRowData;
+ private Chunker nodeChunker;
+ private Chunker relChunker;
+ private int nodePropCount;
+ private int relPropCount;
+ private long from = -1;
+ private long to = -1;
public ParallelImporter(File graphDb, File nodesFile, File relationshipsFile,
long nodesToCreate, int propsPerNode, int relsPerNode, int maxRelsPerNode, int propsPerRel, String[] relTypes) {
@@ -189,17 +192,19 @@ private void initRelTypes(BatchInserterImpl inserter) {
private void initReader() throws IOException {
nodesReader = new BufferedReader(new FileReader(nodesFile), MEGABYTE);
- nodesData = new RowData(nodesReader.readLine(), "\t", 0);
+ nodeChunker = new Chunker(nodesReader, '\t');
- relHeader = new Object[3];
relsReader = new BufferedReader(new FileReader(relationshipsFile), MEGABYTE);
- relsData = new RowData(relsReader.readLine(), "\t", 3);
+ relChunker = new Chunker(relsReader, '\t');
}
- private void initProperties(BatchInserterImpl inserter) {
- final String[] nodesFields = nodesData.getFields();
- final String[] relFields = relsData.getFields();
+ private void initProperties(BatchInserterImpl inserter) throws IOException {
+ final String[] nodesFields = nodesReader.readLine().split("\t");
+ nodePropCount = nodesFields.length;
+ String[] relFields = relsReader.readLine().split("\t");
+ relFields = Arrays.copyOfRange(relFields, 3, relFields.length);
+ relPropCount = relFields.length;
List<String> propertyNames = new ArrayList<String>(asList(nodesFields));
propertyNames.addAll(asList(relFields));
@@ -215,11 +220,9 @@ private void initProperties(BatchInserterImpl inserter) {
@Override
public void fillStruct(long nodeId, NodeStruct nodeStruct) {
try {
- String nodesLine = nodesReader.readLine();
- if (nodesLine == null) throw new IllegalStateException("Less Node rows than indicated at id " + nodeId);
- final Object[] rowData = nodesData.updateArray(nodesLine,(Object[])null);
- addProperties(nodeStruct, rowData, nodesData.getCount(), nodePropIds);
+ if (nodeId>=nodesToCreate) throw new IllegalStateException("Already at "+nodeId+" but only configured to import "+nodesToCreate+" nodes");
+ addProperties(nodeStruct,nodeChunker, nodePropIds,nodePropCount);
addRelationships(nodeId, nodeStruct);
@@ -231,26 +234,24 @@ public void fillStruct(long nodeId, NodeStruct nodeStruct) {
private void addRelationships(long nodeId, NodeStruct nodeStruct) throws IOException {
while (true) {
- // todo real record-class for relationship-row data
- if (relRowData==null) {
- String line = relsReader.readLine();
- if (line==null) break; // reached end
- relRowData = relsData.updateArray(line, relHeader);
+ if (from == -1) {
+ final String token = relChunker.nextWord();
+ if (token==null) return;
+ from = Long.parseLong(token);
}
-
- long from = Long.parseLong((String)relHeader[0]);
- long to = Long.parseLong((String)relHeader[1]);
+ if (to == -1) to = Long.parseLong(relChunker.nextWord());
long min = Math.min(from, to);
if (min < nodeId)
throw new IllegalStateException(String.format("relationship-rows not pre-sorted found id %d less than node-id %d", min, nodeId));
- if (min > nodeId) break; // keep row data
+ if (min > nodeId) break; // keep parsed data
long target = Math.max(from, to);
final boolean outgoing = from == min;
- final Relationship rel = nodeStruct.addRel(target, outgoing, type(relHeader[2]));
+ final Relationship rel = nodeStruct.addRel(target, outgoing, type(relChunker.nextWord()));
- addProperties(rel, relRowData, relsData.getCount(), relPropIds);
- relRowData = null;
+ addProperties(rel, relChunker, relPropIds,relPropCount);
+ from = -1;
+ to = -1;
}
}
@@ -260,10 +261,12 @@ private int type(Object relType) {
throw new IllegalStateException("Unknown Relationship-Type "+relType);
}
- private void addProperties(PropertyHolder propertyHolder, Object[] rowData, int count, final int[] propIds) {
- for (int i=count-1;i>=0;i--) {
- if (rowData[i]==null) continue;
- propertyHolder.addProperty(propIds[i], rowData[i]);
+ private void addProperties(PropertyHolder propertyHolder, Chunker nodeChunker, final int[] propIds, int count) throws IOException {
+ for (int i = 0; i < count; i++) {
+ final String value = nodeChunker.nextWord();
+ if (value==null) return; // EOF
+ if (value.isEmpty()) continue;
+ propertyHolder.addProperty(propIds[i], value);
}
}
View
2  src/main/java/org/neo4j/batchimport/handlers/RelationshipUpdateCache.java
@@ -14,7 +14,7 @@
*/
public class RelationshipUpdateCache implements RelationshipUpdater {
private static final int BUCKETS = 16;
- public static final int RELS_PER_BUFFER = 20 * (1024 * 1024);
+ public static final int RELS_PER_BUFFER = 1 * (1024 * 1024);
private static final int RECORD_SIZE = (Short.SIZE + 3 * Integer.SIZE) / 8;
private static final int CAPACITY = RELS_PER_BUFFER * RECORD_SIZE;
View
31 src/main/java/org/neo4j/batchimport/utils/Chunker.java
@@ -0,0 +1,31 @@
+package org.neo4j.batchimport.utils;
+
+import java.io.IOException;
+import java.io.Reader;
+
+/**
+* @author mh
+* @since 13.11.12
+*/
+public class Chunker {
+ private final Reader reader;
+ private final char delim;
+ private final char[] buffer=new char[100];
+
+ public Chunker(Reader reader, char delim) {
+ this.reader = reader;
+ this.delim = delim;
+ }
+ public String nextWord() throws IOException {
+ int count = 0;
+ int ch;
+ while ((ch = reader.read())!=delim && ch!='\n' && ch!=-1) {
+ buffer[count++]=(char)ch;
+ }
+ if (count == 0) {
+ if (ch == -1) return null;
+ return "";
+ }
+ return String.valueOf(buffer,0, count);
+ }
+}
View
43 src/test/java/org/neo4j/batchimport/StreamTokenizerTest.java
@@ -0,0 +1,43 @@
+package org.neo4j.batchimport;
+
+import org.junit.Test;
+import org.neo4j.batchimport.utils.Chunker;
+
+import java.io.*;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author mh
+ * @since 13.11.12
+ */
+public class StreamTokenizerTest {
+
+ String file = "FROM\tTO\tTYPE\tNAME\tAGE:INT\n"
+ +"1\t2\tKNOWS\tFOO\t42\n"
+ +"1\t2\tKNOWS\t\t42"
+ ;
+
+ @Test
+ public void testReadHeader() throws Exception {
+ final BufferedReader reader = new BufferedReader(new StringReader(file));
+ final String[] header = reader.readLine().split("\t");
+ final Chunker chunker = new Chunker(reader, '\t');
+ readLine(header, chunker, "FOO", "42");
+ readLine(header, chunker, "", "42");
+ assertEquals(null,chunker.nextWord());
+ }
+
+ private void readLine(String[] header, Chunker st, Object...values) throws IOException {
+ long from = Long.parseLong(st.nextWord());
+ assertEquals(1,from);
+ long to = Long.parseLong(st.nextWord());
+ assertEquals(2,to);
+ String type = st.nextWord();
+ assertEquals("KNOWS", type);
+
+ for (int i = 3; i < header.length; i++) {
+ assertEquals(header[i], values[i - 3], st.nextWord());
+ }
+ }
+}
View
2  src/test/java/org/neo4j/batchimport/TestDataGenerator.java
@@ -14,7 +14,7 @@
@Ignore
public class TestDataGenerator {
- private static int NODES = 1000 * 1000; // * 1000;
+ private static int NODES = 1 * 1000; // * 1000;
private static final int RELS_PER_NODE = 50;
private static final String[] TYPES = {"ONE","TWO","THREE","FOUR","FIVE","SIX","SEVEN","EIGHT","NINE","TEN"};
public static final int NUM_TYPES = 10;
Please sign in to comment.
Something went wrong with that request. Please try again.