In [None]:
from pyspark.sql import SparkSession
import math

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("SQL to Neo4j") \
    .config("spark.jars.packages", "org.neo4j:neo4j-connector-apache-spark_2.12:4.0.0") \
    .getOrCreate()

# JDBC URL and connection properties
jdbc_url = "jdbc:your_sql_database_url"
connection_properties = {
    "user": "your_username",
    "password": "your_password",
    "driver": "com.mysql.jdbc.Driver"  # Change to your SQL database driver
}


# Configure Neo4j connection
neo4j_url = "bolt://your_neo4j_url:7687"
neo4j_properties = {
    "url": neo4j_url,
    "authentication.basic.username": "your_neo4j_username",
    "authentication.basic.password": "your_neo4j_password"
}

In [None]:
# Function to get the min and max values of the partition column
def get_min_max(table, column):
    query = f"(SELECT MIN({column}) as min_val, MAX({column}) as max_val FROM {table}) as min_max_table"
    df = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties)
    row = df.collect()[0]
    return row["min_val"], row["max_val"]

# Function to calculate the number of partitions
def calculate_num_partitions(size, partition_size=100000):
    return math.ceil(size / partition_size)

In [None]:
tables = {
    "actors": {"column": "id", "df": None},
    "genre": {"column": "id", "df": None},
    "movie": {"column": "id", "df": None},
    "acted_in": {"column": "id", "df": None},
    "awards": {"column": "id", "df": None}
}

for table, info in tables.items():
    min_val, max_val = get_min_max(table, info["column"])
    size = max_val - min_val + 1
    num_partitions = calculate_num_partitions(size)
    
    info["df"] = spark.read.jdbc(
        url=jdbc_url,
        table=table,
        properties=connection_properties,
        column=info["column"],
        lowerBound=min_val,
        upperBound=max_val,
        numPartitions=num_partitions
    )

In [None]:
# Extract DataFrames from the dictionary
actors_df = tables["actors"]["df"]
genre_df = tables["genre"]["df"]
movie_df = tables["movie"]["df"]
acted_in_df = tables["acted_in"]["df"]
awards_df = tables["awards"]["df"]

# Transform data
actors_df = actors_df.withColumnRenamed("id", "actorId")
genre_df = genre_df.withColumnRenamed("id", "genreId")
movie_df = movie_df.withColumnRenamed("id", "movieId").withColumnRenamed("genre_id", "genreId")
acted_in_df = acted_in_df.withColumnRenamed("actor_id", "actorId").withColumnRenamed("movie_id", "movieId")
awards_df = awards_df.withColumnRenamed("actor_id", "actorId").withColumnRenamed("movie_id", "movieId")


In [None]:
# Function to write data to Neo4j
def write_to_neo4j(df, labels, relationship=None):
    df.write \
        .format("org.neo4j.spark.DataSource") \
        .mode("overwrite") \
        .option("url", neo4j_url) \
        .option("labels", labels) \
        .option("relationship", relationship) \
        .option("relationship.save.strategy", "keys") \
        .option("relationship.source.labels", ":Actor" if relationship else None) \
        .option("relationship.source.save.mode", "overwrite" if relationship else None) \
        .option("relationship.target.labels", ":Movie" if relationship else None) \
        .option("relationship.target.save.mode", "overwrite" if relationship else None) \
        .option("relationship.source.node.keys", "actorId:actorId" if relationship else None) \
        .option("relationship.target.node.keys", "movieId:movieId" if relationship else None) \
        .save()

# Write data to Neo4j with partitioning
actors_df.repartition(num_partitions).foreachPartition(lambda df: write_to_neo4j(df, ":Actor"))
genre_df.repartition(num_partitions).foreachPartition(lambda df: write_to_neo4j(df, ":Genre"))
movie_df.repartition(num_partitions).foreachPartition(lambda df: write_to_neo4j(df, ":Movie"))
acted_in_df.repartition(num_partitions).foreachPartition(lambda df: write_to_neo4j(df, ":Actor", "ACTED_IN"))
awards_df.repartition(num_partitions).foreachPartition(lambda df: write_to_neo4j(df, ":Actor", "AWARDED"))