### III - Installer pyspark sur colab

#### 1 - Installer Apache Spark et PySpark

In [6]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j
!pip install -q pymongo matplotlib seaborn

[33m0% [Working][0m            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://cli.github.com/packages stable InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Hit:9 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
53 packages can be upgraded. Run 'apt list --upgradable' to see them.
[1;33mW: [0mSkipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)[0m
^C


#### 2 - Configurer l'environnement

In [7]:
import os
import sys
import findspark
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
findspark.init()
findspark.find()

'/usr/local/lib/python3.12/dist-packages/pyspark'

#### 3 - Démarrer une session Spark

In [8]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ColabSpark").config("spark.driver.memory", "2g").getOrCreate()
print("Spark est configuré avec succès!")

Spark est configuré avec succès!


#### 4 - Premier exemple

In [9]:
# Vérifier la session Spark
print(spark.version)

4.0.1


In [10]:
# Créer un DataFrame simple avec les infos suivantes
data = [(1, "Alice", 23), (2, "Bob", 30), (3, "Charlie", 29)]
columns = ["id", "nom", "age"]
df = spark.createDataFrame(data, columns)

In [11]:
# Afficher le contenu du DataFrame
df.show()

+---+-------+---+
| id|    nom|age|
+---+-------+---+
|  1|  Alice| 23|
|  2|    Bob| 30|
|  3|Charlie| 29|
+---+-------+---+



In [12]:
# voici Quelques opérations de base
df.printSchema() # Structure du DataFrame
df.select("nom", "age").show() # Sélection de colonnes
df.filter(df.age > 25).show() # Filtrage des données

root
 |-- id: long (nullable = true)
 |-- nom: string (nullable = true)
 |-- age: long (nullable = true)

+-------+---+
|    nom|age|
+-------+---+
|  Alice| 23|
|    Bob| 30|
|Charlie| 29|
+-------+---+

+---+-------+---+
| id|    nom|age|
+---+-------+---+
|  2|    Bob| 30|
|  3|Charlie| 29|
+---+-------+---+



#### 5 - Chargement et Manipulation des Données avec Spark
##### a. On va charger un dataset CSV et l'analyser avec Spark

In [14]:
df = spark.read.csv("/content/transactions.csv", header=True, inferSchema=True)
df.show(5)

+--------------+-----------------+-------------------+------------------+----------------+-------------------+------------------+----------+--------------------------------+-----------+----------------+------------+----------------------+--------+
|Transaction ID|Sender Account ID|Receiver Account ID|Transaction Amount|Transaction Type|          Timestamp|Transaction Status|Fraud Flag|Geolocation (Latitude/Longitude)|Device Used|Network Slice ID|Latency (ms)|Slice Bandwidth (Mbps)|PIN Code|
+--------------+-----------------+-------------------+------------------+----------------+-------------------+------------------+----------+--------------------------------+-----------+----------------+------------+----------------------+--------+
| TXN9520068950|         ACC14994|           ACC16656|             495.9|         Deposit|2025-01-17 10:14:00|            Failed|      true|            34.0522 N, -74.006 W|    Desktop|          Slice3|          10|                   179|    3075|
| TXN941

In [15]:
# Afficher le schéma des données
df.printSchema()

root
 |-- Transaction ID: string (nullable = true)
 |-- Sender Account ID: string (nullable = true)
 |-- Receiver Account ID: string (nullable = true)
 |-- Transaction Amount: double (nullable = true)
 |-- Transaction Type: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Transaction Status: string (nullable = true)
 |-- Fraud Flag: boolean (nullable = true)
 |-- Geolocation (Latitude/Longitude): string (nullable = true)
 |-- Device Used: string (nullable = true)
 |-- Network Slice ID: string (nullable = true)
 |-- Latency (ms): integer (nullable = true)
 |-- Slice Bandwidth (Mbps): integer (nullable = true)
 |-- PIN Code: integer (nullable = true)



In [16]:
# Filtrer les transactions supérieures à 1000
df.filter(df["Transaction Amount"] > 1000).show()

+--------------+-----------------+-------------------+------------------+----------------+-------------------+------------------+----------+--------------------------------+-----------+----------------+------------+----------------------+--------+
|Transaction ID|Sender Account ID|Receiver Account ID|Transaction Amount|Transaction Type|          Timestamp|Transaction Status|Fraud Flag|Geolocation (Latitude/Longitude)|Device Used|Network Slice ID|Latency (ms)|Slice Bandwidth (Mbps)|PIN Code|
+--------------+-----------------+-------------------+------------------+----------------+-------------------+------------------+----------+--------------------------------+-----------+----------------+------------+----------------------+--------+
| TXN2214150284|         ACC48650|           ACC76457|           1129.88|        Transfer|2025-01-17 10:56:00|           Success|      true|            34.0522 N, -74.006 W|     Mobile|          Slice3|          10|                   127|    6374|
| TXN310

In [17]:
# Calculer le montant total des transactions par type
df.groupBy("Transaction Type").sum("Transaction Amount").show()

+----------------+-----------------------+
|Transaction Type|sum(Transaction Amount)|
+----------------+-----------------------+
|         Deposit|     252042.61999999988|
|        Transfer|      291776.5500000002|
|      Withdrawal|     227346.12000000002|
+----------------+-----------------------+



In [18]:
# Trier les transactions par montant décroissant
df.orderBy(df["Transaction Amount"].desc()).show(5)

+--------------+-----------------+-------------------+------------------+----------------+-------------------+------------------+----------+--------------------------------+-----------+----------------+------------+----------------------+--------+
|Transaction ID|Sender Account ID|Receiver Account ID|Transaction Amount|Transaction Type|          Timestamp|Transaction Status|Fraud Flag|Geolocation (Latitude/Longitude)|Device Used|Network Slice ID|Latency (ms)|Slice Bandwidth (Mbps)|PIN Code|
+--------------+-----------------+-------------------+------------------+----------------+-------------------+------------------+----------+--------------------------------+-----------+----------------+------------+----------------------+--------+
| TXN1696527060|         ACC44804|           ACC26269|           1497.76|        Transfer|2025-01-17 10:54:00|           Success|      true|            55.7558 N, -118.2...|    Desktop|          Slice2|          10|                   113|    5668|
| TXN874

### IV - Étude de cas : Intégration de Spark avec MongoDB Atlas

In [19]:
# Installer le connecteur MongoDB Spark
!pip install pymongo

