In [1]:
import os
import json
import random
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("Log Analysis").getOrCreate()

In [2]:
log_data = {
    "INFO": ["User logged in", "New user registered", "Data successfully processed"],
    "ERROR": ["Page not found", "Database connection lost", "File not found"],
    "WARNING": ["High memory usage detected", "Slow API response", "Deprecated function used"],
    "DEBUG": ["Function X executed", "Cache cleared", "Config file loaded"],
    "CRITICAL": ["System out of memory! Shutting down", "Power failure detected"],
    "FATAL": ["Kernel panic", "Unrecoverable database corruption"],
    "TRACE": ["Entering function process_data()", "Exiting function process_data()"],
    "SECURITY": ["Unauthorized login attempt", "Suspicious IP detected"],
    "ACCOUNT": ["User X changed password", "Account settings updated"]
}

json_file = os.path.join(os.getcwd(), "log_messages.json")
with open(json_file, "w") as f:
    json.dump(log_data, f, indent=4)

log_levels = "|".join(log_data.keys())

In [3]:
def generate_log_entry():
    log_level = random.choice(list(log_data.keys()))
    log_message = random.choice(log_data[log_level]).strip()
    timestamp = (datetime.now() - timedelta(days=random.randint(0, 30))).strftime("%Y-%m-%d %H:%M:%S")
    return f"{timestamp} {log_level} {log_message}"

log_file = os.path.join(os.getcwd(), "log_file.txt")
num_entries = 100000
with open(log_file, "w") as f:
    for _ in range(num_entries):
        f.write(generate_log_entry() + "\n")

In [6]:
log_df = spark.read.text(log_file)
log_df = log_df.withColumn("Timestamp", F.regexp_extract(F.col("value"), r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})", 1))
log_df = log_df.withColumn("LogLevel", F.regexp_extract(F.col("value"), rf"\b({log_levels})\b", 1))
log_df = log_df.withColumn("Message", F.trim(F.regexp_extract(F.col("value"), rf"\b(?:{log_levels})\b (.*)", 1)))

log_df = log_df.drop("value").na.drop()
log_df = log_df.withColumn("Timestamp", F.col("Timestamp").cast(TimestampType()))

log_df.show(10, truncate=False)

+-------------------+--------+--------------------------+
|Timestamp          |LogLevel|Message                   |
+-------------------+--------+--------------------------+
|2025-02-17 09:04:36|FATAL   |Kernel panic              |
|2025-03-07 09:04:36|INFO    |New user registered       |
|2025-02-06 09:04:36|ERROR   |Page not found            |
|2025-02-13 09:04:36|SECURITY|Suspicious IP detected    |
|2025-03-06 09:04:36|ERROR   |File not found            |
|2025-02-26 09:04:36|SECURITY|Unauthorized login attempt|
|2025-03-05 09:04:36|SECURITY|Suspicious IP detected    |
|2025-02-13 09:04:36|CRITICAL|Power failure detected    |
|2025-02-11 09:04:36|ERROR   |Page not found            |
|2025-03-01 09:04:36|ACCOUNT |Account settings updated  |
+-------------------+--------+--------------------------+
only showing top 10 rows



In [7]:
df_count = log_df.groupBy("LogLevel").agg(F.count("LogLevel").alias("Count"))
df_count.show()


+--------+-----+
|LogLevel|Count|
+--------+-----+
|    INFO|11204|
| ACCOUNT|10925|
|   ERROR|11317|
|   FATAL|11132|
|   DEBUG|11166|
|   TRACE|11111|
|CRITICAL|11144|
|SECURITY|11089|
+--------+-----+



In [8]:
error_logs = log_df.where(F.col("LogLevel") == "ERROR")
error_logs.show(10, truncate=False)


+-------------------+--------+------------------------+
|Timestamp          |LogLevel|Message                 |
+-------------------+--------+------------------------+
|2025-02-06 09:04:36|ERROR   |Page not found          |
|2025-03-06 09:04:36|ERROR   |File not found          |
|2025-02-11 09:04:36|ERROR   |Page not found          |
|2025-02-08 09:04:36|ERROR   |Database connection lost|
|2025-02-12 09:04:36|ERROR   |Page not found          |
|2025-03-06 09:04:36|ERROR   |File not found          |
|2025-03-04 09:04:36|ERROR   |Database connection lost|
|2025-02-11 09:04:36|ERROR   |Page not found          |
|2025-02-06 09:04:36|ERROR   |Page not found          |
|2025-02-09 09:04:36|ERROR   |Database connection lost|
+-------------------+--------+------------------------+
only showing top 10 rows



In [9]:
logs_by_hour = log_df.withColumn("Hour", F.hour("Timestamp")).groupBy("Hour").count().orderBy("Hour")
logs_by_hour.show()


+----+------+
|Hour| count|
+----+------+
|   9|100000|
+----+------+

