In [1]:
# cree une spark session
from pyspark.sql import SparkSession


spark = (
    SparkSession 
    .builder 
    .appName("Streaming from spring-boot") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [2]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "ed-kafka:29092")
    .option("subscribe", "logsTopic")
    .option("startingOffsets", "earliest")
    .load()
)


In [3]:
# View schema for raw kafka_df
kafka_df.printSchema()
#kafka_df.show()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
# Parse value from binay to string into kafka_json_df
from pyspark.sql.functions import expr

kafka_logs_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [5]:
from pyspark.sql.functions import regexp_extract, col

# Define the regex pattern
log_pattern = r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[([A-Z]+)] ([^\s]+) ([^:]+): (.+?) - (.+)"

# Extract fields using the regex pattern
parsed_kafka_logs_df = kafka_logs_df.withColumn("timestamp", regexp_extract(col("value"), log_pattern, 1)) \
    .withColumn("logLevel", regexp_extract(col("value"), log_pattern, 2)) \
    .withColumn("threadName", regexp_extract(col("value"), log_pattern, 3)) \
    .withColumn("loggerName", regexp_extract(col("value"), log_pattern, 4)) \
    .withColumn("message", regexp_extract(col("value"), log_pattern, 5)) \
    .withColumn("contextData", regexp_extract(col("value"), log_pattern, 6))



In [6]:
# Check the schema of the flattened_df, place a sample json file and change readStream to read 
parsed_kafka_logs_df.printSchema()
#flattened_df.show(truncate=False)

root
 |-- key: binary (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- logLevel: string (nullable = true)
 |-- threadName: string (nullable = true)
 |-- loggerName: string (nullable = true)
 |-- message: string (nullable = true)
 |-- contextData: string (nullable = true)



In [7]:
from pyspark.sql.functions import   lit, to_json,struct, collect_list, count
log_level_by_logger_df = (
    parsed_kafka_logs_df
    .groupBy("loggerName", "message")
    .agg(
        count("*").alias("logCount"),  # Count the number of logs in each group
        collect_list("contextData").alias("allContextData")  # Collect all contextData for the group
    )
)

In [8]:
aggregated_df = (
    log_level_by_logger_df
    .withColumn(
        "logDetails",  # Column that holds the JSON object
        struct(col("loggerName"), col("message"),col("allContextData"), col("logCount"))  # Combine all relevant columns
    )
    .withColumn(
        "value",  # Convert the list into a JSON array
        to_json(col("logDetails"))
    )
    .select(
        lit("logLevelByLogger").alias("key"),  # Static key for the Kafka message
        col("value")  # JSON array as value
    )
)

In [None]:
# Write the output to Kafka cc
(aggregated_df
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "ed-kafka:29092")  # Kafka broker address
    .option("topic", "spark-out-put")  # Kafka topic to send data to
    .option("checkpointLocation", "checkpoint04")  # Checkpoint location for fault tolerance
    .outputMode("update")  # Use append mode to add new data to the topic
    .start()  # Start the streaming query
    .awaitTermination()  # Keep the stream running
)