In [9]:
# cree une spark session
from pyspark.sql import SparkSession


spark = (
    SparkSession 
    .builder 
    .appName("Streaming from spring-boot") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [10]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "telecom_events")
    .option("failOnDataLoss", "false")
    .option("startingOffsets", "earliest")
    .load()
)


In [11]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [44]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, lower, trim, coalesce, lit, date_format, when, window,sum as _sum,round
from pyspark.sql.types import *
spark = SparkSession.builder \
    .appName("TelecomPipeline") \
    .config("spark.jars", "jars/postgresql-42.2.27.jar") \
    .getOrCreate()

# Définir le schéma attendu
schema = StructType([
    StructField("event_id", StringType()),
    StructField("timestamp", StringType()),
    StructField("event_type", StringType()),
    StructField("user_id", StringType()),
    StructField("destination_id", StringType()),
    StructField("duration", DoubleType()),
    StructField("volume", LongType()),
    StructField("cell_id", StringType()),
    StructField("technology", StringType()),
    StructField("status", StringType()),
    StructField("error_code", StringType())
])
# Lecture du topic Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "telecom_events") \
    .load()


# Convertir la colonne value
json_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")


In [45]:
# Nettoyer et transformer les données
clean_df = json_df \
    .withColumn("event_type", lower(trim(col("event_type")))) \
    .withColumn("status", coalesce(col("status"), lit("completed"))) \
    .withColumn("timestamp", col("timestamp").cast(TimestampType())) \
    .filter(col("event_id").isNotNull() & col("timestamp").isNotNull())

In [46]:
# Appliquer les règles tarifaires de base
rated_df = clean_df.withColumn(
    "tarif",
     when(col("event_type") == "voice", col("duration") * 0.01)  # 0.01 €/seconde
    .when(col("event_type") == "data", (col("volume") / 1024 / 1024) * 0.005)  # 0.005 €/Mo
    .when(col("event_type") == "sms", lit(0.1))  # 0.1 €/SMS
    .otherwise(lit(0.0))
).withColumn(
    "status_tarif",
    when(col("status") == "failed", "error")  # Si l'événement a échoué, statut "error"
    .when(col("tarif") > 0, "rated")  # Si tarif > 0, statut "rated"
    .otherwise("rejected")  # Sinon, statut "rejected"
)

In [48]:
# Aggréger par client et par cycle de facturation (mois)
billing_df = rated_df.withColumn(
    "cycle_facturation",
    date_format(col("timestamp"), "yyyy-MM")
).groupBy(
    "cycle_facturation",
    "user_id"
).agg(
    _sum(lit(1)).alias("nombre_total_evenements"),
    _sum(when(col("event_type") == "voice", lit(1)).otherwise(lit(0))).alias("nombre_appels"),
    _sum(when(col("event_type") == "sms", lit(1)).otherwise(lit(0))).alias("nombre_sms"),
    _sum(when(col("event_type") == "data", lit(1)).otherwise(lit(0))).alias("nombre_donnees"),
    _sum(when(col("status_tarif") == "rated", col("tarif")).otherwise(lit(0))).alias("sous_total")
)

# Appliquer les quotas gratuits (100 SMS inclus)
billing_df = billing_df.withColumn(
    "sms_excedent",
    when(col("nombre_sms") > 100, col("nombre_sms") - 100).otherwise(lit(0))
).withColumn(
    "tarif_sms_excedent",
    col("sms_excedent") * 0.1
).withColumn(
    "sous_total_ajuste",
    col("sous_total") + col("tarif_sms_excedent")
)
# Appliquer les remises
billing_df = billing_df.withColumn(
    "remise",
    when(col("user_id").endswith("0"), col("sous_total_ajuste") * 0.10)  # 10% fidélité
    .when(col("user_id").contains("student"), col("sous_total_ajuste") * 0.15)  # 15% étudiant
    .when(col("cycle_facturation") == "2025-06", col("sous_total_ajuste") * 0.05)  # 5% saisonnier (juin 2025)
    .otherwise(lit(0.0))
).withColumn(
    "montant_avant_taxes",
    col("sous_total_ajuste") - col("remise")
)

# Appliquer les taxes
billing_df = billing_df.withColumn(
    "tva", col("montant_avant_taxes") * 0.20  # 20% TVA
).withColumn(
    "redevance", lit(1.0)  # 1€ redevance fixe
).withColumn(
    "montant_total", round(col("montant_avant_taxes") + col("tva") + col("redevance"), 2)
)
# Structurer la facture en JSON en français
invoice_df = billing_df.selectExpr(
    """
    to_json(struct(
        user_id ,
        cycle_facturation ,
        nombre_total_evenements,
        nombre_appels ,
        nombre_sms AS ,
        nombre_donnees ,
        sms_excedent ,
        sous_total ,
        tarif_sms_excedent ,
        sous_total_ajuste ,
        remise ,
        montant_avant_taxes ,
        tva ,
        redevance ,
        montant_total
    )) AS value
""")

In [None]:
# Envoyer le flux des factures vers un topic Kafka
query = invoice_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "factures_telecom") \
    .option("checkpointLocation", "/home/jovyan/checkpoints/factures") \
    .outputMode("complete") \
    .start()

# Attendre la terminaison du flux
query.awaitTermination()