In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, ceil

spark = SparkSession.builder\
         .appName("RatingEngine")\
         .config("spark.jars", r".conf\postgresql-42.7.6.jar")\
         .config("spark.driver.extraClassPath", r".\conf\postgresql-42.7.6.jar")\
         .config("spark.executor.extraClassPath", r".\conf\\postgresql-42.7.6.jar")\
         .config("spark.sql.shuffle.partitions", "50")\
         .config("spark.sql.autoBroadcastJoinThreshold", "10485760")\
         .config("spark.driver.memory", "6g")\
         .getOrCreate()

In [27]:
data_path = r".\mediated_data"

cleanedCdrsDf  = spark.read.format("parquet") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(data_path)

In [28]:
customersDf = spark.read \
    .format("jdbc")\
    .option("url",      "jdbc:postgresql://localhost:5432/telecomdb")\
    .option("dbtable",  "customer_subscriptions")\
    .option("user",     "postgres")\
    .option("password", "0000")\
    .load() 

customersDf.show(5)

+------------+-----------------+---------------+-----------------+------+--------------------+------------+
| customer_id|    customer_name|activation_date|subscription_type|status|              region|rate_plan_id|
+------------+-----------------+---------------+-----------------+------+--------------------+------------+
|212603516508|   Noura El Fassi|     2024-04-03|         postpaid|active|Tanger-Tétouan-Al...|    Titanium|
|212754643912| Hamza Benjelloun|     2023-09-07|         postpaid|active|      Drâa-Tafilalet|        Gold|
|212646849604|Zahra El Khattabi|     2023-10-14|         postpaid|active|          Fès-Meknès|        Gold|
|212637298216|     Zahra Kabbaj|     2025-01-09|         postpaid|active|         Souss-Massa|      Silver|
|212698457056|    Othman Daoudi|     2024-09-29|         postpaid|active|Dakhla-Oued Ed-Dahab|        Gold|
+------------+-----------------+---------------+-----------------+------+--------------------+------------+
only showing top 5 rows



In [29]:
catalogDf = spark.read \
    .format("jdbc")\
    .option("url",      "jdbc:postgresql://localhost:5432/telecomdb")\
    .option("dbtable",  "productCatalog")\
    .option("user",     "postgres")\
    .option("password", "0000")\
    .load() 

catalogDf.show(5)

+------------+------------+------+---------+--------------------+
|product_code|service_type|  unit|rate_type|         description|
+------------+------------+------+---------+--------------------+
|   VOICE_NAT|       voice|second|     flat|Appel vocal local...|
|   VOICE_INT|       voice|second|     flat|Appel vocal inter...|
|     SMS_STD|         sms|   sms|     flat|Envoi SMS standar...|
|    DATA_STD|        data|    MB|     flat|Connexion Interne...|
|  DATA_ULTRA|        data|    MB|   tiered|Connexion Interne...|
+------------+------------+------+---------+--------------------+



In [30]:
from pyspark.sql.functions import col

try:
    ratePlansDf = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/telecomdb") \
        .option("dbtable", "rate_plans") \
        .option("user", "postgres") \
        .option("password", "0000") \
        .load() \
        .withColumn("unit_price", col("unit_price").cast("double")) \
        .withColumn("free_units", col("free_units").cast("int")) \
        .withColumn("tier_threshold", col("tier_threshold").cast("int")) \
        .withColumn("tier_price", col("tier_price").cast("double"))
    
    ratePlansDf.show(5)
    print("Successfully loaded rate_plans table")
    
except Exception as e:
    print(f"Error loading data: {e}")

+------------+------------+------------+----------+----------+--------------+----------+
|rate_plan_id|product_code|service_type|unit_price|free_units|tier_threshold|tier_price|
+------------+------------+------------+----------+----------+--------------+----------+
|      Silver|   VOICE_NAT|       voice|      0.05|        30|          NULL|      NULL|
|      Silver|   VOICE_INT|       voice|      0.25|         0|          NULL|      NULL|
|      Silver|    DATA_STD|        data|      0.01|       100|          NULL|      NULL|
|      Silver|  DATA_ULTRA|        data|     0.005|       200|           500|     0.002|
|        Gold|   VOICE_NAT|       voice|     0.035|        60|          NULL|      NULL|
+------------+------------+------------+----------+----------+--------------+----------+
only showing top 5 rows

Successfully loaded rate_plans table


