In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, current_date, date_sub, col
import pandas as pd

spark = SparkSession.builder.appName("Log File Analysis").master("local[*]").getOrCreate()

file_path = "/FileStore/tables/sample_logs.json"  
df = spark.read.option("multiline", "True").json(file_path)

df.printSchema()

root
 |-- log_level: string (nullable = true)
 |-- message: string (nullable = true)
 |-- server_id: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [0]:
df = df.withColumn("timestamp", to_date("timestamp"))

df_last_week = df.filter(col("timestamp") >= date_sub(current_date(), 7))

In [0]:
# Task 1: Top 3 servers with the highest number of ERROR logs in the last 7 days
df_error_logs = df_last_week.filter(col("log_level") == "ERROR")
df_error_logs_grouped = df_error_logs.groupBy("server_id").count()

df_error_logs_grouped_pd = df_error_logs_grouped.toPandas()
top_3_servers = df_error_logs_grouped_pd.nlargest(3, 'count')

print("\nTop 3 servers with the most ERROR logs in the last 7 days:")
print(top_3_servers)


Top 3 servers with the most ERROR logs in the last 7 days:
   server_id  count
8  Server_05    185
1  Server_12    182
2  Server_04    174


In [0]:
# Task 2: Calculate the average number of logs generated per day by each server
df_logs_per_day = df_last_week.groupBy("server_id", "timestamp").count()

df_logs_per_day_pd = df_logs_per_day.toPandas()

avg_logs_per_day = df_logs_per_day_pd.groupby('server_id')['count'].mean().reset_index(name='avg_logs_per_day')

print("\nAverage number of logs generated per day by each server over the past week:")
print(avg_logs_per_day)


Average number of logs generated per day by each server over the past week:
    server_id  avg_logs_per_day
0   Server_01            113.50
1   Server_02            115.25
2   Server_03            111.00
3   Server_04            116.50
4   Server_05            132.25
5   Server_06            119.75
6   Server_07            117.00
7   Server_08            111.00
8   Server_09            121.75
9   Server_10            122.50
10  Server_11            110.25
11  Server_12            121.00
12  Server_13            117.25
13  Server_14            105.75
14  Server_15            111.25
15  Server_16            109.00
16  Server_17            118.75
17  Server_18            114.25
18  Server_19            123.00
19  Server_20            112.25


In [0]:
# Task 3: Provide a summary of the most common log messages for each severity level (INFO, WARN, ERROR)
df_log_summary = df_last_week.groupBy("log_level", "message").count()

df_log_summary_pd = df_log_summary.toPandas()

df_log_summary_pd['rank'] = df_log_summary_pd.groupby('log_level')['count'].rank(method='first', ascending=False)

top_messages = df_log_summary_pd[df_log_summary_pd['rank'] == 1][['log_level', 'message', 'count']]

print("\nMost common log messages for each severity level:")
print(top_messages)


Most common log messages for each severity level:
  log_level                        message  count
1      WARN     Memory usage exceeded 80%.   1068
4     ERROR            Disk write failure.    638
9      INFO  System maintenance completed.    775
