In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
import requests
from io import BytesIO
from zipfile import ZipFile
from neo4j_parallel_spark_loader.bipartite import group_and_batch_spark_dataframe
from neo4j_parallel_spark_loader import ingest_spark_dataframe


StatementMeta(medium, 19, 7, Finished, Available, Finished)

## Create spark session

In [2]:
username = "NEO4J_USER"
password = "NEO4J_PASSWORD"
url = "NEO4J_URL"
dbname = "NEO4J_DATABASE"
spark_executor_count=5

spark = (
    SparkSession.builder
    .appName("AmazonRatings")
    .config("neo4j.url", url)
    .config("url", url)
    .config("neo4j.authentication.basic.username", username)
    .config("neo4j.authentication.basic.password", password)
    .config("neo4j.database", dbname)
    .getOrCreate()
)

StatementMeta(medium, 19, 3, Finished, Available, Finished)

## Download data

In [7]:
# Define the schema
schema = StructType([
    StructField("source_id", IntegerType(), True),
    StructField("target_id", IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("timestamp", IntegerType(), True)
])

# Download the ZIP file
response = requests.get("https://nrvis.com/download/data/dynamic/rec-amazon-ratings.zip")
zip_file = ZipFile(BytesIO(response.content))

# Read the CSV file directly from the ZIP
with zip_file.open("rec-amazon-ratings.edges") as file:
    # Convert to string buffer for Spark to read
    content = file.read().decode('utf-8')
    
    # Create RDD from content
    rdd = spark.sparkContext.parallelize(content.splitlines())
    
    # Convert RDD to DataFrame with schema
    rating_df = spark.read.csv(rdd, schema=schema, header=False)

# Now df is your Spark DataFrame containing the data with proper column names and types
# You can verify the data
rating_df.show()
rating_df.printSchema()

StatementMeta(medium, 19, 8, Finished, Available, Finished)

+---------+---------+------+----------+
|source_id|target_id|rating| timestamp|
+---------+---------+------+----------+
|        1|        1|   5.0|1117404000|
|        1|        2|   1.0|1105916400|
|        1|        3|   5.0|1105916400|
|        1|        4|   1.0|1105570800|
|        1|        5|   1.0|1104966000|
|        1|        6|   5.0|1103497200|
|        1|        7|   4.0|1081461600|
|        1|        8|   5.0|1074985200|
|        1|        9|   5.0|1071961200|
|        1|       10|   1.0|1071788400|
|        1|       11|   4.0|1071702000|
|        1|       12|   5.0|1070492400|
|        1|       13|   5.0|1070319600|
|        1|       14|   5.0|1066514400|
|        1|       15|   4.0|1066341600|
|        1|       16|   5.0|1066341600|
|        1|       17|   5.0|1066168800|
|        1|       18|   2.0|1065996000|
|        1|       19|   5.0|1065909600|
|        1|       20|   5.0|1065650400|
+---------+---------+------+----------+
only showing top 20 rows

root
 |-- sour

In [8]:
rating_df.count()

StatementMeta(medium, 19, 9, Finished, Available, Finished)

5838041

## Load nodes

In [11]:
(
    rating_df
    .select("source_id")
    .distinct()
    .write
    .format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Source")
    .option("node.keys", "source_id:id")
    .option("schema.optimization.node.keys", "KEY")
    .save()
)

StatementMeta(medium, 19, 12, Finished, Available, Finished)

In [12]:
(
    rating_df
    .select("target_id")
    .distinct()
    .write
    .format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Target")
    .option("node.keys", "target_id:id")
    .option("schema.optimization.node.keys", "KEY")
    .save()
)

StatementMeta(medium, 19, 13, Finished, Available, Finished)

## Load rels

In [9]:
rel_batch_df = group_and_batch_spark_dataframe(spark_dataframe=rating_df, 
                                               source_col='source_id', 
                                               target_col='target_id', 
                                               num_groups=spark_executor_count)

StatementMeta(medium, 19, 10, Finished, Available, Finished)

In [10]:
rel_batch_df.show()

StatementMeta(medium, 19, 11, Finished, Available, Finished)

+---------+---------+------+----------+-------+-----+
|source_id|target_id|rating| timestamp|  group|batch|
+---------+---------+------+----------+-------+-----+
|        1|        7|   4.0|1081461600|3 --> 4|    2|
|        1|       19|   5.0|1065909600|3 --> 3|    1|
|    13417|      110|   5.0|1102806000|4 --> 2|    1|
|   398740|     1497|   5.0| 999554400|4 --> 3|    2|
|    55714|     6452|   5.0|1101769200|4 --> 4|    3|
|   758089|     8282|   5.0|1024178400|3 --> 4|    2|
|   758100|     9000|   4.0|1015714800|0 --> 0|    0|
|   563976|    10287|   3.0|1098655200|4 --> 4|    3|
|    28574|    14304|   1.0|1093039200|1 --> 4|    0|
|   968213|    18887|   4.0|1056146400|2 --> 2|    4|
|     6791|    23365|   4.0|1096322400|0 --> 2|    2|
|     6791|    23365|   4.0|1096322400|0 --> 2|    2|
|    94201|    26022|   4.0|1065304800|2 --> 4|    1|
|    55711|    31486|   1.0|1082844000|2 --> 3|    0|
|    55711|    31486|   1.0|1082844000|2 --> 3|    0|
|   758084|    33837|   5.0|

In [18]:
query = """
    MATCH(source:Source {id: event.source_id})
    MATCH(target:Target {id: event.target_id})
    MERGE(source)-[r:RELATES_TO {timestamp:event.timestamp}]->(target)
    SET r.rating = event.rating
    """

ingest_spark_dataframe(
    spark_dataframe=rel_batch_df,
    save_mode= "Overwrite",
    options={"query":query}
)

StatementMeta(medium, 19, 19, Finished, Available, Finished)

## Delete rels

In [25]:
rel_count = rating_df.count()
batch_count = rel_count // 10000 + 1
print(rel_count, batch_count)

StatementMeta(medium, 19, 26, Finished, Available, Finished)

5838041 584


In [26]:
from pyspark.sql.functions import lit
del_df = (spark.range(batch_count)
    .select(lit(1).alias("id")))
print(del_df.count())

StatementMeta(medium, 19, 27, Finished, Available, Finished)

584


In [27]:
del_query = """
    MATCH ()-[r:RELATES_TO]->()
    WITH r LIMIT 10000
    DELETE r"""

(
    del_df.coalesce(1).write
    .format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("query", del_query)
    .option("batch.size", 1)
    .save()
)

StatementMeta(medium, 19, 28, Finished, Available, Finished)

## Load rels serially

In [29]:
(
    rating_df.repartition(1).write
    .format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("query", query)
    .save()
)

StatementMeta(medium, 19, 30, Finished, Available, Finished)