# **Configuration de PySpark**

Installation de Pyspark

In [None]:
!pip install pyspark

# Installation de Findspark

In [None]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


# Configurationde Spark

In [None]:
import findspark
findspark.init()

# Simulation d'un flux de données

In [None]:
import time
import csv

# Simuler un flux en écrivant périodiquement des lignes dans un fichier CSV
def simulate_stream(file_name):
    data = [
        {"name": "Alice", "age": 22, "major": "Engineering"},
        {"name": "Bob", "age": 20, "major": "Science"},
        {"name": "Charlie", "age": 25, "major": "Mathematics"}
    ]
    with open(file_name, 'w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=["name", "age", "major"])
        writer.writeheader()
        for row in data:
            writer.writerow(row)
            time.sleep(2)  # Ajout d'une nouvelle ligne toutes les 2 secondes


# **Création d'une dataframe**

In [None]:
import pandas as pd

# Données initiales
data = {
    "name": ["Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Hank", " Irene", "Jack", "Karen", "Leo", "Mia", "Noah", "Tom", "Lucie", "Clara", "Bill", "Carl", "Patricia"],
    "age": [22, 20, 25, 19, 24, 23, 21, 26, 22, 20, 23, 24, 21, 25, 22, 20, 23, 25, 18, 20],
    "major": ["Engineering", "Science", "Mathematics", "Arts", "Engineering", "Science", " Arts", "Engineering", "Mathematics", "Science", "Arts", "Engineerings", "Science", "Mathematics", "Arts", "Engineerings", "Science", "Mathematics", "Arts", "Mathematiics"]
}

# Création d'un DataFrame
df = pd.DataFrame(data)

# Sauvegarde au format CSV
df.to_csv("students.csv", index=False)
print("Fichier students.csv créé !")


Fichier students.csv créé !


# Lecture du fichier CSV pour le traitement batch

In [None]:
from pyspark.sql import SparkSession

# Créer une session Spark
spark = SparkSession.builder.appName("ETL_Pipeline").getOrCreate()

# Lire le fichier CSV
df = spark.read.csv("students.csv", header=True, inferSchema=True)

# Afficher les données
df.show()


+--------+---+------------+
|    name|age|       major|
+--------+---+------------+
|   Alice| 22| Engineering|
|     Bob| 20|     Science|
| Charlie| 25| Mathematics|
|   David| 19|        Arts|
|     Eve| 24| Engineering|
|   Frank| 23|     Science|
|   Grace| 21|        Arts|
|    Hank| 26| Engineering|
|   Irene| 22| Mathematics|
|    Jack| 20|     Science|
|   Karen| 23|        Arts|
|     Leo| 24|Engineerings|
|     Mia| 21|     Science|
|    Noah| 25| Mathematics|
|     Tom| 22|        Arts|
|   Lucie| 20|Engineerings|
|   Clara| 23|     Science|
|    Bill| 25| Mathematics|
|    Carl| 18|        Arts|
|Patricia| 20|Mathematiics|
+--------+---+------------+



# Applications des transformations

Convertissons les noms en majuscules

In [None]:
from pyspark.sql.functions import upper

df_transformed = df.withColumn("name", upper(df["name"]))
df_transformed.show()


+--------+---+------------+
|    name|age|       major|
+--------+---+------------+
|   ALICE| 22| Engineering|
|     BOB| 20|     Science|
| CHARLIE| 25| Mathematics|
|   DAVID| 19|        Arts|
|     EVE| 24| Engineering|
|   FRANK| 23|     Science|
|   GRACE| 21|        Arts|
|    HANK| 26| Engineering|
|   IRENE| 22| Mathematics|
|    JACK| 20|     Science|
|   KAREN| 23|        Arts|
|     LEO| 24|Engineerings|
|     MIA| 21|     Science|
|    NOAH| 25| Mathematics|
|     TOM| 22|        Arts|
|   LUCIE| 20|Engineerings|
|   CLARA| 23|     Science|
|    BILL| 25| Mathematics|
|    CARL| 18|        Arts|
|PATRICIA| 20|Mathematiics|
+--------+---+------------+



Filtrons les étudiants de plus de 20 ans

In [None]:
df_filtered = df_transformed.filter(df["age"] > 20)
df_filtered.show()

+-------+---+------------+
|   name|age|       major|
+-------+---+------------+
|  ALICE| 22| Engineering|
|CHARLIE| 25| Mathematics|
|    EVE| 24| Engineering|
|  FRANK| 23|     Science|
|  GRACE| 21|        Arts|
|   HANK| 26| Engineering|
|  IRENE| 22| Mathematics|
|  KAREN| 23|        Arts|
|    LEO| 24|Engineerings|
|    MIA| 21|     Science|
|   NOAH| 25| Mathematics|
|    TOM| 22|        Arts|
|  CLARA| 23|     Science|
|   BILL| 25| Mathematics|
+-------+---+------------+



Calculons la moyenne d'age

In [None]:
df_filtered.groupBy("major").avg("age").show()

+------------+------------------+
|       major|          avg(age)|
+------------+------------------+
|     Science|22.333333333333332|
| Engineering|              24.0|
|        Arts|              21.0|
|Engineerings|              24.0|
|        Arts|              22.5|
| Mathematics|             24.25|
+------------+------------------+



# **Simulation d'un flux en temps réel**

In [None]:
# prompt: mkdir /content/input_files
# mv /content/disney_movies.csv /content/input_files/

!mkdir /content/input_files
!mv /content/disney_movies.csv /content/input_files/

In [None]:
# prompt: df_stream = spark.readStream.schema(schema).csv("/content/input_files/", header=True)

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define the schema for your CSV data
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("major", StringType(), True)
])

