In [2]:
import datetime
import datetime
current_date = datetime.date.today().strftime('%Y/%m/%d')
filename=current_date + ".log"
filename

'2024/12/16.log'

In [26]:
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("log_analysis").getOrCreate()

In [98]:
from pyspark.sql.functions import to_timestamp,col,datediff,current_date,to_date,count,avg,round
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,TimestampType,DateType

In [46]:
schema=StructType([
    StructField("log_level",StringType(),True),
    StructField("message",StringType(),True),
    StructField("server_id",StringType(),True),
    StructField("timestamp",TimestampType(),True)
])

In [47]:
df=spark.read.format("json")\
    .option("multiline","True")\
    .option("schema",schema)\
    .load("sample_logs.json")

In [48]:
df.printSchema()

root
 |-- log_level: string (nullable = true)
 |-- message: string (nullable = true)
 |-- server_id: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [58]:
df=df.withColumn("timestamp",to_timestamp("timestamp","yyyy-MM-dd'T'HH:mm:ss"))

Identify the top 3 servers with the highest number of ERROR logs over the past week. 


In [88]:
filtered_df=df.filter(
    (col("log_level")=="ERROR") & 
    (datediff(current_date(),to_date("timestamp"))<=7)
)

filtered_df\
    .groupBy("server_id").agg(count(col("server_id")).alias("count"))\
    .orderBy(col("count").desc())\
    .show(3)


+---------+-----+
|server_id|count|
+---------+-----+
|Server_05|  324|
|Server_12|  321|
|Server_07|  318|
+---------+-----+
only showing top 3 rows



Calculate the average number of logs generated per day by each server over the past week. 

In [101]:
filtered_df1=df.filter(
    (datediff(current_date(),to_date("timestamp"))<=7)
)
filtered_df1\
    .withColumn("date",to_date("timestamp"))\
    .groupBy("date","server_id").agg(count("*").alias("total_count"))\
    .groupBy("server_id").agg(round(avg("total_count"),2).alias("avg_count"))\
    .orderBy(col("avg_count").desc())\
    .show()
    

+---------+---------+
|server_id|avg_count|
+---------+---------+
|Server_05|   136.43|
|Server_06|    132.0|
|Server_07|   131.43|
|Server_09|   130.57|
|Server_10|   130.14|
|Server_19|    130.0|
|Server_13|   129.43|
|Server_17|   129.14|
|Server_02|    129.0|
|Server_04|   127.71|
|Server_12|   126.43|
|Server_16|   125.86|
|Server_01|   125.29|
|Server_20|   124.86|
|Server_11|   124.71|
|Server_14|   124.29|
|Server_03|   124.14|
|Server_18|    124.0|
|Server_15|   123.57|
|Server_08|   117.86|
+---------+---------+



Provide a summary report of the most common log messages for each severity level. 

In [106]:
df.\
    groupBy("log_level","message").agg(count("*"))\
    .orderBy(col("log_level"))\
    .show(truncate=False)

+---------+--------------------------------------------+--------+
|log_level|message                                     |count(1)|
+---------+--------------------------------------------+--------+
|ERROR    |Failed to connect to the database.          |1332    |
|ERROR    |Critical security vulnerability detected.   |1324    |
|ERROR    |Disk write failure.                         |1385    |
|ERROR    |Disk space low.                             |1264    |
|ERROR    |Application crashed due to an unknown error.|1361    |
|INFO     |Service restarted successfully.             |1598    |
|INFO     |Server started successfully.                |1602    |
|INFO     |Backup completed successfully.              |1653    |
|INFO     |System maintenance completed.               |1709    |
|WARN     |Memory usage exceeded 80%.                  |2293    |
|WARN     |High CPU usage detected.                    |2203    |
|WARN     |High memory usage detected.                 |2276    |
+---------