In [1]:
"""
Module Name: Scalable Programming
Description: This file is to perform computations of dataset using Apache Spark.
Author: Shayam Shah
Date: 2024-07-29
"""

# This code cell is used to import all the dependencis that are used throughout the code
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col, dayofweek, from_unixtime, lower, to_timestamp, to_date, date_format, avg, count
session = SparkSession.builder.appName('BGL_Analysis').getOrCreate()

24/07/29 14:59:23 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
# This code cell is used to load the dataset in the DataFrame
dataset_path = "/home/hduser/notebooks/BGL.log"
df = spark.read.option("header", "false").csv(dataset_path, sep=" ")

                                                                                

In [4]:
df = df.withColumnRenamed('_c0', 'alert_flag') \
       .withColumnRenamed('_c1', 'timestamp') \
       .withColumnRenamed('_c2', 'date') \
       .withColumnRenamed('_c3', 'node') \
       .withColumnRenamed('_c4', 'date_time') \
       .withColumnRenamed('_c5', 'node_repeated') \
       .withColumnRenamed('_c6', 'message_type') \
       .withColumnRenamed('_c7', 'system_component') \
       .withColumnRenamed('_c8', 'level') \
       .withColumn('message_content', concat_ws(' ', col('_c9'), col('_c10'), col('_c11'), col('_c12'), col('_c13'))) \
       .withColumn('date', to_date(col('date'), 'yyyy.MM.dd')) \
       .withColumn('day_of_week', dayofweek(col('date')))

In [5]:
#This piece of code is used to print the schema of the dataset
df.printSchema()

root
 |-- alert_flag: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- date: date (nullable = true)
 |-- node: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- node_repeated: string (nullable = true)
 |-- message_type: string (nullable = true)
 |-- system_component: string (nullable = true)
 |-- level: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- message_content: string (nullable = false)
 |-- day_of_week: integer (nullable = true)



In [6]:
#This code snippet is used to remove the unwanted columns from the dataset
columns_to_drop = ["_c9", "_c10", "_c11", "_c12", "_c13"]
correct_dataframe = df.drop(*columns_to_drop)

In [7]:
# Filter for kernel panic messages on Friday or Saturday
fri_sat_panic= df.drop(*columns_to_drop).filter((col('message_content') == 'kernel panic') &
                        ((col('day_of_week') == 6) | (col('day_of_week') == 7)))

print("Below are the instances as per the filteration")
fri_sat_panic.show()
print("The count of the data")
fri_sat_panic.count()

Below are the instances as per the filteration


                                                                                

+----------+----------+----------+-------------------+--------------------+-------------------+------------+----------------+-----+---------------+-----------+
|alert_flag| timestamp|      date|               node|           date_time|      node_repeated|message_type|system_component|level|message_content|day_of_week|
+----------+----------+----------+-------------------+--------------------+-------------------+------------+----------------+-----+---------------+-----------+
|   KERNPAN|1124484960|2005-08-19|R43-M0-NC-I:J18-U01|2005-08-19-13.56....|R43-M0-NC-I:J18-U01|         RAS|          KERNEL|FATAL|   kernel panic|          6|
|   KERNPAN|1124484960|2005-08-19|R43-M0-N8-I:J18-U11|2005-08-19-13.56....|R43-M0-N8-I:J18-U11|         RAS|          KERNEL|FATAL|   kernel panic|          6|
|   KERNPAN|1126306502|2005-09-09|R05-M1-N4-I:J18-U11|2005-09-09-15.55....|R05-M1-N4-I:J18-U11|         RAS|          KERNEL|FATAL|   kernel panic|          6|
|   KERNPAN|1128113240|2005-09-30|R22-M0

                                                                                

6

In [19]:
#This code snippet is used to filter the message content
filtered_df = df.filter(col("message_content").contains("torus receiver z+ input"))

# Group by day_of_week and calculate the average number of seconds
result_df = filtered_df.groupBy("day_of_week").agg(avg("timestamp").alias("average_seconds"))

# Show the result
result_df.show()




+-----------+--------------------+
|day_of_week|     average_seconds|
+-----------+--------------------+
|          1|1.1295884162542372E9|
|          6| 1.126602118937799E9|
|          3| 1.129997737671875E9|
|          5|1.1304924833903227E9|
|          4|1.1224597884473684E9|
|          7|1.1294064528880596E9|
|          2|1.1244554479086537E9|
+-----------+--------------------+



                                                                                

In [22]:
# Order by count in descending order and select top 5
node_counts = df_dropped.groupBy("node").agg(count("node").alias("count"))
top_nodes = node_counts.orderBy(col("count").desc()).limit(5)

# Show the results
top_nodes.show(truncate=False)



+-------------------+------+
|node               |count |
+-------------------+------+
|R30-M0-N9-C:J16-U01|152329|
|NULL               |89296 |
|R02-M1-N0-C:J12-U11|64651 |
|R37-M1-NC-C:J02-U11|35288 |
|UNKNOWN_LOCATION   |27039 |
+-------------------+------+



                                                                                

In [37]:
# Filter rows where alert_flag is "APPBUSY" and count occurrences of each node
appbusy_counts = df.filter(col("alert_flag") == "APPBUSY") \
    .groupBy("node") \
    .agg(count("*").alias("count")) \
    .orderBy("count") \
    .limit(1)

# Show the result
appbusy_counts.show()



+-------------------+-----+
|               node|count|
+-------------------+-----+
|R15-M1-N0-I:J18-U11|    1|
+-------------------+-----+



                                                                                

In [38]:
# Get the earliest occurrence based on timestamp
filtered_df = df.filter(
    (col("system_component") == "KERNEL") &
    (col("level") == "FATAL") &
    (col("message_content").contains("Lustre mount FAILED"))
)

earliest_error = filtered_df.orderBy(col("timestamp")).first()

# Show the result
if earliest_error:
    print(f"Earliest fatal kernel error where the message contains 'Lustre mount FAILED':")
    print(f"Date: {earliest_error['date']}")
    print(f"Time: {earliest_error['date_time']}")
else:
    print("No fatal kernel errors found with message 'Lustre mount FAILED'.")

                                                                                

Earliest fatal kernel error where the message contains 'Lustre mount FAILED':
Date: 2005-08-03
Time: 2005-08-03-15.35.34.555500
