In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, lit, sha2, concat_ws, when
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, TimestampType
from dotenv import load_dotenv
import os

load_dotenv()

DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")


In [4]:
jdbc_url = "jdbc:postgresql://localhost:5432/"+DB_NAME
jdbc_properties = {
    "user": DB_USER,
    "password": DB_PASSWORD,
    "driver": "org.postgresql.Driver"
}
nom_table = "normalized_cdr"

In [5]:
# Démarrer Spark
spark = SparkSession.builder \
    .appName("StreamingMediation") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0")\
    .config("spark.jars", "/home/josh/.m2/repository/org/postgresql/postgresql/42.7.5/postgresql-42.7.5.jar") \
    .config("spark.sql.shuffle.partitions", 8)\
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

25/06/08 14:47:43 WARN Utils: Your hostname, Joshua resolves to a loopback address: 127.0.0.1; using 192.168.11.113 instead (on interface wlo1)
25/06/08 14:47:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/josh/Big%20Data%20Projects/Projet%20Telecom/telecom_env/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/josh/.ivy2/cache
The jars for the packages stored in: /home/josh/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7271e2b0-06a4-4768-ba20-5461be72139e;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in local-m2-cache
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 1978ms :: artifacts dl 69ms

25/06/08 14:48:09 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
# Lire depuis Kafka
df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "cdr_topic") \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()


In [7]:
df_raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [8]:
# Définir le schéma brut (adapté aux 3 types de records)
schema = StructType() \
    .add("record_type", StringType()) \
    .add("timestamp", StringType()) \
    .add("caller_id", StringType()) \
    .add("callee_id", StringType()) \
    .add("sender_id", StringType()) \
    .add("receiver_id", StringType()) \
    .add("user_id", StringType()) \
    .add("duration_sec", IntegerType()) \
    .add("session_duration_sec", IntegerType()) \
    .add("data_volume_mb", DoubleType()) \
    .add("cell_id", StringType()) \
    .add("technology", StringType())

In [9]:
df_json = df_raw.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")


In [10]:
df_json.printSchema()

root
 |-- record_type: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- caller_id: string (nullable = true)
 |-- callee_id: string (nullable = true)
 |-- sender_id: string (nullable = true)
 |-- receiver_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- duration_sec: integer (nullable = true)
 |-- session_duration_sec: integer (nullable = true)
 |-- data_volume_mb: double (nullable = true)
 |-- cell_id: string (nullable = true)
 |-- technology: string (nullable = true)



In [11]:
# Normalisation du champ msisdn (priorité : caller_id > sender_id > user_id)
df_norm = df_json.withColumn("msisdn", 
    when(col("caller_id").isNotNull(), col("caller_id"))
    .when(col("sender_id").isNotNull(), col("sender_id"))
    .otherwise(col("user_id"))) \
    .withColumn("peer_id", 
    when(col("callee_id").isNotNull(), col("callee_id"))
    .when(col("receiver_id").isNotNull(), col("receiver_id"))
    .otherwise(lit(None)))


In [12]:
# Nettoyage & validation
valid_df = df_norm \
    .withColumn("status", 
        when(col("msisdn").isNull(), lit("error"))
        .when(~col("msisdn").rlike("^212[0-9]+"), lit("error"))
        .when(col("msisdn").startswith("999"), lit("error"))
        .when((col("duration_sec") < 0) | (col("data_volume_mb") < 0) | (col("session_duration_sec") < 0), lit("error"))
        .otherwise(lit("ok")))

# Déduplication via hash (possible en production avec stateful mapGroupsWithState)
valid_df = valid_df.withColumn("record_hash", sha2(concat_ws("|", col("timestamp"), col("msisdn"), col("record_type")), 256))
from pyspark.sql.functions import col

deduplicated_df = valid_df.dropDuplicates(["record_hash"])


In [13]:
clean_df = deduplicated_df.filter(col("status") == "ok")
clean_df.printSchema()

root
 |-- record_type: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- caller_id: string (nullable = true)
 |-- callee_id: string (nullable = true)
 |-- sender_id: string (nullable = true)
 |-- receiver_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- duration_sec: integer (nullable = true)
 |-- session_duration_sec: integer (nullable = true)
 |-- data_volume_mb: double (nullable = true)
 |-- cell_id: string (nullable = true)
 |-- technology: string (nullable = true)
 |-- msisdn: string (nullable = true)
 |-- peer_id: string (nullable = true)
 |-- status: string (nullable = false)
 |-- record_hash: string (nullable = true)



In [14]:
df_for_db = clean_df.select(
    "record_type", 
    col("timestamp").cast("timestamp"),  # cast string to timestamp
    "msisdn", "peer_id", 
    "duration_sec", "data_volume_mb", "session_duration_sec", 
    "cell_id", "technology", "status", "record_hash"
)
df_for_db.printSchema()

root
 |-- record_type: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- msisdn: string (nullable = true)
 |-- peer_id: string (nullable = true)
 |-- duration_sec: integer (nullable = true)
 |-- data_volume_mb: double (nullable = true)
 |-- session_duration_sec: integer (nullable = true)
 |-- cell_id: string (nullable = true)
 |-- technology: string (nullable = true)
 |-- status: string (nullable = false)
 |-- record_hash: string (nullable = true)



In [17]:
# Python function to write to multiple sinks
def device_data_output(df, batch_id):
    print("Batch id: "+ str(batch_id))
    
    # # Write to parquet
    # df.write.format("parquet").mode("append").save("data/output/device_data.parquet/")
    
    
    # Write to JDBC Postgres
    (
        df.write \
            .format("jdbc") \
            .option("url", jdbc_url) \
            .option("dbtable", nom_table) \
            .option("user", jdbc_properties["user"]) \
            .option("password", jdbc_properties["password"]) \
            .option("driver", jdbc_properties["driver"]) \
            .mode("append") \
            .save()
    
    )
    
    # Diplay
    df.show()
    

In [None]:
# Running foreachBatch
# Write the output to Multiple Sinks

(df_for_db
 .writeStream
 .foreachBatch(device_data_output)
 .trigger(processingTime='10 seconds')
 .option("checkpointLocation", "checkpoint_dir_kafka")
 .start()
 .awaitTermination())

25/06/08 14:51:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/06/08 14:51:12 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/06/08 14:51:27 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 15775 milliseconds
                                                                                