# Create a SparkSession
spark = SparkSession.builder.appName("StreamingCSV").getOrCreate()

# Read the stream
df_stream = spark.readStream.schema(schema).csv("/content/input_files/", header=True)

# Print the schema
df_stream.printSchema()

# Start the query and print the results to the console
query = df_stream.writeStream.outputMode("append").format("console").start()
query.awaitTermination(10) # Wait for 10 seconds

# Stop the query
query.stop()

# Stop the SparkSession
spark.stop()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- major: string (nullable = true)



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import upper

# Créer une session Spark
spark = SparkSession.builder.appName("StreamingExample").getOrCreate()

# Schéma (à ajuster selon votre CSV)
schema = "name STRING, age INT, grade STRING"

# Lire le flux depuis un répertoire
df_stream = spark.readStream.schema(schema).csv("/content/input_files/", header=True)

# Transformation : convertir les noms en majuscules
df_stream_transformed = df_stream.withColumn("name", upper(df_stream["name"]))

# Écriture des résultats sur la console
query = df_stream_transformed.writeStream.format("console").outputMode("append").start()

# Attente de la fin du streaming
query.awaitTermination()


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
!pip install kafka-python

from kafka import KafkaProducer
import json
import time

# Kafka configuration
bootstrap_servers = 'kafka-494d166-paulallanmeyesika-3d16.h.aivencloud.com:16362'  # Replace with your Kafka broker address
topic_name = 'examen'  # Replace with your desired topic name



In [None]:
# prompt: configuration du producer

# Kafka configuration
bootstrap_servers = 'kafka-494d166-paulallanmeyesika-3d16.h.aivencloud.com:16362'  # Replace with your Kafka broker address
topic_name = 'examen'  # Replace with your desired topic name
security_protocol = 'SSL'
ssl_cafile = '/content/ca.pem' # Remplacez par le chemin vers votre fichier ca.pem
ssl_certfile = '/content/service.cert' # Remplacez par le chemin vers votre fichier service.cert
ssl_keyfile = '/content/service.key' # Remplacez par le chemin vers votre fichier service.key


