In [27]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window


In [28]:
spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()

log_df = spark.read.text("./Dlogs.txt")


In [29]:
log_df = log_df.select(
    F.from_unixtime(F.unix_timestamp(F.split(F.col("value"), ",")[0], "yyyy-MM-dd'T'HH:mm:ss'Z'")).alias("timestamp"),
    F.split(F.col("value"), ",")[1].alias("ip_address"),
    F.split(F.col("value"), ",")[2].alias("request_method"),
    F.split(F.col("value"), ",")[3].alias("endpoint"),
    F.split(F.col("value"), ",")[4].cast("int").alias("response_code"),
    F.split(F.col("value"), ",")[5].cast("int").alias("response_time")
)

In [30]:
endpoint_counts = log_df.groupBy("endpoint").count().orderBy("count", ascending=False)

endpoint_counts.show()

+---------+-----+
| endpoint|count|
+---------+-----+
|    /home| 1279|
| /contact| 1252|
|   /about| 1238|
|   /login| 1231|
+---------+-----+



In [31]:
hourly_traffic = log_df.withColumn("hour", F.hour("timestamp")) \
                            .groupBy("hour", "request_method") \
                            .count()

hourly_traffic.show()

+----+--------------+-----+
|hour|request_method|count|
+----+--------------+-----+
|  10|           GET|   54|
|  13|           GET|   59|
|   1|           PUT|   52|
|   9|          POST|   53|
|  20|           PUT|   53|
|  19|           PUT|   47|
|   2|           PUT|   37|
|  20|        DELETE|   50|
|  22|          POST|   51|
|   1|          POST|   52|
|   2|        DELETE|   42|
|  15|          POST|   66|
|  21|           PUT|   47|
|   4|           PUT|   52|
|  16|           GET|   46|
|   4|           GET|   47|
|  19|        DELETE|   41|
|  22|           GET|   45|
|  23|          POST|   52|
|   7|          POST|   33|
+----+--------------+-----+
only showing top 20 rows



In [32]:
error_df = log_df.filter(F.col("response_code").between(400, 599))

error_by_endpoint = error_df.groupBy("endpoint").count()

error_by_endpoint.show()

+---------+-----+
| endpoint|count|
+---------+-----+
|   /login|  901|
|    /home|  974|
| /contact|  955|
|   /about|  940|
+---------+-----+



In [33]:
avg_response_times = log_df.groupBy("endpoint").avg("response_time")

avg_response_times.orderBy("avg(response_time)", ascending=False).show()

+---------+------------------+
| endpoint|avg(response_time)|
+---------+------------------+
| /contact|281.28514376996804|
|    /home| 278.8835027365129|
|   /login|270.77335499593823|
|   /about| 266.4668820678514|
+---------+------------------+



In [34]:
traffic_window = Window.partitionBy("ip_address").orderBy("timestamp")
ip_traffic_analysis = log_df.withColumn("next_timestamp", F.lead("timestamp").over(traffic_window)) \
                            .withColumn("time_diff", F.unix_timestamp("next_timestamp") - F.unix_timestamp("timestamp")) \
                            .filter("time_diff < 60")  
surge_traffic = ip_traffic_analysis.groupBy("ip_address").count().filter("count > 20") 

surge_traffic.show()

error_window = Window.partitionBy().orderBy("timestamp")
error_analysis = error_df.withColumn("next_timestamp", F.lead("timestamp").over(error_window)) \
                         .withColumn("time_diff", F.unix_timestamp("next_timestamp") - F.unix_timestamp("timestamp")) \
                         .filter("time_diff < 60") 
high_error_rate = error_analysis.groupBy("timestamp").count().filter("count > 0") 

high_error_rate.show()

+--------------+-----+
|    ip_address|count|
+--------------+-----+
| 192.168.1.105|   86|
+--------------+-----+

+-------------------+-----+
|          timestamp|count|
+-------------------+-----+
|2023-12-25 00:00:06|    1|
|2023-12-25 00:00:10|    1|
|2023-12-25 00:00:17|    1|
|2023-12-25 00:00:30|    1|
|2023-12-25 00:00:39|    1|
|2023-12-25 00:01:26|    1|
|2023-12-25 00:02:22|    1|
|2023-12-25 00:02:55|    1|
|2023-12-25 00:03:09|    1|
|2023-12-25 00:03:36|    1|
|2023-12-25 00:04:26|    1|
|2023-12-25 00:04:42|    1|
|2023-12-25 00:05:19|    1|
|2023-12-25 00:05:24|    1|
|2023-12-25 00:05:33|    1|
|2023-12-25 00:05:37|    1|
|2023-12-25 00:05:44|    1|
|2023-12-25 00:06:30|    1|
|2023-12-25 00:07:12|    1|
|2023-12-25 00:07:58|    1|
+-------------------+-----+
only showing top 20 rows

