In [None]:
!pip install pyspark psycopg2-binary


In [None]:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType, TimestampType, IntegerType, ArrayType, FloatType
)
from pyspark.sql.functions import (
    from_json, col, lower, regexp_replace, window, avg, count, length, explode, concat_ws
)
import psycopg2
from psycopg2 import sql

# Configure le logging
logging.basicConfig(level=logging.INFO)

# Paramètres de connexion à PostgreSQL
db_host = "some-postgres"
db_port = "5432"
db_name = "mastodon_data"
db_user = "postgres"
db_password = "mysecretpassword"
db_url = f"jdbc:postgresql://{db_host}:{db_port}/{db_name}"
db_properties = {
    "user": db_user,
    "password": db_password,
    "driver": "org.postgresql.Driver"
}


# Fonction pour créer la base de données si elle n'existe pas
def create_database():
    print("Vérification et création de la base de données si nécessaire...")
    try:
        # Se connecte à la base de données 
        conn = psycopg2.connect(
            dbname='postgres',
            user=db_properties["user"],
            password=db_properties["password"],
            host=db_host,
            port=db_port
        )
        conn.autocommit = True
        cursor = conn.cursor()

        # Vérifie si la base de données existe
        cursor.execute(f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{db_name}'")
        exists = cursor.fetchone()
        if not exists:
            # Crée la base de données
            cursor.execute(f'CREATE DATABASE {db_name}')
            print(f"Base de données '{db_name}' créée avec succès.")
        else:
            print(f"La base de données '{db_name}' existe déjà.")

        cursor.close()
        conn.close()
    except Exception as e:
        logging.error(f"Erreur lors de la vérification/création de la base de données: {e}")
        print(f"Erreur lors de la vérification/création de la base de données: {e}")


# Fonction pour créer les tables dans PostgreSQL
def create_tables():
    print("Création des tables dans PostgreSQL...")
    try:
        conn = psycopg2.connect(
            dbname=db_name,
            user=db_properties["user"],
            password=db_properties["password"],
            host=db_host,
            port=db_port
        )
        conn.autocommit = True
        cursor = conn.cursor()

        create_filtered_toots_table = """
        CREATE TABLE IF NOT EXISTS filtered_toots (
            toot_id VARCHAR(255) PRIMARY KEY,
            timestamp TIMESTAMP,
            text TEXT,
            user_id VARCHAR(255),
            language VARCHAR(10),
            hashtags TEXT,
            reblogs_count INTEGER,
            favourites_count INTEGER,
            replies_count INTEGER
        );
        """

        create_time_window_table = """
        CREATE TABLE IF NOT EXISTS toots_time_window_aggregates (
            window_start TIMESTAMP,
            window_end TIMESTAMP,
            total_toots INTEGER,
            PRIMARY KEY (window_start, window_end)
        );
        """

        create_user_avg_length_table = """
        CREATE TABLE IF NOT EXISTS user_avg_length (
            user_id VARCHAR(255) PRIMARY KEY,
            average_length FLOAT
        );
        """

        create_hashtag_avg_length_table = """
        CREATE TABLE IF NOT EXISTS hashtag_avg_length (
            hashtag VARCHAR(255) PRIMARY KEY,
            average_length FLOAT
        );
        """

        cursor.execute(create_filtered_toots_table)
        cursor.execute(create_time_window_table)
        cursor.execute(create_user_avg_length_table)
        cursor.execute(create_hashtag_avg_length_table)

        print("Tables créées avec succès dans PostgreSQL.")

        cursor.close()
        conn.close()

    except Exception as e:
        logging.error(f"Erreur lors de la création des tables dans PostgreSQL: {e}")
        print(f"Erreur lors de la création des tables: {e}")

# Crée la base de données si elle n'existe pas
create_database()
# Crée les tables
create_tables()

# Configure SparkSession avec le package Kafka et le driver JDBC PostgreSQL
print("Initialisation de SparkSession...")
spark = SparkSession.builder \
    .appName("MastodonStreaming") \
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0,"
        "org.postgresql:postgresql:42.2.25"
    ) \
    .getOrCreate()
print("SparkSession initialisée.")

# Défini les paramètres de Kafka
kafka_brokers = "kafka:9092"
topic_name = "mastodon_stream"

