In [None]:
brew install apt-get
apt-get update # Update apt-get repository.
apt-get openjdk-8-jdk-headless -qq > /dev/null # Install Java.
wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.
pip install pyspark==3.2.1
pip install mysql-connector-python
apt-get -y install mysql-server

In [None]:
# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

findspark.add_packages('mysql:mysql-connector-java:8.0.11')

# Append the directory containing the config module to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'config')))


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, explode, size, array, sort_array, struct

# Append the directory containing the config module to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'config')))


spark = SparkSession.builder \
    .appName("Friend Recommendations") \
    .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.17") \
    .getOrCreate()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, explode, size, array, sort_array, struct
from pyspark.sql.window import Window

url = "jdbc:mysql://instalitedb.c1jqnrtmzqmb.us-east-1.rds.amazonaws.com:3306/instalitedb"
properties = {
    "user": "admin",
    "password": "rds-password",
    "driver": "com.mysql.jdbc.Driver"
}

table_name = "user_friends"
df = spark.read.jdbc(url=url, table=table_name, properties=properties).selectExpr("cast(user_id as int)", "cast(friend_id as int)")

# Create symmetric pairs (bi-directional relationships)
friends = df.union(df.select(col("friend_id").alias("user_id"), col("user_id").alias("friend_id")))

# Join on user_id to find friends of friends
connections = friends.alias("f1").join(friends.alias("f2"), col("f1.friend_id") == col("f2.user_id")) \
    .select(col("f1.user_id"), col("f2.friend_id").alias("fof_id")) \
    .where(col("f1.user_id") != col("f2.friend_id"))

# Deduplicate and count mutual friends
mutual_friends = connections.groupBy("user_id", "fof_id").count()

# Structure the recommendations
recommendations_struct = mutual_friends.select(
    "user_id",
    struct(col("fof_id"), col("count").alias("mutual_friends")).alias("recommendation")
)

# Order the DataFrame by user_id and mutual_friends count descending
ordered_recommendations = recommendations_struct.orderBy("user_id", col("recommendation.mutual_friends").desc())

# Group by user_id and collect recommendations into a list
final_recommendations = ordered_recommendations.groupBy("user_id").agg(
    collect_list("recommendation").alias("recommendations")
)

# Explode the recommendations to separate rows
exploded_recommendations = final_recommendations.select(
    "user_id",
    explode("recommendations").alias("recommendation")
)

# Add rank for each recommendation within each user
window_spec = Window.partitionBy("user_id").orderBy(col("recommendation.mutual_friends").desc())
ranked_recommendations = exploded_recommendations.select(
    "user_id",
    col("recommendation.fof_id").alias("friend_rec_id"),
    row_number().over(window_spec).alias("rank")
)

# Show result
ranked_recommendations.show()

In [None]:
# Table name in MySQL
table_name = "friend_recommendations"

# Write DataFrame to MySQL
# ranked_recommendations.write.jdbc(url=url, table=table_name, mode="append", properties=properties)

# Write DataFrame to a staging table
staging_table_name = "friends_recommendation_staging"
ranked_recommendations.write.jdbc(url=url, table=staging_table_name, mode="overwrite", properties=properties)


In [None]:
import pymysql

# Connection details
connection = pymysql.connect(host='instalitedb.c1jqnrtmzqmb.us-east-1.rds.amazonaws.com', user="admin", password='rds-password', db='instalitedb')

try:
    with connection.cursor() as cursor:
        sql_command = """
        INSERT INTO friends_recommendation (user_id, friend_rec_id, rank)
        SELECT user_id, friend_rec_id rank FROM friends_recommendation_staging
        ON DUPLICATE KEY UPDATE
            rank = VALUES(rank);
        TRUNCATE TABLE friends_recommendation_staging;
        """
        cursor.execute(sql_command)
    connection.commit()
finally:
    connection.close()

spark.close()