## ‚ú® 1. Connexion PySpark ‚Üî MinIO

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("TP Bronze Silver Gold") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
            "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.establish.timeout", "60000") \
    .config("spark.hadoop.fs.s3a.connection.timeout", "60000") \
    .config("spark.hadoop.fs.s3a.threads.keepalivetime", "60000") \
    .config("spark.hadoop.fs.s3a.multipart.purge.age", "60000") \
    .config("spark.hadoop.fs.s3a.v2", "false") \
    .getOrCreate()

print(f"‚úÖ SparkSession cr√©√©e - Version : {spark.version}")
print(f"üìä Spark UI : http://localhost:4040")

‚úÖ SparkSession cr√©√©e - Version : 4.0.1
üìä Spark UI : http://localhost:4040


## üì¶ 0. Pr√©paration : Installation et upload du fichier CSV dans MinIO

Cette √©tape utilise boto3 pour v√©rifier et uploader le fichier CSV directement dans MinIO.

In [2]:
# Installation de boto3
!pip install boto3 -q
print("‚úÖ boto3 install√©")

‚úÖ boto3 install√©


In [3]:
import boto3
from botocore.client import Config
import io

# Configuration du client S3 pour MinIO
s3_client = boto3.client(
    's3',
    endpoint_url='http://minio:9000',
    aws_access_key_id='minioadmin',
    aws_secret_access_key='minioadmin',
    config=Config(signature_version='s3v4'),
    region_name='us-east-1'
)

print("‚úÖ Client boto3 configur√© pour MinIO")

‚úÖ Client boto3 configur√© pour MinIO


In [4]:
# Cr√©er le bucket 'datalake' s'il n'existe pas
bucket_name = 'datalake'

try:
    s3_client.head_bucket(Bucket=bucket_name)
    print(f"‚úÖ Bucket '{bucket_name}' existe d√©j√†")
except:
    s3_client.create_bucket(Bucket=bucket_name)
    print(f"‚úÖ Bucket '{bucket_name}' cr√©√©")

# Lister les buckets pour v√©rifier
buckets = s3_client.list_buckets()
print(f"\nüìÇ Buckets disponibles : {[b['Name'] for b in buckets['Buckets']]}")

‚úÖ Bucket 'datalake' existe d√©j√†

üìÇ Buckets disponibles : ['datalake']


In [5]:
# Cr√©er le fichier CSV en m√©moire et l'uploader
csv_data = """id,produit,prix,quantite
1,Stylo,1.20,10
2,Cahier,2.50,5
3,Gomme,0.80,20
4,Stylo,1.20,15"""

# Upload du fichier dans MinIO
s3_client.put_object(
    Bucket='datalake',
    Key='bronze/ventes/ventes.csv',
    Body=csv_data.encode('utf-8'),
    ContentType='text/csv'
)

print("‚úÖ Fichier ventes.csv upload√© dans datalake/bronze/ventes/ventes.csv")

# V√©rifier que le fichier existe
response = s3_client.list_objects_v2(Bucket='datalake', Prefix='bronze/ventes/')
if 'Contents' in response:
    print(f"\nüìÅ Fichiers dans bronze/ventes/ :")
    for obj in response['Contents']:
        print(f"  - {obj['Key']} ({obj['Size']} bytes)")
else:
    print("‚ö†Ô∏è  Aucun fichier trouv√©")

‚úÖ Fichier ventes.csv upload√© dans datalake/bronze/ventes/ventes.csv

üìÅ Fichiers dans bronze/ventes/ :
  - bronze/ventes/parquet/_SUCCESS (0 bytes)
  - bronze/ventes/parquet/part-00000-8b189a87-8cca-47c6-b7ac-8e94af3d4f78-c000.snappy.parquet (1375 bytes)
  - bronze/ventes/ventes.csv (88 bytes)


## ü•â 2. Layer Bronze : Ingestion brute

Lecture du CSV depuis MinIO et affichage des donn√©es.

In [6]:
# Lecture du CSV depuis MinIO
df_bronze = spark.read.csv(
    "s3a://datalake/bronze/ventes/ventes.csv",
    header=True,
    inferSchema=True  # Inf√©rence automatique des types
)

print("ü•â Layer BRONZE - Donn√©es brutes :")
df_bronze.show()
print("\nüìã Sch√©ma :")
df_bronze.printSchema()
print(f"\nüìä Nombre de lignes : {df_bronze.count()}")