print("Lecture des flux de Kafka...")
# Li les flux de Kafka
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", topic_name) \
    .option("startingOffsets", "latest") \
    .load()
print("Flux de Kafka configuré.")

# Schéma pour les données JSON
schema = StructType([
    StructField("id", StringType()),
    StructField("created_at", TimestampType()),
    StructField("content", StringType()),
    StructField("user_id", StringType()),
    StructField("language", StringType()),  
    StructField("tags", ArrayType(StringType())),
    StructField("reblogs_count", IntegerType()),
    StructField("favourites_count", IntegerType()),
    StructField("replies_count", IntegerType())
])

print("Parsing des données JSON...")
# Parser les données JSON
json_stream = raw_stream.selectExpr("CAST(value AS STRING) as json_value")
parsed_stream = json_stream.withColumn("data", from_json(col("json_value"), schema)).select("data.*")
print("Parsing des données JSON terminé.")

# Nettoi les données
print("Nettoyage des données...")
cleaned_stream = parsed_stream \
    .filter(col("content").isNotNull()) \
    .withColumn("text", regexp_replace(col("content"), r"<[^>]+>", "")) \
    .withColumn("text", lower(col("text"))) \
    .withColumn("text", regexp_replace(col("text"), r"http\S+", "")) \
    .withColumn("text", regexp_replace(col("text"), r"\s+", " ")) \
    .withColumn("text", regexp_replace(col("text"), r"[^\w\s#]", "")) \
    .withColumn("text_length", length(col("text")))
print("Nettoyage des données terminé.")

# Extrai user_id depuis account.id
cleaned_stream = cleaned_stream.withColumn("user_id", col("user_id"))

# Extrai les hashtags sous forme de tableau de chaînes
cleaned_stream = cleaned_stream.withColumn("hashtags", concat_ws(",", col("tags")))

# filtre par langue
cleaned_stream = cleaned_stream.filter(col("language") == "fr")


# Prépare les données pour la table des toots filtrés
filtered_toots = cleaned_stream.select(
    col('id').alias('toot_id'),
    col('created_at').alias('timestamp'),
    col('text'),
    col('user_id'),
    col('language'),
    col('hashtags'),
    col('reblogs_count'),
    col('favourites_count'),
    col('replies_count')
)

# Converti les hashtags en chaîne de caractères pour stockage
filtered_toots = filtered_toots.withColumn("hashtags", concat_ws(",", col("hashtags")))

# Agrégation des données en fenêtres temporelles
print("Agrégation des données en fenêtres temporelles...")
windowed_stream = cleaned_stream \
    .withWatermark("created_at", "1 hour") \
    .groupBy(window(col("created_at"), "1 hour")) \
    .agg(
        count("*").alias("total_toots")
    )
print("Agrégation des données terminée.")

# Prépare les données pour la table des agrégats de fenêtres temporelles
windowed_aggregates = windowed_stream.select(
    col('window.start').alias('window_start'),
    col('window.end').alias('window_end'),
    col('total_toots')
)

# Calcul de la longueur moyenne des toots par utilisateur
user_avg_length = cleaned_stream \
    .filter(col("user_id").isNotNull()) \
    .groupBy('user_id') \
    .agg(
        avg('text_length').alias('average_length')
    )


# Calcul de la longueur moyenne des toots par hashtag
hashtag_avg_length = cleaned_stream \
    .withColumn('hashtag', explode(col('tags'))) \
    .groupBy('hashtag') \
    .agg(
        avg('text_length').alias('average_length')
    )


# Fonction pour écrire dans PostgreSQL
def write_to_postgres(df, epoch_id, table_name, save_mode="append"):
    print(f"Tentative d'écriture du batch {epoch_id} dans la table '{table_name}'...")
    try:
        df.write \
            .format("jdbc") \
            .option("url", db_url) \
            .option("dbtable", table_name) \
            .option("user", db_user) \
            .option("password", db_password) \
            .option("driver", db_properties["driver"]) \
            .mode(save_mode) \
            .save()
        logging.info(f"Batch {epoch_id} écrit avec succès dans '{table_name}'.")
        print(f"Batch {epoch_id} écrit avec succès dans '{table_name}'.")
    except Exception as e:
        logging.error(f"Erreur lors de l'écriture dans '{table_name}' : {e}")
        print(f"Erreur lors de l'écriture dans '{table_name}' : {e}")

