In [3]:
pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.9-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.4 kB)
Downloading psycopg2_binary-2.9.9-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.9
Note: you may need to restart the kernel to use updated packages.


In [None]:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType, TimestampType, IntegerType, ArrayType
)
from pyspark.sql.functions import from_json, col, lower, regexp_replace, window, length, avg
import psycopg2

# Configurer le logging
logging.basicConfig(level=logging.INFO)


# Configurer SparkSession
print("Création de SparkSession...")
spark = SparkSession.builder \
    .appName("MastodonUserStreaming") \
    .config(
    "spark.jars.packages",
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0,"
    "org.postgresql:postgresql:42.2.25"
) \
    .getOrCreate()
print("SparkSession créée.")

# Paramètres Kafka
kafka_brokers = "kafka:9093"
topic_name = "message_kafka"

print("Lecture du flux Kafka...")
# Lecture du flux Kafka
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", topic_name) \
    .option("startingOffsets", "earliest") \
    .load()
print("Flux Kafka prêt.")

# Schéma pour les données JSON
schema = StructType([
    StructField("id", StringType(), True),
    StructField("user", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("followers_count", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("content", StringType(), True),
    StructField("language", StringType(), True),
    StructField("hashtags", ArrayType(StringType()), True),
    StructField("favourites_count", IntegerType(), True),
    StructField("reblogs_count", IntegerType(), True)
])

# Parser les données JSON
print("Parsing des données JSON...")
json_stream = kafka_stream.selectExpr("CAST(value AS STRING) as json_value")
parsed_stream = json_stream.select(from_json(col("json_value"), schema).alias("data")).select("data.*")
print("Parsing terminé.")

# Extraction et transformation des données
print("Extraction et transformation des données...")
# Nettoyage du contenu du toot
processed_stream = parsed_stream.withColumn("content", regexp_replace(col("content"), r"<[^>]+>", ""))

# Filtrer par langue (par exemple, français)
filtered_stream = processed_stream.filter(col("language") == "fr")


# Préparation des données utilisateurs, y compris le texte
users_data = filtered_stream.select(
    col("id"),
    col("user_id"),
    col("user"),
    col("followers_count"),
    col("timestamp"),
    col("content"),
    col("language"),
    col("hashtags"),
    col("favourites_count"),
    col("reblogs_count"),
)

# Filtrer les enregistrements où user_id est NULL
users_data = users_data.filter(col("user").isNotNull())

# Fenêtres temporelles
windowed_stream = users_data.groupBy(
    window(col("timestamp"), "1 hour")  # Fenêtre d'une heure
).count()

# Calcul de la longueur moyenne des pouets par utilisateur
length_avg_by_user = users_data.withColumn("content_length", length(col("content"))) \
    .groupBy("user").agg(avg("content_length").alias("avg_length"))


# Paramètres de connexion à PostgreSQL
db_host = "projetfinal5spar-postgres-1"
db_port = "5432"
db_name = "mastodon_data"
db_user = "user"
db_password = "password"
db_url = f"jdbc:postgresql://{db_host}:{db_port}/{db_name}"
db_properties = {
    "user": db_user,
    "password": db_password,
    "driver": "org.postgresql.Driver"
}


# Fonction pour initialiser la table mastodon dans PostgreSQL
def initialize_mastodon_table():
    print("Initialisation de la table mastodon dans PostgreSQL...")
    try:
        conn = psycopg2.connect(
            dbname=db_name,
            user=db_properties["user"],
            password=db_properties["password"],
            host=db_host,
            port=db_port
        )
        conn.autocommit = True
        cursor = conn.cursor()

        create_table_sql = """
        CREATE TABLE IF NOT EXISTS mastodon (
            "id" VARCHAR(255),
            "user_id" VARCHAR(255),
            "user" VARCHAR(255),
            "followers_count" INT,
            timestamp TIMESTAMP,
            content TEXT,
            language VARCHAR(10),
            hashtags TEXT[],
            favourites_count INT,
            reblogs_count INT,
            PRIMARY KEY ("id")
        );
        """

        cursor.execute(create_table_sql)
        print("Table mastodon initialisée avec succès dans PostgreSQL.")

        # Ajout d'une contrainte d'unicité si nécessaire
        cursor.execute("""
        ALTER TABLE mastodon ADD CONSTRAINT unique_mastodon UNIQUE ("id");
        """)

        cursor.close()
        conn.close()

    except Exception as e:
        logging.error(f"Erreur lors de l'initialisation de la table mastodon dans PostgreSQL: {e}")
        print(f"Erreur lors de l'initialisation de la table mastodon: {e}")


# Initialiser la table mastodon
initialize_mastodon_table()


# Fonction pour réaliser un upsert dans PostgreSQL en utilisant INSERT ... ON CONFLICT DO UPDATE
def upsert_to_postgresql(dataframe, batch_id):
    print(f"Traitement du batch {batch_id} pour upsert dans PostgreSQL...")
    try:
        # Convertir le DataFrame Spark en DataFrame Pandas
        pandas_df = dataframe.toPandas()

        # Connexion à PostgreSQL
        conn = psycopg2.connect(
            dbname=db_name,
            user=db_properties["user"],
            password=db_properties["password"],
            host=db_host,
            port=db_port
        )
        conn.autocommit = True
        cursor = conn.cursor()

        # Requête d'upsert
        for _, row in pandas_df.iterrows():
            cursor.execute("""
                INSERT INTO mastodon (id, user_id, "user", followers_count, timestamp, content, language, hashtags, favourites_count, reblogs_count)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT ("id") DO UPDATE SET
                "user_id" = EXCLUDED."user_id",
                "user" = EXCLUDED."user",
                followers_count = EXCLUDED."followers_count",
                timestamp = EXCLUDED.timestamp,
                content = EXCLUDED.content,
                language = EXCLUDED.language,
                hashtags = EXCLUDED.hashtags,
                favourites_count = EXCLUDED.favourites_count,
                reblogs_count = EXCLUDED.reblogs_count
            """, (
                row['id'],
                row['user_id'],
                row['user'],
                row['followers_count'],
                row['timestamp'],
                row['content'],
                row['language'],
                row['hashtags'],
                row['favourites_count'],
                row['reblogs_count']
            ))

        print(f"Batch {batch_id} traité avec succès dans 'mastodon'.")

        cursor.close()
        conn.close()

    except Exception as e:
        logging.error(f"Erreur lors de l'upsert dans 'mastodon' : {e}")
        print(f"Erreur lors de l'upsert dans 'mastodon' : {e}")


# Écriture en streaming
print("Démarrage de l'écriture en streaming vers PostgreSQL...")
streaming_query = users_data.writeStream \
    .outputMode("append") \
    .foreachBatch(upsert_to_postgresql) \
    .start()

# Afficher les fenêtres temporelles sur la console
print("Affichage des fenêtres temporelles...")
windowed_stream_query = windowed_stream.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Afficher la longueur moyenne des pouets par utilisateur
print("Affichage de la longueur moyenne des pouets par utilisateur...")
length_avg_by_user_query = length_avg_by_user.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Attendre la fin du streaming
streaming_query.awaitTermination()
print("Streaming terminé.")

print("Résultat des statistiques.")
windowed_stream_query.awaitTermination()
length_avg_by_user_query.awaitTermination()

Création de SparkSession...
SparkSession créée.
Lecture du flux Kafka...
Flux Kafka prêt.
Parsing des données JSON...
Parsing terminé.
Extraction et transformation des données...
Initialisation de la table mastodon dans PostgreSQL...
Table mastodon initialisée avec succès dans PostgreSQL.
Démarrage de l'écriture en streaming vers PostgreSQL...
Affichage des fenêtres temporelles...
Affichage de la longueur moyenne des pouets par utilisateur...


INFO:py4j.clientserver:Python Server ready to receive messages
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 0 pour upsert dans PostgreSQL...
Batch 0 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 1 pour upsert dans PostgreSQL...
Traitement du batch 20 pour upsert dans PostgreSQL...
Batch 1 traité avec succès dans 'mastodon'.
Batch 20 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 21 pour upsert dans PostgreSQL...
Traitement du batch 2 pour upsert dans PostgreSQL...
Batch 21 traité avec succès dans 'mastodon'.
Batch 2 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 22 pour upsert dans PostgreSQL...
Traitement du batch 3 pour upsert dans PostgreSQL...
Batch 22 traité avec succès dans 'mastodon'.
Batch 3 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 4 pour upsert dans PostgreSQL...
Traitement du batch 23 pour upsert dans PostgreSQL...
Batch 4 traité avec succès dans 'mastodon'.
Batch 23 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 24 pour upsert dans PostgreSQL...
Traitement du batch 5 pour upsert dans PostgreSQL...
Batch 24 traité avec succès dans 'mastodon'.
Batch 5 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 25 pour upsert dans PostgreSQL...
Traitement du batch 6 pour upsert dans PostgreSQL...
Batch 6 traité avec succès dans 'mastodon'.
Batch 25 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 7 pour upsert dans PostgreSQL...
Traitement du batch 26 pour upsert dans PostgreSQL...
Batch 7 traité avec succès dans 'mastodon'.
Batch 26 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 8 pour upsert dans PostgreSQL...
Traitement du batch 27 pour upsert dans PostgreSQL...
Batch 8 traité avec succès dans 'mastodon'.
Batch 27 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 28 pour upsert dans PostgreSQL...
Traitement du batch 9 pour upsert dans PostgreSQL...
Batch 28 traité avec succès dans 'mastodon'.
Batch 9 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 29 pour upsert dans PostgreSQL...
Traitement du batch 10 pour upsert dans PostgreSQL...
Batch 29 traité avec succès dans 'mastodon'.
Batch 10 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 11 pour upsert dans PostgreSQL...
Traitement du batch 30 pour upsert dans PostgreSQL...
Batch 11 traité avec succès dans 'mastodon'.
Batch 30 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 12 pour upsert dans PostgreSQL...
Traitement du batch 31 pour upsert dans PostgreSQL...
Batch 12 traité avec succès dans 'mastodon'.
Batch 31 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 32 pour upsert dans PostgreSQL...
Traitement du batch 13 pour upsert dans PostgreSQL...
Batch 32 traité avec succès dans 'mastodon'.
Batch 13 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 14 pour upsert dans PostgreSQL...
Traitement du batch 33 pour upsert dans PostgreSQL...
Batch 14 traité avec succès dans 'mastodon'.
Batch 33 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 34 pour upsert dans PostgreSQL...
Traitement du batch 15 pour upsert dans PostgreSQL...
Batch 34 traité avec succès dans 'mastodon'.
Batch 15 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 16 pour upsert dans PostgreSQL...
Traitement du batch 35 pour upsert dans PostgreSQL...
Batch 35 traité avec succès dans 'mastodon'.
Batch 16 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 36 pour upsert dans PostgreSQL...
Traitement du batch 17 pour upsert dans PostgreSQL...
Batch 17 traité avec succès dans 'mastodon'.
Batch 36 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 18 pour upsert dans PostgreSQL...
Traitement du batch 37 pour upsert dans PostgreSQL...
Batch 18 traité avec succès dans 'mastodon'.
Batch 37 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 19 pour upsert dans PostgreSQL...Traitement du batch 38 pour upsert dans PostgreSQL...

Batch 19 traité avec succès dans 'mastodon'.
Batch 38 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 39 pour upsert dans PostgreSQL...
Traitement du batch 20 pour upsert dans PostgreSQL...
Batch 39 traité avec succès dans 'mastodon'.
Batch 20 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 21 pour upsert dans PostgreSQL...
Traitement du batch 40 pour upsert dans PostgreSQL...
Batch 21 traité avec succès dans 'mastodon'.
Batch 40 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 41 pour upsert dans PostgreSQL...
Traitement du batch 22 pour upsert dans PostgreSQL...
Batch 41 traité avec succès dans 'mastodon'.
Batch 22 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 23 pour upsert dans PostgreSQL...
Traitement du batch 42 pour upsert dans PostgreSQL...
Batch 23 traité avec succès dans 'mastodon'.
Batch 42 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 24 pour upsert dans PostgreSQL...
Traitement du batch 43 pour upsert dans PostgreSQL...
Batch 24 traité avec succès dans 'mastodon'.
Batch 43 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 25 pour upsert dans PostgreSQL...
Traitement du batch 44 pour upsert dans PostgreSQL...
Batch 25 traité avec succès dans 'mastodon'.
Batch 44 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 45 pour upsert dans PostgreSQL...
Traitement du batch 26 pour upsert dans PostgreSQL...
Batch 45 traité avec succès dans 'mastodon'.
Batch 26 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 27 pour upsert dans PostgreSQL...
Traitement du batch 46 pour upsert dans PostgreSQL...
Batch 27 traité avec succès dans 'mastodon'.
Batch 46 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 28 pour upsert dans PostgreSQL...
Traitement du batch 47 pour upsert dans PostgreSQL...
Batch 28 traité avec succès dans 'mastodon'.
Batch 47 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


Traitement du batch 48 pour upsert dans PostgreSQL...
Traitement du batch 29 pour upsert dans PostgreSQL...
Batch 29 traité avec succès dans 'mastodon'.
Batch 48 traité avec succès dans 'mastodon'.


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p1


Traitement du batch 49 pour upsert dans PostgreSQL...
Traitement du batch 30 pour upsert dans PostgreSQL...
Batch 49 traité avec succès dans 'mastodon'.
Batch 30 traité avec succès dans 'mastodon'.
