In [1]:
from pyspark.sql import SparkSession
from kafka import KafkaConsumer
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, when , to_timestamp , lit

In [62]:
spark.stop()

In [2]:
spark = SparkSession.builder \
    .appName("TelecomStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
    .config("spark.jars","/home/omar/libs/postgresql-42.6.0.jar") \
    .getOrCreate()

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "192.168.0.181:9092,192.168.0.135:9092") \
    .option("subscribe", "voice") \
    .option("startingOffsets", "earliest") \
    .load()

25/06/10 17:42:37 WARN Utils: Your hostname, omar-VirtualBox resolves to a loopback address: 127.0.0.1; using 192.168.0.181 instead (on interface enp0s3)
25/06/10 17:42:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/omar/spark-3.5.5-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/omar/.ivy2/cache
The jars for the packages stored in: /home/omar/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1a6b5b7b-85eb-44a3-8113-f2e743c5f730;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.9.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 867ms :: artifacts dl 26ms
	:: modu

In [4]:
schema = StructType() \
    .add("record_type", StringType()) \
    .add("timestamp", StringType()) \
    .add("caller_id", StringType()) \
    .add("callee_id", StringType()) \
    .add("sender_id", StringType()) \
    .add("receiver_id", StringType()) \
    .add("user_id", StringType()) \
    .add("duration_sec", IntegerType()) \
    .add("session_duration_sec", DoubleType()) \
    .add("data_volume_mb", DoubleType()) \
    .add("cell_id", StringType()) \
    .add("technology", StringType())

parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

In [7]:
def write_to_postgres(batch_df, batch_id):
    dedup_df = batch_df.dropDuplicates()
    dedup_df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://192.168.0.135:5432/telecom_db") \
        .option("dbtable", "clean_stream") \
        .option("user", "postgres") \
        .option("password", "123") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()

In [8]:
mediated_df = parsed_df.withColumn("timestamp", to_timestamp("timestamp")) \
    .withColumn("normalized_caller_id", when(col("record_type") == "voice", col("caller_id"))
                                         .when(col("record_type") == "sms", col("sender_id"))
                                         .when(col("record_type") == "data", col("user_id"))) \
    .withColumn("normalized_callee_id", when(col("record_type") == "voice", col("callee_id"))
                                         .when(col("record_type") == "sms", col("receiver_id"))
                                         .otherwise(None)) \
    .withColumn("normalized_duration", when(col("record_type") == "voice", col("duration_sec"))
                                        .when(col("record_type") == "data", col("session_duration_sec"))
                                        .otherwise(0)) \
    .withColumn("msisdn", when(col("record_type") == "voice", col("caller_id"))
    .when(col("record_type") == "sms", col("sender_id"))
    .when(col("record_type") == "data", col("user_id"))) \
    .withColumn("normalized_volume", when(col("record_type") == "data", col("data_volume_mb"))) \
    .withColumn("error_flag",
                    when(col("timestamp").isNull() |
                     col("record_type").isin("Television", "DSL", "Cloud", "") |
                     col("record_type").isNull() |
                     col("normalized_caller_id").isNull() |
                     col("normalized_caller_id").startswith("999") |
                     (col("normalized_duration") < 0) |
                     (col("data_volume_mb") < 0),
                     lit("True")).otherwise(lit("False")))

def log_errors(batch_df, batch_id):
    error_rows = batch_df.collect()  # Ici batch_df est un DataFrame batch (statique)
    with open('/home/omar/BigDataProject/error_records.log', 'a') as log_file:
        for row in error_rows:
            log_file.write(str(row) + '\n')

errors_df = mediated_df.filter(col("error_flag") == "True")
clean_df = mediated_df.filter(col("error_flag") == "False")
# 2. Filtrer les données erroné
errors_query = errors_df.writeStream \
    .foreachBatch(log_errors) \
    .outputMode("append") \
    .start()

clean_query = clean_df.writeStream \
    .foreachBatch(write_to_postgres) \
    .outputMode("append") \
    .option("checkpointLocation", "/home/omar/BigDataProject/checkpoints_clean_pg") \
    .start()
'''clean_query = clean_df.coalesce(1) \
    .writeStream \
    .outputMode("append") \
    .option("header", "true") \
    .format("csv") \
    .option("path", "/home/omar/BigDataProject/clean_records") \
    .option("checkpointLocation", "/home/omar/BigDataProject/checkpoints_clean") \
    .start()'''
errors_query.awaitTermination()
clean_query.awaitTermination()

25/06/10 17:44:44 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-36cabf7d-695d-444e-aab3-584720976212. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/06/10 17:44:44 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/06/10 17:44:44 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/06/10 17:44:44 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/06/10 17:44:44 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.pol

KeyboardInterrupt: 

In [None]:
query = mediated_df.repartition(1).writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("path", "/home/omar/BigDataProject/output") \
    .option("checkpointLocation", "/home/omar/BigDataProject/checkpoints") \
    .option("header", "true") \
    .start()
query.awaitTermination()