In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, regexp_extract
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

In [2]:
NEO4J_URI = "bolt://neo4j:password@neo4j:7687"

In [3]:
import pyspark
print(pyspark.__version__) # VERSION MUST MATCH THE SPARK CONTAINER VERSION

3.5.3


In [4]:
spark = (
    SparkSession.builder.appName("JsonToNeo4jInjection")
    .master("spark://spark:7077")
    .config("spark.jars.packages", "neo4j-contrib:neo4j-spark-connector:5.3.1-s_2.12")
    .getOrCreate()
)

spark

In [5]:
author_struct = StructType([StructField("name", StringType())])
keyword_struct = StructType([StructField("name", StringType())])

In [6]:
schema = StructType([
    StructField("title", StringType()),
    StructField("volnr", StringType()),
    StructField("pubyear", StringType()),
    StructField("volacronym", StringType()),
    StructField("voltitle", StringType()),
    StructField("fulltitle", StringType()),
    StructField("loctime", StringType()),
    StructField("voleditors", ArrayType(author_struct)),
    StructField("papers", ArrayType(StructType([
        StructField("authors", ArrayType(author_struct)),
        StructField("keywords", ArrayType(keyword_struct)),
        StructField("url", StringType()),
        StructField("title", StringType()),
        StructField("pages", StringType()),
        StructField("abstract", StringType()),
    ])))
])

In [7]:
# Path to directory containing multiple JSON files
from pathlib import Path

json_dir = Path("../data/Volumes").__str__()

# Read all JSON files in the directory
n = 10
df = (spark.read
      .schema(schema)
      .option("multiline", "true")
      .json(json_dir)
      .limit(n)
      .repartition(1)
     )

# Show the DataFrame
df.show()

