In [None]:
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper, avg, round
spark = SparkSession.builder.appName("MyApp").getOrCreate()
print("Connexion réussie avec Pyspark !")

Connexion réussie avec Pyspark !


In [None]:
# Lire un fichier CSV
df = spark.read.csv("/content/drive/MyDrive/students.csv", header=True, inferSchema=True)

# Afficher le schéma et les premières lignes
df.printSchema()
df.show(5)

root
 |-- nom: string (nullable = true)
 |-- prenom: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- filiere: string (nullable = true)

+--------+------+---+--------+
|     nom|prenom|age| filiere|
+--------+------+---+--------+
|Williams|  Paul| 25|Biologie|
|   Jones| Marie| 19|Biologie|
|   Brown| Marie| 18|Biologie|
|  Garcia| Emily| 27|Biologie|
|   Jones|  John| 29|Histoire|
+--------+------+---+--------+
only showing top 5 rows



In [None]:
# Transformation 1 : Convertir les noms des étudiants en majuscules
df_transformed = df.withColumn("nom", upper(col("nom")))
print("Après conversion des noms en majuscules :")
df_transformed.show(5)

Après conversion des noms en majuscules :
+--------+------+---+--------+
|     nom|prenom|age| filiere|
+--------+------+---+--------+
|WILLIAMS|  Paul| 25|Biologie|
|   JONES| Marie| 19|Biologie|
|   BROWN| Marie| 18|Biologie|
|  GARCIA| Emily| 27|Biologie|
|   JONES|  John| 29|Histoire|
+--------+------+---+--------+
only showing top 5 rows



In [None]:
# Transformation 2 : Filtrer les étudiants ayant plus de 20 ans
df_filtered = df_transformed.filter(col("age") > 20)
print("Après filtrage des étudiants ayant plus de 20 ans :")
df_filtered.show(10)

Après filtrage des étudiants ayant plus de 20 ans :
+--------+-------+---+------------+
|     nom| prenom|age|     filiere|
+--------+-------+---+------------+
|WILLIAMS|   Paul| 25|    Biologie|
|  GARCIA|  Emily| 27|    Biologie|
|   JONES|   John| 29|    Histoire|
|   SMITH| Sophia| 23|    Biologie|
|  GARCIA|Michael| 30|    Économie|
|WILLIAMS|  Marie| 23|      Chimie|
|   BROWN| Sophia| 26|Informatique|
| JOHNSON|  David| 27|    Physique|
|  MILLER| Sophia| 28|    Histoire|
|   JONES|   Anna| 21| Littérature|
+--------+-------+---+------------+
only showing top 10 rows



In [None]:
# Transformation 3 : Calculer la moyenne d'âge par filière et l'arrondir
df_stats = df_filtered.groupBy("filiere").agg(round(avg("age"), 1).alias("moyenne_age"))
print("Moyenne d'âge par filière (arrondie) :")
df_stats.show()

Moyenne d'âge par filière (arrondie) :
+-------------+-----------+
|      filiere|moyenne_age|
+-------------+-----------+
| Informatique|       25.9|
|       Chimie|       25.5|
|Mathématiques|       25.9|
|     Économie|       25.5|
|     Physique|       25.9|
|     Histoire|       25.6|
|  Philosophie|       26.1|
|  Littérature|       25.7|
|     Biologie|       26.0|
+-------------+-----------+



In [None]:
df_transformed.show()

+--------+-------+---+-------------+
|     nom| prenom|age|      filiere|
+--------+-------+---+-------------+
|WILLIAMS|   Paul| 25|     Biologie|
|   JONES|  Marie| 19|     Biologie|
|   BROWN|  Marie| 18|     Biologie|
|  GARCIA|  Emily| 27|     Biologie|
|   JONES|   John| 29|     Histoire|
| JOHNSON|   Paul| 18| Informatique|
|   SMITH| Sophia| 18| Informatique|
|     DOE|Michael| 18|  Philosophie|
|   SMITH| Sophia| 23|     Biologie|
|  GARCIA|Michael| 30|     Économie|
|WILLIAMS|  Marie| 23|       Chimie|
|   BROWN| Sophia| 26| Informatique|
| JOHNSON|  David| 27|     Physique|
|  MILLER| Sophia| 28|     Histoire|
|   JONES|   Anna| 21|  Littérature|
|   SMITH|   John| 24|     Biologie|
|  MILLER|   Jane| 24|     Biologie|
|   SMITH|   Jane| 19|  Littérature|
|   NGOMA|  David| 26|     Biologie|
|   BROWN|Michael| 27|Mathématiques|
+--------+-------+---+-------------+
only showing top 20 rows



In [None]:
# Sauvegarder les données transformées dans un fichier CSV localement dans Colab
output_path_local = "/content/students_transformed.csv"  # Emplacement temporaire dans l'environnement Colab

# Sauvegarder le DataFrame transformé en CSV
df_transformed.coalesce(1).write.csv(output_path_local, header=True, mode="overwrite")

print(f"Données transformées sauvegardées avec succès")

Données transformées sauvegardées avec succès


In [None]:
# Installation des dépendances nécessaires
!pip install pyspark confluent_kafka

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper
from confluent_kafka import Producer, Consumer, KafkaError

# Définir les chemins des certificats SSL
ssl_cafile = "/content/drive/MyDrive/ca.pem"
ssl_certfile = "/content/drive/MyDrive/service.cert"
ssl_keyfile = "/content/drive/MyDrive/service.key"

# Vérification des fichiers de certificat
print("CA File exists:", os.path.exists(ssl_cafile))
print("Cert File exists:", os.path.exists(ssl_certfile))
print("Key File exists:", os.path.exists(ssl_keyfile))

