In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, explode, lit, size, desc
from pyspark.sql.types import (
    FloatType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)

In [2]:
spark = (
    SparkSession.builder.appName("bk-imp")
    .config(
        "spark.jars.packages",
        "org.neo4j:neo4j-connector-apache-spark_2.12:5.0.1_for_spark_3",
    )
    .getOrCreate()
)

23/05/09 16:41:47 WARN Utils: Your hostname, workspace resolves to a loopback address: 127.0.1.1; using 11.11.1.73 instead (on interface eth0)
23/05/09 16:41:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/terrabot/bk-imp/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/terrabot/.ivy2/cache
The jars for the packages stored in: /home/terrabot/.ivy2/jars
org.neo4j#neo4j-connector-apache-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1e78b784-b4a5-4879-8031-f9c475eebfd4;1.0
	confs: [default]
	found org.neo4j#neo4j-connector-apache-spark_2.12;5.0.1_for_spark_3 in central
	found org.neo4j#neo4j-connector-apache-spark_2.12_common;5.0.1 in central
	found org.neo4j.driver#neo4j-java-driver;4.4.11 in central
	found org.reactivestreams#reactive-streams;1.0.4 in central
	found org.apache.xbean#xbean-asm6-shaded;4.10 in central
	found org.neo4j#neo4j-cypher-dsl;2020.1.4 in central
	found org.apiguardian#apiguardian-api;1.1.0 in central
:: resolution report :: resolve 239ms :: artifacts dl 11ms
	:: modules in use:
	org.apache.xbean#xbean-asm6-shaded;4.10 from central in [default]
	org.apiguardian#apiguardian-api;1.1.0 from central in [default]
	org.neo4j#neo4j-connector-apache-spar

## Sampling Data

In [3]:
review_schema = StructType(
    [
        StructField("asin", StringType(), True),
        StructField("reviewerID", StringType(), True),
        StructField("overall", FloatType(), True),
    ]
)
review_df = (
    spark.read.schema(review_schema)
    .json("../data/Automotive.json")
    .select(
        col("asin").alias("product_id"),
        col("reviewerID").alias("reviewer_id"),
        col("overall").alias("rating"),
    )
    .dropDuplicates()
    .repartition(8)
)

metadata_df = (
    spark.read.json("../data/meta_Automotive.json")
    .select(
        [
            col("asin").alias("product_id"),
            "rank",
            "category",
            "description",
        ]
    )
    .dropDuplicates()
    .repartition(8)
)

merged_df = review_df.join(metadata_df, ["product_id"])
merged_df = (
    merged_df.groupBy("product_id")
    .agg(
        count("*").alias("reviewer_count"),
    )
    .filter("reviewer_count >= 5")
    .limit(1000)
    .join(merged_df, ["product_id"])
    .select(
        "product_id",
        "reviewer_id",
        "rating",
        "rank",
        "category",
        "description",
    )
    .groupBy("reviewer_id")
    .agg(
        count("*").alias("review_count"),
    )
    .filter("review_count >= 2")
    .join(merged_df, ["reviewer_id"])
    .select(
        "product_id",
        "reviewer_id",
        "rating",
        "rank",
        "category",
        "description",
    )
)

merged_df.count()

23/05/09 16:42:12 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/05/09 16:42:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:42:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:42:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:43:00 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:43:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:43:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:43:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
2

46628

In [4]:
merged_df.show(3)

