Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

merged with batch-performance repository

  • Loading branch information...
commit 4964fb56a868b21ee0ba3cdebe61e746e4860f08 2 parents 06b9a53 + 17ca9d5
@jexp authored
Showing with 4,950 additions and 259 deletions.
  1. +3 −1 .gitignore
  2. +2 −2 batch.properties
  3. +27 −6 pom.xml
  4. +163 −0 readme.md
  5. +116 −0 src/main/java/org/neo4j/batchimport/DisruptorBatchInserter.java
  6. +28 −246 src/main/java/org/neo4j/batchimport/Importer.java
  7. +21 −0 src/main/java/org/neo4j/batchimport/NodeStructFactory.java
  8. +39 −0 src/main/java/org/neo4j/batchimport/StdOutReport.java
  9. +85 −0 src/main/java/org/neo4j/batchimport/Utils.java
  10. +38 −0 src/main/java/org/neo4j/batchimport/collections/ConcurrentIntReverseRelationshipMap.java
  11. +40 −0 src/main/java/org/neo4j/batchimport/collections/ConcurrentLongReverseRelationshipMap.java
  12. +23 −0 src/main/java/org/neo4j/batchimport/collections/PrimitiveIntReverseRelationshipMap.java
  13. +10 −0 src/main/java/org/neo4j/batchimport/collections/ReverseRelationshipMap.java
  14. +86 −0 src/main/java/org/neo4j/batchimport/handlers/NodeWriteFileHandler.java
  15. +44 −0 src/main/java/org/neo4j/batchimport/handlers/NodeWriteRecordHandler.java
  16. +55 −0 src/main/java/org/neo4j/batchimport/handlers/PropertyEncodingHandler.java
  17. +62 −0 src/main/java/org/neo4j/batchimport/handlers/PropertyRecordCreatorHandler.java
  18. +21 −0 src/main/java/org/neo4j/batchimport/handlers/PropertyRecordHighIdHandler.java
  19. +52 −0 src/main/java/org/neo4j/batchimport/handlers/PropertyWriteRecordHandler.java
  20. +146 −0 src/main/java/org/neo4j/batchimport/handlers/RelationshipFileWriter.java
  21. +82 −0 src/main/java/org/neo4j/batchimport/handlers/RelationshipIdHandler.java
  22. +75 −0 src/main/java/org/neo4j/batchimport/handlers/RelationshipRecordWriter.java
  23. +79 −0 src/main/java/org/neo4j/batchimport/handlers/RelationshipWriteHandler.java
  24. +20 −0 src/main/java/org/neo4j/batchimport/handlers/RelationshipWriter.java
  25. +16 −0 src/main/java/org/neo4j/batchimport/importer/RelType.java
  26. +85 −0 src/main/java/org/neo4j/batchimport/importer/RowData.java
  27. +69 −0 src/main/java/org/neo4j/batchimport/importer/Type.java
  28. +65 −0 src/main/java/org/neo4j/batchimport/structs/NodeStruct.java
  29. +27 −0 src/main/java/org/neo4j/batchimport/structs/Property.java
  30. +34 −0 src/main/java/org/neo4j/batchimport/structs/PropertyHolder.java
  31. +34 −0 src/main/java/org/neo4j/batchimport/structs/Relationship.java
  32. +735 −0 src/main/java/org/neo4j/kernel/impl/nioneo/store/CommonAbstractStore.java
  33. +223 −0 src/main/java/org/neo4j/kernel/impl/nioneo/store/PropertyBlock.java
  34. +201 −0 src/main/java/org/neo4j/kernel/impl/nioneo/store/PropertyRecord.java
  35. +741 −0 src/main/java/org/neo4j/kernel/impl/nioneo/store/PropertyStore.java
  36. +1,013 −0 src/main/java/org/neo4j/unsafe/batchinsert/BatchInserterImpl.java
  37. +28 −0 src/main/resources/log4j.properties
  38. +2 −2 src/test/java/DataTest.java
  39. +1 −1  src/test/java/TestDataGenerator.java
  40. +141 −0 src/test/java/org/neo4j/batchimport/DisruptorTest.java
  41. +1 −1  src/test/java/org/neo4j/batchimport/ImporterTest.java
  42. +48 −0 src/test/java/org/neo4j/batchimport/RelTest.java
  43. +169 −0 src/test/java/org/neo4j/batchimport/TestImporter.java