In [31]:
records = (cleanedCdrsDf
        .withColumn(
            "customer_id",
            when(col("record_type") == "voice", col("caller_id"))
            .when(col("record_type") == "sms",  col("sender_id"))
            .otherwise(col("user_id"))
        ))

In [32]:
records.count()

records.show()

+--------------------+--------------------+------------+----------+------------+-------------+------------+------------+-------------+---------+-----------+------------+--------------------+--------------+--------+-----------+------------+
|           record_ID|           timestamp|     cell_id|technology|   caller_id|    callee_id|duration_sec|product_code|rating_status|sender_id|receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type| customer_id|
+--------------------+--------------------+------------+----------+------------+-------------+------------+------------+-------------+---------+-----------+------------+--------------------+--------------+--------+-----------+------------+
|65b1359e-f697-451...|2025-06-10 17:08:...|         UNK|        5G|        NULL|         NULL|        NULL|   DATA_PLUS|        ready|     NULL|       NULL|212626326086|                 589|       1639.17|       2|       data|212626326086|
|71e761ed-1038-402...|2025-06-10 17:08:.

In [33]:
# jointure client → plan 
cdrJoinPlan = records.join(
    customersDf.filter((col("status") == "active") &
                       (col("subscription_type") == "postpaid"))
               .select("customer_id", "rate_plan_id"),
    on="customer_id",
    how="inner"
)
cdrJoinPlan.show()

+------------+--------------------+--------------------+-------------+----------+------------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------------+
| customer_id|           record_ID|           timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|product_code|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|rate_plan_id|
+------------+--------------------+--------------------+-------------+----------+------------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------------+
|212603516508|638b4663-510b-4a2...|2025-06-10 17:10:...|   KENITRA_03|        2G|        NULL|        NULL|        NULL|   DATA_PLUS| needs_review|        NULL|        NULL|212603516508|                 UNK|           UNK|      1

In [34]:
# jointure produit → (rate_type, service_type) 
cdrJoinCatalog = cdrJoinPlan.join(catalogDf, on="product_code", how="left")
cdrJoinCatalog.show()

+------------+------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------------+------------+------+---------+--------------------+
|product_code| customer_id|           record_ID|           timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|rate_plan_id|service_type|  unit|rate_type|         description|
+------------+------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------------+------------+------+---------+--------------------+
|   DATA_PLUS|212603516508|638b4663-510b-4a2...|2025-06-10 17:10:...|   KENI

In [35]:
cdrJoinRating = cdrJoinCatalog.join(ratePlansDf,
                   on=["rate_plan_id", "product_code"],
                   how="left")

In [36]:
cdrJoinRating.show()

+------------+------------+------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------------+------+---------+--------------------+------------+----------+----------+--------------+----------+
|rate_plan_id|product_code| customer_id|           record_ID|           timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|service_type|  unit|rate_type|         description|service_type|unit_price|free_units|tier_threshold|tier_price|
+------------+------------+------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------

In [37]:
cdrJoinRating=cdrJoinRating.drop("service_type")
cdrJoinRating.show()

+------------+------------+------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+----------+--------------+----------+
|rate_plan_id|product_code| customer_id|           record_ID|           timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|
+------------+------------+------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+---------

In [38]:
cdrJoinRating.show()

+------------+------------+------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+----------+--------------+----------+
|rate_plan_id|product_code| customer_id|           record_ID|           timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|
+------------+------------+------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+---------

In [39]:
cdrJoinRating = cdrJoinRating.withColumn(
    "imputed_mb",
    when((col("data_volume_mb").isNull()) | (col("data_volume_mb")=="UNK"), lit(0))
    .otherwise(col("data_volume_mb"))
)

In [40]:
cdrJoinRating = cdrJoinRating.withColumn(
    "rating_status",
    when(col("rating_status")=="needs_review", "ready").otherwise(col("rating_status"))
)

In [41]:
cdrJoinRating.show()

+------------+------------+------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+----------+--------------+----------+----------+
|rate_plan_id|product_code| customer_id|           record_ID|           timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|imputed_mb|
+------------+------------+------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+-------------------

In [42]:
cdrJoinRating = cdrJoinRating.withColumn(
    "dur_sec_imp",
    when(col("duration_sec").isNull() | (col("duration_sec") < 0), lit(0))
    .otherwise(col("duration_sec"))
)

