From 5d99359f66c278737ccade0a9391d849b0210926 Mon Sep 17 00:00:00 2001 From: Andrea Santurbano Date: Wed, 8 Apr 2020 18:05:29 +0200 Subject: [PATCH] fixes #1257: apoc.load.json file roundtrip from apoc.export.json.all (#1409) --- docs/asciidoc/import/loadjson.adoc | 25 ++ .../java/apoc/export/json/ImportJson.java | 45 +++ .../apoc/export/json/ImportJsonConfig.java | 52 +++ .../java/apoc/export/json/JsonImporter.java | 331 ++++++++++++++++++ .../java/apoc/export/json/ImportJsonTest.java | 137 ++++++++ 5 files changed, 590 insertions(+) create mode 100644 src/main/java/apoc/export/json/ImportJson.java create mode 100644 src/main/java/apoc/export/json/ImportJsonConfig.java create mode 100644 src/main/java/apoc/export/json/JsonImporter.java create mode 100644 src/test/java/apoc/export/json/ImportJsonTest.java diff --git a/docs/asciidoc/import/loadjson.adoc b/docs/asciidoc/import/loadjson.adoc index 5e068ba410..9b7dce64f5 100644 --- a/docs/asciidoc/import/loadjson.adoc +++ b/docs/asciidoc/import/loadjson.adoc @@ -195,3 +195,28 @@ You will receive the following response: "weight": 0.02548018842935562 } ---- + +== Import JSON + +If you used the `apoc.export.json.all` to export the graph, we provide the `apoc.import.json` to reimport that data. + +[source,cypher] +---- +CALL apoc.import.json($file, $config) +---- + +The `$config` parameter is a map + +[opts=header,cols="m,m"] +|=== +| name | default | description +| unwindBatchSize | 5000 | the batch size of the unwind +| txBatchSize | 5000 | the batch size of the transacttion +| importIdName | neo4jImportId | the name of the "id" field into the used for the import it refers to the "id" field into the root object of the json. +| nodePropertyMappings | {} | The mapping label/property name/property type for Custom Neo4j types (point date). I.e. { User: { born: 'Point', dateOfBirth: 'Datetime' } } +| relPropertyMappings | {} | The mapping rel type/property name/property type for Custom Neo4j types (point date). I.e. { KNOWS: { since: 'Datetime' } } +|=== + +N.b. Supported Neo4j types via config mappings are: + +Point, Localdate, Localtime, Localdatetime, Duration, offsettime, Zoneddatetime \ No newline at end of file diff --git a/src/main/java/apoc/export/json/ImportJson.java b/src/main/java/apoc/export/json/ImportJson.java new file mode 100644 index 0000000000..addfd92521 --- /dev/null +++ b/src/main/java/apoc/export/json/ImportJson.java @@ -0,0 +1,45 @@ +package apoc.export.json; + +import apoc.export.util.CountingReader; +import apoc.export.util.ProgressReporter; +import apoc.result.ProgressInfo; +import apoc.util.FileUtils; +import apoc.util.JsonUtil; +import apoc.util.Util; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Mode; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; + +import java.util.Map; +import java.util.Scanner; +import java.util.stream.Stream; + +public class ImportJson { + @Context + public GraphDatabaseService db; + + @Procedure(value = "apoc.import.json", mode = Mode.WRITE) + @Description("apoc.import.json(file,config) - imports the json list to the provided file") + public Stream all(@Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) { + ProgressInfo result = + Util.inThread(() -> { + ImportJsonConfig importJsonConfig = new ImportJsonConfig(config); + ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, "file", "json")); + + try (final CountingReader reader = FileUtils.readerFor(fileName); + final Scanner scanner = new Scanner(reader).useDelimiter("\n|\r"); + JsonImporter jsonImporter = new JsonImporter(importJsonConfig, db, reporter)) { + while (scanner.hasNext()) { + Map row = JsonUtil.OBJECT_MAPPER.readValue(scanner.nextLine(), Map.class); + jsonImporter.importRow(row); + } + } + + return reporter.getTotal(); + }); + return Stream.of(result); + } +} diff --git a/src/main/java/apoc/export/json/ImportJsonConfig.java b/src/main/java/apoc/export/json/ImportJsonConfig.java new file mode 100644 index 0000000000..386c3a9ba7 --- /dev/null +++ b/src/main/java/apoc/export/json/ImportJsonConfig.java @@ -0,0 +1,52 @@ +package apoc.export.json; + +import apoc.util.Util; +import org.apache.commons.lang3.StringUtils; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +public class ImportJsonConfig { + + private final Map> nodePropertyMappings; + private final Map> relPropertyMappings; + + private final int unwindBatchSize; + private final int txBatchSize; + + private final String importIdName; + + public ImportJsonConfig(Map config) { + config = config == null ? Collections.emptyMap() : config; + this.nodePropertyMappings = (Map>) config.getOrDefault("nodePropertyMappings", Collections.emptyMap()); + this.relPropertyMappings = (Map>) config.getOrDefault("relPropertyMappings", Collections.emptyMap()); + this.unwindBatchSize = Util.toInteger(config.getOrDefault("unwindBatchSize", 5000)); + this.txBatchSize = Util.toInteger(config.getOrDefault("txBatchSize", 5000)); + this.importIdName = (String) config.getOrDefault("importIdName", "neo4jImportId"); + } + + public String typeForNode(Collection labels, String property) { + return labels.stream() + .map(label -> nodePropertyMappings.getOrDefault(label, Collections.emptyMap()).get(property)) + .filter(StringUtils::isNotBlank) + .findFirst() + .orElse(null); + } + + public String typeForRel(String type, String property) { + return relPropertyMappings.getOrDefault(type, Collections.emptyMap()).get(property); + } + + public int getUnwindBatchSize() { + return unwindBatchSize; + } + + public int getTxBatchSize() { + return txBatchSize; + } + + public String getImportIdName() { + return importIdName; + } +} diff --git a/src/main/java/apoc/export/json/JsonImporter.java b/src/main/java/apoc/export/json/JsonImporter.java new file mode 100644 index 0000000000..b2985f0061 --- /dev/null +++ b/src/main/java/apoc/export/json/JsonImporter.java @@ -0,0 +1,331 @@ +package apoc.export.json; + +import apoc.export.util.Reporter; +import apoc.util.Util; +import org.apache.commons.lang3.StringUtils; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Transaction; +import org.neo4j.values.storable.CoordinateReferenceSystem; +import org.neo4j.values.storable.DurationValue; +import org.neo4j.values.storable.PointValue; +import org.neo4j.values.storable.Values; + +import java.io.Closeable; +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetTime; +import java.time.ZonedDateTime; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class JsonImporter implements Closeable { + private static final String CREATE_NODE = "UNWIND $rows AS row " + + "CREATE (n:%s {%s: row.id}) SET n += row.properties"; + private static final String CREATE_RELS = "UNWIND $rows AS row " + + "MATCH (s:%s {%s: row.start.id}) " + + "MATCH (e:%s {%2$s: row.end.id}) " + + "CREATE (s)-[r:%s]->(e) SET r += row.properties"; + + private final List> paramList; + private final int unwindBatchSize; + private final int txBatchSize; + private final GraphDatabaseService db; + private final Reporter reporter; + + private String lastType; + private List lastLabels; + private Map lastRelTypes; + + private final ImportJsonConfig importJsonConfig; + + public JsonImporter(ImportJsonConfig importJsonConfig, + GraphDatabaseService db, + Reporter reporter) { + this.paramList = new ArrayList<>(importJsonConfig.getUnwindBatchSize()); + this.db = db; + this.txBatchSize = importJsonConfig.getTxBatchSize(); + this.unwindBatchSize = Math.min(importJsonConfig.getUnwindBatchSize(), txBatchSize); + this.reporter = reporter; + this.importJsonConfig = importJsonConfig; + } + + public void importRow(Map param) { + final String type = (String) param.get("type"); + + manageEntityType(type); + + switch (type) { + case "node": + manageNode(param); + break; + case "relationship": + manageRelationship(param); + break; + default: + throw new IllegalArgumentException("Current type not supported: " + type); + } + + final Map properties = (Map) param.getOrDefault("properties", Collections.emptyMap()); + updateReporter(type, properties); + param.put("properties", convertProperties(type, properties, null)); + + paramList.add(param); + if (paramList.size() % txBatchSize == 0) { + final Collection>> results = chunkData(); + paramList.clear(); + // write + writeUnwindBatch(results); + } + } + + private void writeUnwindBatch(Collection>> results) { + try (final Transaction tx = db.beginTx()) { + results.forEach(resultList -> { + if (resultList.size() == unwindBatchSize) { + write(tx, resultList); + } else { + paramList.addAll(resultList); + } + }); + tx.success(); + } + } + + private void manageEntityType(String type) { + if (lastType == null) { + lastType = type; + } + if (!type.equals(lastType)) { + flush(); + lastType = type; + } + } + + private void manageRelationship(Map param) { + Map relType = Util.map( + "start", getLabels((Map) param.get("start")), + "end", getLabels((Map) param.get("end")), + "label", getType(param)); + if (lastRelTypes == null) { + lastRelTypes = relType; + } + if (!relType.equals(lastRelTypes)) { + flush(); + lastRelTypes = relType; + } + } + + private void manageNode(Map param) { + List labels = getLabels(param); + if (lastLabels == null) { + lastLabels = labels; + } + if (!labels.equals(lastLabels)) { + flush(); + lastLabels = labels; + } + } + + private void updateReporter(String type, Map properties) { + final int size = properties.size() + 1; // +1 is for the "neo4jImportId" + switch (type) { + case "node": + reporter.update(1, 0, size); + break; + case "relationship": + reporter.update(0, 1, size); + break; + default: + throw new IllegalArgumentException("Current type not supported: " + type); + } + } + + private Stream> flatMap(Map map, String key) { + final String prefix = key != null ? key : ""; + return map.entrySet().stream() + .flatMap(e -> { + if (e.getValue() instanceof Map) { + return flatMap((Map) e.getValue(), prefix + "." + e.getKey()); + } else { + return Stream.of(new AbstractMap.SimpleEntry<>(prefix + "." + e.getKey(), e.getValue())); + } + }); + } + + private List convertList(Collection coll, String classType) { + return coll.stream() + .map(c -> { + if (c instanceof Collection) { + return convertList((Collection) c, classType); + } + return convertMappedValue(c, classType); + }) + .collect(Collectors.toList()); + } + + private Map convertProperties(String type, Map properties, String keyPrefix) { + return properties.entrySet().stream() + .flatMap(e -> { + if (e.getValue() instanceof Map) { + Map map = (Map) e.getValue(); + String classType = getClassType(type, e.getKey()); + if (classType != null && "POINT".equals(classType.toUpperCase())) { + return Stream.of(e); + } + return flatMap(map, e.getKey()); + } else { + return Stream.of(e); + } + }) + .map(e -> { + String key = e.getKey(); + final String classType = getClassType(type, key); + if (e.getValue() instanceof Collection) { + final List coll = convertList((Collection) e.getValue(), classType); + return new AbstractMap.SimpleEntry<>(e.getKey(), coll); + } else { + return new AbstractMap.SimpleEntry<>(e.getKey(), + convertMappedValue(e.getValue(), classType)); + } + }) + .filter(e -> e.getValue() != null) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + } + + private String getClassType(String type, String key) { + final String classType; + switch (type) { + case "node": + classType = importJsonConfig.typeForNode(lastLabels, key); + break; + case "relationship": + classType = importJsonConfig.typeForRel((String) lastRelTypes.get("label"), key); + break; + default: + classType = null; + break; + } + return classType; + } + + private Object convertMappedValue(Object value, String classType) { + if (classType == null) { + return value; + } + switch (classType.toUpperCase()) { + case "POINT": + value = toPoint((Map) value); + break; + case "LOCALDATE": + value = LocalDate.parse((String) value); + break; + case "LOCALTIME": + value = LocalTime.parse((String) value); + break; + case "LOCALDATETIME": + value = LocalDateTime.parse((String) value); + break; + case "DURATION": + value = DurationValue.parse((String) value); + break; + case "OFFSETTIME": + value = OffsetTime.parse((String) value); + break; + case "ZONEDDATETIME": + value = ZonedDateTime.parse((String) value); + break; + default: + break; + } + return value; + } + + private PointValue toPoint(Map pointMap) { + double x; + double y; + Double z = null; + + final CoordinateReferenceSystem crs = CoordinateReferenceSystem.byName((String) pointMap.get("crs")); + if (crs.getName().startsWith("wgs-84")) { + x = (double) pointMap.get("latitude"); + y = (double) pointMap.get("longitude"); + if (crs.getName().endsWith("-3d")) { + z = (double) pointMap.get("height"); + } + } else { + x = (double) pointMap.get("x"); + y = (double) pointMap.get("y"); + if (crs.getName().endsWith("-3d")) { + z = (double) pointMap.get("z"); + } + } + + return z != null ? Values.pointValue(crs, x, y, z) : Values.pointValue(crs, x, y); + } + + private String getType(Map param) { + return Util.quote((String) param.get("label")); + } + + private List getLabels(Map param) { + return ((List) param.getOrDefault("labels", Collections.emptyList())).stream() + .map(Util::quote) + .collect(Collectors.toList()); + } + + + private void write(Transaction tx, List> resultList) { + if (resultList.isEmpty()) return; + final String type = (String) resultList.get(0).get("type"); + String query = null; + switch (type) { + case "node": + query = String.format(CREATE_NODE, StringUtils.join(lastLabels, ":"), importJsonConfig.getImportIdName()); + break; + case "relationship": + String startLabels = StringUtils.join((List) lastRelTypes.get("start"), ":"); + String endLabels = StringUtils.join((List) lastRelTypes.get("end"), ":"); + String rel = (String) lastRelTypes.get("label"); + query = String.format(CREATE_RELS, startLabels, importJsonConfig.getImportIdName(), endLabels, rel); + break; + default: + throw new IllegalArgumentException("Current type not supported: " + type); + } + if (StringUtils.isNotBlank(query)) { + db.execute(query, Collections.singletonMap("rows", resultList)); + } + } + + private Collection>> chunkData() { + AtomicInteger chunkCounter = new AtomicInteger(0); + return paramList.stream() + .collect(Collectors.groupingBy(it -> chunkCounter.getAndIncrement() / unwindBatchSize)) + .values(); + } + + @Override + public void close() throws IOException { + flush(); + reporter.done(); + } + + private void flush() { + if (!paramList.isEmpty()) { + final Collection>> results = chunkData(); + try (final Transaction tx = db.beginTx()) { + results.forEach(resultList -> write(tx, resultList)); + tx.success(); + } + paramList.clear(); + } + } +} diff --git a/src/test/java/apoc/export/json/ImportJsonTest.java b/src/test/java/apoc/export/json/ImportJsonTest.java new file mode 100644 index 0000000000..f367aaaae8 --- /dev/null +++ b/src/test/java/apoc/export/json/ImportJsonTest.java @@ -0,0 +1,137 @@ +package apoc.export.json; + +import apoc.util.TestUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.graphdb.spatial.Point; +import org.neo4j.test.TestGraphDatabaseFactory; +import org.neo4j.values.storable.CoordinateReferenceSystem; +import org.neo4j.values.storable.DurationValue; +import org.neo4j.values.storable.PointValue; +import org.neo4j.values.storable.Values; + +import java.io.File; +import java.time.LocalDateTime; +import java.util.Map; + +import static apoc.util.MapUtil.map; + +public class ImportJsonTest { + + private static GraphDatabaseService db; + private static File directory = new File("docs/asciidoc/data/exportJSON"); + + @Before + public void setUp() throws Exception { + db = new TestGraphDatabaseFactory() + .newImpermanentDatabaseBuilder() + .setConfig(GraphDatabaseSettings.load_csv_file_url_root, directory.getAbsolutePath()) + .setConfig("apoc.import.file.enabled", "true") + .newGraphDatabase(); + TestUtil.registerProcedure(db, ImportJson.class); + } + + @After + public void tearDown() { + db.shutdown(); + } + + @Test + public void shouldImportAllJson() throws Exception { + // given + String filename = "all.json"; + + // when + TestUtil.testCall(db, "CALL apoc.import.json($file, null)", + map("file", filename), + (r) -> { + // then + Assert.assertEquals("all.json", r.get("file")); + Assert.assertEquals("file", r.get("source")); + Assert.assertEquals("json", r.get("format")); + Assert.assertEquals(3L, r.get("nodes")); + Assert.assertEquals(1L, r.get("relationships")); + Assert.assertEquals(15L, r.get("properties")); + Assert.assertEquals(4L, r.get("rows")); + Assert.assertEquals(true, r.get("done")); + + final long countNodes = db.execute("MATCH (n:User) RETURN count(n) AS count") + .columnAs("count") + .next(); + Assert.assertEquals(3L, countNodes); + + final long countRels = db.execute("MATCH ()-[r:KNOWS]->() RETURN count(r) AS count") + .columnAs("count") + .next(); + Assert.assertEquals(1L, countRels); + + final Map props = db.execute("MATCH (n:User {name: 'Adam'}) RETURN n") + .columnAs("n") + .next() + .getAllProperties(); + Assert.assertEquals(9, props.size()); + Assert.assertEquals("wgs-84", props.get("place.crs")); + Assert.assertEquals(33.46789D, props.get("place.latitude")); + Assert.assertEquals(13.1D, props.get("place.longitude")); + Assert.assertFalse(props.containsKey("place")); + } + ); + } + + @Test + public void shouldImportAllJsonWithPropertyMappings() throws Exception { + // given + String filename = "all.json"; + + // when + TestUtil.testCall(db, "CALL apoc.import.json($file, $config)", + map("file", filename, "config", + map("nodePropertyMappings", map("User", map("place", "Point", "born", "LocalDateTime")), + "relPropertyMappings", map("KNOWS", map("bffSince", "Duration"))), "unwindBatchSize", 1, "txBatchSize", 1), + (r) -> { + // then + Assert.assertEquals("all.json", r.get("file")); + Assert.assertEquals("file", r.get("source")); + Assert.assertEquals("json", r.get("format")); + Assert.assertEquals(3L, r.get("nodes")); + Assert.assertEquals(1L, r.get("relationships")); + Assert.assertEquals(15L, r.get("properties")); + Assert.assertEquals(4L, r.get("rows")); + Assert.assertEquals(true, r.get("done")); + + final long countNodes = db.execute("MATCH (n:User) RETURN count(n) AS count") + .columnAs("count") + .next(); + Assert.assertEquals(3L, countNodes); + + final long countRels = db.execute("MATCH ()-[r:KNOWS]->() RETURN count(r) AS count") + .columnAs("count") + .next(); + Assert.assertEquals(1L, countRels); + + final Map props = db.execute("MATCH (n:User {name: 'Adam'}) RETURN n") + .columnAs("n") + .next() + .getAllProperties(); + Assert.assertEquals(7, props.size()); + Assert.assertTrue(props.get("place") instanceof PointValue); + PointValue point = (PointValue) props.get("place"); + final PointValue pointValue = Values.pointValue(CoordinateReferenceSystem.WGS84, 33.46789D, 13.1D); + Assert.assertTrue(point.equals((Point) pointValue)); + Assert.assertTrue(props.get("born") instanceof LocalDateTime); + + Relationship rel = db.execute("MATCH ()-[r:KNOWS]->() RETURN r") + .columnAs("r") + .next(); + Assert.assertTrue(rel.getProperty("bffSince") instanceof DurationValue); + Assert.assertEquals("P5M1DT12H", rel.getProperty("bffSince").toString()); + } + ); + } +}