+--------------------+--------+-------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               title|   volnr|pubyear|    volacronym|            voltitle|           fulltitle|             loctime|          voleditors|              papers|
+--------------------+--------+-------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|CEUR-WS.org/Vol-3...|Vol-3486|   2023|  Ital-IA 2023|Ital-IA 2023 Them...|Proceedings of th...|Pisa, Italy, May ...|[{Fabrizio Falchi...|[{[{Lorenzo De Do...|
|CEUR-WS.org/Vol-3...|Vol-3497|   2023|  CLEF-WN 2023|CLEF 2023 Working...|Working Notes of ...|Thessaloniki, Gre...|[{Mohammad Aliann...|[{[], [], https:/...|
|CEUR-WS.org/Vol-3...|Vol-3180|   2022|     CLEF 2022|CLEF 2022 Working...|Proceedings of th...|                NULL|[{Guglielmo Faggi...|[{[{Guglielmo Fag...|
|CEUR-WS.org/Vol-2...|Vol-2936|   2021| 

In [8]:
df.select("volnr").show(10)

+--------+
|   volnr|
+--------+
|Vol-3486|
|Vol-3497|
|Vol-3180|
|Vol-2936|
|Vol-2940|
|Vol-2019|
|Vol-3293|
|Vol-1174|
|Vol-3159|
|Vol-3171|
+--------+



In [9]:
vol = (
    spark.read.option("url", NEO4J_URI)
    .option("authentication.type", "basic")
    .option("authentication.basic.username", "neo4j")
    .option("authentication.basic.password", "password")
    .format("org.neo4j.spark.DataSource")
    .option("labels", ":Volume")
    .load()
)
vol.count()

0

In [10]:
# if vol.count() > 0:
#     df = df.join(vol, df["volnr"] == vol["volnr"], "left_anti")
df.count()

10

## Create volume relationships

In [11]:
volume_param = [
    "volnr",
    "title",
    "pubyear",
    "volacronym",
    "voltitle",
    "fulltitle",
    "loctime",
]
paper_param = ["url", "abstract", "title", "pages"]

person_param = "name"
keyword_param = "name"

### Volume -> Paper (CONTAINS)

In [12]:
volume_papers = df.withColumn("paper", explode("papers")).select(
    "title",
    "volnr",
    "pubyear",
    "volacronym",
    "voltitle",
    "fulltitle",
    "loctime",
    col("paper.url").alias("url"),
    col("paper.title").alias("paper_title"),
    col("paper.pages").alias("pages"),
    col("paper.abstract").alias("abstract"),
).repartition(1)

p_param = ",".join(paper_param).replace("title", "paper_title:title", 1)

(
    volume_papers.write
    # Overwrite relationships
    .mode("Overwrite")
    .option("url", NEO4J_URI)
    .option("authentication.type", "basic")
    .option("authentication.basic.username", "neo4j")
    .option("authentication.basic.password", "password")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "CONTAINS")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Volume")
    # Map the DataFrame columns to node properties
    .option("relationship.source.node.properties", ",".join(volume_param))
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "volnr")
    # Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Paper")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", p_param)
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "url")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "")
    .option("schema.optimization.node.keys", "UNIQUE")
    .save()
)

### Volume -> Person (EDITOR)

In [13]:
volume_editor = (
    df.withColumn("voleditorname", explode("voleditors"))
    .drop("voleditors", "papers")
    .withColumn("voleditorname", col("voleditorname.name"))
    .dropna(subset=["voleditorname"])
    .repartition(1)
)

(
    volume_editor.write
    # Overwrite relationships
    .mode("Overwrite")
    .option("url", NEO4J_URI)
    .option("authentication.type", "basic")
    .option("authentication.basic.username", "neo4j")
    .option("authentication.basic.password", "password")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "EDITOR")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Volume")
    # Map the DataFrame columns to node properties
    .option("relationship.source.node.properties", ",".join(volume_param))
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "volnr")
    # Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Person")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "voleditorname:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "voleditorname:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "")
    .option("schema.optimization.node.keys", "UNIQUE")
    .save()
)

In [14]:
volume_editor

DataFrame[title: string, volnr: string, pubyear: string, volacronym: string, voltitle: string, fulltitle: string, loctime: string, voleditorname: string]

In [15]:
volume_editor.show()

+--------------------+--------+-------+------------+--------------------+--------------------+--------------------+--------------------+
|               title|   volnr|pubyear|  volacronym|            voltitle|           fulltitle|             loctime|       voleditorname|
+--------------------+--------+-------+------------+--------------------+--------------------+--------------------+--------------------+
|CEUR-WS.org/Vol-3...|Vol-3486|   2023|Ital-IA 2023|Ital-IA 2023 Them...|Proceedings of th...|Pisa, Italy, May ...|     Fabrizio Falchi|
|CEUR-WS.org/Vol-3...|Vol-3486|   2023|Ital-IA 2023|Ital-IA 2023 Them...|Proceedings of th...|Pisa, Italy, May ...|     Fosca Giannotti|
|CEUR-WS.org/Vol-3...|Vol-3486|   2023|Ital-IA 2023|Ital-IA 2023 Them...|Proceedings of th...|Pisa, Italy, May ...|       Anna Monreale|
|CEUR-WS.org/Vol-3...|Vol-3486|   2023|Ital-IA 2023|Ital-IA 2023 Them...|Proceedings of th...|Pisa, Italy, May ...|     Chiara Boldrini|
|CEUR-WS.org/Vol-3...|Vol-3486|   2023|It

## Create papers relationships

In [16]:
papers = (df.withColumn("paper", explode("papers"))
            .select(
                col("paper.authors").alias("authors"),
                col("paper.keywords").alias("keywords"),
                col("paper.url").alias("url"),
                col("paper.title").alias("title"),
                col("paper.pages").alias("pages"),
                col("paper.abstract").alias("abstract")
            ).repartition(1)
)
papers

DataFrame[authors: array<struct<name:string>>, keywords: array<struct<name:string>>, url: string, title: string, pages: string, abstract: string]

### Paper -> Person (AUTHOR)

In [17]:
papers_authors = (
    papers.withColumn("authorname", explode("authors"))
    .drop("authors", "keywords")
    .withColumn("authorname", col("authorname.name"))
    .dropna(subset=["authorname"])
    .repartition(1)
)

(
    papers_authors.write
    # Overwrite relationships
    .mode("Overwrite")
    .option("url", NEO4J_URI)
    .option("authentication.type", "basic")
    .option("authentication.basic.username", "neo4j")
    .option("authentication.basic.password", "password")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "AUTHOR")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Paper")
    # Map the DataFrame columns to node properties
    .option("relationship.source.node.properties", ",".join(paper_param))
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "url")
    # Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Person")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "authorname:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "authorname:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "")
    .option("schema.optimization.node.keys", "UNIQUE")
    .save()
)

### Paper -> Keyword (KEYWORD)

In [18]:
papers_authors = (
    papers.withColumn("keyword", explode("keywords"))
    .drop("authors", "keywords")
    .withColumn("keyword", col("keyword.name"))
    .dropna(subset=["keyword"])
    .repartition(1)
)

(
    papers_authors.write
    # Overwrite relationships
    .mode("Overwrite")
    .option("url", NEO4J_URI)
    .option("authentication.type", "basic")
    .option("authentication.basic.username", "neo4j")
    .option("authentication.basic.password", "password")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "KEYWORD")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Paper")
    # Map the DataFrame columns to node properties
    .option("relationship.source.node.properties", ",".join(paper_param))
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "url")
    # Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Keyword")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "keyword:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "keyword:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "")
    .option("schema.optimization.node.keys", "UNIQUE")
    .save()
)