In [43]:
cdrJoinRating = cdrJoinRating.withColumn(
    "billable_units",
    when(col("record_type") == "voice",
         ceil(col("dur_sec_imp")))
    .when(col("record_type") == "sms",
         lit(1))
    .otherwise(                    
         col("imputed_mb"))
)


In [44]:
cdrJoinRating.show()

+------------+------------+------------+--------------------+--------------------+------------+----------+------------+-------------+------------+-------------+---------+-----------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+----------+--------------+----------+----------+-----------+--------------+
|rate_plan_id|product_code| customer_id|           record_ID|           timestamp|     cell_id|technology|   caller_id|    callee_id|duration_sec|rating_status|sender_id|receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|imputed_mb|dur_sec_imp|billable_units|
+------------+------------+------------+--------------------+--------------------+------------+----------+------------+-------------+------------+-------------+---------+-----------+------------+--------------------+--------------+--------+------

In [45]:
from pyspark.sql.functions import coalesce

cdrJoinRating = (cdrJoinRating
    .withColumn("free_units",    coalesce(col("free_units"),    lit(0)))
    .withColumn("tier_threshold",coalesce(col("tier_threshold"),lit(0)))
    .withColumn("unit_price",    coalesce(col("unit_price"),    lit(0.0)))
    .withColumn("tier_price",    coalesce(col("tier_price"),    lit(0.0)))
)


In [46]:
cdrsRated = cdrJoinRating.withColumn(
    "cost",
    when(col("rate_type") == "flat",
         when(col("billable_units") <= col("free_units"), lit(0))
         .otherwise((col("billable_units") - col("free_units")) * col("unit_price")))
    .when(col("rate_type") == "tiered",
         when(col("billable_units") <= col("free_units"), lit(0))
         .when(col("billable_units") <= col("tier_threshold"),
               (col("billable_units") - col("free_units")) * col("unit_price"))
         .otherwise(
               (col("tier_threshold") - col("free_units")) * col("unit_price") +
               (col("billable_units") - col("tier_threshold")) * col("tier_price")))
)

cdrsRated = cdrsRated.withColumn(
    "rating_status",
    when(col("cost").isNull(),"unmatched")
    .otherwise("rated")
)


In [47]:
cdrsRated.select("record_ID","customer_id","product_code",
                 "billable_units","unit_price","cost","rating_status") \
         .show(truncate=False)


+------------------------------------+------------+------------+--------------+----------+------------------+-------------+
|record_ID                           |customer_id |product_code|billable_units|unit_price|cost              |rating_status|
+------------------------------------+------------+------------+--------------+----------+------------------+-------------+
|ce4ed4c0-4372-4061-890a-131e96bb6fef|212754643912|DATA_ULTRA  |880.97        |0.003     |1.89291           |rated        |
|08ed0b9d-b9fa-49e6-8f5b-938cb66907bb|212643052129|VOICE_INT   |465           |0.18      |81.89999999999999 |rated        |
|dbd5ac45-4316-4fce-9c59-33f8b8720bfb|212765464481|VOICE_INT   |299           |0.25      |74.75             |rated        |
|8a468587-cdeb-4050-b37b-e6a04ce053fb|212782121833|VOICE_INT   |102           |0.12      |8.64              |rated        |
|7a557c52-2392-4832-bc25-cc6dd87a7882|212668974708|VOICE_INT   |170           |0.25      |42.5              |rated        |
|91aebe1

In [48]:
from pyspark.sql.functions import date_format

cdrsRated = cdrsRated.withColumn(
    "billing_period",
    date_format(col("timestamp"), "yyyy-MM")
)

In [49]:
(
 cdrsRated
 .write
 .mode("overwrite")
 .partitionBy("billing_period")
 .parquet("rated_data/")
)

print("✓ Rating terminé : Parquet écrit dans rated_cdrs/")


✓ Rating terminé : Parquet écrit dans rated_cdrs/


In [50]:
cdrsRated.show()

+------------+------------+------------+--------------------+--------------------+------------+----------+------------+-------------+------------+-------------+---------+-----------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+----------+--------------+----------+----------+-----------+--------------+------------------+--------------+
|rate_plan_id|product_code| customer_id|           record_ID|           timestamp|     cell_id|technology|   caller_id|    callee_id|duration_sec|rating_status|sender_id|receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|imputed_mb|dur_sec_imp|billable_units|              cost|billing_period|
+------------+------------+------------+--------------------+--------------------+------------+----------+------------+-------------+------------+-------------+---------+--------