In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col, length, avg, count, when, expr, regexp_extract
import psycopg2

# 1. Créer une session Spark
print("Création de la session Spark...")
spark = SparkSession.builder \
    .appName("PostgresToSpark") \
    .config("spark.jars", "/usr/local/spark/jars/postgresql-42.3.9.jar") \
    .getOrCreate()

# 2. Configurer les paramètres de connexion
print("Configuration des paramètres de connexion JDBC pour PostgreSQL...")
jdbc_url = "jdbc:postgresql://postgres:5432/DB_Mastodon"
properties = {
    "user": "fadi",       
    "password": "fadi",   
    "driver": "org.postgresql.Driver"  
}

# 3. Lire les données depuis la table 'Mostodon_SILVER_bis'
print("Lecture des données depuis la table 'Mostodon_SILVER_bis' dans PostgreSQL...")
df_silver = spark.read.jdbc(url=jdbc_url, table="Mostodon_SILVER_bis", properties=properties)
print(f"Nombre de lignes lues : {df_silver.count()}")

# 4. Calcul de nouvelles colonnes
print("Calcul des nouvelles colonnes...")

# Vérification de la présence d'un lien dans la colonne 'content'
# Utilisation de l'expression régulière pour détecter les liens (http/https)
df_with_calculations = df_silver.withColumn("engagement_score", 
    (col("favourites_count") * 0.5) + 
    (col("reblogs_count") * 1.0) + 
    (col("replies_count") * 1.5)
).withColumn("content_length", length(col("content"))
).withColumn("contains_link", 
    when(regexp_extract(col("content"), r'(http|https)://[^\s]+', 0) != "", True).otherwise(False)
)

# 5. Connexion à PostgreSQL via psycopg2 pour la table Mostodon_GOLD_bis
print("Connexion à PostgreSQL avec psycopg2...")
conn = psycopg2.connect(
    host="postgres",
    database="DB_Mastodon",
    user="fadi",
    password="fadi"
)
cursor = conn.cursor()

# 6. Création de la table 'Mostodon_GOLD_bis' si elle n'existe pas déjà
print("Création de la table 'Mostodon_GOLD_bis' avec des colonnes supplémentaires si elle n'existe pas déjà...")
create_table_query = '''
CREATE TABLE IF NOT EXISTS Mostodon_GOLD_bis (
    id BIGINT PRIMARY KEY,
    username TEXT NOT NULL,
    display_name TEXT NOT NULL,
    content TEXT NOT NULL,
    favourites_count INT NOT NULL,
    reblogs_count INT NOT NULL,
    replies_count INT NOT NULL,
    tags TEXT,
    engagement_score FLOAT NOT NULL,
    content_length INT NOT NULL,
    contains_link BOOLEAN NOT NULL,
    total_posts INT DEFAULT 0,
    avg_engagement_score FLOAT DEFAULT 0.0
);
'''
cursor.execute(create_table_query)
conn.commit()

# 7. Lire les IDs existants dans la table Mostodon_GOLD_bis
existing_ids_gold = spark.read.jdbc(url=jdbc_url, table="Mostodon_GOLD_bis", properties=properties) \
    .select("id") \
    .rdd.flatMap(lambda x: x).collect()

# 8. Filtrer le DataFrame pour exclure les enregistrements existants
df_filtered = df_with_calculations.filter(~col("id").isin(existing_ids_gold))

# 9. Insertion des données dans la table Mostodon_GOLD_bis
print("Insertion des données dans la table 'Mostodon_GOLD_bis' via Spark JDBC...")

# Convertir la colonne tags (array) en texte avant d'écrire dans la base
df_filtered = df_filtered.withColumn("tags", col("tags").cast("string"))

# Insérer les données filtrées
df_filtered.write.jdbc(url=jdbc_url, table="Mostodon_GOLD_bis", mode="append", properties=properties)
print("Insertion terminée.")

# 10. Calcul des agrégations (total_posts et avg_engagement_score)
print("Calcul des agrégations pour 'total_posts' et 'avg_engagement_score'...")
aggregated_df = df_with_calculations.groupBy("username").agg(
    count("id").alias("total_posts"),
    avg("engagement_score").alias("avg_engagement_score")
)

# 11. Mise à jour des résultats agrégés dans la table Mostodon_GOLD_bis
print("Mise à jour des agrégations dans la table 'Mostodon_GOLD_bis'...")
for row in aggregated_df.collect():
    username = row.username
    total_posts = row.total_posts
    avg_engagement_score = row.avg_engagement_score

    update_query = '''
    UPDATE Mostodon_GOLD_bis
    SET total_posts = %s, avg_engagement_score = %s
    WHERE username = %s
    '''
    cursor.execute(update_query, (total_posts, avg_engagement_score, username))

conn.commit()

# 12. Compter le nombre total d'entrées dans la table 'Mostodon_GOLD_bis'
print("Calcul du nombre total d'entrées dans la table 'Mostodon_GOLD_bis'...")
cursor.execute("SELECT COUNT(*) FROM Mostodon_GOLD_bis;")
count_gold = cursor.fetchone()[0]
print(f"Nombre total d'entrées dans la table 'Mostodon_GOLD_bis' : {count_gold}")

# 13. Fermer le curseur et la connexion PostgreSQL
cursor.close()
conn.close()
print("Connexion à PostgreSQL fermée.")

# 14. Fermer la session Spark
print("Fermeture de la session Spark...")
spark.stop()
print("Session Spark fermée.")


Création de la session Spark...
Configuration des paramètres de connexion JDBC pour PostgreSQL...
Lecture des données depuis la table 'Mostodon_SILVER_bis' dans PostgreSQL...
Nombre de lignes lues : 27231
Calcul des nouvelles colonnes...
Connexion à PostgreSQL avec psycopg2...
Création de la table 'Mostodon_GOLD_bis' avec des colonnes supplémentaires si elle n'existe pas déjà...
Insertion des données dans la table 'Mostodon_GOLD_bis' via Spark JDBC...
Insertion terminée.
Calcul des agrégations pour 'total_posts' et 'avg_engagement_score'...
Mise à jour des agrégations dans la table 'Mostodon_GOLD_bis'...
Calcul du nombre total d'entrées dans la table 'Mostodon_GOLD_bis'...
Nombre total d'entrées dans la table 'Mostodon_GOLD_bis' : 27231
Connexion à PostgreSQL fermée.
Fermeture de la session Spark...
Session Spark fermée.
