In [1]:
from pyspark.sql import SparkSession
from pathlib import Path

LOG_FILE_PATH = Path("./web_server_logs.csv")


spark = SparkSession.builder.appName("Analyzer HTTP Logs").getOrCreate()

df = spark.read.csv(LOG_FILE_PATH.as_posix(), header=True, sep=",", inferSchema=True)
df.show()

+---------------+-------------------+------+--------------------+-------------+-------------+
|             ip|          timestamp|method|                 url|response_code|response_size|
+---------------+-------------------+------+--------------------+-------------+-------------+
|122.205.103.124|2024-07-25 16:32:55|DELETE|             explore|          500|         3548|
|   78.33.252.60|2024-07-10 09:06:49|DELETE|          categories|          500|         9618|
| 92.101.177.214|2024-08-26 08:27:26|DELETE|      categories/app|          500|         2131|
|   2.162.44.128|2024-11-02 19:42:31|   GET|     wp-content/list|          200|         4606|
|  144.64.33.139|2024-11-15 05:48:49|  POST|       tag/posts/tag|          200|          612|
|  26.185.128.78|2024-01-31 09:57:29|   PUT|list/posts/catego...|          301|         1879|
| 219.217.205.17|2024-08-27 17:27:39|   PUT|          wp-content|          500|         3553|
| 118.249.252.62|2024-05-26 13:01:21|   PUT|list/categories/

In [5]:
from pyspark.sql import functions as f

df_ip_high_requests = df.groupBy("ip").agg(
    f.count(f.col("ip")).alias("requests_count")
).select("ip", "requests_count").sort(f.col("requests_count").desc())
df_ip_high_requests.show(10)

+--------------+--------------+
|            ip|requests_count|
+--------------+--------------+
| 29.105.228.97|             2|
|   1.58.178.83|             2|
| 130.9.244.188|             2|
| 170.8.224.192|             1|
|114.239.220.25|             1|
|   7.0.242.187|             1|
| 84.241.30.145|             1|
|   4.60.52.253|             1|
|97.241.137.188|             1|
|154.224.85.202|             1|
+--------------+--------------+


In [6]:
df_http_methods = df.groupBy("method").agg(
    f.count(f.col("method")).alias("method_count")
).select("method", "method_count").sort(f.col("method_count").desc())
df_http_methods.show()

+------+------------+
|method|method_count|
+------+------------+
|  POST|       25165|
|   GET|       25055|
|DELETE|       24929|
|   PUT|       24851|
+------+------------+


In [10]:
count_404_response = df.where(
    f.col("response_code") == 404
).count()
print(f"Number of 404 response codes: {count_404_response}")

Number of 404 response codes: 24932


In [17]:
df_total_response_size_dates = df.withColumn("date", f.to_date(f.col("timestamp"))).groupBy("date").agg(
    f.sum(f.col("response_size")).alias("total_response_size")
).sort("date")
df_total_response_size_dates.show()
# df.printSchema()

+----------+-------------------+
|      date|total_response_size|
+----------+-------------------+
|2024-01-01|            1579594|
|2024-01-02|            1511013|
|2024-01-03|            1406932|
|2024-01-04|            1433647|
|2024-01-05|            1474578|
|2024-01-06|            1455497|
|2024-01-07|            1426465|
|2024-01-08|            1486051|
|2024-01-09|            1377264|
|2024-01-10|            1278870|
|2024-01-11|            1507364|
|2024-01-12|            1335366|
|2024-01-13|            1428527|
|2024-01-14|            1357803|
|2024-01-15|            1528435|
|2024-01-16|            1260900|
|2024-01-17|            1405764|
|2024-01-18|            1453248|
|2024-01-19|            1347440|
|2024-01-20|            1502185|
+----------+-------------------+
