In [0]:
# Set Azure Storage configurations
storage_account_name = "projectid3"
container_name = "project"
storage_account_key = "Sk4YdDVhS/tdri3VMFs09NGRrmuZTMFawnqXBq40hXi6qGzzh7cWLLxMLTf9WES0vlwWGVd3RCkg+AStmYEfQA=="

# Configure Spark to access Azure Blob Storage using the account key
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
    storage_account_key
)

# Define the file path to the CSV file in Azure Blob Storage
file_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/Application Logs.csv"

# Read the CSV file into a DataFrame
logs_df = spark.read.option("header", "true").csv(file_path)

# Show the first few rows to confirm data is loaded correctly
logs_df.show(5)


+-----------+-------------------+------------+--------+-------------+
|      Level|      Date and Time|      Source|Event ID|Task Category|
+-----------+-------------------+------------+--------+-------------+
|Information|19-12-2024 09:29:56| MSSQLSERVER|   17890|       Server|
|Information|19-12-2024 09:28:32|  edgeupdate|       0|         None|
|Information|19-12-2024 09:26:40|MsiInstaller|    1035|         None|
|Information|19-12-2024 09:26:40|MsiInstaller|    1035|         None|
|Information|19-12-2024 09:26:40|MsiInstaller|    1035|         None|
+-----------+-------------------+------------+--------+-------------+
only showing top 5 rows



In [0]:
# 1. Loading the Delta Table
# Load the Delta table from DBFS
logs_df = spark.read.format("delta").load("dbfs:/user/hive/warehouse/application_logs")
logs_df.createOrReplaceTempView("logs_view")  # Create a temporary view for SQL queries

# Display first 5 rows for a quick check
logs_df.show(5)


+-----------+-------------------+------------+--------+-------------+
|      Level|      Date and Time|      Source|Event ID|Task Category|
+-----------+-------------------+------------+--------+-------------+
|Information|19-12-2024 09:29:56| MSSQLSERVER|   17890|       Server|
|Information|19-12-2024 09:28:32|  edgeupdate|       0|         None|
|Information|19-12-2024 09:26:40|MsiInstaller|    1035|         None|
|Information|19-12-2024 09:26:40|MsiInstaller|    1035|         None|
|Information|19-12-2024 09:26:40|MsiInstaller|    1035|         None|
+-----------+-------------------+------------+--------+-------------+
only showing top 5 rows



In [0]:
# 2. Check the Schema of the DataFrame
# This will help you understand the structure and available columns in your dataset.
# Check the schema of the DataFrame to understand the available columns
logs_df.printSchema()


root
 |-- Level: string (nullable = true)
 |-- Date and Time: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Event ID: long (nullable = true)
 |-- Task Category: string (nullable = true)



In [0]:
# 3. Calculating the Error Rate by Level
# Assuming Level represents the severity of the log (e.g., "error", "info"), this step calculates the error rate.
# Calculate error rate by 'Level' (assuming 'Level' is the severity column)
error_count = logs_df.filter(logs_df["Level"] == "error").count()
total_count = logs_df.count()
error_rate = (error_count / total_count) * 100

print(f"Error rate: {error_rate}%")


Error rate: 0.0%


In [0]:
# 4. Trend Analysis Over Time (Using Date and Time Column)
# This step extracts the date and hour components from the Date and Time column to analyze the error trends over time.
from pyspark.sql.functions import col, to_date, hour

# Extract date and hour directly from 'Date and Time' column
logs_df = logs_df.withColumn("date", to_date(col("Date and Time")))
logs_df = logs_df.withColumn("hour", hour(col("Date and Time")))

# Group by date and hour to find trends
trend_df = logs_df.groupBy("date", "hour", "Level").count().orderBy("date", "hour")
trend_df.show(10)


