Skip to content

Commit

Permalink
fixes #1276: Add streaming support to export JSON and GraphML (#1330)
Browse files Browse the repository at this point in the history
  • Loading branch information
conker84 authored and sarmbruster committed Nov 19, 2019
1 parent a05d95a commit 3c62248
Show file tree
Hide file tree
Showing 15 changed files with 237 additions and 84 deletions.
4 changes: 4 additions & 0 deletions 40_notes.adoc
Expand Up @@ -55,3 +55,7 @@ NOTE: the following list is incomplete
* on API level a `IndexDefinition` could have multiple labels. Apoc's `Schema` should reflect that - to be discussed.
* nodes/rels acquired in one tx need to be rebound when used in another tx `tx.getNodeByid(node.getId())`
## todos

* make test (esp. for export functions) independent of internal node ids.
2 changes: 1 addition & 1 deletion docs/asciidoc/data/exportJSON/MapPath.json
@@ -1,2 +1,2 @@
{"map":{"key":{"length":1,"rels":[{"id":"0","type":"relationship","label":"KNOWS","properties":{"since":1993},"start":{"id":"0","labels":["User"]},"end":{"id":"1","labels":["User"]}}],"nodes":[{"type":"node","id":"0","labels":["User"],"properties":{"born":"2015-07-04T19:32:24","name":"Adam","place":{"crs":"wgs-84","latitude":33.46789,"longitude":13.1,"height":null},"age":42,"male":true,"kids":["Sam","Anna","Grace"]}},{"type":"node","id":"1","labels":["User"],"properties":{"name":"Jim","age":42}}]}},"name":"Kate"}
{"map":{"key":{"length":1,"rels":[{"id":"20","type":"relationship","label":"KNOWS","properties":{"since":1850},"start":{"id":"20","labels":["User"]},"end":{"id":"21","labels":["User"]}}],"nodes":[{"type":"node","id":"20","labels":["User"],"properties":{"name":"Mike","age":78,"male":true}},{"type":"node","id":"21","labels":["User"],"properties":{"name":"John","age":18}}]}},"name":"Kate"}
{"map":{"key":{"length":1,"rels":[{"id":"1","type":"relationship","label":"KNOWS","properties":{"since":1850},"start":{"id":"3","labels":["User"]},"end":{"id":"4","labels":["User"]}}],"nodes":[{"type":"node","id":"3","labels":["User"],"properties":{"name":"Mike","age":78,"male":true}},{"type":"node","id":"4","labels":["User"],"properties":{"name":"John","age":18}}]}},"name":"Kate"}
2 changes: 1 addition & 1 deletion docs/asciidoc/data/exportJSON/query_node_labels.json
@@ -1 +1 @@
{"u":{"type":"node","id":"20","labels":["User","User0","User1","User12"],"properties":{"name":"Alan"}}}
{"u":{"type":"node","id":"3","labels":["User","User0","User1","User12"],"properties":{"name":"Alan"}}}
5 changes: 4 additions & 1 deletion docs/asciidoc/export/exportJson.adoc
Expand Up @@ -24,8 +24,11 @@ include::../../../build/generated-documentation/apoc.export.json.csv[]

.Config

[opts=header]
|===
| writeNodeProperties | true/false, if true export properties too.
| name | type | default | description
| writeNodeProperties | boolean | false | if true export properties too.
| stream | boolean | false | stream the json directly to the client into the `data` field
|===