23/05/09 16:44:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:44:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:44:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:44:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:44:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:44:29 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:44:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:44:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+----------+--------------+------+--------------------+--------------------+--------------------+
|product_id|   reviewer_id|rating|                rank|            category|         description|
+----------+--------------+------+--------------------+--------------------+--------------------+
|B0001CMUV4|A1646HQFB6TKQ3|   5.0|[">#768 in Automo...|[Automotive, Exte...|[The StowAway hit...|
|B00029WVJC|A3F7HEE8XEF335|   3.0|[">#58,233 in Aut...|[Automotive, Repl...|[The Auxiliary Mi...|
|B0002SR4Q8| ADSL9QVBI9GTU|   4.0|[">#2,165 in Auto...|[Automotive, Tool...|[Removes all oil ...|
+----------+--------------+------+--------------------+--------------------+--------------------+
only showing top 3 rows



                                                                                

In [5]:
merged_df.write.parquet("../data/sampled_data", mode="overwrite")

23/05/09 16:45:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:45:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:45:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:45:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:45:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:45:38 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:45:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:45:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:45:40 WARN RowBasedKeyValueBatch: Calling spill() on

## Review

In [6]:
df = merged_df.select(["product_id", "reviewer_id", "rating"])

In [None]:
# Write nodes to Neo4j
df.select(col("product_id").alias("id")).dropDuplicates().repartition(
    8
).write.format("org.neo4j.spark.DataSource").option(
    "url", "bolt://localhost:7687"
).option(
    "node.keys", "id"
).option(
    "labels", ":Product"
).mode(
    "overwrite"
).save()
df.select(col("reviewer_id").alias("id")).dropDuplicates().repartition(
    8
).write.format("org.neo4j.spark.DataSource").option(
    "url", "bolt://localhost:7687"
).option(
    "authentication.basic.username", "neo4j"
).option(
    "authentication.basic.password", "bitnami1"
).option(
    "node.keys", "id"
).option(
    "labels", ":User"
).mode(
    "overwrite"
).save()

23/05/09 16:46:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:46:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:46:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:46:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:47:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:47:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:47:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/05/09 16:47:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
[Stage 253:>                                                    

In [None]:
# Write relationships to Neo4j
df.repartition(8).write.format("org.neo4j.spark.DataSource").option(
    "url", "bolt://localhost:7687"
).option("relationship.save.strategy", "keys").option(
    "relationship", "reviews"
).option(
    "relationship.properties", "rating"
).option(
    "relationship.source.labels", ":User"
).option(
    "relationship.source.node.keys", "reviewer_id:id"
).option(
    "relationship.target.labels", ":Product"
).option(
    "relationship.target.node.keys", "product_id:id"
).mode(
    "overwrite"
).save()

## Metadata

In [None]:
df = merged_df.select(
    ["also_buy", "also_view", "product_id"]
).drop_duplicates()

In [None]:
# Explode also_buy
also_buy_df = df.select(
    col("product_id").alias("src_product_id"),
    explode("also_buy").alias("dst_product_id"),
    lit("same_buyer").alias("relationship"),
)

# Explode also_view
also_view_df = df.select(
    col("product_id").alias("src_product_id"),
    explode("also_view").alias("dst_product_id"),
    lit("same_viewer").alias("relationship"),
)

# Union the two dataframes
result_df = also_buy_df.union(also_view_df).dropDuplicates(
    ["src_product_id", "dst_product_id"]
)

In [None]:
# Create a dataframe with distinct product IDs
nodes_df = (
    result_df.select(col("src_product_id").alias("id"))
    .union(result_df.select(col("dst_product_id").alias("id")))
    .distinct()
)

In [None]:
# Write nodes to Neo4j
nodes_df.repartition(8).write.format("org.neo4j.spark.DataSource").option(
    "url", "bolt://localhost:7687"
).option("node.keys", "id").option("labels", ":Product").mode(
    "overwrite"
).save()

In [None]:
# Write relationships to Neo4j
for relationship in ["same_buyer", "same_viewer"]:
    relationships_df = result_df.filter(result_df.relationship == relationship)
    relationships_df.repartition(8).write.format(
        "org.neo4j.spark.DataSource"
    ).option("url", "bolt://localhost:7687").option(
        "relationship.save.strategy", "keys"
    ).option(
        "relationship", relationship
    ).option(
        "relationship.source.labels", ":Product"
    ).option(
        "relationship.source.node.keys", "src_product_id:id"
    ).option(
        "relationship.target.labels", ":Product"
    ).option(
        "relationship.target.node.keys", "dst_product_id:id"
    ).mode(
        "overwrite"
    ).save()
    relationships_df.repartition(8).write.format(
        "org.neo4j.spark.DataSource"
    ).option("url", "bolt://localhost:7687").option(
        "relationship.save.strategy", "keys"
    ).option(
        "relationship", relationship
    ).option(
        "relationship.source.labels", ":Product"
    ).option(
        "relationship.source.node.keys", "dst_product_id:id"
    ).option(
        "relationship.target.labels", ":Product"
    ).option(
        "relationship.target.node.keys", "src_product_id:id"
    ).mode(
        "overwrite"
    ).save()