View
4 .gitignore
@@ -1,9 +1,11 @@
.project
.shell_history
-.idea
*.ipr
*.iws
*.iml
+.idea
target
*.csv
+.DS_Store
+.settings
View
4 batch.properties
@@ -1,4 +1,4 @@
-dump_configuration=true
+dump_configuration=false
cache_type=none
use_memory_mapped_buffers=true
neostore.propertystore.db.index.keys.mapped_memory=5M
@@ -6,4 +6,4 @@ neostore.propertystore.db.index.mapped_memory=5M
neostore.nodestore.db.mapped_memory=200M
neostore.relationshipstore.db.mapped_memory=1000M
neostore.propertystore.db.mapped_memory=1000M
-neostore.propertystore.db.strings.mapped_memory=200M
+neostore.propertystore.db.strings.mapped_memory=200M
View
33 pom.xml
@@ -2,9 +2,11 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.neo4j</groupId>
<artifactId>batch-import</artifactId>
- <version>0.1-SNAPSHOT</version>
- <name>Simple Batch Importer</name>
-
+ <version>1.9-SNAPSHOT</version>
+ <name>Neo4j Batch Importer</name>
+ <properties>
+ <neo4j.version>1.9-SNAPSHOT</neo4j.version>
+ </properties>
<repositories>
<repository>
<id>Neo4j Snapshots</id>
@@ -14,12 +16,22 @@
<dependencies>
<dependency>
+ <groupId>edu.ucla.sspace</groupId>
+ <artifactId>sspace</artifactId>
+ <version>2.0.3</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.8.5</version>
@@ -28,14 +40,23 @@
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-kernel</artifactId>
- <version>1.8-SNAPSHOT</version>
+ <version>${neo4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.neo4j</groupId>
+ <artifactId>neo4j-enterprise</artifactId>
+ <version>${neo4j.version}</version>
</dependency>
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-lucene-index</artifactId>
- <version>1.8-SNAPSHOT</version>
+ <version>${neo4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.disruptor</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>2.10.3</version>
</dependency>
-
</dependencies>
<build>
<plugins>
View
163 readme.md
@@ -0,0 +1,163 @@
+# Neo4j (CSV) Batch Importer
+
+You provide one tab separated csv file for nodes and one for relationships (optionally more for indexes)
+
+Example data for the files is a small social network
+
+## File format
+
+* Property names in first row.
+* The row number corresponds to the node-id (node 0 is the reference node)
+* Property values not listed will not be set on the nodes or properties.
+* Optionally property fields can have a type (defaults to String) indicated with name:type where type is one of (int, long, float, double, boolean, byte, short, char, string). The string value is then converted to that type. Conversion failure will result in abort of the import operation.
+
+## Examples
+
+### nodes.csv
+
+ name age works_on
+ Michael 37 neo4j
+ Selina 14
+ Rana 6
+ Selma 4
+
+### rels.csv
+
+ start end type since counter:int
+ 1 2 FATHER_OF 1998-07-10 1
+ 1 3 FATHER_OF 2007-09-15 2
+ 1 4 FATHER_OF 2008-05-03 3
+ 3 4 SISTER_OF 2008-05-03 5
+ 2 3 SISTER_OF 2007-09-15 7
+
+
+## Execution
+
+ java -server -Xmx4G -jar ../batch-import/target/batch-import-jar-with-dependencies.jar neo4j/data/graph.db nodes.csv rels.csv
+
+
+ ynagzet:batchimport mh$ rm -rf target/db
+ ynagzet:batchimport mh$ mvn clean compile assembly:single
+ [INFO] Scanning for projects...
+ [INFO] ------------------------------------------------------------------------
+ [INFO] Building Simple Batch Importer
+ [INFO] task-segment: [clean, compile, assembly:single]
+ [INFO] ------------------------------------------------------------------------
+ ...
+ [INFO] Building jar: /Users/mh/java/neo/batchimport/target/batch-import-jar-with-dependencies.jar
+ [INFO] ------------------------------------------------------------------------
+ [INFO] BUILD SUCCESSFUL
+ [INFO] ------------------------------------------------------------------------
+ ynagzet:batchimport mh$ java -server -Xmx4G -jar target/batch-import-jar-with-dependencies.jar target/db nodes.csv rels.csv
+ Physical mem: 16384MB, Heap size: 3640MB
+ use_memory_mapped_buffers=false
+ neostore.propertystore.db.index.keys.mapped_memory=5M
+ neostore.propertystore.db.strings.mapped_memory=100M
+ neostore.propertystore.db.arrays.mapped_memory=215M
+ neo_store=/Users/mh/java/neo/batchimport/target/db/neostore
+ neostore.relationshipstore.db.mapped_memory=1000M
+ neostore.propertystore.db.index.mapped_memory=5M
+ neostore.propertystore.db.mapped_memory=1000M
+ dump_configuration=true
+ cache_type=none
+ neostore.nodestore.db.mapped_memory=200M
+ ...........................................................................
+ Importing 7500000 Nodes took 17 seconds
+ ....................................................................................................35818 ms
+ ....................................................................................................39343 ms
+ ....................................................................................................41788 ms
+ ....................................................................................................48897 ms
+ ............
+ Importing 41246740 Relationships took 170 seconds
+ 212 seconds
+ ynagzet:batchimport mh$ du -sh target/db/
+ 3,2G target/db/
+
+
+## Indexing
+
+Optionally you can add nodes and relationships to indexes.
+
+Add four arguments per each index to command line:
+
+To create a full text node index called users using nodes_index.csv:
+
+ node_index users fulltext nodes_index.csv
+
+To create an exact relationship index called worked using rels_index.csv:
+
+ rel_index worked exact rels_index.csv
+
+Example command line:
+
+ java -server -Xmx4G -jar ../batch-import/target/batch-import-jar-with-dependencies.jar neo4j/data/graph.db nodes.csv rels.csv node_index users fulltext nodes_index.csv rel_index worked exact rels_index.csv
+
+## Examples
+
+### nodes_index.csv
+
+ id name language
+ 1 Victor Richards West Frisian
+ 2 Virginia Shaw Korean
+ 3 Lois Simpson Belarusian
+ 4 Randy Bishop Hiri Motu
+ 5 Lori Mendoza Tok Pisin
+
+### rels_index.csv
+
+ id property1 property2
+ 0 cwqbnxrv rpyqdwhk
+ 1 qthnrret tzjmmhta
+ 2 dtztaqpy pbmcdqyc
+
+
+
+# Parallel Batch inserter with Neo4j
+
+Uses the [LMAX Disruptor](http://lmax-exchange.github.com/disruptor/) to parallelize operations during batch-insertion.
+
+## The 6 operations are:
+
+1. property encoding
+2. property-record creation
+3. relationship-id creation and forward handling of reverse relationship chains
+4. writing node-records
+5. writing relationship-records
+6. writing property-records
+
+## Dependencies:
+
+ (1)<--(2)<--(6)
+ (2)<--(5)-->(3)
+ (2)<--(4)-->(3)
+
+It uses the above dependency setup of disruptor handlers to execute the different concerns in parallel. A ringbuffer of about 2^18 elements is used and a heap size of 5-20G, MMIO configuration within the heap limits.
+
+## Execution:
+
+ MAVEN_OPTS="-Xmx5G -Xms5G -server -d64 -XX:NewRatio=5" mvn clean test-compile exec:java -Dexec.mainClass=org.neo4j.batchimport.DisruptorTest -Dexec.classpathScope=test
+
+## current limitations, constraints:
+
+* only up to 2bn relationships (due to an int based multi-map)
+* have to know max # of rels per node, properties per node and relationship
+* relationships have to be pre-sorted by min(start,end)
+
+## measurements
+
+We successfully imported 2bn nodes (2 properties) and 20bn relationships (1 property) in 11 hours on an EC2 high-IO instance,
+with 35 ECU, 60GB RAM, 2TB SSD writing up to 200MB/s, resulting in a store of 1.4 TB. That makes around 500k elements per second.
+
+## future improvements:
+
+* implement batch-importer CSV "API" on top of this
+* stripe writes across store-files (i.e. strip the relationship-record file over 10 handlers, according to CPUs)
+* parallelize writing to dynamic string and arraystore too
+* change relationship-record updates for backwards pointers to run in a separate handler that is
+ RandomAccessFile-based (or nio2) and just writes the 2 int values directly at file-pos
+* add a csv analyser / sorter that
+* add support & parallelize index addition
+* good support for index based lookup for relationship construction (kv-store, better in-memory structure, e.g. a collection of long[])
+* use id-compression internally to save memory in structs (write a CompressedLongArray)
+* reuse PropertyBlock, PropertyRecords, RelationshipRecords, NodeRecords, probably subclass them and override getId() etc. or copy the code
+ from the Store's to work with interfaces
View
116 src/main/java/org/neo4j/batchimport/DisruptorBatchInserter.java
@@ -0,0 +1,116 @@
+package org.neo4j.batchimport;
+
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.SingleThreadedClaimStrategy;
+import com.lmax.disruptor.YieldingWaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import org.apache.log4j.Logger;
+import org.neo4j.batchimport.handlers.*;
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.kernel.impl.nioneo.store.NeoStore;
+import org.neo4j.unsafe.batchinsert.BatchInserterImpl;
+import org.neo4j.unsafe.batchinsert.BatchInserters;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class DisruptorBatchInserter {
+
+ private final static Logger log = Logger.getLogger(DisruptorBatchInserter.class);
+
+ private final static int RING_SIZE = 1 << 18;
+
+ private Disruptor<NodeStruct> incomingEventDisruptor;
+ private final String storeDir;
+ private BatchInserterImpl inserter;
+ private ExecutorService executor;
+ private PropertyEncodingHandler[] propertyMappingHandlers;
+ private RelationshipIdHandler relationshipIdHandler;
+ private NodeWriteRecordHandler nodeWriter;
+ private PropertyWriteRecordHandler propertyWriter;
+ private RelationshipWriteHandler relationshipWriter;
+ private PropertyRecordCreatorHandler propertyRecordCreatorHandler;
+ private final Map<String,String> config;
+ private final long nodesToCreate;
+ private final NodeStructFactory nodeStructFactory;
+
+ public DisruptorBatchInserter(String storeDir, final Map<String, String> config, int nodesToCreate, final NodeStructFactory nodeStructFactory) {
+ this.storeDir = storeDir;
+ this.config = config;
+ this.nodesToCreate = nodesToCreate;
+ this.nodeStructFactory = nodeStructFactory;
+ }
+
+ void init() {
+ inserter = (BatchInserterImpl) BatchInserters.inserter(storeDir, config);
+ nodeStructFactory.init(inserter);
+ NeoStore neoStore = inserter.getNeoStore();
+ neoStore.getNodeStore().setHighId(nodesToCreate + 1);
+ executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ //final ExecutorService executor = Executors.newCachedThreadPool();
+
+ incomingEventDisruptor = new Disruptor<NodeStruct>(nodeStructFactory, executor, new SingleThreadedClaimStrategy(RING_SIZE), new YieldingWaitStrategy());
+
+ createHandlers(neoStore,nodeStructFactory);
+
+ incomingEventDisruptor.
+ handleEventsWith(propertyMappingHandlers).
+ then(propertyRecordCreatorHandler, relationshipIdHandler).
+ then(nodeWriter, relationshipWriter, propertyWriter); //
+ }
+
+ private void createHandlers(NeoStore neoStore, NodeStructFactory nodeStructFactory) {
+ propertyMappingHandlers = PropertyEncodingHandler.createHandlers(inserter);
+
+ propertyRecordCreatorHandler = new PropertyRecordCreatorHandler();
+ relationshipIdHandler = new RelationshipIdHandler(nodeStructFactory.getMaxRelsPerNode());
+
+ //nodeWriter = new NodeFileWriteHandler(new File(nodeStore.getStorageFileName()));
+ nodeWriter = new NodeWriteRecordHandler(neoStore.getNodeStore());
+ propertyWriter = new PropertyWriteRecordHandler(neoStore.getPropertyStore());
+ relationshipWriter = new RelationshipWriteHandler(new RelationshipRecordWriter(neoStore.getRelationshipStore()));
+ //relationshipWriter = new RelationshipWriteHandler(new RelationshipFileWriter(new File(neoStore.getRelationshipStore().getStorageFileName())));
+ }
+
+ void run() {
+ RingBuffer<NodeStruct> ringBuffer = incomingEventDisruptor.start();
+ long time = System.currentTimeMillis();
+ for (long nodeId = 0; nodeId < nodesToCreate; nodeId++) {
+ long sequence = ringBuffer.next();
+ NodeStruct nodeStruct = ringBuffer.get(sequence).init();
+
+ nodeStructFactory.fillStruct(nodeId,nodeStruct);
+
+ if (nodeId % (nodesToCreate / 100) == 0) {
+ log.info(nodeId + " " + (System.currentTimeMillis()-time)+" ms.");
+ time = System.currentTimeMillis();
+ }
+ ringBuffer.publish(sequence);
+ }
+ }
+ void shutdown() {
+ incomingEventDisruptor.shutdown();
+ executor.shutdown();
+
+ nodeWriter.close();
+ propertyWriter.close();
+ relationshipWriter.close();
+
+ inserter.shutdown();
+ }
+ void report() {
+ log.info("mapped " + Arrays.deepToString(propertyMappingHandlers));
+
+ log.info("relIds " + relationshipIdHandler);
+
+ log.info("wrote nodes " + nodeWriter);
+ log.info("wrote rels " + relationshipWriter);
+ log.info("wrote props " + propertyWriter);
+ }
+}
View
274 src/main/java/org/neo4j/batchimport/Importer.java
@@ -1,20 +1,17 @@
package org.neo4j.batchimport;
-import org.neo4j.graphdb.RelationshipType;
+import org.neo4j.batchimport.importer.RelType;
+import org.neo4j.batchimport.importer.RowData;
+import org.neo4j.index.lucene.unsafe.batchinsert.LuceneBatchInserterIndexProvider;
import org.neo4j.kernel.impl.util.FileUtils;
import org.neo4j.unsafe.batchinsert.BatchInserter;
import org.neo4j.unsafe.batchinsert.BatchInserters;
import org.neo4j.unsafe.batchinsert.BatchInserterIndexProvider;
import org.neo4j.unsafe.batchinsert.BatchInserterIndex;
-import org.neo4j.unsafe.batchinsert.LuceneBatchInserterIndexProvider;
import java.io.*;
import java.util.*;
-import org.neo4j.helpers.collection.MapUtil;
-
-import static org.neo4j.helpers.collection.MapUtil.map;
-import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.index.impl.lucene.LuceneIndexImplementation.EXACT_CONFIG;
import static org.neo4j.index.impl.lucene.LuceneIndexImplementation.FULLTEXT_CONFIG;
@@ -24,30 +21,7 @@
private BatchInserterIndexProvider lucene;
public Importer(File graphDb) {
- Map<String, String> config = new HashMap<String, String>();
- try {
- if (new File("batch.properties").exists()) {
- System.out.println("Using Existing Configuration File");
- } else {
- System.out.println("Writing Configuration File to batch.properties");
- FileWriter fw = new FileWriter( "batch.properties" );
- fw.append( "use_memory_mapped_buffers=true\n"
- + "neostore.nodestore.db.mapped_memory=100M\n"
- + "neostore.relationshipstore.db.mapped_memory=500M\n"
- + "neostore.propertystore.db.mapped_memory=1G\n"
- + "neostore.propertystore.db.strings.mapped_memory=200M\n"
- + "neostore.propertystore.db.arrays.mapped_memory=0M\n"
- + "neostore.propertystore.db.index.keys.mapped_memory=15M\n"
- + "neostore.propertystore.db.index.mapped_memory=15M" );
- fw.close();
- }
-
- config = MapUtil.load( new File(
- "batch.properties" ) );
-
- } catch (Exception e) {
- System.out.println(e.getMessage());
- }
+ Map<String, String> config = Utils.config();
db = createBatchInserter(graphDb, config);
lucene = createIndexProvider();
@@ -73,27 +47,25 @@ public static void main(String[] args) throws IOException {
File graphDb = new File(args[0]);
File nodesFile = new File(args[1]);
File relationshipsFile = new File(args[2]);
- File indexFile;
- String indexName;
- String indexType;
-
+
if (graphDb.exists()) {
FileUtils.deleteRecursively(graphDb);
}
- Importer importBatch = new Importer(graphDb);
+ Importer importer = new Importer(graphDb);
try {
- if (nodesFile.exists()) importBatch.importNodes(new FileReader(nodesFile));
- if (relationshipsFile.exists()) importBatch.importRelationships(new FileReader(relationshipsFile));
- for (int i = 3; i < args.length; i = i + 4) {
- indexFile = new File(args[i + 3]);
- if (!indexFile.exists()) continue;
- indexName = args[i+1];
- indexType = args[i+2];
- BatchInserterIndex index = args[i].equals("node_index") ? importBatch.nodeIndexFor(indexName, indexType) : importBatch.relationshipIndexFor(indexName, indexType);
- importBatch.importIndex(indexName, index, new FileReader(indexFile));
- }
+ if (nodesFile.exists()) importer.importNodes(new FileReader(nodesFile));
+
+ if (relationshipsFile.exists()) importer.importRelationships(new FileReader(relationshipsFile));
+
+ for (int i = 3; i < args.length; i = i + 4) {
+ String elementType = args[i];
+ String indexName = args[i + 1];
+ String indexType = args[i + 2];
+ String indexFileName = args[i + 3];
+ importer.importIndex(elementType, indexName, indexType, indexFileName);
+ }
} finally {
- importBatch.finish();
+ importer.finish();
}
}
@@ -103,125 +75,9 @@ void finish() {
report.finish();
}
- public static class Data {
- private Object[] data;
- private final int offset;
- private final String delim;
- private final String[] fields;
- private final String[] lineData;
- private final Type types[];
- private final int lineSize;
- private int dataSize;
-
- public Data(String header, String delim, int offset) {
- this.offset = offset;
- this.delim = delim;
- fields = header.split(delim);
- lineSize = fields.length;
- types = parseTypes(fields);
- lineData = new String[lineSize];
- createMapData(lineSize, offset);
- }
-
- private Object[] createMapData(int lineSize, int offset) {
- dataSize = lineSize - offset;
- data = new Object[dataSize*2];
- for (int i = 0; i < dataSize; i++) {
- data[i * 2] = fields[i + offset];
- }
- return data;
- }
-
- private Type[] parseTypes(String[] fields) {
- Type[] types = new Type[lineSize];
- Arrays.fill(types, Type.STRING);
- for (int i = 0; i < lineSize; i++) {
- String field = fields[i];
- int idx = field.indexOf(':');
- if (idx!=-1) {
- fields[i]=field.substring(0,idx);
- types[i]= Type.fromString(field.substring(idx + 1));
- }
- }
- return types;
- }
-
- private int split(String line) {
- final StringTokenizer st = new StringTokenizer(line, delim,true);
- int count=0;
- for (int i = 0; i < lineSize; i++) {
- String value = st.nextToken();
- if (value.equals(delim)) {
- lineData[i] = null;
- } else {
- lineData[i] = value.trim().isEmpty() ? null : value;
- if (i< lineSize -1) st.nextToken();
- }
- if (i >= offset && lineData[i]!=null) {
- data[count++]=fields[i];
- data[count++]=types[i].convert(lineData[i]);
- }
- }
- return count;
- }
-
- public Map<String,Object> update(String line, Object... header) {
- int nonNullCount = split(line);
- if (header.length > 0) {
- System.arraycopy(lineData, 0, header, 0, header.length);
- }
-
- if (nonNullCount == dataSize*2) {
- return map(data);
- }
- Object[] newData=new Object[nonNullCount];
- System.arraycopy(data,0,newData,0,nonNullCount);
- return map(newData);
- }
-
- }
-
- static class StdOutReport implements Report {
- private final long batch;
- private final long dots;
- private long count;
- private long total = System.currentTimeMillis(), time, batchTime;
-
- public StdOutReport(long batch, int dots) {
- this.batch = batch;
- this.dots = batch / dots;
- }
-
- @Override
- public void reset() {
- count = 0;
- batchTime = time = System.currentTimeMillis();
- }
-
- @Override
- public void finish() {
- System.out.println("\nTotal import time: "+ (System.currentTimeMillis() - total) / 1000 + " seconds ");
- }
-
- @Override
- public void dots() {
- if ((++count % dots) != 0) return;
- System.out.print(".");
- if ((count % batch) != 0) return;
- long now = System.currentTimeMillis();
- System.out.println(" "+ (now - batchTime) + " ms for "+batch);
- batchTime = now;
- }
-
- @Override
- public void finishImport(String type) {
- System.out.println("\nImporting " + count + " " + type + " took " + (System.currentTimeMillis() - time) / 1000 + " seconds ");
- }
- }
-
void importNodes(Reader reader) throws IOException {
BufferedReader bf = new BufferedReader(reader);
- final Data data = new Data(bf.readLine(), "\t", 0);
+ final RowData data = new RowData(bf.readLine(), "\t", 0);
String line;
report.reset();
while ((line = bf.readLine()) != null) {
@@ -233,7 +89,7 @@ void importNodes(Reader reader) throws IOException {
void importRelationships(Reader reader) throws IOException {
BufferedReader bf = new BufferedReader(reader);
- final Data data = new Data(bf.readLine(), "\t", 3);
+ final RowData data = new RowData(bf.readLine(), "\t", 3);
Object[] rel = new Object[3];
final RelType relType = new RelType();
String line;
@@ -250,7 +106,7 @@ void importIndex(String indexName, BatchInserterIndex index, Reader reader) thro
BufferedReader bf = new BufferedReader(reader);
- final Data data = new Data(bf.readLine(), "\t", 1);
+ final RowData data = new RowData(bf.readLine(), "\t", 1);
Object[] node = new Object[1];
String line;
report.reset();
@@ -275,88 +131,14 @@ private BatchInserterIndex relationshipIndexFor(String indexName, String indexTy
return indexType.equals("fulltext") ? FULLTEXT_CONFIG : EXACT_CONFIG;
}
- static class RelType implements RelationshipType {
- String name;
-
- public RelType update(Object value) {
- this.name = value.toString();
- return this;
- }
-
- public String name() {
- return name;
- }
- }
-
- public enum Type {
- BOOLEAN {
- @Override
- public Object convert(String value) {
- return Boolean.valueOf(value);
- }
- },
- INT {
- @Override
- public Object convert(String value) {
- return Integer.valueOf(value);
- }
- },
- LONG {
- @Override
- public Object convert(String value) {
- return Long.valueOf(value);
- }
- },
- DOUBLE {
- @Override
- public Object convert(String value) {
- return Double.valueOf(value);
- }
- },
- FLOAT {
- @Override
- public Object convert(String value) {
- return Float.valueOf(value);
- }
- },
- BYTE {
- @Override
- public Object convert(String value) {
- return Byte.valueOf(value);
- }
- },
- SHORT {
- @Override
- public Object convert(String value) {
- return Short.valueOf(value);
- }
- },
- CHAR {
- @Override
- public Object convert(String value) {
- return value.charAt(0);
- }
- },
- STRING {
- @Override
- public Object convert(String value) {
- return value;
- }
- };
-
- private static Type fromString(String typeString) {
- if (typeString==null || typeString.isEmpty()) return Type.STRING;
- try {
- return valueOf(typeString.toUpperCase());
- } catch (Exception e) {
- throw new IllegalArgumentException("Unknown Type "+typeString);
- }
- }
-
- public abstract Object convert(String value);
- }
-
private long id(Object id) {
return Long.parseLong(id.toString());
}
+
+ private void importIndex(String elementType, String indexName, String indexType, String indexFileName) throws IOException {
+ File indexFile = new File(indexFileName);
+ if (!indexFile.exists()) return;
+ BatchInserterIndex index = elementType.equals("node_index") ? nodeIndexFor(indexName, indexType) : relationshipIndexFor(indexName, indexType);
+ importIndex(indexName, index, new FileReader(indexFile));
+ }
}
View
21 src/main/java/org/neo4j/batchimport/NodeStructFactory.java
@@ -0,0 +1,21 @@
+package org.neo4j.batchimport;
+
+import com.lmax.disruptor.EventFactory;
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.unsafe.batchinsert.BatchInserterImpl;
+
+/**
+ * @author mh
+ * @since 27.10.12
+ */
+public interface NodeStructFactory extends EventFactory<NodeStruct> {
+ NodeStruct newInstance();
+
+ void init(BatchInserterImpl inserter);
+
+ void fillStruct(long nodeId, NodeStruct nodeStruct);
+
+ int getRelsPerNode();
+
+ int getMaxRelsPerNode();
+}
View
39 src/main/java/org/neo4j/batchimport/StdOutReport.java
@@ -0,0 +1,39 @@
+package org.neo4j.batchimport;
+
+public class StdOutReport implements Report {
+ private final long batch;
+ private final long dots;
+ private long count;
+ private long total = System.currentTimeMillis(), time, batchTime;
+
+ public StdOutReport(long batch, int dots) {
+ this.batch = batch;
+ this.dots = batch / dots;
+ }
+
+ @Override
+ public void reset() {
+ count = 0;
+ batchTime = time = System.currentTimeMillis();
+ }
+
+ @Override
+ public void finish() {
+ System.out.println("\nTotal import time: "+ (System.currentTimeMillis() - total) / 1000 + " seconds ");
+ }
+
+ @Override
+ public void dots() {
+ if ((++count % dots) != 0) return;
+ System.out.print(".");
+ if ((count % batch) != 0) return;
+ long now = System.currentTimeMillis();
+ System.out.println(" "+ (now - batchTime) + " ms for "+batch);
+ batchTime = now;
+ }
+
+ @Override
+ public void finishImport(String type) {
+ System.out.println("\nImporting " + count + " " + type + " took " + (System.currentTimeMillis() - time) / 1000 + " seconds ");
+ }
+}
View
85 src/main/java/org/neo4j/batchimport/Utils.java
@@ -0,0 +1,85 @@
+package org.neo4j.batchimport;
+
+import org.apache.log4j.Logger;
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.helpers.collection.MapUtil;
+import org.neo4j.kernel.impl.nioneo.store.NodeRecord;
+import org.neo4j.kernel.impl.nioneo.store.RelationshipRecord;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author mh
+ * @since 27.10.12
+ */
+public class Utils {
+ private final static Logger log = Logger.getLogger(Utils.class);
+
+ public static int size(int[] ids) {
+ if (ids==null) return 0;
+ int count = ids.length;
+ for (int i=count-1;i>=0;i--) {
+ if (ids[i]!=-1) return i+1;
+ }
+ return count;
+ }
+
+ public static int size(long[] ids) {
+ if (ids==null) return 0;
+ int count = ids.length;
+ for (int i=count-1;i>=0;i--) {
+ if (ids[i]!=-1) return i+1;
+ }
+ return count;
+ }
+
+ private static void printRelationship(RelationshipRecord record) {
+ if (log.isDebugEnabled()) log.debug(formatRecord(record));
+ }
+
+ private static String formatRecord(RelationshipRecord record) {
+ return String.format("Rel[%d] %s-[%d]->%s created %s chain start: %d->%d target %d->%d", record.getId(), record.getFirstNode(), record.getType(), record.getSecondNode(), record.isCreated(), record.getFirstPrevRel(), record.getFirstNextRel(), record.getSecondPrevRel(), record.getSecondNextRel());
+ }
+
+ private static void printNode(NodeStruct record) {
+ if (log.isDebugEnabled()) log.debug(formatNode(record));
+ }
+
+ private static String formatNode(NodeStruct record) {
+ return String.format("Node[%d] -> %d, .%d", record.id, record.nextRel, record.firstPropertyId);
+ }
+
+ private static String formatNode(NodeRecord record) {
+ return String.format("Node[%d] -> %d, .%d", record.getId(), record.getNextRel(), record.getNextProp());
+ }
+
+ static Map<String, String> config() {
+ Map<String, String> config = new HashMap<String, String>();
+ try {
+ if (new File("batch.properties").exists()) {
+ System.out.println("Using Existing Configuration File");
+ } else {
+ System.out.println("Writing Configuration File to batch.properties");
+ FileWriter fw = new FileWriter("batch.properties");
+ fw.append("use_memory_mapped_buffers=true\n"
+ + "neostore.nodestore.db.mapped_memory=100M\n"
+ + "neostore.relationshipstore.db.mapped_memory=500M\n"
+ + "neostore.propertystore.db.mapped_memory=1G\n"
+ + "neostore.propertystore.db.strings.mapped_memory=200M\n"
+ + "neostore.propertystore.db.arrays.mapped_memory=0M\n"
+ + "neostore.propertystore.db.index.keys.mapped_memory=15M\n"
+ + "neostore.propertystore.db.index.mapped_memory=15M");
+ fw.close();
+ }
+
+ config = MapUtil.load(new File("batch.properties"));
+
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ return config;
+ }
+}
View
38 src/main/java/org/neo4j/batchimport/collections/ConcurrentIntReverseRelationshipMap.java
@@ -0,0 +1,38 @@
+package org.neo4j.batchimport.collections;
+
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class ConcurrentIntReverseRelationshipMap { // implements ReverseRelationshipMap {
+ private final ConcurrentHashMap<Integer,int[]> inner=new ConcurrentHashMap<Integer,int[]>();
+ private final int arraySize;
+
+ ConcurrentIntReverseRelationshipMap(int arraySize) {
+ this.arraySize = arraySize;
+ }
+
+ public void add(long key, long value) {
+ int[] ints = inner.get((int)key);
+ if (ints==null) {
+ ints = new int[arraySize];
+ Arrays.fill(ints, -1);
+ inner.put((int)key, ints);
+ }
+ for (int i=0;i<arraySize;i++) {
+ if (ints[i]==-1) {
+ ints[i]=(int)value;
+ return;
+ }
+ }
+ throw new ArrayIndexOutOfBoundsException("Already "+arraySize+" values in array "+Arrays.toString(ints));
+ }
+
+ public int[] remove(long key) {
+ return inner.remove((int)key);
+ }
+
+}
View
40 src/main/java/org/neo4j/batchimport/collections/ConcurrentLongReverseRelationshipMap.java
@@ -0,0 +1,40 @@
+package org.neo4j.batchimport.collections;
+
+import edu.ucla.sspace.util.primitive.IntSet;
+
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class ConcurrentLongReverseRelationshipMap implements ReverseRelationshipMap {
+ private final ConcurrentHashMap<Long,long[]> inner=new ConcurrentHashMap<Long,long[]>();
+ private final int arraySize;
+
+ public ConcurrentLongReverseRelationshipMap(int arraySize) {
+ this.arraySize = arraySize;
+ }
+
+ public void add(long key, long value) {
+ long[] ids = inner.get(key);
+ if (ids==null) {
+ ids = new long[arraySize];
+ Arrays.fill(ids, -1);
+ inner.put(key, ids);
+ }
+ for (int i=0;i<arraySize;i++) {
+ if (ids[i]==-1) {
+ ids[i]=value;
+ return;
+ }
+ }
+ throw new ArrayIndexOutOfBoundsException("Already "+arraySize+" values in array "+Arrays.toString(ids));
+ }
+
+ public long[] remove(long key) {
+ return inner.remove(key);
+ }
+
+}
View
23 src/main/java/org/neo4j/batchimport/collections/PrimitiveIntReverseRelationshipMap.java
@@ -0,0 +1,23 @@
+package org.neo4j.batchimport.collections;
+
+import edu.ucla.sspace.util.primitive.IntIntHashMultiMap;
+import edu.ucla.sspace.util.primitive.IntIntMultiMap;
+import edu.ucla.sspace.util.primitive.IntSet;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class PrimitiveIntReverseRelationshipMap { // implements ReverseRelationshipMap {
+ private final IntIntMultiMap inner=new IntIntHashMultiMap();
+
+ public void add(int key, int value) {
+ inner.put(key,value);
+ }
+
+ public int[] remove(int key) {
+ IntSet relIds = inner.remove(key);
+ if (relIds==null) return null;
+ return relIds.toPrimitiveArray();
+ }
+}
View
10 src/main/java/org/neo4j/batchimport/collections/ReverseRelationshipMap.java
@@ -0,0 +1,10 @@
+package org.neo4j.batchimport.collections;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public interface ReverseRelationshipMap {
+ void add(long nodeId, long relId);
+ long[] remove(long nodeId);
+}
View
86 src/main/java/org/neo4j/batchimport/handlers/NodeWriteFileHandler.java
@@ -0,0 +1,86 @@
+package org.neo4j.batchimport.handlers;
+
+import com.lmax.disruptor.EventHandler;
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.kernel.impl.nioneo.store.Record;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class NodeWriteFileHandler implements EventHandler<NodeStruct> {
+ public static final int CAPACITY = (1024 ^ 2);
+ FileOutputStream os;
+ int eob=0;
+ private final FileChannel channel;
+ private final ByteBuffer buffer;
+ private int limit;
+ private long written;
+
+ public NodeWriteFileHandler(File file) throws IOException {
+ os = new FileOutputStream(file);
+ channel = os.getChannel();
+ channel.position(0);
+ buffer = ByteBuffer.allocateDirect(CAPACITY);
+ limit = ((int)(CAPACITY/9))*9;
+ buffer.limit(limit);
+ }
+
+ @Override
+ public void onEvent(NodeStruct event, long sequence, boolean endOfBatch) throws Exception {
+ writeRecord(event);
+ if (endOfBatch) {
+ flush();
+ }
+ }
+
+ private void flush() throws IOException {
+ flushBuffer(true);
+ channel.force(true);
+ eob++;
+ }
+
+ @Override
+ public String toString() {
+ return "batches "+eob+" written "+written;
+ }
+
+ private void writeRecord(NodeStruct record) throws IOException {
+ //printNode(record);
+ long nextRel = record.nextRel;
+ long nextProp = record.firstPropertyId;
+
+ short relModifier = Record.NO_NEXT_RELATIONSHIP.is(nextRel) ? 0 : (short) ((nextRel & 0x700000000L) >> 31);
+ short propModifier = Record.NO_NEXT_PROPERTY.is(nextProp) ? 0 : (short) ((nextProp & 0xF00000000L) >> 28);
+
+ // [ , x] in use bit
+ // [ ,xxx ] higher bits for rel id
+ // [xxxx, ] higher bits for prop id
+ short inUseUnsignedByte = Record.IN_USE.byteValue();
+ inUseUnsignedByte = (short) (inUseUnsignedByte | relModifier | propModifier);
+ buffer.put((byte)inUseUnsignedByte).putInt((int)nextRel).putInt((int) nextProp);
+ flushBuffer(false);
+ }
+
+ private void flushBuffer(boolean force) throws IOException {
+ if (buffer.position()==0) return;
+ if (force || buffer.position()==buffer.limit()) {
+ buffer.limit(buffer.position());
+ buffer.position(0);
+ written += channel.write(buffer);
+ buffer.clear().limit(limit);
+ }
+ }
+
+ public void close() throws IOException {
+ flush();
+ channel.close();
+ os.close();
+ }
+}
View
44 src/main/java/org/neo4j/batchimport/handlers/NodeWriteRecordHandler.java
@@ -0,0 +1,44 @@
+package org.neo4j.batchimport.handlers;
+
+import com.lmax.disruptor.EventHandler;
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.kernel.impl.nioneo.store.NodeRecord;
+import org.neo4j.kernel.impl.nioneo.store.NodeStore;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class NodeWriteRecordHandler implements EventHandler<NodeStruct> {
+
+ long counter = 0;
+ private final NodeStore nodeStore;
+
+ public NodeWriteRecordHandler(NodeStore nodeStore) {
+ this.nodeStore = nodeStore;
+ }
+
+ public void onEvent(NodeStruct event, long nodeId, boolean endOfBatch) throws Exception {
+ counter++;
+ if (nodeStore.getHighId() <= nodeId) nodeStore.setHighId(nodeId+1);
+ //printNode(event);
+ nodeStore.updateRecord(createRecord(event, nodeId));
+ if (endOfBatch) nodeStore.flushAll();
+ }
+
+ private NodeRecord createRecord(NodeStruct event, long id) {
+ NodeRecord record = new NodeRecord(id, event.nextRel, event.firstPropertyId);
+ record.setInUse(true);
+ record.setCreated();
+ return record;
+ }
+
+ @Override
+ public String toString() {
+ return "WritingEventHandler " + counter;
+ }
+
+ public void close() {
+ nodeStore.flushAll();
+ }
+}
View
55 src/main/java/org/neo4j/batchimport/handlers/PropertyEncodingHandler.java
@@ -0,0 +1,55 @@
+package org.neo4j.batchimport.handlers;
+
+import com.lmax.disruptor.EventHandler;
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.batchimport.structs.Property;
+import org.neo4j.batchimport.structs.PropertyHolder;
+import org.neo4j.kernel.impl.nioneo.store.PropertyStore;
+import org.neo4j.unsafe.batchinsert.BatchInserterImpl;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class PropertyEncodingHandler implements EventHandler<NodeStruct> {
+ private long count;
+ private final int pos;
+ private final PropertyStore propStore;
+ public static final int MASK = 1;
+
+ public PropertyEncodingHandler(BatchInserterImpl inserter, int pos) {
+ this.pos = pos;
+ propStore = inserter.getPropertyStore();
+ }
+
+ public static PropertyEncodingHandler[] createHandlers(final BatchInserterImpl inserter) {
+ PropertyEncodingHandler[] propertyMappingHandlers = new PropertyEncodingHandler[MASK + 1];
+ for (int i = 0; i < propertyMappingHandlers.length; i++) {
+ propertyMappingHandlers[i] = new PropertyEncodingHandler(inserter, i);
+ }
+ return propertyMappingHandlers;
+ }
+
+ public void onEvent(NodeStruct event, long sequence, boolean endOfBatch) throws Exception {
+ if ((sequence & MASK) != pos) return;
+ encodeProperties(event);
+ for (int i = 0; i < event.relationshipCount; i++) {
+ encodeProperties(event.getRelationship(i));
+ }
+ }
+
+ private void encodeProperties(PropertyHolder holder) {
+ if (holder.propertyCount ==0) return;
+ // todo cache encoded blocks in an LRU cache
+ for (int id = 0; id < holder.propertyCount; id++) {
+ Property value = holder.properties[id];
+ value.encode(propStore);
+ count++;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "encoded "+count+" properties";
+ }
+}
View
62 src/main/java/org/neo4j/batchimport/handlers/PropertyRecordCreatorHandler.java
@@ -0,0 +1,62 @@
+package org.neo4j.batchimport.handlers;
+
+import com.lmax.disruptor.EventHandler;
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.batchimport.structs.Property;
+import org.neo4j.batchimport.structs.PropertyHolder;
+import org.neo4j.kernel.impl.nioneo.store.PropertyBlock;
+import org.neo4j.kernel.impl.nioneo.store.PropertyRecord;
+import org.neo4j.kernel.impl.nioneo.store.PropertyType;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class PropertyRecordCreatorHandler implements EventHandler<NodeStruct> {
+ public static final int PAYLOAD_SIZE = PropertyType.getPayloadSize();
+ private volatile long propertyId=0;
+
+ @Override
+ public void onEvent(NodeStruct event, long sequence, boolean endOfBatch) throws Exception {
+ createPropertyRecords(event);
+ for (int i = 0; i < event.relationshipCount; i++) {
+ createPropertyRecords(event.getRelationship(i));
+ }
+ event.lastPropertyId = propertyId;
+ }
+
+ private void createPropertyRecords(PropertyHolder holder) {
+ if (holder.propertyCount==0) return;
+ holder.firstPropertyId = propertyId;
+ PropertyRecord currentRecord = createRecord(propertyId);
+ propertyId++;
+ int index=0;
+ holder.propertyRecords[index++] = currentRecord;
+ for (int i = 0; i < holder.propertyCount; i++) {
+ Property property = holder.properties[i];
+ PropertyBlock block = property.block;
+ if (currentRecord.size() + block.getSize() > PAYLOAD_SIZE){
+ currentRecord.setNextProp(propertyId);
+ currentRecord = createRecord(propertyId);
+ currentRecord.setPrevProp(propertyId-1);
+ propertyId++;
+ holder.propertyRecords[index++] = currentRecord;
+ }
+ currentRecord.addPropertyBlock(block);
+ property.clean();
+ }
+ if (index<holder.propertyRecords.length) holder.propertyRecords[index]=null;
+ }
+
+ private PropertyRecord createRecord(long id) {
+ PropertyRecord currentRecord = new PropertyRecord(id);
+ currentRecord.setInUse( true );
+ currentRecord.setCreated();
+ return currentRecord;
+ }
+
+ @Override
+ public String toString() {
+ return "MaxPropertyId "+propertyId;
+ }
+}
View
21 src/main/java/org/neo4j/batchimport/handlers/PropertyRecordHighIdHandler.java
@@ -0,0 +1,21 @@
+package org.neo4j.batchimport.handlers;
+
+import com.lmax.disruptor.EventHandler;
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.kernel.impl.nioneo.store.PropertyStore;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class PropertyRecordHighIdHandler implements EventHandler<NodeStruct> {
+ private final PropertyStore propStore;
+
+ public PropertyRecordHighIdHandler(PropertyStore propStore) {
+ this.propStore = propStore;
+ }
+
+ public void onEvent(NodeStruct event, long sequence, boolean endOfBatch) throws Exception {
+ if (propStore.getHighId()<event.lastPropertyId) propStore.setHighId(event.lastPropertyId);
+ }
+}
View
52 src/main/java/org/neo4j/batchimport/handlers/PropertyWriteRecordHandler.java
@@ -0,0 +1,52 @@
+package org.neo4j.batchimport.handlers;
+
+import com.lmax.disruptor.EventHandler;
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.batchimport.structs.PropertyHolder;
+import org.neo4j.kernel.impl.nioneo.store.PropertyRecord;
+import org.neo4j.kernel.impl.nioneo.store.PropertyStore;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class PropertyWriteRecordHandler implements EventHandler<NodeStruct> {
+
+ long counter = 0;
+ private final PropertyStore propStore;
+
+ public PropertyWriteRecordHandler(PropertyStore propStore) {
+ this.propStore = propStore;
+ }
+
+ public void onEvent(NodeStruct event, long sequence, boolean endOfBatch) throws Exception {
+ if (propStore.getHighId() <= event.lastPropertyId) propStore.setHighId(event.lastPropertyId);
+ writePropertyRecords(event);
+ for (int i = 0; i < event.relationshipCount; i++) {
+ writePropertyRecords(event.getRelationship(i));
+ }
+ if (endOfBatch) propStore.flushAll();
+ }
+
+ private boolean writePropertyRecords(PropertyHolder holder) {
+ if (holder.propertyCount==0) return true;
+
+ for (int i=0;i<holder.propertyCount;i++) {
+ PropertyRecord record = holder.propertyRecords[i];
+ if (record == null) return true;
+ propStore.updateRecord(record);
+ counter++;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "PropertyWritingEventHandler " + counter;
+ }
+
+ public void close() {
+ propStore.flushAll();
+ }
+
+}
View
146 src/main/java/org/neo4j/batchimport/handlers/RelationshipFileWriter.java
@@ -0,0 +1,146 @@
+package org.neo4j.batchimport.handlers;
+
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.batchimport.structs.Relationship;
+import org.neo4j.kernel.impl.nioneo.store.Record;
+import org.neo4j.kernel.impl.nioneo.store.RelationshipStore;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class RelationshipFileWriter implements RelationshipWriter {
+ public static final int CAPACITY = (1024 ^ 2);
+ FileOutputStream os;
+ int eob=0;
+ private final FileChannel channel;
+ private final ByteBuffer buffer;
+ private int limit;
+ private long written;
+ private ByteBuffer updateBuffer;
+ private long updated;
+
+ public RelationshipFileWriter(File file) throws IOException {
+ os = new FileOutputStream(file);
+ channel = os.getChannel();
+ channel.position(0);
+ buffer = ByteBuffer.allocateDirect(CAPACITY);
+ updateBuffer = ByteBuffer.allocateDirect(8); // 2x prev/next pointer
+ limit = ((int)(CAPACITY/ RelationshipStore.RECORD_SIZE))*RelationshipStore.RECORD_SIZE;
+ buffer.limit(limit);
+ }
+
+ @Override
+ public void create(long nodeId, NodeStruct event, Relationship relationship, long prevId, long nextId) throws IOException {
+ long from = nodeId;
+ long id = relationship.id;
+
+ long firstNode, secondNode, firstNextRel, firstPrevRel, secondNextRel, secondPrevRel;
+
+ if (relationship.outgoing()) {
+ firstNode = from;
+ secondNode = relationship.other();
+ firstPrevRel = prevId;
+ firstNextRel = nextId;
+ secondPrevRel = Record.NO_PREV_RELATIONSHIP.intValue();
+ secondNextRel = Record.NO_NEXT_RELATIONSHIP.intValue();
+ } else {
+ firstNode = relationship.other();
+ secondNode = from;
+ firstPrevRel = Record.NO_PREV_RELATIONSHIP.intValue();
+ firstNextRel = Record.NO_NEXT_RELATIONSHIP.intValue();
+ secondPrevRel = prevId;
+ secondNextRel = nextId;
+ }
+
+ short firstNodeMod = (short)((firstNode & 0x700000000L) >> 31);
+ long secondNodeMod = (secondNode & 0x700000000L) >> 4;
+ long firstPrevRelMod = firstPrevRel == Record.NO_NEXT_RELATIONSHIP.intValue() ? 0 : (firstPrevRel & 0x700000000L) >> 7;
+ long firstNextRelMod = firstNextRel == Record.NO_NEXT_RELATIONSHIP.intValue() ? 0 : (firstNextRel & 0x700000000L) >> 10;
+ long secondPrevRelMod = secondPrevRel == Record.NO_NEXT_RELATIONSHIP.intValue() ? 0 : (secondPrevRel & 0x700000000L) >> 13;
+ long secondNextRelMod = secondNextRel == Record.NO_NEXT_RELATIONSHIP.intValue() ? 0 : (secondNextRel & 0x700000000L) >> 16;
+
+ long nextProp = relationship.firstPropertyId;
+ long nextPropMod = nextProp == Record.NO_NEXT_PROPERTY.intValue() ? 0 : (nextProp & 0xF00000000L) >> 28;
+
+ // [ , x] in use flag
+ // [ ,xxx ] first node high order bits
+ // [xxxx, ] next prop high order bits
+ short inUseUnsignedByte = (short)(Record.IN_USE.byteValue() | firstNodeMod | nextPropMod);
+
+ // [ xxx, ][ , ][ , ][ , ] second node high order bits, 0x70000000
+ // [ ,xxx ][ , ][ , ][ , ] first prev rel high order bits, 0xE000000
+ // [ , x][xx , ][ , ][ , ] first next rel high order bits, 0x1C00000
+ // [ , ][ xx,x ][ , ][ , ] second prev rel high order bits, 0x380000
+ // [ , ][ , xxx][ , ][ , ] second next rel high order bits, 0x70000
+ // [ , ][ , ][xxxx,xxxx][xxxx,xxxx] type
+ int typeInt = (int)(relationship.type | secondNodeMod | firstPrevRelMod | firstNextRelMod | secondPrevRelMod | secondNextRelMod);
+
+ buffer.put( (byte)inUseUnsignedByte ).putInt( (int) firstNode ).putInt((int) secondNode)
+ .putInt(typeInt).putInt( (int) firstPrevRel ).putInt( (int) firstNextRel )
+ .putInt((int) secondPrevRel).putInt( (int) secondNextRel ).putInt( (int) nextProp );
+
+ flushBuffer(false);
+ }
+
+ private void flushBuffer(boolean force) throws IOException {
+ if (buffer.position()==0) return;
+ if (force || buffer.position()==buffer.limit()) {
+ buffer.limit(buffer.position());
+ buffer.position(0);
+ written += channel.write(buffer);
+ buffer.clear().limit(limit);
+ }
+ }
+
+ /**
+ * only works for prevId & nextId <= MAXINT
+ */
+ @Override
+ public void update(long id, boolean outgoing, long prevId, long nextId) throws IOException {
+ flushBuffer(true);
+ long position = id * RelationshipStore.RECORD_SIZE + 1 + 4 + 4 + 4; // inUse, firstNode, secondNode, relType
+
+ if (!outgoing) {
+ position += 4 + 4;
+ }
+ long oldPos = channel.position();
+ if (oldPos != position) {
+ channel.position(position);
+ }
+
+ updateBuffer.position(0);
+ updateBuffer.putInt((int) prevId).putInt( (int) nextId ).position(0);
+
+ updated += channel.write(updateBuffer);
+ channel.position(oldPos);
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ channel.close();
+ os.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushBuffer(true);
+ eob++;
+ channel.force(true);
+ }
+
+ @Override
+ public void start(long maxRelationshipId) {
+ }
+ @Override
+ public String toString() {
+ return "RelationshipFileWriter: batches "+eob+" written "+written+" updated "+updated;
+ }
+}
View
82 src/main/java/org/neo4j/batchimport/handlers/RelationshipIdHandler.java
@@ -0,0 +1,82 @@
+package org.neo4j.batchimport.handlers;
+
+import com.lmax.disruptor.EventHandler;
+import org.neo4j.batchimport.NodeStructFactory;
+import org.neo4j.batchimport.collections.ConcurrentLongReverseRelationshipMap;
+import org.neo4j.batchimport.collections.PrimitiveIntReverseRelationshipMap;
+import org.neo4j.batchimport.Utils;
+import org.neo4j.batchimport.collections.ReverseRelationshipMap;
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.batchimport.structs.Relationship;
+import org.neo4j.kernel.impl.nioneo.store.Record;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class RelationshipIdHandler implements EventHandler<NodeStruct> {
+ volatile long relId = 0;
+ // store reverse node-id to rel-id for future updates of relationship-records
+ final ReverseRelationshipMap futureModeRelIdQueueOutgoing;
+ final ReverseRelationshipMap futureModeRelIdQueueIncoming;
+
+ public RelationshipIdHandler(int relsPerNode) {
+ futureModeRelIdQueueOutgoing = new ConcurrentLongReverseRelationshipMap(relsPerNode);
+ futureModeRelIdQueueIncoming = new ConcurrentLongReverseRelationshipMap(relsPerNode);
+ }
+// final ReverseRelationshipMap futureModeRelIdQueueOutgoing = new PrimitiveIntReverseRelationshipMap();
+// final ReverseRelationshipMap futureModeRelIdQueueIncoming = new PrimitiveIntReverseRelationshipMap();
+ //final ReverseRelationshipMap futureModeRelIdQueueOutgoing = new ConcurrentReverseRelationshipMap(RELS_PER_NODE);
+ //final ReverseRelationshipMap futureModeRelIdQueueIncoming = new ConcurrentReverseRelationshipMap(RELS_PER_NODE);
+
+ public void onEvent(NodeStruct event, long nodeId, boolean endOfBatch) throws Exception {
+ for (int i = 0; i < event.relationshipCount; i++) {
+ Relationship relationship = event.getRelationship(i);
+ long relId = this.relId++;
+ relationship.id = relId;
+ storeFutureRelId(nodeId, relationship,relId);
+ }
+
+ event.outgoingRelationshipsToUpdate = futureRelIds(nodeId, futureModeRelIdQueueOutgoing);
+ event.incomingRelationshipsToUpdate = futureRelIds(nodeId, futureModeRelIdQueueIncoming);
+ event.nextRel = firstRelationshipId(event);
+ event.maxRelationshipId = maxRelationshipId(event);
+ }
+
+ private void storeFutureRelId(long nodeId, Relationship relationship, long relId) {
+ long other = relationship.other();
+ if (other <= nodeId) return;
+ if (relationship.outgoing()) {
+ futureModeRelIdQueueIncoming.add(other, relId);
+ } else {
+ futureModeRelIdQueueOutgoing.add(other, relId);
+ }
+ }
+
+ private long[] futureRelIds(long nodeId, ReverseRelationshipMap futureRelIds) {
+ long[] relIds = futureRelIds.remove(nodeId);
+ if (relIds == null) return null;
+ return relIds;
+ }
+
+ private long firstRelationshipId(NodeStruct event) {
+ if (event.relationshipCount>0) return event.getRelationship(0).id;
+ if (event.outgoingRelationshipsToUpdate!=null) return event.outgoingRelationshipsToUpdate[0];
+ if (event.incomingRelationshipsToUpdate!=null) return event.incomingRelationshipsToUpdate[0];
+ return Record.NO_PREV_RELATIONSHIP.intValue();
+ }
+
+ private long maxRelationshipId(NodeStruct event) {
+ long result=Record.NO_NEXT_RELATIONSHIP.intValue();
+
+ if (event.incomingRelationshipsToUpdate!=null) result=Math.max(event.incomingRelationshipsToUpdate[Utils.size(event.incomingRelationshipsToUpdate)-1],result);
+ if (event.outgoingRelationshipsToUpdate!=null) result=Math.max(event.outgoingRelationshipsToUpdate[Utils.size(event.outgoingRelationshipsToUpdate)-1],result);
+ if (event.relationshipCount>0) result=Math.max(event.getRelationship(event.relationshipCount-1).id,result);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "relId: " + relId;
+ }
+}
View
75 src/main/java/org/neo4j/batchimport/handlers/RelationshipRecordWriter.java
@@ -0,0 +1,75 @@
+package org.neo4j.batchimport.handlers;
+
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.batchimport.structs.Relationship;
+import org.neo4j.kernel.impl.nioneo.store.RelationshipRecord;
+import org.neo4j.kernel.impl.nioneo.store.RelationshipStore;
+
+import java.io.IOException;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class RelationshipRecordWriter implements RelationshipWriter {
+ private final RelationshipStore relationshipStore;
+
+ public RelationshipRecordWriter(RelationshipStore relationshipStore) {
+ this.relationshipStore = relationshipStore;
+ }
+
+ @Override
+ public void create(long nodeId, NodeStruct event, Relationship relationship, long prevId, long nextId) {
+ updateRecord(createRecord(nodeId, relationship,prevId,nextId));
+ }
+
+ @Override
+ public void update(long relId, boolean outgoing, long prevId, long nextId) {
+ RelationshipRecord record = relationshipStore.getRecord(relId);
+ if (outgoing) {
+ record.setFirstPrevRel(prevId);
+ record.setFirstNextRel(nextId);
+ } else {
+ record.setSecondPrevRel(prevId);
+ record.setSecondNextRel(nextId);
+ }
+ updateRecord(record);
+ }
+
+ private void updateRecord(RelationshipRecord record) {
+ relationshipStore.updateRecord(record);
+ }
+
+ @Override
+ public void flush() {
+ relationshipStore.flushAll();
+ }
+
+ @Override
+ public void start(long maxRelationshipId) {
+ if (relationshipStore.getHighId() <= maxRelationshipId) relationshipStore.setHighId(maxRelationshipId +1);
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ }
+
+ private RelationshipRecord createRecord(long from, Relationship relationship, long prevId, long nextId) {
+ long id = relationship.id;
+ RelationshipRecord relRecord = relationship.outgoing() ?
+ new RelationshipRecord( id, from, relationship.other(), relationship.type ) :
+ new RelationshipRecord( id, relationship.other(), from, relationship.type );
+ relRecord.setInUse(true);
+ relRecord.setCreated();
+ if (relationship.outgoing()) {
+ relRecord.setFirstPrevRel(prevId);
+ relRecord.setFirstNextRel(nextId);
+ } else {
+ relRecord.setSecondPrevRel(prevId);
+ relRecord.setSecondNextRel(nextId);
+ }
+ relRecord.setNextProp(relationship.firstPropertyId);
+ return relRecord;
+ }
+}
View
79 src/main/java/org/neo4j/batchimport/handlers/RelationshipWriteHandler.java
@@ -0,0 +1,79 @@
+package org.neo4j.batchimport.handlers;
+
+import com.lmax.disruptor.EventHandler;
+import org.neo4j.batchimport.Utils;
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.batchimport.structs.Relationship;
+import org.neo4j.kernel.impl.nioneo.store.Record;
+
+import java.io.IOException;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class RelationshipWriteHandler implements EventHandler<NodeStruct> {
+ private long counter;
+ private final RelationshipWriter relationshipWriter;
+
+ public RelationshipWriteHandler(RelationshipWriter relationshipWriter) {
+ this.relationshipWriter = relationshipWriter;
+ }
+
+ @Override
+ public void onEvent(NodeStruct event, long nodeId, boolean endOfBatch) throws Exception {
+ if (Record.NO_NEXT_RELATIONSHIP.is(event.nextRel)) return;
+ relationshipWriter.start(event.maxRelationshipId);
+
+ int count = event.relationshipCount;
+ long followingNextRelationshipId =
+ event.outgoingRelationshipsToUpdate!=null ? event.outgoingRelationshipsToUpdate[0] :
+ event.incomingRelationshipsToUpdate!=null ? event.incomingRelationshipsToUpdate[0] :
+ Record.NO_NEXT_RELATIONSHIP.intValue();
+
+ long prevId = Record.NO_PREV_RELATIONSHIP.intValue();
+ for (int i = 0; i < count; i++) {
+ long nextId = i+1 < count ? event.getRelationship(i+1).id : followingNextRelationshipId;
+ Relationship relationship = event.getRelationship(i);
+ relationshipWriter.create(nodeId,event, relationship, prevId, nextId);
+ prevId = relationship.id;
+ counter++;
+ }
+
+ followingNextRelationshipId =
+ event.incomingRelationshipsToUpdate!=null ? event.incomingRelationshipsToUpdate[0] :
+ Record.NO_NEXT_RELATIONSHIP.intValue();
+
+ prevId = createUpdateRecords(event.outgoingRelationshipsToUpdate, prevId, followingNextRelationshipId,true);
+
+ followingNextRelationshipId = Record.NO_NEXT_RELATIONSHIP.intValue();
+
+ createUpdateRecords(event.incomingRelationshipsToUpdate, prevId, followingNextRelationshipId, false);
+
+ if (endOfBatch) relationshipWriter.flush();
+ }
+
+ private long createUpdateRecords(long[] relIds, long prevId, long followingNextRelationshipId, boolean outgoing) throws IOException {
+ if (relIds==null) return prevId;
+ int count = Utils.size(relIds);
+ for (int i = 0; i < count; i++) {
+ long nextId = i+1 < count ? relIds[i + 1] : followingNextRelationshipId;
+ relationshipWriter.update(relIds[i], outgoing, prevId, nextId);
+ prevId = relIds[i];
+ counter++;
+ }
+ return prevId;
+ }
+
+ @Override
+ public String toString() {
+ return "rel-record-writer " + counter + " "+relationshipWriter;
+ }
+ public void close() {
+ try {
+ relationshipWriter.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
View
20 src/main/java/org/neo4j/batchimport/handlers/RelationshipWriter.java
@@ -0,0 +1,20 @@
+package org.neo4j.batchimport.handlers;
+
+import org.neo4j.batchimport.structs.NodeStruct;
+import org.neo4j.batchimport.structs.Relationship;
+
+import java.io.IOException;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public interface RelationshipWriter {
+ void create(long nodeId, NodeStruct event, Relationship relationship, long prevId, long nextId) throws IOException;
+ void update(long relId, boolean outgoing, long prevId, long nextId) throws IOException;
+ void flush() throws IOException;
+
+ void start(long maxRelationshipId);
+
+ void close() throws IOException;
+}
View
16 src/main/java/org/neo4j/batchimport/importer/RelType.java
@@ -0,0 +1,16 @@
+package org.neo4j.batchimport.importer;
+
+import org.neo4j.graphdb.RelationshipType;
+
+public class RelType implements RelationshipType {
+ String name;
+
+ public RelType update(Object value) {
+ this.name = value.toString();
+ return this;
+ }
+
+ public String name() {
+ return name;
+ }
+}
View
85 src/main/java/org/neo4j/batchimport/importer/RowData.java
@@ -0,0 +1,85 @@
+package org.neo4j.batchimport.importer;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import static org.neo4j.helpers.collection.MapUtil.map;
+
+public class RowData {
+ private Object[] data;
+ private final int offset;
+ private final String delim;
+ private final String[] fields;
+ private final String[] lineData;
+ private final Type types[];
+ private final int lineSize;
+ private int dataSize;
+
+ public RowData(String header, String delim, int offset) {
+ this.offset = offset;
+ this.delim = delim;
+ fields = header.split(delim);
+ lineSize = fields.length;
+ types = parseTypes(fields);
+ lineData = new String[lineSize];
+ createMapData(lineSize, offset);
+ }
+
+ private Object[] createMapData(int lineSize, int offset) {
+ dataSize = lineSize - offset;
+ data = new Object[dataSize*2];
+ for (int i = 0; i < dataSize; i++) {
+ data[i * 2] = fields[i + offset];
+ }
+ return data;
+ }
+
+ private Type[] parseTypes(String[] fields) {
+ Type[] types = new Type[lineSize];
+ Arrays.fill(types, Type.STRING);
+ for (int i = 0; i < lineSize; i++) {
+ String field = fields[i];
+ int idx = field.indexOf(':');
+ if (idx!=-1) {
+ fields[i]=field.substring(0,idx);
+ types[i]= Type.fromString(field.substring(idx + 1));
+ }
+ }
+ return types;
+ }
+
+ private int split(String line) {
+ final StringTokenizer st = new StringTokenizer(line, delim,true);
+ int count=0;
+ for (int i = 0; i < lineSize; i++) {
+ String value = st.nextToken();
+ if (value.equals(delim)) {
+ lineData[i] = null;
+ } else {
+ lineData[i] = value.trim().isEmpty() ? null : value;
+ if (i< lineSize -1) st.nextToken();
+ }
+ if (i >= offset && lineData[i]!=null) {
+ data[count++]=fields[i];
+ data[count++]=types[i].convert(lineData[i]);
+ }
+ }
+ return count;
+ }
+
+ public Map<String,Object> update(String line, Object... header) {
+ int nonNullCount = split(line);
+ if (header.length > 0) {
+ System.arraycopy(lineData, 0, header, 0, header.length);
+ }
+
+ if (nonNullCount == dataSize*2) {
+ return map(data);
+ }
+ Object[] newData=new Object[nonNullCount];
+ System.arraycopy(data,0,newData,0,nonNullCount);
+ return map(newData);
+ }
+
+}
View
69 src/main/java/org/neo4j/batchimport/importer/Type.java
@@ -0,0 +1,69 @@
+package org.neo4j.batchimport.importer;
+
+public enum Type {
+ BOOLEAN {
+ @Override
+ public Object convert(String value) {
+ return Boolean.valueOf(value);
+ }
+ },
+ INT {
+ @Override
+ public Object convert(String value) {
+ return Integer.valueOf(value);
+ }
+ },
+ LONG {
+ @Override
+ public Object convert(String value) {
+ return Long.valueOf(value);
+ }
+ },
+ DOUBLE {
+ @Override
+ public Object convert(String value) {
+ return Double.valueOf(value);
+ }
+ },
+ FLOAT {
+ @Override
+ public Object convert(String value) {
+ return Float.valueOf(value);
+ }
+ },
+ BYTE {
+ @Override
+ public Object convert(String value) {
+ return Byte.valueOf(value);
+ }
+ },
+ SHORT {
+ @Override
+ public Object convert(String value) {
+ return Short.valueOf(value);
+ }
+ },
+ CHAR {
+ @Override
+ public Object convert(String value) {
+ return value.charAt(0);
+ }
+ },
+ STRING {
+ @Override
+ public Object convert(String value) {
+ return value;
+ }
+ };
+
+ public static Type fromString(String typeString) {
+ if (typeString==null || typeString.isEmpty()) return Type.STRING;
+ try {
+ return valueOf(typeString.toUpperCase());
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Unknown Type "+typeString);
+ }
+ }
+
+ public abstract Object convert(String value);
+}
View
65 src/main/java/org/neo4j/batchimport/structs/NodeStruct.java
@@ -0,0 +1,65 @@
+package org.neo4j.batchimport.structs;
+
+import org.neo4j.kernel.impl.nioneo.store.Record;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class NodeStruct extends PropertyHolder {
+ //long p1,p2,p3,p4,p5,p6,p7;
+ public volatile long nextRel = Record.NO_NEXT_RELATIONSHIP.intValue();
+ //long o1,o2,o3,o4,o5,o6,o7;
+
+ private final Relationship[] relationships;
+ public final List<Relationship> moreRelationships = new ArrayList<Relationship>();
+ public volatile int relationshipCount;
+
+ public volatile long lastPropertyId;
+ public volatile long maxRelationshipId;
+ public volatile long[] outgoingRelationshipsToUpdate;
+ public volatile long[] incomingRelationshipsToUpdate;
+ private static int avgRelCount;
+ private static int relPropertyCount;
+
+ public static void classInit(int avgRelCount, int relPropertyCount) {
+ NodeStruct.avgRelCount = avgRelCount;
+ NodeStruct.relPropertyCount = relPropertyCount;
+ }
+
+ public NodeStruct(int propertyCount) {
+ super(propertyCount);
+ this.relationships=new Relationship[NodeStruct.avgRelCount];
+ for (int i = 0; isRelInArray(i); i++) {
+ relationships[i]=new Relationship(NodeStruct.relPropertyCount);
+ }
+ }
+
+ public NodeStruct init() {
+ super.init();
+ if (!isRelInArray(relationshipCount)) moreRelationships.clear();
+ relationshipCount=0;
+ nextRel = Record.NO_NEXT_RELATIONSHIP.intValue();
+ return this;
+ }
+ public Relationship addRel(long other, boolean outgoing, int type) {
+ if (isRelInArray(relationshipCount++)) {
+ return relationships[relationshipCount-1].init(other,outgoing,type);
+ }
+ Relationship rel = new Relationship(relPropertyCount).init(other, outgoing, type);
+ moreRelationships.add(rel);
+ return rel;
+ }
+
+ public Relationship getRelationship(int i) {
+ if (isRelInArray(i)) return relationships[i];
+ return moreRelationships.get(i-avgRelCount);
+ }
+
+ private boolean isRelInArray(int i) {
+ return i<avgRelCount;
+ }
+}
View
27 src/main/java/org/neo4j/batchimport/structs/Property.java
@@ -0,0 +1,27 @@
+package org.neo4j.batchimport.structs;
+
+import org.neo4j.kernel.impl.nioneo.store.PropertyBlock;
+import org.neo4j.kernel.impl.nioneo.store.PropertyStore;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class Property {
+ public volatile int nameIndex;
+ public volatile Object value;
+ public final PropertyBlock block = new PropertyBlock();
+
+ void init(int index, Object value) {
+ this.nameIndex = index;
+ this.value = value;
+ this.block.clean();
+ }
+ public void encode(PropertyStore propStore) {
+ propStore.encodeValue(block, nameIndex, value);
+ }
+
+ public void clean() {
+ this.value = null;
+ }
+}
View
34 src/main/java/org/neo4j/batchimport/structs/PropertyHolder.java
@@ -0,0 +1,34 @@
+package org.neo4j.batchimport.structs;
+
+import org.neo4j.kernel.impl.nioneo.store.PropertyRecord;
+import org.neo4j.kernel.impl.nioneo.store.Record;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class PropertyHolder {
+ public volatile long id;
+ public volatile long firstPropertyId = Record.NO_NEXT_PROPERTY.intValue();
+
+ public volatile int propertyCount;
+ public final Property[] properties;
+ public final PropertyRecord[] propertyRecords;
+
+ public PropertyHolder(int propertyCount) {
+ this.properties = new Property[propertyCount];
+ for (int i = 0; i < properties.length; i++) {
+ properties[i]=new Property();
+ }
+ this.propertyRecords =new PropertyRecord[propertyCount];
+ }
+ public NodeStruct init() {
+ id = 0;
+ firstPropertyId = Record.NO_NEXT_PROPERTY.intValue();
+ propertyCount = 0;
+ return null;
+ }
+ public void addProperty(int id, Object value) {
+ this.properties[propertyCount++].init(id,value);
+ }
+}
View
34 src/main/java/org/neo4j/batchimport/structs/Relationship.java
@@ -0,0 +1,34 @@
+package org.neo4j.batchimport.structs;
+
+/**
+* @author mh
+* @since 27.10.12
+*/
+public class Relationship extends PropertyHolder {
+ // encode outgoing > 0, incoming as 2-complement ~other
+ public volatile long other;
+ public volatile int type;
+
+ public Relationship(int propertyCount) {
+ super(propertyCount);
+ }
+
+ public Relationship init(long other, boolean outgoing, int type) {
+ super.init();
+ this.other = outgoing ? other : ~other;
+ this.type = type;
+ return this;
+ }
+ public boolean outgoing() {
+ return other >= 0;
+ }
+
+ public long other() {
+ return other < 0 ? ~other : other;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Rel[%d] %s-[%d]->%s %s",id, outgoing() ? "?" : other(),type,outgoing() ? other() : "?",outgoing());
+ }
+}
View
735 src/main/java/org/neo4j/kernel/impl/nioneo/store/CommonAbstractStore.java
@@ -0,0 +1,735 @@
+/**
+ * Copyright (c) 2002-2012 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package org.neo4j.kernel.impl.nioneo.store;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.neo4j.graphdb.factory.GraphDatabaseSetting;
+import org.neo4j.graphdb.factory.GraphDatabaseSettings;
+import org.neo4j.helpers.UTF8;
+import org.neo4j.kernel.IdGeneratorFactory;
+import org.neo4j.kernel.IdType;
+import org.neo4j.kernel.InternalAbstractGraphDatabase;
+import org.neo4j.kernel.configuration.Config;
+import org.neo4j.kernel.impl.core.ReadOnlyDbException;
+import org.neo4j.kernel.impl.nioneo.store.windowpool.WindowPool;
+import org.neo4j.kernel.impl.nioneo.store.windowpool.WindowPoolFactory;
+import org.neo4j.kernel.impl.util.StringLogger;
+
+import static org.neo4j.helpers.Exceptions.launderedException;
+
+/**
+ * Contains common implementation for {@link AbstractStore} and
+ * {@link AbstractDynamicStore}.
+ */
+public abstract class CommonAbstractStore
+{
+ public static abstract class Configuration
+ {
+ public static final GraphDatabaseSetting.StringSetting store_dir = InternalAbstractGraphDatabase.Configuration.store_dir;
+ public static final GraphDatabaseSetting.StringSetting neo_store = InternalAbstractGraphDatabase.Configuration.neo_store;
+
+ public static final GraphDatabaseSetting.BooleanSetting grab_file_lock = GraphDatabaseSettings.grab_file_lock;
+ public static final GraphDatabaseSetting.BooleanSetting read_only = GraphDatabaseSettings.read_only;
+ public static final GraphDatabaseSetting.BooleanSetting backup_slave = GraphDatabaseSettings.backup_slave;
+ public static final GraphDatabaseSetting.BooleanSetting use_memory_mapped_buffers = GraphDatabaseSettings.use_memory_mapped_buffers;
+ }
+
+ public static final String ALL_STORES_VERSION = "v0.A.0";
+ public static final String UNKNOWN_VERSION = "Uknown";
+
+ protected static final Logger logger = Logger
+ .getLogger( CommonAbstractStore.class.getName() );
+
+ protected Config configuration;
+ private final IdGeneratorFactory idGeneratorFactory;
+ private final WindowPoolFactory windowPoolFactory;
+ protected FileSystemAbstraction fileSystemAbstraction;
+
+ protected final String storageFileName;
+ protected final IdType idType;
+ protected StringLogger stringLogger;
+ private IdGenerator idGenerator = null;
+ private FileChannel fileChannel = null;
+ private WindowPool windowPool;
+ private boolean storeOk = true;
+ private Throwable causeOfStoreNotOk;
+ private FileLock fileLock;
+ private boolean grabFileLock = true;
+
+ private boolean readOnly = false;
+ private boolean backupSlave = false;
+ private long highestUpdateRecordId = -1;
+
+ /**
+ * Opens and validates the store contained in <CODE>fileName</CODE>
+ * loading any configuration defined in <CODE>config</CODE>. After
+ * validation the <CODE>initStorage</CODE> method is called.
+ * <p>
+ * If the store had a clean shutdown it will be marked as <CODE>ok</CODE>
+ * and the {@link #getStoreOk()} method will return true.
+ * If a problem was found when opening the store the {@link #makeStoreOk()}
+ * must be invoked.
+ *
+ * throws IOException if the unable to open the storage or if the
+ * <CODE>initStorage</CODE> method fails
+ *
+ * @param idType
+ * The Id used to index into this store
+ * @param windowPoolFactory
+ */
+ public CommonAbstractStore( String fileName, Config configuration, IdType idType,
+ IdGeneratorFactory idGeneratorFactory, WindowPoolFactory windowPoolFactory,
+ FileSystemAbstraction fileSystemAbstraction, StringLogger stringLogger )
+ {
+ this.storageFileName = fileName;
+ this.configuration = configuration;
+ this.idGeneratorFactory = idGeneratorFactory;
+ this.windowPoolFactory = windowPoolFactory;
+ this.fileSystemAbstraction = fileSystemAbstraction;
+ this.idType = idType;
+ this.stringLogger = stringLogger;
+ grabFileLock = configuration.get( Configuration.grab_file_lock );
+
+ try
+ {
+ checkStorage();
+ checkVersion(); // Overriden in NeoStore
+ loadStorage();
+ }
+ catch ( Exception e )
+ {
+ if ( fileChannel != null )
+ closeChannel();
+ throw launderedException( e );
+ }
+ }
+
+ public String getTypeAndVersionDescriptor()
+ {
+ return buildTypeDescriptorAndVersion( getTypeDescriptor() );
+ }
+
+ public static String buildTypeDescriptorAndVersion( String typeDescriptor )
+ {
+ return typeDescriptor + " " + ALL_STORES_VERSION;
+ }
+
+ protected long longFromIntAndMod( long base, long modifier )
+ {
+ return modifier == 0 && base == IdGeneratorImpl.INTEGER_MINUS_ONE ? -1 : base|modifier;
+ }
+
+ /**
+ * Returns the type and version that identifies this store.
+ *
+ * @return This store's implementation type and version identifier
+ */
+ public abstract String getTypeDescriptor();
+
+ protected void checkStorage()
+ {
+ readOnly = configuration.get( Configuration.read_only );
+ backupSlave = configuration.get( Configuration.backup_slave );
+ if ( !fileSystemAbstraction.fileExists( storageFileName ) )
+ {
+ throw new IllegalStateException( "No such store[" + storageFileName
+ + "]" );