[NOTE]
Expand Down
1 change: 1 addition & 0 deletions docs/asciidoc/export/graphml.adoc
Expand Up @@ -63,6 +63,7 @@ The output of `labels()` function is not sorted, use it in combination with `apo
| cypherFormat | create | In export to cypher script, define the cypher format (for example use `MERGE` instead of `CREATE`). Possible values are: "create", "updateAll", "addStructure", "updateStructure".
| bulkImport | true | In export it creates files for Neo4j Admin import
| separateHeader | false | In export it creates two file one for header and one for data
| stream | false | stream the xml directly to the client into the `data` field
|===

Values for the `quotes` configuration:
Expand Down
25 changes: 8 additions & 17 deletions src/main/java/apoc/export/csv/ExportCSV.java
Expand Up @@ -5,11 +5,12 @@
import apoc.export.cypher.ExportFileManager;
import apoc.export.cypher.FileManagerFactory;
import apoc.export.util.ExportConfig;
import apoc.export.util.ExportUtils;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.*;
Expand All @@ -23,9 +24,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* @author mh
Expand Down Expand Up @@ -93,27 +92,19 @@ private void preventBulkImport(ExportConfig config) {
}

private Stream<ProgressInfo> exportCsv(@Name("file") String fileName, String source, Object data, ExportConfig exportConfig) throws Exception {
apocConfig.checkWriteAllowed(exportConfig);
ProgressInfo progressInfo = new ProgressInfo(fileName, source, "csv");
if (StringUtils.isNotBlank(fileName)) apocConfig.checkWriteAllowed(exportConfig);
final String format = "csv";
ProgressInfo progressInfo = new ProgressInfo(fileName, source, format);
progressInfo.batchSize = exportConfig.getBatchSize();
ProgressReporter reporter = new ProgressReporter(null, null, progressInfo);
CsvFormat exporter = new CsvFormat(db);

ExportFileManager cypherFileManager = FileManagerFactory
.createFileManager(fileName, exportConfig.isBulkImport(), exportConfig.streamStatements());
.createFileManager(fileName, exportConfig.isBulkImport());

if (exportConfig.streamStatements()) {
long timeout = exportConfig.getTimeoutSeconds();
final ArrayBlockingQueue<ProgressInfo> queue = new ArrayBlockingQueue<>(1000);
ProgressReporter reporterWithConsumer = reporter.withConsumer(
(pi) -> Util.put(queue, pi == ProgressInfo.EMPTY ? ProgressInfo.EMPTY : new ProgressInfo(pi).drain(cypherFileManager.getStringWriter("csv")), timeout)
);
Util.inTxFuture(pools.getDefaultExecutorService(), db, txInThread -> {
dump(data, exportConfig, reporterWithConsumer, cypherFileManager, exporter);
return true;
});
QueueBasedSpliterator<ProgressInfo> spliterator = new QueueBasedSpliterator<>(queue, ProgressInfo.EMPTY, terminationGuard, timeout);
return StreamSupport.stream(spliterator, false);
return ExportUtils.getProgressInfoStream(db, pools.getDefaultExecutorService(), terminationGuard, format, exportConfig, reporter, cypherFileManager,
(reporterWithConsumer) -> dump(data, exportConfig, reporterWithConsumer, cypherFileManager, exporter));
} else {
dump(data, exportConfig, reporter, cypherFileManager, exporter);
return reporter.stream();
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/apoc/export/cypher/ExportCypher.java
Expand Up @@ -8,6 +8,7 @@
import apoc.result.ProgressInfo;
import apoc.util.QueueBasedSpliterator;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.CypherResultSubGraph;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
Expand Down Expand Up @@ -105,13 +106,13 @@ public Stream<DataProgressInfo> schema(@Name(value = "file",defaultValue = "") S
}

private Stream<DataProgressInfo> exportCypher(@Name("file") String fileName, String source, SubGraph graph, ExportConfig c, boolean onlySchema) throws IOException {
if (fileName != null) apocConfig.checkWriteAllowed(c);
if (StringUtils.isNotBlank(fileName)) apocConfig.checkWriteAllowed(c);

ProgressInfo progressInfo = new ProgressInfo(fileName, source, "cypher");
progressInfo.batchSize = c.getBatchSize();
ProgressReporter reporter = new ProgressReporter(null, null, progressInfo);
boolean separatedFiles = !onlySchema && c.separateFiles();
ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, separatedFiles, c.streamStatements());
ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, separatedFiles);

if (c.streamStatements()) {
long timeout = c.getTimeoutSeconds();
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/apoc/export/cypher/FileManagerFactory.java
Expand Up @@ -12,7 +12,7 @@
* @since 06.12.17
*/
public class FileManagerFactory {
public static ExportFileManager createFileManager(String fileName, boolean separatedFiles, boolean b) {
public static ExportFileManager createFileManager(String fileName, boolean separatedFiles) {
if (fileName == null) {
return new StringExportCypherFileManager(separatedFiles);
}
Expand Down Expand Up @@ -81,15 +81,23 @@ public StringExportCypherFileManager(boolean separatedFiles) {
@Override
public PrintWriter getPrintWriter(String type) {
if (this.separatedFiles) {
return new PrintWriter(writers.compute(type, (key, writer) -> writer == null ? new StringWriter() : writer));
return new PrintWriter(getStringWriter(type));
} else {
return new PrintWriter(writers.compute(type.equals("csv") ? type : "cypher", (key, writer) -> writer == null ? new StringWriter() : writer));
switch (type) {
case "csv":
case "json":
case "graphml":
break;
default:
type = "cypher";
}
return new PrintWriter(getStringWriter(type));
}
}

@Override
public StringWriter getStringWriter(String type) {
return writers.get(type);
return writers.computeIfAbsent(type, (key) -> new StringWriter());
}

@Override
Expand Down
47 changes: 37 additions & 10 deletions src/main/java/apoc/export/graphml/ExportGraphML.java
Expand Up @@ -2,12 +2,16 @@

import apoc.ApocConfig;
import apoc.Pools;
import apoc.export.cypher.ExportFileManager;
import apoc.export.cypher.FileManagerFactory;
import apoc.export.util.ExportConfig;
import apoc.export.util.ExportUtils;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.FileUtils;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.CypherResultSubGraph;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
Expand All @@ -21,8 +25,6 @@
import java.util.Map;
import java.util.stream.Stream;

import static apoc.util.FileUtils.getPrintWriter;

/**
* @author mh
* @since 22.05.16
Expand All @@ -40,6 +42,9 @@ public class ExportGraphML {
@Context
public Pools pools;

@Context
public TerminationGuard terminationGuard;

@Procedure(name = "apoc.import.graphml",mode = Mode.WRITE)
@Description("apoc.import.graphml(file,config) - imports graphml file")
public Stream<ProgressInfo> file(@Name("file") String fileName, @Name("config") Map<String, Object> config) throws Exception {
Expand Down Expand Up @@ -96,14 +101,36 @@ public Stream<ProgressInfo> query(@Name("query") String query, @Name("file") Str
return exportGraphML(fileName, source, graph, c);
}

private Stream<ProgressInfo> exportGraphML(@Name("file") String fileName, String source, SubGraph graph, ExportConfig config) throws Exception {
if (fileName != null) apocConfig.checkWriteAllowed(config);
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, "graphml"));
PrintWriter printWriter = getPrintWriter(fileName, null);
private Stream<ProgressInfo> exportGraphML(@Name("file") String fileName, String source, SubGraph graph, ExportConfig exportConfig) throws Exception {
if (StringUtils.isNotBlank(fileName)) apocConfig.checkWriteAllowed(exportConfig);
final String format = "graphml";
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, format));
XmlGraphMLWriter exporter = new XmlGraphMLWriter();
exporter.write(graph, printWriter, reporter, config);
printWriter.flush();
printWriter.close();
return reporter.stream();
ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false);
final PrintWriter graphMl = cypherFileManager.getPrintWriter(format);
if (exportConfig.streamStatements()) {
return ExportUtils.getProgressInfoStream(db, pools.getDefaultExecutorService() ,terminationGuard, format, exportConfig, reporter, cypherFileManager,
(reporterWithConsumer) -> {
try {
exporter.write(graph, graphMl, reporterWithConsumer, exportConfig);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} else {
exporter.write(graph, graphMl, reporter, exportConfig);
closeWriter(graphMl);
return reporter.stream();
}
}

private void closeWriter(PrintWriter writer) {
writer.flush();
try {
writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
1 change: 1 addition & 0 deletions src/main/java/apoc/export/graphml/XmlGraphMLWriter.java
Expand Up @@ -36,6 +36,7 @@ public void write(SubGraph graph, Writer writer, Reporter reporter, ExportConfig
reporter.update(0, 1, props);
}
writeFooter(xmlWriter);
reporter.done();
}

private void writeKey(XMLStreamWriter writer, SubGraph ops, ExportConfig config) throws Exception {
Expand Down
38 changes: 28 additions & 10 deletions src/main/java/apoc/export/json/ExportJson.java
@@ -1,22 +1,25 @@
package apoc.export.json;

import apoc.ApocConfig;
import apoc.Pools;
import apoc.export.cypher.ExportFileManager;
import apoc.export.cypher.FileManagerFactory;
import apoc.export.util.ExportConfig;
import apoc.export.util.ExportUtils;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.*;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import java.io.PrintWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -33,6 +36,11 @@ public class ExportJson {
@Context
public ApocConfig apocConfig;

@Context
public Pools pools;

@Context
public TerminationGuard terminationGuard;

public ExportJson(GraphDatabaseService db) {
this.db = db;
Expand Down Expand Up @@ -75,20 +83,30 @@ public Stream<ProgressInfo> query(@Name("query") String query, @Name("file") Str
return exportJson(fileName, source,result,config);
}

private Stream<ProgressInfo> exportJson(@Name("file") String fileName, String source, Object data, Map<String,Object> config) throws Exception {
ExportConfig c = new ExportConfig(config);
apocConfig.checkWriteAllowed(c);
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, "json"));
private Stream<ProgressInfo> exportJson(String fileName, String source, Object data, Map<String,Object> config) throws Exception {
ExportConfig exportConfig = new ExportConfig(config);
if (StringUtils.isNotBlank(fileName)) apocConfig.checkWriteAllowed(exportConfig);
final String format = "json";
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, format));
JsonFormat exporter = new JsonFormat(db);
ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false);
if (exportConfig.streamStatements()) {
return ExportUtils.getProgressInfoStream(db, pools.getDefaultExecutorService() ,terminationGuard, format, exportConfig, reporter, cypherFileManager,
(reporterWithConsumer) -> dump(data, exportConfig, reporterWithConsumer, exporter, cypherFileManager));
} else {
dump(data, exportConfig, reporter, exporter, cypherFileManager);
return reporter.stream();
}
}

ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, false, c.streamStatements());

try (PrintWriter printWriter = cypherFileManager.getPrintWriter("json")) {
private void dump(Object data, ExportConfig c, ProgressReporter reporter, JsonFormat exporter, ExportFileManager cypherFileManager) {
try {
if (data instanceof SubGraph)
exporter.dump(((SubGraph)data),cypherFileManager,reporter,c);
if (data instanceof Result)
exporter.dump(((Result)data),printWriter,reporter,c);
exporter.dump(((Result)data),cypherFileManager,reporter,c);
} catch (Exception e) {
throw new RuntimeException(e);
}
return reporter.stream();
}
}
29 changes: 18 additions & 11 deletions src/main/java/apoc/export/json/JsonFormat.java
Expand Up @@ -11,7 +11,12 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.*;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Path;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;

import java.io.IOException;
import java.io.Reader;
Expand All @@ -34,12 +39,15 @@ public ProgressInfo load(Reader reader, Reporter reporter, ExportConfig config)
}

private ProgressInfo dump(Writer writer, Reporter reporter, Consumer<JsonGenerator> consumer) throws Exception {
try (Transaction tx = db.beginTx(); JsonGenerator jsonGenerator = getJsonGenerator(writer);) {

try (Transaction tx = db.beginTx();
JsonGenerator jsonGenerator = getJsonGenerator(writer)) {
consumer.accept(jsonGenerator);

jsonGenerator.flush();
tx.commit();
reporter.done();
return reporter.getTotal();
} finally {
writer.close();
}
}

Expand All @@ -56,7 +64,7 @@ public ProgressInfo dump(SubGraph graph, ExportFileManager writer, Reporter repo
return dump(writer.getPrintWriter("json"), reporter, consumer);
}

public ProgressInfo dump(Result result, Writer writer, Reporter reporter, ExportConfig config) throws Exception {
public ProgressInfo dump(Result result, ExportFileManager writer, Reporter reporter, ExportConfig config) throws Exception {
Consumer<JsonGenerator> consumer = (jsonGenerator) -> {
try {
String[] header = result.columns().toArray(new String[result.columns().size()]);
Expand All @@ -69,14 +77,14 @@ public ProgressInfo dump(Result result, Writer writer, Reporter reporter, Export
throw new RuntimeException(e);
}
};
return dump(writer, reporter, consumer);
return dump(writer.getPrintWriter("json"), reporter, consumer);
}

private JsonGenerator getJsonGenerator(Writer writer) throws IOException {
JsonFactory jsonF = new JsonFactory();
JsonGenerator jsonGenerator = jsonF.createGenerator(writer);
jsonGenerator.setCodec(JsonUtil.OBJECT_MAPPER);
jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
JsonGenerator jsonGenerator = new JsonFactory()
.createGenerator(writer)
.setCodec(JsonUtil.OBJECT_MAPPER)
.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
return jsonGenerator;
}

Expand Down Expand Up @@ -158,7 +166,6 @@ private void write(Reporter reporter, JsonGenerator jsonGenerator, ExportConfig
JsonFormatSerializer.DEFAULT.serializeProperty(jsonGenerator, keyName, value, writeKey);
reporter.update(0, 0, 1);
break;

}
}

Expand Down

0 comments on commit 3c62248

Please sign in to comment.