diff --git a/examples/apache-spark/pom.xml b/examples/apache-spark/pom.xml new file mode 100644 index 00000000..60503b6f --- /dev/null +++ b/examples/apache-spark/pom.xml @@ -0,0 +1,115 @@ + + + 4.0.0 + + org.neo4j.importer + import-spec-examples-parent + 1.0-SNAPSHOT + + apache-spark-example + apache-spark-example + + 2.13 + -XX:+IgnoreUnrecognizedVMOptions + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/sun.nio.cs=ALL-UNNAMED + --add-opens=java.base/sun.security.action=ALL-UNNAMED + --add-opens=java.base/sun.util.calendar=ALL-UNNAMED + --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED + -Djdk.reflect.useDirectMethodHandle=false + + + + com.google.cloud.bigdataoss + gcs-connector + hadoop3-2.2.11 + shaded + + + org.apache.spark + spark-sql_${scala.version} + 3.5.7 + + + org.neo4j + neo4j-connector-apache-spark_${scala.version} + 5.3.10_for_spark_3 + + + ${project.groupId} + import-spec + test + + + com.google.cloud + google-cloud-nio + 0.128.7 + test + + + org.assertj + assertj-core + test + + + org.junit.jupiter + junit-jupiter + test + + + org.junit.vintage + junit-vintage-engine + test + + + org.slf4j + slf4j-nop + test + + + org.testcontainers + junit-jupiter + test + + + org.testcontainers + neo4j + test + + + org.testcontainers + testcontainers + test + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + ${surefire.jvm.args} + + + + run-example + + integration-test + verify + + + + + + + diff --git a/examples/apache-spark/src/test/java/org/neo4j/importer/CompletableFutures.java b/examples/apache-spark/src/test/java/org/neo4j/importer/CompletableFutures.java new file mode 100644 index 00000000..fe7ff7c1 --- /dev/null +++ b/examples/apache-spark/src/test/java/org/neo4j/importer/CompletableFutures.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer; + +import java.util.concurrent.CompletableFuture; + +class CompletableFutures { + + public static CompletableFuture initial() { + return CompletableFuture.completedFuture(null); + } + + public static CompletableFuture mergeSequential( + CompletableFuture chain1, CompletableFuture chain2) { + return chain1.thenCompose(v -> chain2); + } + + public static CompletableFuture mergeParallel( + CompletableFuture chain1, CompletableFuture chain2) { + return CompletableFuture.allOf(chain1, chain2); + } +} diff --git a/examples/apache-spark/src/test/java/org/neo4j/importer/SparkExampleIT.java b/examples/apache-spark/src/test/java/org/neo4j/importer/SparkExampleIT.java new file mode 100644 index 00000000..a06177de --- /dev/null +++ b/examples/apache-spark/src/test/java/org/neo4j/importer/SparkExampleIT.java @@ -0,0 +1,559 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.importer; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.neo4j.cypherdsl.core.internal.SchemaNames; +import org.neo4j.driver.*; +import org.neo4j.driver.types.MapAccessor; +import org.neo4j.importer.v1.ImportSpecificationDeserializer; +import org.neo4j.importer.v1.actions.plugin.CypherAction; +import org.neo4j.importer.v1.pipeline.*; +import org.neo4j.importer.v1.pipeline.ImportExecutionPlan.ImportStepGroup; +import org.neo4j.importer.v1.pipeline.ImportExecutionPlan.ImportStepStage; +import org.neo4j.importer.v1.sources.Source; +import org.neo4j.importer.v1.sources.SourceProvider; +import org.neo4j.importer.v1.targets.NodeMatchMode; +import org.neo4j.importer.v1.targets.PropertyMapping; +import org.neo4j.importer.v1.targets.PropertyType; +import org.neo4j.importer.v1.targets.WriteMode; +import org.testcontainers.containers.Neo4jContainer; +import org.testcontainers.utility.DockerImageName; + +public class SparkExampleIT { + + private static final String NEO4J_DATASOURCE = "org.neo4j.spark.DataSource"; + + private static SparkSession spark = null; + + @ClassRule + public static Neo4jContainer NEO4J = new Neo4jContainer<>(DockerImageName.parse("neo4j:5-enterprise")) + .withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes") + .withAdminPassword("letmein!"); + + @BeforeClass + public static void beforeClass() { + Assume.assumeTrue( + "Please run `gcloud auth application-default login` and define the environment variable GOOGLE_APPLICATION_CREDENTIALS with the resulting path", + System.getenv("GOOGLE_APPLICATION_CREDENTIALS") != null); + + spark = SparkSession.builder() + .master("local[*]") + .appName("SparkExample") + .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") + .config("spark.hadoop.fs.gs.auth.type", "APPLICATION_DEFAULT") + .config("neo4j.url", NEO4J.getBoltUrl()) + .config("neo4j.authentication.basic.username", "neo4j") + .config("neo4j.authentication.basic.password", NEO4J.getAdminPassword()) + .getOrCreate(); + } + + @Test + public void imports_dvd_rental_data_set() throws Exception { + final String url = NEO4J.getBoltUrl(); + final AuthToken token = AuthTokens.basic("neo4j", NEO4J.getAdminPassword()); + + try (var driver = GraphDatabase.driver(url, token)) { + driver.verifyConnectivity(); + + try (InputStream stream = this.getClass().getResourceAsStream("/specs/dvd_rental.yaml")) { + assertThat(stream).isNotNull(); + + try (var reader = new InputStreamReader(stream)) { + var importPipeline = ImportPipeline.of(ImportSpecificationDeserializer.deserialize(reader)); + runSparkImport(driver, importPipeline.executionPlan()); + } + } + + assertSchema(driver); + assertNodeCount(driver, "Actor", 200L); + assertNodeCount(driver, "Category", 16L); + assertNodeCount(driver, "Customer", 599L); + assertNodeCount(driver, "Movie", 1000L); + assertNodeCount(driver, "Inventory", 0); + assertRelationshipCount(driver, "Actor", "ACTED_IN", "Movie", 5462L); + assertRelationshipCount(driver, "Movie", "IN_CATEGORY", "Category", 1000L); + assertRelationshipCount(driver, "Customer", "HAS_RENTED", "Movie", 16044L); + } + } + + private void runSparkImport(Driver driver, ImportExecutionPlan plan) { + plan.getGroups().stream() + .reduce( + CompletableFutures.initial(), + (previous, currentGroup) -> CompletableFuture.allOf(previous, runGroup(driver, currentGroup)), + CompletableFutures::mergeParallel) + .join(); + } + + private CompletableFuture runGroup(Driver driver, ImportStepGroup group) { + var sourceDataFrames = new HashMap>(); + try { + return group.getStages().stream() + .reduce( + CompletableFutures.initial(), + (chain, currentStage) -> + chain.thenCompose((ignored) -> runStage(driver, currentStage, sourceDataFrames)), + CompletableFutures::mergeSequential); + } finally { + sourceDataFrames.values().forEach(Dataset::unpersist); + } + } + + private CompletableFuture runStage( + Driver driver, ImportStepStage currentStage, Map> sourceDataFrames) { + return CompletableFuture.runAsync(() -> { + var stepFutures = new ArrayList>(); + for (var step : currentStage.getSteps()) { + switch (step) { + case SourceStep sourceStep -> indexSource(sourceDataFrames, sourceStep); + case NodeTargetStep node -> { + var df = sourceDataFrames.get(node.sourceName()); + stepFutures.add(runNodeImport(node, df)); + } + case RelationshipTargetStep relationship -> { + var df = sourceDataFrames.get(relationship.sourceName()); + stepFutures.add(runRelationshipImport(relationship, df)); + } + case ActionStep action -> stepFutures.add(runAction(driver, action)); + default -> Assertions.fail("Unexpected step: %s".formatted(step)); + } + } + CompletableFuture.allOf(stepFutures.toArray(new CompletableFuture[0])) + .join(); + }); + } + + private static void indexSource(Map> sourceDataFrames, SourceStep sourceStep) { + var source = sourceStep.source(); + assertThat(source).isInstanceOf(ParquetSource.class); + var parquetSource = (ParquetSource) source; + var df = spark.read().parquet(parquetSource.uri()).cache(); + sourceDataFrames.put(parquetSource.name(), df); + } + + private CompletableFuture runNodeImport(NodeTargetStep node, Dataset df) { + return CompletableFuture.runAsync(() -> df.select(sourceColumns(node)) + .withColumnsRenamed(columnRenames(node)) + .write() + .format(NEO4J_DATASOURCE) + .mode(saveMode(node.writeMode())) + .option("script", String.join(";\n", schemaStatements(node))) + .option("labels", labels(node)) + .option("node.keys", keys(node)) + .save()); + } + + private CompletableFuture runRelationshipImport(RelationshipTargetStep relationship, Dataset df) { + var nodeSaveMode = nodeSaveMode(relationship.nodeMatchMode()); + var startNode = relationship.startNode(); + var startNodeKeys = keyProperties(startNode); + var endNode = relationship.endNode(); + var endNodeKeys = keyProperties(endNode); + + return CompletableFuture.runAsync(() -> df.select(sourceColumns(relationship)) + .write() + .format(NEO4J_DATASOURCE) + .mode(saveMode(relationship.writeMode())) + .option("script", String.join(";\n", schemaStatements(relationship))) + .option("relationship", relationship.type()) + .option("relationship.save.strategy", "keys") + .option("relationship.properties", properties(relationship)) + .option("relationship.source.save.mode", nodeSaveMode) + .option("relationship.source.labels", labels(startNode)) + .option("relationship.source.node.keys", startNodeKeys) + .option("relationship.source.node.properties", startNodeKeys) + .option("relationship.target.save.mode", nodeSaveMode) + .option("relationship.target.labels", labels(endNode)) + .option("relationship.target.node.keys", endNodeKeys) + .option("relationship.target.node.properties", endNodeKeys) + .save()); + } + + private static CompletableFuture runAction(Driver driver, ActionStep action) { + assertThat(action.action()).isInstanceOf(CypherAction.class); + return CompletableFuture.runAsync(() -> runAction((CypherAction) action.action(), driver)); + } + + private static Column[] sourceColumns(NodeTargetStep target) { + return allProperties(target).map(SparkExampleIT::toSourceColumn).toArray(Column[]::new); + } + + private static Column[] sourceColumns(RelationshipTargetStep target) { + return Streams.concatAll( + allProperties(target), + target.startNode().keyProperties().stream(), + target.endNode().keyProperties().stream()) + .map(SparkExampleIT::toSourceColumn) + .toArray(Column[]::new); + } + + private Map columnRenames(NodeTargetStep target) { + return allProperties(target) + .collect(Collectors.toMap(PropertyMapping::getSourceField, PropertyMapping::getTargetProperty)); + } + + private String properties(EntityTargetStep target) { + return allProperties(target) + .map(mapping -> "%s:%s".formatted(mapping.getSourceField(), mapping.getTargetProperty())) + .collect(Collectors.joining(",")); + } + + private String keyProperties(EntityTargetStep target) { + return target.keyProperties().stream() + .map(mapping -> "%s:%s".formatted(mapping.getSourceField(), mapping.getTargetProperty())) + .collect(Collectors.joining(",")); + } + + private SaveMode saveMode(WriteMode writeMode) { + switch (writeMode) { + case CREATE -> { + return SaveMode.Append; + } + case MERGE -> { + return SaveMode.Overwrite; + } + } + throw new IllegalStateException("unexpected write mode: %s".formatted(writeMode)); + } + + private String nodeSaveMode(NodeMatchMode nodeMatchMode) { + switch (nodeMatchMode) { + case MATCH -> { + return "Match"; + } + case MERGE -> { + return "Overwrite"; + } + } + throw new IllegalStateException("unexpected node match mode: %s".formatted(nodeMatchMode)); + } + + private String keys(EntityTargetStep target) { + return target.keyProperties().stream() + .map(PropertyMapping::getTargetProperty) + .collect(Collectors.joining(",")); + } + + private String labels(NodeTargetStep target) { + return target.labels().stream() + .map(label -> { + Optional result = SchemaNames.sanitize(label); + assertThat(result) + .overridingErrorMessage("Label '%s' could not be sanitized", label) + .isPresent(); + return result.get(); + }) + .collect(Collectors.joining(":", ":", "")); + } + + private static Stream allProperties(EntityTargetStep target) { + return Stream.concat(target.keyProperties().stream(), target.nonKeyProperties().stream()); + } + + private static Column toSourceColumn(PropertyMapping mapping) { + return new Column(mapping.getSourceField()); + } + + private List schemaStatements(NodeTargetStep step) { + var schema = step.schema(); + if (schema == null) { + return Collections.emptyList(); + } + var statements = new ArrayList(); + statements.addAll(schema.getKeyConstraints().stream() + .map(constraint -> "CREATE CONSTRAINT %s FOR (n:%s) REQUIRE (%s) IS NODE KEY" + .formatted( + constraint.getName(), + sanitize(constraint.getLabel()), + constraint.getProperties().stream() + .map(SparkExampleIT::sanitize) + .map(prop -> "%s.%s".formatted("n", prop)) + .collect(Collectors.joining(",")))) + .toList()); + statements.addAll(schema.getUniqueConstraints().stream() + .map(constraint -> "CREATE CONSTRAINT %s FOR (n:%s) REQUIRE (%s) IS UNIQUE" + .formatted( + constraint.getName(), + sanitize(constraint.getLabel()), + constraint.getProperties().stream() + .map(SparkExampleIT::sanitize) + .map(prop -> "%s.%s".formatted("n", prop)) + .collect(Collectors.joining(",")))) + .toList()); + Map propertyTypes = step.propertyTypes(); + statements.addAll(schema.getTypeConstraints().stream() + .map(constraint -> "CREATE CONSTRAINT %s FOR (n:%s) REQUIRE n.%s IS :: %s" + .formatted( + constraint.getName(), + sanitize(constraint.getLabel()), + sanitize(constraint.getProperty()), + propertyType(propertyTypes.get(constraint.getProperty())))) + .toList()); + return statements; + } + + private List schemaStatements(RelationshipTargetStep step) { + var schema = step.schema(); + if (schema == null) { + return Collections.emptyList(); + } + var statements = new ArrayList(); + statements.addAll(schema.getKeyConstraints().stream() + .map(constraint -> "CREATE CONSTRAINT %s FOR ()-[r:%s]-() REQUIRE (%s) IS RELATIONSHIP KEY" + .formatted( + constraint.getName(), + sanitize(step.type()), + constraint.getProperties().stream() + .map(SparkExampleIT::sanitize) + .map(prop -> "%s.%s".formatted("r", prop)) + .collect(Collectors.joining(",")))) + .toList()); + statements.addAll(schema.getUniqueConstraints().stream() + .map(constraint -> "CREATE CONSTRAINT %s FOR ()-[r:%s]-() REQUIRE (%s) IS UNIQUE" + .formatted( + constraint.getName(), + sanitize(step.type()), + constraint.getProperties().stream() + .map(SparkExampleIT::sanitize) + .map(prop -> "%s.%s".formatted("r", prop)) + .collect(Collectors.joining(",")))) + .toList()); + Map propertyTypes = step.propertyTypes(); + statements.addAll(schema.getTypeConstraints().stream() + .map(constraint -> "CREATE CONSTRAINT %s FOR ()-[r:%s]-() REQUIRE r.%s IS :: %s" + .formatted( + constraint.getName(), + sanitize(step.type()), + sanitize(constraint.getProperty()), + propertyType(propertyTypes.get(constraint.getProperty())))) + .toList()); + return statements; + } + + private static String sanitize(String element) { + var result = SchemaNames.sanitize(element); + assertThat(result).isPresent(); + return result.get(); + } + + private static String propertyType(PropertyType propertyType) { + return switch (propertyType) { + case BOOLEAN -> "BOOLEAN"; + case BOOLEAN_ARRAY -> "LIST"; + case DATE -> "DATE"; + case DATE_ARRAY -> "LIST"; + case DURATION -> "DURATION"; + case DURATION_ARRAY -> "LIST"; + case FLOAT -> "FLOAT"; + case FLOAT_ARRAY -> "LIST"; + case INTEGER -> "INTEGER"; + case INTEGER_ARRAY -> "LIST"; + case LOCAL_DATETIME -> "LOCAL DATETIME"; + case LOCAL_DATETIME_ARRAY -> "LIST"; + case LOCAL_TIME -> "LOCAL TIME"; + case LOCAL_TIME_ARRAY -> "LIST"; + case POINT -> "POINT"; + case POINT_ARRAY -> "LIST"; + case STRING -> "STRING"; + case STRING_ARRAY -> "LIST"; + case ZONED_DATETIME -> "ZONED DATETIME"; + case ZONED_DATETIME_ARRAY -> "LIST"; + case ZONED_TIME -> "ZONED TIME"; + case ZONED_TIME_ARRAY -> "LIST"; + default -> throw new IllegalArgumentException(String.format("Unsupported property type: %s", propertyType)); + }; + } + + private static void runAction(CypherAction cypherAction, Driver driver) { + try (var session = driver.session()) { + var query = cypherAction.getQuery(); + switch (cypherAction.getExecutionMode()) { + case TRANSACTION -> + session.writeTransaction((tx) -> tx.run(query).consume()); + case AUTOCOMMIT -> session.run(query).consume(); + } + } + } + + private static void assertSchema(Driver driver) { + assertNodeConstraint(driver, "NODE_KEY", "Actor", "id"); + assertNodeTypeConstraint(driver, "Actor", "first_name", "STRING"); + assertNodeTypeConstraint(driver, "Actor", "id", "INTEGER"); + assertNodeTypeConstraint(driver, "Actor", "last_name", "STRING"); + assertNodeConstraint(driver, "NODE_KEY", "Category", "id"); + assertNodeTypeConstraint(driver, "Category", "id", "INTEGER"); + assertNodeTypeConstraint(driver, "Category", "name", "STRING"); + assertNodeConstraint(driver, "UNIQUENESS", "Category", "name"); + assertNodeConstraint(driver, "NODE_KEY", "Customer", "id"); + assertNodeTypeConstraint(driver, "Customer", "creation_date", "DATE"); + assertNodeTypeConstraint(driver, "Customer", "email", "STRING"); + assertNodeTypeConstraint(driver, "Customer", "first_name", "STRING"); + assertNodeTypeConstraint(driver, "Customer", "id", "INTEGER"); + assertNodeTypeConstraint(driver, "Customer", "last_name", "STRING"); + assertNodeConstraint(driver, "UNIQUENESS", "Customer", "email"); + assertNodeConstraint(driver, "NODE_KEY", "Inventory", "id"); + assertNodeTypeConstraint(driver, "Inventory", "id", "INTEGER"); + assertNodeTypeConstraint(driver, "Inventory", "movie_id", "INTEGER"); + assertNodeConstraint(driver, "NODE_KEY", "Movie", "id"); + assertNodeTypeConstraint(driver, "Movie", "description", "STRING"); + assertNodeTypeConstraint(driver, "Movie", "id", "INTEGER"); + assertNodeTypeConstraint(driver, "Movie", "title", "STRING"); + assertRelationshipConstraint(driver, "RELATIONSHIP_KEY", "HAS_RENTED", "id"); + assertRelationshipTypeConstraint(driver, "HAS_RENTED", "id", "INTEGER"); + } + + private static void assertNodeConstraint(Driver driver, String constraintType, String label, String property) { + try (Session session = driver.session()) { + var result = session.run( + """ + SHOW CONSTRAINTS YIELD type, entityType, labelsOrTypes, properties \ + WHERE type = $constraintType AND entityType = 'NODE' AND labelsOrTypes = [$label] AND properties = [$property] \ + RETURN count(*) = 1 AS result""", + Map.of("constraintType", constraintType, "label", label, "property", property)); + var records = result.list(MapAccessor::asMap); + assertThat(records).hasSize(1); + assertThat((boolean) records.getFirst().get("result")).isTrue(); + } + } + + private static void assertNodeTypeConstraint(Driver driver, String label, String property, String propertyType) { + try (Session session = driver.session()) { + var result = session.run( + """ + SHOW CONSTRAINTS YIELD type, entityType, labelsOrTypes, properties, propertyType \ + WHERE type = 'NODE_PROPERTY_TYPE' AND entityType = 'NODE' AND labelsOrTypes = [$label] AND properties = [$property] AND propertyType = $propertyType \ + RETURN count(*) = 1 AS result""", + Map.of("label", label, "property", property, "propertyType", propertyType)); + var records = result.list(MapAccessor::asMap); + assertThat(records).hasSize(1); + assertThat((boolean) records.getFirst().get("result")).isTrue(); + } + } + + private static void assertRelationshipConstraint( + Driver driver, String constraintType, String relType, String property) { + try (Session session = driver.session()) { + var result = session.run( + """ + SHOW CONSTRAINTS YIELD type, entityType, labelsOrTypes, properties \ + WHERE type = $constraintType AND entityType = 'RELATIONSHIP' AND labelsOrTypes = [$type] AND properties = [$property] \ + RETURN count(*) = 1 AS result""", + Map.of("constraintType", constraintType, "type", relType, "property", property)); + + var records = result.list(MapAccessor::asMap); + assertThat(records).hasSize(1); + assertThat((boolean) records.getFirst().get("result")).isTrue(); + } + } + + private static void assertRelationshipTypeConstraint( + Driver driver, String relType, String property, String propertyType) { + try (Session session = driver.session()) { + var result = session.run( + """ + SHOW CONSTRAINTS YIELD type, entityType, labelsOrTypes, properties, propertyType \ + WHERE type = 'RELATIONSHIP_PROPERTY_TYPE' AND entityType = 'RELATIONSHIP' AND labelsOrTypes = [$type] AND properties = [$property] AND propertyType = $propertyType \ + RETURN count(*) = 1 AS result""", + Map.of("type", relType, "property", property, "propertyType", propertyType)); + + var records = result.list(MapAccessor::asMap); + assertThat(records).hasSize(1); + assertThat((boolean) records.getFirst().get("result")).isTrue(); + } + } + + private static void assertNodeCount(Driver driver, String label, long expectedCount) { + try (Session session = driver.session()) { + var query = String.format("MATCH (n:`%s`) RETURN count(n) AS count", label); + var records = session.run(query).list(MapAccessor::asMap); + assertThat(records).hasSize(1); + assertThat((long) records.getFirst().get("count")).isEqualTo(expectedCount); + } + } + + private static void assertRelationshipCount( + Driver driver, String startLabel, String type, String endLabel, long expectedCount) { + try (Session session = driver.session()) { + var query = String.format( + "MATCH (:`%s`)-[r:`%s`]->(:`%s`) RETURN count(r) AS count", startLabel, type, endLabel); + var records = session.run(query).list(MapAccessor::asMap); + assertThat(records).hasSize(1); + assertThat((long) records.getFirst().get("count")).isEqualTo(expectedCount); + } + } + + public static class ParquetSourceProvider implements SourceProvider { + + @Override + public String supportedType() { + return "parquet"; + } + + @SuppressWarnings({"removal"}) + @Override + public ParquetSource provide(ObjectNode objectNode) { + return new ParquetSource( + objectNode.get("name").textValue(), objectNode.get("uri").textValue()); + } + } + + public record ParquetSource(String name, String uri) implements Source { + + @Override + public String getType() { + return "parquet"; + } + + @Override + public String getName() { + return name; + } + } + + private static class Streams { + + @SafeVarargs + public static Stream concatAll(Stream... streams) { + return Arrays.stream(streams).reduce(Stream::concat).orElse(Stream.empty()); + } + } +} diff --git a/examples/apache-spark/src/test/resources/META-INF/services/org.neo4j.importer.v1.sources.SourceProvider b/examples/apache-spark/src/test/resources/META-INF/services/org.neo4j.importer.v1.sources.SourceProvider new file mode 100644 index 00000000..cdc13868 --- /dev/null +++ b/examples/apache-spark/src/test/resources/META-INF/services/org.neo4j.importer.v1.sources.SourceProvider @@ -0,0 +1 @@ +org.neo4j.importer.SparkExampleIT$ParquetSourceProvider diff --git a/examples/apache-spark/src/test/resources/specs/dvd_rental.yaml b/examples/apache-spark/src/test/resources/specs/dvd_rental.yaml new file mode 100644 index 00000000..7098aff4 --- /dev/null +++ b/examples/apache-spark/src/test/resources/specs/dvd_rental.yaml @@ -0,0 +1,246 @@ +version: "1" +sources: + - name: actor + type: parquet + uri: gs://connectors-public/dvdrental/actor.parquet + - name: category + type: parquet + uri: gs://connectors-public/dvdrental/category.parquet + - name: customer + type: parquet + uri: gs://connectors-public/dvdrental/customer.parquet + - name: film_actor + type: parquet + uri: gs://connectors-public/dvdrental/film_actor.parquet + - name: film_category + type: parquet + uri: gs://connectors-public/dvdrental/film_category.parquet + - name: film + type: parquet + uri: gs://connectors-public/dvdrental/film.parquet + - name: inventory + type: parquet + uri: gs://connectors-public/dvdrental/inventory.parquet + - name: rental + type: parquet + uri: gs://connectors-public/dvdrental/rental.parquet +targets: + nodes: + - source: actor + name: actor_nodes + write_mode: merge + labels: [ Actor ] + properties: + - source_field: actor_id + target_property: id + target_property_type: integer + - source_field: first_name + target_property: first_name + target_property_type: string + - source_field: last_name + target_property: last_name + target_property_type: string + schema: + key_constraints: + - name: actor_id_key + label: Actor + properties: [ id ] + type_constraints: + - name: actor_id_type + label: Actor + property: id + - name: actor_first_name_type + label: Actor + property: first_name + - name: actor_last_name_type + label: Actor + property: last_name + + - source: category + name: category_nodes + write_mode: merge + labels: [ Category ] + properties: + - source_field: category_id + target_property: id + target_property_type: integer + - source_field: name + target_property: name + target_property_type: string + schema: + key_constraints: + - name: category_id_key + label: Category + properties: [ id ] + unique_constraints: + - name: category_name_key + label: Category + properties: [ name ] + type_constraints: + - name: category_id_type + label: Category + property: id + - name: category_name_type + label: Category + property: name + + - source: customer + name: customer_nodes + write_mode: merge + labels: [ Customer ] + properties: + - source_field: customer_id + target_property: id + target_property_type: integer + - source_field: first_name + target_property: first_name + target_property_type: string + - source_field: last_name + target_property: last_name + target_property_type: string + - source_field: email + target_property: email + target_property_type: string + - source_field: create_date + target_property: creation_date + target_property_type: date + schema: + key_constraints: + - name: customer_id_key + label: Customer + properties: [ id ] + unique_constraints: + - name: customer_email_unique + label: Customer + properties: [ email ] + type_constraints: + - name: customer_id_type + label: Customer + property: id + - name: customer_first_name_type + label: Customer + property: first_name + - name: customer_last_name_type + label: Customer + property: last_name + - name: customer_email_type + label: Customer + property: email + - name: customer_creation_date_type + label: Customer + property: creation_date + + - source: film + name: movie_nodes + write_mode: merge + labels: [ Movie ] + properties: + - source_field: film_id + target_property: id + target_property_type: integer + - source_field: title + target_property: title + target_property_type: string + - source_field: description + target_property: description + target_property_type: string + - source_field: release_year + target_property: release_year + schema: + key_constraints: + - name: movie_id_key + label: Movie + properties: [ id ] + type_constraints: + - name: movie_id_type + label: Movie + property: id + - name: movie_title_type + label: Movie + property: title + - name: movie_description_type + label: Movie + property: description + + - source: inventory + name: inventory_nodes + write_mode: merge + labels: [ Inventory ] + properties: + - source_field: inventory_id + target_property: id + target_property_type: integer + - source_field: film_id + target_property: movie_id + target_property_type: integer + schema: + key_constraints: + - name: inventory_id_key + label: Inventory + properties: [ id ] + type_constraints: + - name: inventory_id_type + label: Inventory + property: id + - name: inventory_film_id_type + label: Inventory + property: movie_id + + relationships: + - source: film_actor + name: movie_actor_relationships + write_mode: merge + node_match_mode: match + type: ACTED_IN + start_node_reference: actor_nodes + end_node_reference: movie_nodes + + - source: film_category + name: movie_category_relationships + write_mode: merge + node_match_mode: match + type: IN_CATEGORY + start_node_reference: movie_nodes + end_node_reference: category_nodes + + - source: rental + name: rental_relationships + write_mode: merge + node_match_mode: merge + type: HAS_RENTED + start_node_reference: customer_nodes + end_node_reference: inventory_nodes + properties: + - source_field: rental_id + target_property: id + target_property_type: integer + - source_field: rental_date + target_property: date + target_property_type: date + schema: + key_constraints: + - name: rental_id_key + properties: [ id ] + type_constraints: + - name: rental_id_type + property: id + +actions: + - name: remove inventory indirection + type: cypher + stage: end + execution_mode: autocommit + query: | + MATCH (inventory:Inventory) + CALL (inventory) { + MATCH (inventory)<-[old_rental:HAS_RENTED]-(customer:Customer) + WITH inventory, old_rental, old_rental.id AS rental_id, old_rental.date AS rental_date, customer + DELETE old_rental + WITH inventory, rental_id, rental_date, customer + MATCH (movie:Movie WHERE movie.id = inventory.movie_id) + MERGE (customer)-[new_rental:HAS_RENTED {id: rental_id}]->(movie) + ON CREATE SET new_rental.date = rental_date + } IN TRANSACTIONS + CALL (inventory) { + DELETE inventory + } IN TRANSACTIONS diff --git a/examples/pom.xml b/examples/pom.xml index 3756b800..73218d6b 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -12,6 +12,7 @@ import-spec-examples-parent apache-beam + apache-spark neo4j-admin