+----+----+--------------------+-----+
|date|hour|               Level|count|
+----+----+--------------------+-----+
|NULL|NULL|C:\WINDOWS\system...|    7|
|NULL|NULL|App: C:\Program F...|    7|
|NULL|NULL|\t -s ""MSSQLSERV...|    7|
|NULL|NULL|13: 8a292df8-d653...|    7|
|NULL|NULL|Application Id=55...|    7|
|NULL|NULL|dbv = 1568.230.50...|    1|
|NULL|NULL|    10.0.22621.4601"|    7|
|NULL|NULL|[16] 0.001881 -0....|    1|
|NULL|NULL|[1] 0.040380 -0.0...|    1|
|NULL|NULL|               [5] -|    1|
+----+----+--------------------+-----+
only showing top 10 rows



In [0]:
# 5. Anomaly Detection Based on Error Count Spikes
# Here, we detect anomalies by flagging times when the error count exceeds a predefined threshold (e.g., twice the average error count).
from pyspark.sql import functions as F

# Aggregating error counts by date and hour
error_counts = logs_df.filter(logs_df.Level == "error") \
    .groupBy("date", "hour") \
    .agg(F.count("*").alias("error_count"))

# Calculate the average error count
avg_error_count = error_counts.agg(F.avg("error_count")).collect()[0][0]

# Check if the average value is None (no errors in the logs), and set a default threshold if needed
if avg_error_count is None:
    print("No error logs found.")
    threshold = 10  # Set a reasonable default threshold if no errors are found
else:
    # Define a threshold for anomaly detection (e.g., twice the average error count)
    threshold = avg_error_count * 2  # Double the average count

# Flag anomalies where error count exceeds the threshold
anomalies = error_counts.filter(error_counts.error_count > threshold)
anomalies.show(10)



No error logs found.
+----+----+-----------+
|date|hour|error_count|
+----+----+-----------+
+----+----+-----------+



In [0]:
from pyspark.sql.functions import regexp_extract

# Extract timestamp, log level, and message
df_parsed = logs_df.withColumn("timestamp", regexp_extract("Level", r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})", 1)) \
              .withColumn("log_level", regexp_extract("Level", r"\b(INFO|ERROR|WARN|DEBUG)\b", 1)) \
              .withColumn("message", regexp_extract("Level", r"(?:INFO|ERROR|WARN|DEBUG)\s+(.*)", 1))

df_parsed.show(truncate=False)

output_path = "dbfs:/user/hive/warehouse/processed_logs"

# Save as Parquet
df_parsed.write.mode("overwrite").parquet(output_path)

# Save as CSV
df_parsed.write.mode("overwrite").csv(output_path)



+-----------+-------------------+------------+--------+-------------+----+----+---------+---------+-------+
|Level      |Date and Time      |Source      |Event ID|Task Category|date|hour|timestamp|log_level|message|
+-----------+-------------------+------------+--------+-------------+----+----+---------+---------+-------+
|Information|19-12-2024 09:29:56|MSSQLSERVER |17890   |Server       |NULL|NULL|         |         |       |
|Information|19-12-2024 09:28:32|edgeupdate  |0       |None         |NULL|NULL|         |         |       |
|Information|19-12-2024 09:26:40|MsiInstaller|1035    |None         |NULL|NULL|         |         |       |
|Information|19-12-2024 09:26:40|MsiInstaller|1035    |None         |NULL|NULL|         |         |       |
|Information|19-12-2024 09:26:40|MsiInstaller|1035    |None         |NULL|NULL|         |         |       |
|Information|19-12-2024 09:26:40|MsiInstaller|1035    |None         |NULL|NULL|         |         |       |
|Information|19-12-2024 09:2

In [0]:
# Define the output path for processed data
output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/processed_data/"

# Write the processed DataFrame back as a CSV file, overwrite existing data if necessary
logs_df.write.mode("overwrite").option("header", "true").csv(output_path)

# Print confirmation
print(f"Data successfully written to: {output_path}")

Data successfully written to: wasbs://project@projectid3.blob.core.windows.net/processed_data/