ü•â Layer BRONZE - Donn√©es brutes :
+---+-------+----+--------+
| id|produit|prix|quantite|
+---+-------+----+--------+
|  1|  Stylo| 1.2|      10|
|  2| Cahier| 2.5|       5|
|  3|  Gomme| 0.8|      20|
|  4|  Stylo| 1.2|      15|
+---+-------+----+--------+


üìã Sch√©ma :
root
 |-- id: integer (nullable = true)
 |-- produit: string (nullable = true)
 |-- prix: double (nullable = true)
 |-- quantite: integer (nullable = true)


üìä Nombre de lignes : 4


In [8]:
# Sauvegarde Bronze en Parquet
df_bronze.write.mode("overwrite").parquet("s3a://datalake/bronze/ventes/parquet/")

print("‚úÖ Donn√©es Bronze sauvegard√©es en Parquet dans s3a://datalake/bronze/ventes/parquet/")

# V√©rifier avec boto3
response = s3_client.list_objects_v2(Bucket='datalake', Prefix='bronze/ventes/parquet/')
if 'Contents' in response:
    print(f"\nüìÅ Fichiers Parquet cr√©√©s : {len(response['Contents'])} fichiers")
else:
    print("‚ö†Ô∏è  Erreur : aucun fichier Parquet cr√©√©")

‚úÖ Donn√©es Bronze sauvegard√©es en Parquet dans s3a://datalake/bronze/ventes/parquet/

üìÅ Fichiers Parquet cr√©√©s : 2 fichiers


## ü•à 3. Layer Silver : Nettoyage + typage

Transformations :
- Suppression des lignes nulles
- Typage correct des colonnes  
- Cr√©ation d'une colonne `montant_total`

In [9]:
from pyspark.sql.functions import col, expr

df_silver = (
    df_bronze
    .dropna()  # Suppression des lignes nulles
    .withColumn("prix", col("prix").cast("double"))  # Typage prix
    .withColumn("quantite", col("quantite").cast("int"))  # Typage quantit√©
    .withColumn("montant_total", expr("prix * quantite"))  # Calcul montant total
)

print("ü•à Layer SILVER - Donn√©es nettoy√©es et typ√©es :")
df_silver.show()
print("\nüìã Sch√©ma Silver :")
df_silver.printSchema()
print(f"\nüìä Nombre de lignes : {df_silver.count()}")

ü•à Layer SILVER - Donn√©es nettoy√©es et typ√©es :
+---+-------+----+--------+-------------+
| id|produit|prix|quantite|montant_total|
+---+-------+----+--------+-------------+
|  1|  Stylo| 1.2|      10|         12.0|
|  2| Cahier| 2.5|       5|         12.5|
|  3|  Gomme| 0.8|      20|         16.0|
|  4|  Stylo| 1.2|      15|         18.0|
+---+-------+----+--------+-------------+


üìã Sch√©ma Silver :
root
 |-- id: integer (nullable = true)
 |-- produit: string (nullable = true)
 |-- prix: double (nullable = true)
 |-- quantite: integer (nullable = true)
 |-- montant_total: double (nullable = true)


üìä Nombre de lignes : 4


In [10]:
# Sauvegarde Silver en Parquet
df_silver.write.mode("overwrite").parquet("s3a://datalake/silver/ventes/")

print("‚úÖ Donn√©es Silver sauvegard√©es en Parquet dans s3a://datalake/silver/ventes/")

# V√©rifier avec boto3
response = s3_client.list_objects_v2(Bucket='datalake', Prefix='silver/ventes/')
if 'Contents' in response:
    print(f"\nüìÅ Fichiers Parquet cr√©√©s : {len(response['Contents'])} fichiers")
else:
    print("‚ö†Ô∏è  Erreur : aucun fichier Parquet cr√©√©")

‚úÖ Donn√©es Silver sauvegard√©es en Parquet dans s3a://datalake/silver/ventes/

üìÅ Fichiers Parquet cr√©√©s : 2 fichiers


## ü•á 4. Layer Gold : Vue analytique - CA par produit

Agr√©gation pour calculer le chiffre d'affaires total par produit.

In [11]:
from pyspark.sql.functions import sum as _sum, count, round as _round

df_gold = (
    df_silver
    .groupBy("produit")
    .agg(
        _sum("montant_total").alias("chiffre_affaires"),
        _sum("quantite").alias("quantite_totale"),
        count("id").alias("nombre_ventes")
    )
    .withColumn("chiffre_affaires", _round(col("chiffre_affaires"), 2))
    .orderBy(col("chiffre_affaires").desc())
)

