In [None]:
# STREAMING SPARK VERS POWER BI (SCRIPT FINAL)

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_unixtime, col, regexp_extract, to_json, struct
import requests
import time

# 1. Initialisation de la session Spark avec Delta
spark = SparkSession.builder \
    .appName("Streaming SNCF vers Power BI") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 2. Chargement des données enrichies depuis Delta Lake
chemin_delta = "/home/jovyan/work/data/delta_sncf_enriched"
df_trafic = spark.read.format("delta").load(chemin_delta)

# 3. Nettoyage et transformation

df_trafic = df_trafic \
    .withColumn("heure_arrivee", from_unixtime(col("arrival"))) \
    .withColumn("heure_depart", from_unixtime(col("departure")))

# 4. Préparation des données à envoyer à Power BI
colonnes = [
    "stop_id",
    "stop_name",
    "trip_id",
    "start_date",
    "arrival",
    "departure",
    "heure_arrivee",
    "heure_depart"
]

# 5. Fonction d'envoi à Power BI
url_powerbi = "https://api.powerbi.com/beta/78613bf0-3424-4615-9325-8f6a7d76a04a/datasets/9e42dacb-5d36-4f40-a2b8-c01f63adc78c/rows?experience=fabric-developer&key=lwBq0dtI5wk9DxsFW9fl%2BSv9LVSit9oTMnNFpeqAZ4LhF1CH00iEUBgtaWSXEIiUxQe3CKfZxb3k3LgZWIuVdA%3D%3D"

def envoyer_ligne_powerbi(dictionnaire):
    headers = {"Content-Type": "application/json"}
    response = requests.post(url_powerbi, headers=headers, json=[dictionnaire])
    if response.status_code == 200:
        print("✔ Ligne envoyée avec succès")
    else:
        print("❌ Erreur:", response.status_code, response.text)

# 6. Simulation de streaming ligne par ligne

df_sample = df_trafic.select(colonnes).limit(20).toPandas()

for index, ligne in df_sample.iterrows():
    # Nettoyage des colonnes datetime
    ligne["heure_arrivee"] = ligne["heure_arrivee"].replace(" ", "T") + ".000Z"
    ligne["heure_depart"] = ligne["heure_depart"].replace(" ", "T") + ".000Z"
    envoyer_ligne_powerbi(ligne.to_dict())
    time.sleep(2)  # Délai simulé entre les lignes