# Démarre les écritures en streaming
print("Début de l'écriture des toots filtrés...")
query_filtered_toots = filtered_toots.writeStream \
    .outputMode("append") \
    .foreachBatch(lambda df, epoch_id: write_to_postgres(df, epoch_id, "filtered_toots")) \
    .start()
print("Écriture des toots filtrés démarrée.")

print("Début de l'écriture des agrégats de fenêtres temporelles...")
query_time_window = windowed_aggregates.writeStream \
    .outputMode("append") \
    .foreachBatch(lambda df, epoch_id: write_to_postgres(df, epoch_id, "toots_time_window_aggregates")) \
    .start()
print("Écriture des agrégats de fenêtres temporelles démarrée.")

print("Début de l'écriture de la longueur moyenne des toots par utilisateur...")
query_user_avg_length = user_avg_length.writeStream \
    .outputMode("complete") \
    .foreachBatch(lambda df, epoch_id: write_to_postgres(df, epoch_id, "user_avg_length", save_mode="overwrite")) \
    .start()
print("Écriture de la longueur moyenne des toots par utilisateur démarrée.")

print("Début de l'écriture de la longueur moyenne des toots par hashtag...")
query_hashtag_avg_length = hashtag_avg_length.writeStream \
    .outputMode("complete") \
    .foreachBatch(lambda df, epoch_id: write_to_postgres(df, epoch_id, "hashtag_avg_length", save_mode="overwrite")) \
    .start()
print("Écriture de la longueur moyenne des toots par hashtag démarrée.")

# Attend la fin des requêtes
query_filtered_toots.awaitTermination()
query_time_window.awaitTermination()
query_user_avg_length.awaitTermination()
query_hashtag_avg_length.awaitTermination()

print("Streaming terminé.")


Vérification et création de la base de données si nécessaire...
La base de données 'mastodon_data' existe déjà.
Création des tables dans PostgreSQL...
Tables créées avec succès dans PostgreSQL.
Initialisation de SparkSession...
SparkSession initialisée.
Lecture des flux de Kafka...
Flux de Kafka configuré.
Parsing des données JSON...
Parsing des données JSON terminé.
Nettoyage des données...
Nettoyage des données terminé.
Agrégation des données en fenêtres temporelles...
Agrégation des données terminée.
Début de l'écriture des toots filtrés...
Écriture des toots filtrés démarrée.
Début de l'écriture des agrégats de fenêtres temporelles...
Écriture des agrégats de fenêtres temporelles démarrée.
Début de l'écriture de la longueur moyenne des toots par utilisateur...


INFO:py4j.clientserver:Python Server ready to receive messages
INFO:py4j.clientserver:Received command c on object id p4


Écriture de la longueur moyenne des toots par utilisateur démarrée.
Début de l'écriture de la longueur moyenne des toots par hashtag...
Écriture de la longueur moyenne des toots par hashtag démarrée.
Tentative d'écriture du batch 0 dans la table 'filtered_toots'...


INFO:root:Batch 0 écrit avec succès dans 'filtered_toots'.
INFO:py4j.clientserver:Python Server ready to receive messages
INFO:py4j.clientserver:Received command c on object id p5
INFO:py4j.clientserver:Python Server ready to receive messages
INFO:py4j.clientserver:Received command c on object id p6


Batch 0 écrit avec succès dans 'filtered_toots'.
Tentative d'écriture du batch 0 dans la table 'toots_time_window_aggregates'...
Tentative d'écriture du batch 0 dans la table 'user_avg_length'...


INFO:py4j.clientserver:Python Server ready to receive messages
INFO:py4j.clientserver:Received command c on object id p7


Tentative d'écriture du batch 0 dans la table 'hashtag_avg_length'...


INFO:root:Batch 0 écrit avec succès dans 'toots_time_window_aggregates'.


Batch 0 écrit avec succès dans 'toots_time_window_aggregates'.


INFO:root:Batch 0 écrit avec succès dans 'user_avg_length'.


Batch 0 écrit avec succès dans 'user_avg_length'.


INFO:root:Batch 0 écrit avec succès dans 'hashtag_avg_length'.


Batch 0 écrit avec succès dans 'hashtag_avg_length'.