# Create a Kafka producer
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    security_protocol=security_protocol,
    ssl_cafile=ssl_cafile,
    ssl_certfile=ssl_certfile,
    ssl_keyfile=ssl_keyfile,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)


# Example usage (replace with your actual data)

data = {
    "name": ["Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Hank", " Irene", "Jack", "Karen", "Leo", "Mia", "Noah", "Tom", "Lucie", "Clara", "Bill", "Carl", "Patricia"],
    "age": [22, 20, 25, 19, 24, 23, 21, 26, 22, 20, 23, 24, 21, 25, 22, 20, 23, 25, 18, 20],
    "major": ["Engineering", "Science", "Mathematics", "Arts", "Engineering", "Science", " Arts", "Engineering", "Mathematics", "Science", "Arts", "Engineerings", "Science", "Mathematics", "Arts", "Engineerings", "Science", "Mathematics", "Arts", "Mathematiics"]
}


# Send the data to Kafka
producer.send(topic_name, value=data)
producer.flush() # Important : assurez-vous que les messages sont envoyés

print(f"Message sent to Kafka topic '{topic_name}'")

In [None]:
import json
import csv
from kafka import KafkaProducer

# Configuration Kafka
bootstrap_servers = 'kafka-494d166-paulallanmeyesika-3d16.h.aivencloud.com:16362'  # Replace with your Kafka broker address
topic_name = 'examen'  # Replace with your desired topic name
security_protocol = 'SSL'
ssl_cafile = '/content/ca.pem'  # Remplacez par le chemin vers votre fichier ca.pem
ssl_certfile = '/content/service.cert'  # Remplacez par le chemin vers votre fichier service.cert
ssl_keyfile = '/content/service.key'  # Remplacez par le chemin vers votre fichier service.key

# Créer un Kafka producer
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    security_protocol=security_protocol,
    ssl_cafile=ssl_cafile,
    ssl_certfile=ssl_certfile,
    ssl_keyfile=ssl_keyfile,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Chemin vers le fichier CSV
csv_file_path = '/content/students.csv'  # Remplacez par le chemin réel de votre fichier CSV

# Lire le fichier CSV et envoyer les données
try:
    with open(csv_file_path, 'r') as csvfile:
        reader = csv.DictReader(csvfile)  # Utilisation de DictReader pour lire le CSV en tant que dictionnaire
        for row in reader:
            # Envoyer chaque ligne sous forme JSON à Kafka
            producer.send(topic_name, value=row)
            print(f"Message sent: {row}")

    producer.flush()  # Assurez-vous que tous les messages sont envoyés
    print(f"All messages from '{csv_file_path}' have been sent to Kafka topic '{topic_name}'.")
except Exception as e:
    print(f"Message sent to Kafka topic '{topic_name}'")
finally:
    producer.close()  # Toujours fermer le producteur


Message sent to Kafka topic 'examen'


In [None]:
pip install pyspark kafka-python



# Configuration Spark Streaming

In [None]:
spark = SparkSession.builder \
    .appName("Kafka_Consumer") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") \
    .getOrCreate()


In [None]:
!pip install pyspark
!wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.4.1/spark-sql-kafka-0-10_2.12-3.4.1.jar -P $SPARK_HOME/jars


--2024-11-27 18:07:40--  https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.4.1/spark-sql-kafka-0-10_2.12-3.4.1.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 427253 (417K) [application/java-archive]
Saving to: ‘/jars/spark-sql-kafka-0-10_2.12-3.4.1.jar’


2024-11-27 18:07:40 (8.62 MB/s) - ‘/jars/spark-sql-kafka-0-10_2.12-3.4.1.jar’ saved [427253/427253]



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import upper

# Initialiser une session Spark avec le connecteur Kafka
spark = SparkSession.builder \
    .appName("Kafka_Consumer") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

# Configuration du journal Spark (optionnel)
spark.sparkContext.setLogLevel("DEBUG")  # DEBUG ou INFO pour plus de détails

