In [1]:
# Import necessary modules from PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, concat_ws, lit, monotonically_increasing_id

# Step 1: Create a Spark session
# This is like starting the Spark engine
spark = SparkSession.builder \
    .appName("BeginnerFriendlyLogProcessing") \
    .master("local[2]") \
    .getOrCreate()

# Step 2: Load your CSV files into a Spark DataFrame
# 'header=True' tells Spark the first row has column names
df = spark.read.option("header", True).csv("../Data/raw_logs/Friday-WorkingHours-Afternoon-DDos.csv")

# Step 3: Transform the raw CSV into a structured log DataFrame
# We'll create 5 new columns for log processing:
# - log_level: INFO for BENIGN, ERROR for attacks
# - error: copy of the original Label column
# - event: human-readable message about the flow
# - timestamp: unique ID for each row (acts as a timestamp)
# - time: the flow duration from CSV

log_df = df.withColumn(
    "log_level",
    when(col("Label") == "BENIGN", "INFO").otherwise("ERROR")  # set log level
).withColumn(
    "error",
    col("Label")  # copy the attack type / label
).withColumn(
    "event",
    concat_ws(" ",                                  # combine columns into one string
              lit("Flow on port"),                # static text
              col("Destination Port"),           # port number
              lit("with total packets"),         # static text
              col("Total Fwd Packets") + col("Total Backward Packets"))  # sum packets
).withColumn(
    "timestamp",
    monotonically_increasing_id()  # generates a unique ID for each row
).withColumn(
    "time",
    col("Flow Duration")  # use flow duration as the time column
)

# Step 4: Show the first 5 rows of the structured log DataFrame
# truncate=False ensures we can see the full event message
log_df.select("log_level", "timestamp", "error", "event", "time").show(5, truncate=False)


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Label` cannot be resolved. Did you mean one of the following? [` Label`, ` Idle Max`, ` Idle Min`, ` Idle Std`, `Idle Mean`].;
'Project [ Destination Port#17,  Flow Duration#18,  Total Fwd Packets#19,  Total Backward Packets#20, Total Length of Fwd Packets#21,  Total Length of Bwd Packets#22,  Fwd Packet Length Max#23,  Fwd Packet Length Min#24,  Fwd Packet Length Mean#25,  Fwd Packet Length Std#26, Bwd Packet Length Max#27,  Bwd Packet Length Min#28,  Bwd Packet Length Mean#29,  Bwd Packet Length Std#30, Flow Bytes/s#31,  Flow Packets/s#32,  Flow IAT Mean#33,  Flow IAT Std#34,  Flow IAT Max#35,  Flow IAT Min#36, Fwd IAT Total#37,  Fwd IAT Mean#38,  Fwd IAT Std#39,  Fwd IAT Max#40, ... 56 more fields]
+- Relation [ Destination Port#17, Flow Duration#18, Total Fwd Packets#19, Total Backward Packets#20,Total Length of Fwd Packets#21, Total Length of Bwd Packets#22, Fwd Packet Length Max#23, Fwd Packet Length Min#24, Fwd Packet Length Mean#25, Fwd Packet Length Std#26,Bwd Packet Length Max#27, Bwd Packet Length Min#28, Bwd Packet Length Mean#29, Bwd Packet Length Std#30,Flow Bytes/s#31, Flow Packets/s#32, Flow IAT Mean#33, Flow IAT Std#34, Flow IAT Max#35, Flow IAT Min#36,Fwd IAT Total#37, Fwd IAT Mean#38, Fwd IAT Std#39, Fwd IAT Max#40,... 55 more fields] csv


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, concat_ws, lit, monotonically_increasing_id

# 1️⃣ Start Spark session
spark = SparkSession.builder \
    .appName("BeginnerFriendlyLogProcessing") \
    .master("local[*]") \
    .getOrCreate()

# 2️⃣ Load new dataset
df = spark.read.option("header", True) \
    .option("inferSchema", True) \
    .csv("../Data/raw_logs/logdata.csv")

# 3️⃣ Strip spaces from column names (safe step)
df = df.toDF(*[c.strip() for c in df.columns])

# 4️⃣ Transform into structured log DataFrame (UPDATED LOGIC)
log_df = df.withColumn(
    "log_level",
    when(col("LogLevel") == "INFO", "INFO").otherwise("ERROR")
).withColumn(
    "error",
    col("LogLevel")
).withColumn(
    "event",
    concat_ws(
        " | ",
        lit("Service:"), col("Service"),
        lit("Message:"), col("Message"),
        lit("ClientIP:"), col("ClientIP")
    )
).withColumn(
    "timestamp",
    col("Timestamp")
).withColumn(
    "time",
    col("TimeTaken")
)

# 5️⃣ Show first 5 rows
log_df.select(
    "log_level",
    "timestamp",
    "error",
    "event",
    "time"
).show(5, truncate=False)


+---------+--------------------------+-------+---------------------------------------------------------------------------------+----+
|log_level|timestamp                 |error  |event                                                                            |time|
+---------+--------------------------+-------+---------------------------------------------------------------------------------+----+
|ERROR    |2023-11-20 08:40:50.672154|DEBUG  |Service: | ServiceA | Message: | File I/O | ClientIP: | 192.168.1.219            |55ms|
|ERROR    |2023-11-20 08:40:50.688973|ERROR  |Service: | ServiceA | Message: | Critical Errors | ClientIP: | 192.168.1.185     |72ms|
|ERROR    |2023-11-20 08:40:50.697002|ERROR  |Service: | ServiceB | Message: | Critical Errors | ClientIP: | 192.168.1.194     |56ms|
+---------+--------------------------+-------+---------------------------------------------------------------------------------+----+
only showing top 5 rows

