In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, to_timestamp, hour, col
import os

In [2]:
spark = SparkSession.builder \
       .appName("Log Analyzer") \
       .master("local[*]") \
       .getOrCreate()

In [3]:
logs_path = os.path.join("hdfs://hadoop-namenode:9000/logs/input/access.log")
raw_logs = spark.read.text(logs_path)
raw_logs.show(5, truncate=False)

+--------------------------------------------------------------------------------------+
|value                                                                                 |
+--------------------------------------------------------------------------------------+
|127.0.0.1 - - [15/Apr/2025:13:45:23 +0000] "GET /index.html HTTP/1.1" 200 1024        |
|192.168.1.10 - - [15/Apr/2025:13:45:45 +0000] "POST /api/login HTTP/1.1" 302 512      |
|203.0.113.5 - - [15/Apr/2025:13:46:01 +0000] "GET /dashboard HTTP/1.1" 200 2048       |
|198.51.100.23 - - [15/Apr/2025:13:46:15 +0000] "GET /reports/monthly HTTP/1.1" 404 256|
|172.16.0.2 - - [15/Apr/2025:13:47:05 +0000] "DELETE /api/user/123 HTTP/1.1" 403 128   |
+--------------------------------------------------------------------------------------+
only showing top 5 rows



In [4]:
log_pattern = r'^(\S+) - - \[(.*?)\] "(GET|POST|PUT|DELETE) (\S+) HTTP/\d.\d" (\d{3}) \d+$'

parsed_logs = raw_logs.withColumn("ip", regexp_extract("value", log_pattern, 1)) \
                      .withColumn("timestamp", regexp_extract("value", log_pattern, 2)) \
                      .withColumn("method", regexp_extract("value", log_pattern, 3)) \
                      .withColumn("url", regexp_extract("value", log_pattern, 4)) \
                      .withColumn("status_code", regexp_extract("value", log_pattern, 5).cast("int"))

clean_logs = parsed_logs.withColumn("datetime", to_timestamp("timestamp", "dd/MMM/yyyy:HH:mm:ss Z")) \
                        .withColumn("hour", hour("datetime")) 


In [5]:
request_by_hour = clean_logs.groupby("hour").count().orderBy("count")
top_ips = clean_logs.groupby("ip").count().orderBy(col("count").desc()).limit(10)
errors = clean_logs.filter((col("status_code") >= 400) & (col("status_code") < 600)) \
                   .groupBy("status_code").count().orderBy("status_code")


In [6]:
request_by_hour.show()
top_ips.show()
errors.show()

+----+-----+
|hour|count|
+----+-----+
|   0|    8|
|  20|   10|
|   6|   11|
|  12|   12|
|  16|   12|
|  23|   12|
|  17|   13|
|   4|   13|
|   2|   13|
|  22|   14|
|   1|   14|
|   7|   14|
|  18|   14|
|   3|   15|
|   8|   15|
|  10|   15|
|  21|   15|
|  11|   16|
|  14|   16|
|   9|   17|
+----+-----+
only showing top 20 rows

+--------------+-----+
|            ip|count|
+--------------+-----+
|  192.168.3.21|   10|
|  192.168.4.25|    8|
|      10.1.2.3|    8|
| 198.51.100.23|    6|
|  91.189.92.38|    6|
|   203.0.113.5|    6|
|      10.2.3.4|    6|
|  192.168.1.10|    6|
|    172.16.0.2|    6|
|104.244.42.129|    6|
+--------------+-----+

+-----------+-----+
|status_code|count|
+-----------+-----+
|        400|    4|
|        401|   11|
|        403|   16|
|        404|   12|
|        500|    5|
+-----------+-----+



In [7]:
errors.write.format("csv").save("hdfs://hadoop-namenode:9000/logs/output/errors.csv")
top_ips.write.format("csv").save("hdfs://hadoop-namenode:9000/logs/output/top_ips.csv")
request_by_hour.write.format("csv").save("hdfs://hadoop-namenode:9000/logs/output/requests_by_hour.csv")