try:
    # Lire les données du topic Kafka en streaming
    df_stream = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka-494d166-paulallanmeyesika-3d16.h.aivencloud.com:16362") \
        .option("subscribe", "examen") \
        .load()

    # Convertir les données Kafka en texte lisible
    df_value = df_stream.selectExpr("CAST(value AS STRING) as value")

    # Appliquer des transformations (Exemple : majuscules pour les noms)
    df_transformed = df_value.withColumn("value", upper(df_value["value"]))

    # Afficher les données en temps réel dans la console
    query = df_transformed.writeStream.format("console").outputMode("append").start()

    # Attendre la fin du traitement
    query.awaitTermination()

except Exception as e:
    print(f"Une erreur est survenue : {e}")

finally:
    # Arrêter la session Spark proprement
    spark.stop()


Une erreur est survenue : Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.


In [None]:
# Configuration Kafka (unchanged)
bootstrap_servers = 'kafka-494d166-paulallanmeyesika-3d16.h.aivencloud.com:16362'
topic_name = 'examen'
security_protocol = 'SSL'
ssl_cafile = '/content/ca.pem'
ssl_certfile = '/content/service.cert'
ssl_keyfile = '/content/service.key'

# List Kafka files
!ls -l /content/*.pem /content/*.cert /content/*.key

-rw-r--r-- 1 root root 1537 Nov 27 17:33 /content/ca.pem
-rw-r--r-- 1 root root 1578 Nov 27 17:33 /content/service.cert
-rw-r--r-- 1 root root 2484 Nov 27 17:33 /content/service.key


In [None]:
# Code to create a Kafka consumer (replace existing incorrect code)
from kafka import KafkaConsumer
import json

bootstrap_servers = 'kafka-494d166-paulallanmeyesika-3d16.h.aivencloud.com:16362'  # Replace with your Kafka broker address
topic_name = 'examen'  # Replace with your desired topic name
security_protocol = 'SSL'
ssl_cafile = '/content/ca.pem' # Replace with the path to your ca.pem file
ssl_certfile = '/content/service.cert' # Replace with the path to your service.cert file
ssl_keyfile = '/content/service.key' # Replace with the path to your service.key file

consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=bootstrap_servers,
    security_protocol=security_protocol,
    ssl_cafile=ssl_cafile,
    ssl_certfile=ssl_certfile,
    ssl_keyfile=ssl_keyfile,
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    auto_offset_reset='earliest', # Start reading from the beginning of the topic
    enable_auto_commit=True, # Enable auto commit of offsets
)

for message in consumer:
    print(f"Received message: {message.value}")



Received message: {'name': 'Alice', 'age': 30, 'major': 'Computer Science'}
Received message: {'name': 'Alice', 'age': 22, 'major': ' Engineering'}
Received message: {'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace', 'Hank', ' Irene', 'Jack', 'Karen', 'Leo', 'Mia', 'Noah', 'Tom', 'Lucie', 'Clara', 'Bill', 'Carl', 'Patricia'], 'age': [22, 20, 25, 19, 24, 23, 21, 26, 22, 20, 23, 24, 21, 25, 22, 20, 23, 25, 18, 20], 'major': ['Engineering', 'Science', 'Mathematics', 'Arts', 'Engineering', 'Science', ' Arts', 'Engineering', 'Mathematics', 'Science', 'Arts', 'Engineerings', 'Science', 'Mathematics', 'Arts', 'Engineerings', 'Science', 'Mathematics', 'Arts', 'Mathematiics']}
Received message: {'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace', 'Hank', ' Irene', 'Jack', 'Karen', 'Leo', 'Mia', 'Noah', 'Tom', 'Lucie', 'Clara', 'Bill', 'Carl', 'Patricia'], 'age': [22, 20, 25, 19, 24, 23, 21, 26, 22, 20, 23, 24, 21, 25, 22, 20, 23, 25, 18, 20], 'major': ['Eng

KeyboardInterrupt: 