# Étape 1 : Lecture et transformation des données avec PySpark
spark = SparkSession.builder.appName("KafkaSparkPipeline").getOrCreate()
print("Connexion réussie avec Aiven kafka !")

Collecting confluent_kafka
  Downloading confluent_kafka-2.6.1-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (51 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.8/51.8 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
Downloading confluent_kafka-2.6.1-cp310-cp310-manylinux_2_28_x86_64.whl (3.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m34.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: confluent_kafka
Successfully installed confluent_kafka-2.6.1
CA File exists: True
Cert File exists: True
Key File exists: True
Connexion réussie avec Aiven kafka !


In [None]:
from pyspark.sql import SparkSession
from confluent_kafka import Consumer, KafkaError

# Initialisation de Spark
spark = SparkSession.builder.master("local[*]").appName("KafkaPipeline").getOrCreate()

# Charger les données CSV
input_csv_path = "/content/drive/MyDrive/students.csv"
df = spark.read.csv(input_csv_path, header=True, inferSchema=True)

# Prendre les 10 premières lignes
df = df.limit(10)

# Afficher les 10 premiers étudiants sélectionnés
print("10 premiers étudiants sélectionnés :")
df.show()

# Configuration du consommateur Kafka
consumer_conf = {
    'bootstrap.servers': 'kafka-1a2fc4df-mvuezoloraphael0-6122.g.aivencloud.com:11343',
    'security.protocol': 'SSL',
    'ssl.ca.location': ssl_cafile,
    'ssl.certificate.location': ssl_certfile,
    'ssl.key.location': ssl_keyfile,
    'group.id': 'student-consumer',
    'auto.offset.reset': 'earliest'
}

# Créer le consommateur Kafka
consumer = Consumer(consumer_conf)
consumer.subscribe(['Goro'])

# Liste pour stocker les messages consommés
consumed_data = []
message_count = 0

# Afficher les messages envoyés à Goro dans le format voulu
print("\nMessages envoyés à Goro sont :")
try:
    while message_count < 5:  # Consommer exactement 5 messages
        msg = consumer.poll(1.0)  # Attente d'une seconde
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print('Fin des messages\n')
                break
            else:
                print(f'Erreur: {msg.error()}')
                break

        # Ajouter le message consommé à la liste
        message = msg.value().decode('utf-8')
        print(f"Message brut reçu envoyé à Goro : {message}")

        # Décomposer le message reçu
        message_data = message.strip("[]").split(",")  # Supprimer les crochets et séparer par les virgules

        if len(message_data) != 4:
            print(f"Format inattendu pour le message : {message_data}. Ignoré.")
            continue  # Ignorer les messages mal formés

        nom, prenom, age, filiere = message_data

        try:
            # Convertir l'âge en entier
            age = int(age.strip())  # Nettoyer les espaces et convertir en entier
        except ValueError:
            print(f"Erreur de conversion de l'âge pour {nom} {prenom}. Ignoré.")
            continue  # Passer au message suivant si l'âge n'est pas un entier

        # Appliquer le filtrage (âge > 20) et la transformation
        if age > 20:
            transformed_message = [nom.upper(), prenom, age, filiere]
            consumed_data.append(transformed_message)
            message_count += 1

except Exception as e:
    print(f"Erreur pendant la consommation : {e}")
finally:
    consumer.close()

# Afficher les messages consommés et transformés
print("\nMessages consommés et transformés sont :")
for transformed_message in consumed_data:
    print(f"Message consommé et transformé : {transformed_message}")

# Espacer les sections
print("\n" + "-"*50 + "\n")

# Afficher le nombre de messages transformés
print(f"{message_count} messages consommés et transformés.\n")

# Étape 4 : Sauvegarder les données consommées dans un fichier CSV
if message_count > 0:
    output_csv_path = "/content/students_transformed_by_kafka.csv"
    columns = ["nom", "prenom", "age", "filiere"]  # Correspond à vos colonnes réelles

    # Fusionner toutes les partitions en une seule et écrire un fichier unique
    consumed_df = spark.createDataFrame(consumed_data, columns)
    consumed_df.coalesce(1).write.csv(output_csv_path, header=True, mode="overwrite")

    print(f"Données consommées sauvegardées dans : {output_csv_path}")
else:
    print("Aucun message ne correspond aux critères (âge > 20). Aucun fichier sauvegardé.")


10 premiers étudiants sélectionnés :
+--------+-------+---+------------+
|     nom| prenom|age|     filiere|
+--------+-------+---+------------+
|Williams|   Paul| 25|    Biologie|
|   Jones|  Marie| 19|    Biologie|
|   Brown|  Marie| 18|    Biologie|
|  Garcia|  Emily| 27|    Biologie|
|   Jones|   John| 29|    Histoire|
| Johnson|   Paul| 18|Informatique|
|   Smith| Sophia| 18|Informatique|
|     Doe|Michael| 18| Philosophie|
|   Smith| Sophia| 23|    Biologie|
|  Garcia|Michael| 30|    Économie|
+--------+-------+---+------------+


Messages envoyés à Goro sont :
Message brut reçu envoyé à Goro : [Williams, Paul, 25, Biologie]
Message brut reçu envoyé à Goro : [Jones, Marie, 19, Biologie]
Message brut reçu envoyé à Goro : [Brown, Marie, 18, Biologie]
Message brut reçu envoyé à Goro : [Garcia, Emily, 27, Biologie]
Message brut reçu envoyé à Goro : [Jones, John, 29, Histoire]
Message brut reçu envoyé à Goro : [Johnson, Paul, 18, Informatique]
Message brut reçu envoyé à Goro : [Smith,

# Merci pour votre attention Mr #