print("ü•á Layer GOLD - Chiffre d'affaires par produit :")
df_gold.show()
print(f"\nüìä Nombre de produits : {df_gold.count()}")

ü•á Layer GOLD - Chiffre d'affaires par produit :
+-------+----------------+---------------+-------------+
|produit|chiffre_affaires|quantite_totale|nombre_ventes|
+-------+----------------+---------------+-------------+
|  Stylo|            30.0|             25|            2|
|  Gomme|            16.0|             20|            1|
| Cahier|            12.5|              5|            1|
+-------+----------------+---------------+-------------+


üìä Nombre de produits : 3


In [13]:
# Sauvegarde Gold en Parquet
df_gold.write.mode("overwrite").parquet("s3a://datalake/gold/ca_par_produit/")

print("‚úÖ Donn√©es Gold sauvegard√©es en Parquet dans s3a://datalake/gold/ca_par_produit/")

# V√©rifier avec boto3
response = s3_client.list_objects_v2(Bucket='datalake', Prefix='gold/ca_par_produit/')
if 'Contents' in response:
    print(f"\nüìÅ Fichiers Parquet cr√©√©s : {len(response['Contents'])} fichiers")
else:
    print("‚ö†Ô∏è  Erreur : aucun fichier Parquet cr√©√©")

‚úÖ Donn√©es Gold sauvegard√©es en Parquet dans s3a://datalake/gold/ca_par_produit/

üìÅ Fichiers Parquet cr√©√©s : 2 fichiers


## üéâ R√©capitulatif Final

Pipeline ETL Bronze-Silver-Gold termin√© avec succ√®s !

### üìä R√©sultats :

Ex√©cutez cette cellule pour voir un r√©capitulatif complet de toutes les donn√©es cr√©√©es dans MinIO.

In [14]:
# R√©capitulatif de tous les fichiers cr√©√©s dans MinIO
print("=" * 70)
print("üìÇ STRUCTURE COMPL√àTE DANS MinIO - Bucket: datalake")
print("=" * 70)

for prefix in ['bronze/', 'silver/', 'gold/']:
    print(f"\nüìÅ {prefix}")
    response = s3_client.list_objects_v2(Bucket='datalake', Prefix=prefix)
    if 'Contents' in response:
        for obj in response['Contents']:
            size_kb = obj['Size'] / 1024
            print(f"   - {obj['Key']} ({size_kb:.2f} KB)")
    else:
        print(f"   ‚ö†Ô∏è  Aucun fichier")

print("\n" + "=" * 70)
print("‚úÖ PIPELINE ETL BRONZE ‚Üí SILVER ‚Üí GOLD : TERMIN√â")
print("=" * 70)

print("\nüìä R√©sum√© :")
print(f"   Bronze : {df_bronze.count()} lignes")
print(f"   Silver : {df_silver.count()} lignes")
print(f"   Gold   : {df_gold.count()} produits")

# Arr√™ter la session Spark
spark.stop()
print("\n‚úÖ SparkSession arr√™t√©e")

üìÇ STRUCTURE COMPL√àTE DANS MinIO - Bucket: datalake

üìÅ bronze/
   - bronze/ventes/parquet/_SUCCESS (0.00 KB)
   - bronze/ventes/parquet/part-00000-1f1941d6-8424-4490-9382-1d7e4a5ff0d3-c000.snappy.parquet (1.34 KB)
   - bronze/ventes/ventes.csv (0.09 KB)

üìÅ silver/
   - silver/ventes/_SUCCESS (0.00 KB)
   - silver/ventes/part-00000-c100e879-192d-4af4-bc72-30be7ee94eed-c000.snappy.parquet (1.64 KB)

üìÅ gold/
   - gold/ca_par_produit/_SUCCESS (0.00 KB)
   - gold/ca_par_produit/part-00000-a9a4b879-bc06-4020-bc4d-23457b2ccb55-c000.snappy.parquet (1.43 KB)

‚úÖ PIPELINE ETL BRONZE ‚Üí SILVER ‚Üí GOLD : TERMIN√â

üìä R√©sum√© :
   Bronze : 4 lignes
   Silver : 4 lignes
   Gold   : 3 produits

‚úÖ SparkSession arr√™t√©e
