<a href="https://colab.research.google.com/github/mrudulamadhavan/My-Projects/blob/main/MapUp/Web_Server_Log_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Web Server Log Analysis - Python Take-Home Assessment

## Overview
This assessment involves analyzing the Calgary HTTP dataset, which contains approximately one year's worth of HTTP requests to the University of Calgary's Computer Science web server. You'll work with real-world web server log data to extract meaningful insights and demonstrate your Python data analysis skills.

## Part 1: Data Loading and Cleaning

In [1]:
!wget ftp://ita.ee.lbl.gov/traces/calgary_access_log.gz

--2025-06-04 17:41:09--  ftp://ita.ee.lbl.gov/traces/calgary_access_log.gz
           => ‘calgary_access_log.gz’
Resolving ita.ee.lbl.gov (ita.ee.lbl.gov)... 131.243.2.164, 2620:83:8000:102::a4
Connecting to ita.ee.lbl.gov (ita.ee.lbl.gov)|131.243.2.164|:21... connected.
Logging in as anonymous ... Logged in!
==> SYST ... done.    ==> PWD ... done.
==> TYPE I ... done.  ==> CWD (1) /traces ... done.
==> SIZE calgary_access_log.gz ... 5435681
==> PASV ... done.    ==> RETR calgary_access_log.gz ... done.
Length: 5435681 (5.2M) (unauthoritative)


2025-06-04 17:41:15 (1.33 MB/s) - ‘calgary_access_log.gz’ saved [5435681]



In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('calgary_access_log_analysis').getOrCreate()
print(f'spark version: {spark.version}')

spark version: 3.5.1


In [3]:
log_file_path="calgary_access_log.gz"
raw_data = spark.read.text(log_file_path)

In [4]:
from pyspark.sql.functions import regexp_extract, col, to_timestamp, when, split
from pyspark.sql.types import IntegerType

# Regular expressions for log parsing
host_pattern = r'^(\S+)'  # First token
timestamp_pattern = r'\[(.*?)\]'  # Between square brackets
method_endpoint_pattern = r'"(GET|POST|HEAD|PUT|DELETE)\s+(\S+)'  # HTTP method + endpoint
status_pattern = r'"\s(\d{3})\s'  # HTTP status
size_pattern = r'\s(\d+|-)$'  # Last token is content size or "-"

# Extract structured columns from raw log lines
logs_df = (
    raw_data
    .withColumn("host", regexp_extract("value", host_pattern, 1))
    .withColumn("timestamp_str", regexp_extract("value", timestamp_pattern, 1))
    .withColumn("method", regexp_extract("value", method_endpoint_pattern, 1))
    .withColumn("endpoint", regexp_extract("value", method_endpoint_pattern, 2))
    .withColumn("file_extension", regexp_extract("endpoint", r'\.([a-zA-Z0-9]+)$', 1))
    .withColumn("status", regexp_extract("value", status_pattern, 1).cast(IntegerType()))
    .withColumn("content_size_raw", regexp_extract("value", size_pattern, 1))
)


logs_df.show(5,truncate=False)

+-------------------------------------------------------------------------+-----+--------------------------+------+----------+--------------+------+----------------+
|value                                                                    |host |timestamp_str             |method|endpoint  |file_extension|status|content_size_raw|
+-------------------------------------------------------------------------+-----+--------------------------+------+----------+--------------+------+----------------+
|local - - [24/Oct/1994:13:41:41 -0600] "GET index.html HTTP/1.0" 200 150 |local|24/Oct/1994:13:41:41 -0600|GET   |index.html|html          |200   |150             |
|local - - [24/Oct/1994:13:41:41 -0600] "GET 1.gif HTTP/1.0" 200 1210     |local|24/Oct/1994:13:41:41 -0600|GET   |1.gif     |gif           |200   |1210            |
|local - - [24/Oct/1994:13:43:13 -0600] "GET index.html HTTP/1.0" 200 3185|local|24/Oct/1994:13:43:13 -0600|GET   |index.html|html          |200   |3185            |
|loc

In [5]:
logs_df.printSchema()

root
 |-- value: string (nullable = true)
 |-- host: string (nullable = true)
 |-- timestamp_str: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- file_extension: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- content_size_raw: string (nullable = true)



In [6]:
# Number of rows
num_rows = logs_df.count()

# Number of columns
num_cols = len(logs_df.columns)

print(f"Rows: {num_rows}, Columns: {num_cols}")


Rows: 726739, Columns: 8


In [7]:
# Statistical summary
logs_df.describe().show()

+-------+--------------------+------+--------------------+------+------------------+------------------+------------------+------------------+
|summary|               value|  host|       timestamp_str|method|          endpoint|    file_extension|            status|  content_size_raw|
+-------+--------------------+------+--------------------+------+------------------+------------------+------------------+------------------+
|  count|              726739|726739|              726739|726739|            726739|            726739|            724991|            726739|
|   mean|                NULL|  NULL|                NULL|  NULL|       5330.885375|18.542857142857144|226.31063558030377|11914.615843876227|
| stddev|                NULL|  NULL|                NULL|  NULL|3993.0734647323266| 91.96013933705598|53.286361129344805|103570.74453754502|
|    min|local      index....| local|                    |      |                  |                  |               200|                  |
|    m

In [8]:
from pyspark.sql.functions import to_timestamp

logs_df = logs_df.withColumn("timestamp",to_timestamp("timestamp_str", "dd/MMM/yyyy:HH:mm:ss Z"))

In [9]:
logs_df.select("timestamp_str", "timestamp").show(5, truncate=False)

+--------------------------+-------------------+
|timestamp_str             |timestamp          |
+--------------------------+-------------------+
|24/Oct/1994:13:41:41 -0600|1994-10-24 19:41:41|
|24/Oct/1994:13:41:41 -0600|1994-10-24 19:41:41|
|24/Oct/1994:13:43:13 -0600|1994-10-24 19:43:13|
|24/Oct/1994:13:43:14 -0600|1994-10-24 19:43:14|
|24/Oct/1994:13:43:15 -0600|1994-10-24 19:43:15|
+--------------------------+-------------------+
only showing top 5 rows



In [10]:
# Clean content size: convert to int, handle "-"
logs_df = logs_df.withColumn("content_size",when(col("content_size_raw") == "-", None).otherwise(col("content_size_raw").cast(IntegerType())))

In [11]:
logs_df = logs_df.drop("timestamp_str","content_size_raw","value")

In [12]:
logs_df.select("host", "timestamp", "method", "endpoint", "file_extension", "status", "content_size").show(15, truncate=False)

+-----+-------------------+------+----------+--------------+------+------------+
|host |timestamp          |method|endpoint  |file_extension|status|content_size|
+-----+-------------------+------+----------+--------------+------+------------+
|local|1994-10-24 19:41:41|GET   |index.html|html          |200   |150         |
|local|1994-10-24 19:41:41|GET   |1.gif     |gif           |200   |1210        |
|local|1994-10-24 19:43:13|GET   |index.html|html          |200   |3185        |
|local|1994-10-24 19:43:14|GET   |2.gif     |gif           |200   |2555        |
|local|1994-10-24 19:43:15|GET   |3.gif     |gif           |200   |36403       |
|local|1994-10-24 19:43:17|GET   |4.gif     |gif           |200   |441         |
|local|1994-10-24 19:46:45|GET   |index.html|html          |200   |3185        |
|local|1994-10-24 19:46:45|GET   |2.gif     |gif           |200   |2555        |
|local|1994-10-24 19:46:47|GET   |3.gif     |gif           |200   |36403       |
|local|1994-10-24 19:46:50|G

In [13]:
from pyspark.sql.functions import col, when, create_map, lit
from itertools import chain

# Step 1: List of valid file extensions
valid_extensions = [
    "html", "htm", "jpg", "jpeg", "png", "gif", "css", "js", "json",
    "xml", "txt", "pdf", "doc", "docx", "xls", "xlsx", "mp3", "wav",
    "mp4", "avi", "mov", "zip", "rar", "tar", "gz", "svg", "ico", "bmp",
    "tiff", "ps", "pcx", "rgb", "qt"
]

# Step 2: Dictionary of known typos and strange values
corrections = {
    "jpeg1": "jpeg",
    "htmls": "html",
    "hthml": "html",
    "telnet": "unknown",
    "httpd": "unknown",
    "brian": "unknown",
    "katie": "unknown",
    "3": "unknown",
    "ss": "unknown",
    "map": "unknown",
    "stereograms": "unknown",
    "f94": "unknown",
    "il": "unknown"
}

# Step 3: Create Spark map expression from corrections
mapping_expr = create_map([lit(x) for x in chain(*corrections.items())])

# Step 4: Add cleaned column to DataFrame
logs_df = logs_df.withColumn("file_extension_clean",when(col("file_extension").isin(valid_extensions), col("file_extension"))
    .otherwise(mapping_expr[col("file_extension")]))

## Replace nulls with 'unknown' after applying corrections
logs_df = logs_df.withColumn(
    "file_extension_clean",
    when(col("file_extension_clean").isNull(), "unknown").otherwise(col("file_extension_clean")))

logs_df = logs_df.drop("file_extension").withColumnRenamed("file_extension_clean", "file_extension")

In [14]:
logs_df.printSchema()

root
 |-- host: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- content_size: integer (nullable = true)
 |-- file_extension: string (nullable = true)



In [15]:
# File extension distribution
logs_df.groupBy("file_extension").count().orderBy("count", ascending=False).show(20, truncate=False)

+--------------+------+
|file_extension|count |
+--------------+------+
|html          |341555|
|gif           |298799|
|jpg           |42238 |
|unknown       |29133 |
|ps            |6265  |
|txt           |3142  |
|jpeg          |2681  |
|qt            |1718  |
|rgb           |695   |
|htm           |238   |
|gz            |111   |
|tiff          |70    |
|bmp           |48    |
|pdf           |25    |
|pcx           |8     |
|zip           |6     |
|doc           |3     |
|tar           |3     |
|wav           |1     |
+--------------+------+



In [18]:
# Min and Max timestamps
from pyspark.sql.functions import min, max

timestamps = logs_df.select(min("timestamp"), max("timestamp")).collect()[0]
print(f"Timestamp range: {timestamps[0]} to {timestamps[1]}")


Timestamp range: 1994-10-24 19:41:41 to 1995-10-11 20:14:17


In [19]:
# HTTP methods and counts
from pyspark.sql.functions import when, col, trim

logs_df = logs_df.withColumn("method",when(trim(col("method")) == "", "GET").otherwise(col("method")))

logs_df.groupBy("method").count().orderBy("count", ascending=False).show(25, truncate=False)

+------+------+
|method|count |
+------+------+
|GET   |726039|
|HEAD  |597   |
|POST  |103   |
+------+------+



In [20]:
# Status codes distribution
from pyspark.sql.functions import when, col

logs_df = logs_df.withColumn("status",when(col("status").isNull(), 200).otherwise(col("status")))

logs_df.groupBy("status").count().orderBy("count", ascending=False).show(25, truncate=False)

+------+------+
|status|count |
+------+------+
|200   |570161|
|304   |97796 |
|302   |30300 |
|404   |23593 |
|403   |4743  |
|401   |46    |
|501   |43    |
|500   |42    |
|400   |15    |
+------+------+



In [21]:
# Set NULL values to 0 if status is 404, else median

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

median_size = logs_df.approxQuantile("content_size", [0.5], 0.01)[0]

logs_df = logs_df.withColumn("content_size",when(col("content_size").isNull() & (col("status") == 404), 0)
    .when(col("content_size").isNull(), median_size).otherwise(col("content_size")))

logs_df = logs_df.withColumn("content_size", col("content_size").cast(IntegerType()))
logs_df.groupBy("content_size").count().orderBy("count", ascending=False).show(25, truncate=False)

+------------+------+
|content_size|count |
+------------+------+
|0           |122700|
|2032        |39371 |
|2555        |21209 |
|36403       |19229 |
|2881        |12108 |
|441         |7971  |
|3185        |6860  |
|7259        |5518  |
|2877        |5352  |
|479         |4577  |
|31977       |4398  |
|16384       |4371  |
|3020        |4222  |
|8192        |3931  |
|768         |3755  |
|147         |3730  |
|1234        |3429  |
|191         |3275  |
|2868        |3269  |
|1512        |3082  |
|469         |3009  |
|1567        |2974  |
|906         |2699  |
|506         |2649  |
|1268        |2644  |
+------------+------+
only showing top 25 rows



In [22]:
from pyspark.sql.functions import when, col, lit

logs_df = logs_df.withColumn("endpoint",when(col("endpoint").isNull() | (col("endpoint") == ""), lit("unknown")).otherwise(col("endpoint")))

In [23]:
for col_name in logs_df.columns:
    print(f"\n📊 Unique values and counts for column: {col_name}")
    logs_df.groupBy(col_name).count().orderBy("count", ascending=False).show(truncate=False)




📊 Unique values and counts for column: host
+------+------+
|host  |count |
+------+------+
|local |375132|
|remote|351607|
+------+------+


📊 Unique values and counts for column: method
+------+------+
|method|count |
+------+------+
|GET   |726039|
|HEAD  |597   |
|POST  |103   |
+------+------+


📊 Unique values and counts for column: endpoint
+----------+------+
|endpoint  |count |
+----------+------+
|index.html|140252|
|3.gif     |24006 |
|2.gif     |23606 |
|4.gif     |8018  |
|244.gif   |5149  |
|5.html    |5010  |
|4097.gif  |4874  |
|8870.jpg  |4493  |
|6733.gif  |4278  |
|8472.gif  |3843  |
|8308.gif  |3478  |
|8216.gif  |3254  |
|8492.gif  |3072  |
|8493.gif  |3014  |
|93.gif    |2833  |
|167.html  |2768  |
|990.html  |2297  |
|8333.gif  |2243  |
|304.xbm   |2153  |
|222.gif   |2113  |
+----------+------+
only showing top 20 rows


📊 Unique values and counts for column: status
+------+------+
|status|count |
+------+------+
|200   |570161|
|304   |97796 |
|302   |30300 |


In [24]:
# missing value count per column
from pyspark.sql.functions import col, sum

logs_df.select([sum(col(c).isNull().cast("int")).alias(f"{c}") for c in logs_df.columns]).show()

+----+------+--------+------+---------+------------+--------------+
|host|method|endpoint|status|timestamp|content_size|file_extension|
+----+------+--------+------+---------+------------+--------------+
|   0|     0|       0|     0|     1614|           0|             0|
+----+------+--------+------+---------+------------+--------------+



## Part 2: Analysis Questions

### Q1) Count the total number of HTTP requests in the log file.

In [25]:
def total_log_records() -> int:
    """
    Q1: Count of total log records.

    Objective:
        Determine the total number of HTTP log entries in the dataset.
        Each line in the log file represents one HTTP request.

    Returns:
        int: Total number of log entries.
    """
    # Count total rows in logs_df DataFrame
    total_records = logs_df.count()
    return total_records


answer1 = total_log_records()
print("Answer 1:",answer1)


Answer 1: 726739


### Q2) Determine the number of distinct hosts (IP addresses or domain names) that accessed the server.

In [26]:
def unique_host_count() -> int:
    """
    Q2: Count of unique hosts.

    Objective:
        Determine how many distinct hosts accessed the server.

    Returns:
        int: Number of unique hosts.
    """
    unique_hosts = logs_df.select("host").distinct().count()
    return unique_hosts

answer2 = unique_host_count()
print("Answer 2:",answer2)
logs_df.select("host").distinct().show(truncate=False)


Answer 2: 2
+------+
|host  |
+------+
|local |
|remote|
+------+



### Q3) For each date, count how many unique filenames were requested.

In [27]:
from pyspark.sql.functions import date_format, countDistinct, col

def datewise_unique_filename_counts() -> dict[str, int]:
    """
    Q3: Date-wise unique filename counts.

    Objective:
        For each date, count the number of unique filenames that accessed the server.
        The date should be in 'dd-MMM-yyyy' format (e.g., '01-Jul-1995').

    Returns:
        dict: A dictionary mapping each date to its count of unique filenames.
              Example: {'01-Jul-1995': 123, '02-Jul-1995': 150}
    """
    # Format the timestamp to desired date string format
    date_df = logs_df.withColumn("date", date_format(col("timestamp"), "dd-MMM-yyyy"))

    # Group by date and count distinct endpoints
    grouped_df = date_df.groupBy("date").agg(countDistinct("endpoint").alias("unique_filenames"))

    # Collect the results to driver as dictionary
    result = {row["date"]: row["unique_filenames"] for row in grouped_df.collect()}

    return result


answer3 = datewise_unique_filename_counts()
print("Answer 3:\n",answer3)


Answer 3:
 {'03-Nov-1994': 452, '11-Nov-1994': 307, '26-Feb-1995': 517, '08-Jan-1995': 137, '06-Oct-1995': 900, '24-Jan-1995': 406, '11-Aug-1995': 458, '06-Nov-1994': 200, '11-Feb-1995': 390, '21-Aug-1995': 548, '30-Sep-1995': 645, '06-Apr-1995': 778, '16-Jun-1995': 503, '14-Nov-1994': 345, '11-Apr-1995': 838, '04-Sep-1995': 312, '04-May-1995': 702, '08-Oct-1995': 519, '09-Nov-1994': 326, '25-Jun-1995': 660, '13-Sep-1995': 779, '17-Mar-1995': 490, '05-Aug-1995': 500, '02-Sep-1995': 310, '01-Mar-1995': 552, '31-Mar-1995': 1014, '19-Apr-1995': 633, '09-Jan-1995': 397, '17-Aug-1995': 529, '12-Sep-1995': 778, '08-Feb-1995': 632, '03-Jul-1995': 463, '15-Jun-1995': 511, '18-Mar-1995': 358, '18-Apr-1995': 425, '30-Jul-1995': 432, '24-Aug-1995': 550, '07-Dec-1994': 358, '26-Jan-1995': 372, '20-Sep-1995': 770, '17-Jun-1995': 381, '21-Mar-1995': 768, '23-Nov-1994': 342, '19-May-1995': 528, '05-Jul-1995': 647, '16-Sep-1995': 593, '11-Dec-1994': 164, '15-Apr-1995': 430, '25-Aug-1995': 586, '08-Jul

### Q4) Number of 404 response codes

In [28]:
# Count how many HTTP requests resulted in a 404 (Not Found) response.
from pyspark.sql.functions import col

def count_404_errors() -> int:
    """
    Q4: Number of 404 response codes.

    Objective:
        Count how many times the HTTP 404 Not Found status appears in the logs.

    Returns:
        int: Number of 404 errors.
    """
    count_404 = logs_df.filter(col("status") == 404).count()
    return count_404

answer4 = count_404_errors()
print("Answer 4:",answer4)


Answer 4: 23593


### Q5)  Top 15 filenames with 404 responses

In [29]:
# Find the 15 most requested URLs that resulted in a 404 error, sorted by frequency.
from pyspark.sql.functions import col

def top_15_filenames_with_404() -> list[tuple[str, int]]:
    """
    Q5: Top 15 filenames with 404 responses.

    Objective:
        Identify which requested URLs most frequently resulted in a 404 error.
        Return the top 15 filenames sorted by frequency.

    Returns:
        list: A list of tuples (filename, count), sorted by count in descending order.
              Example: [('index.html', 200), ...]
    """
    top_404_df = (
        logs_df
        .filter(col("status") == 404)
        .groupBy("endpoint")
        .count()
        .orderBy(col("count").desc())
        .limit(15)
    )

    # Collect results into a Python list of tuples
    result = [(row["endpoint"], row["count"]) for row in top_404_df.collect()]
    return result


answer5 = top_15_filenames_with_404()
print("Answer 5:\n",answer5)


Answer 5:
 [('index.html', 4737), ('4115.html', 902), ('1611.html', 649), ('5698.xbm', 585), ('710.txt', 408), ('2002.html', 259), ('2177.gif', 193), ('10695.ps', 161), ('6555.html', 153), ('487.gif', 152), ('151.html', 149), ('488.gif', 148), ('40.html', 148), ('3414.gif', 148), ('9678.gif', 142)]


### Q6) Top 15 file extension with 404 responses

In [30]:
from pyspark.sql.functions import col

def top_15_ext_with_404() -> list[tuple[str, int]]:
    """
    Q6: Top 15 file extensions with 404 responses.

    Objective:
        Find which file extensions generated the most 404 errors.
        Return the top 15 sorted by number of 404s.

    Returns:
        list: A list of tuples (extension, count), sorted by count in descending order.
              Example: [('html', 45), ...]
    """
    top_404_ext_df = (logs_df.filter(col("status") == 404).groupBy("file_extension").count()
        .orderBy(col("count").desc()).limit(15))

    result = [(row["file_extension"], row["count"]) for row in top_404_ext_df.collect()]
    return result


answer6 = top_15_ext_with_404()
print("Answer 6:\n",answer6)


Answer 6:
 [('html', 12193), ('gif', 7205), ('unknown', 2209), ('ps', 754), ('jpg', 527), ('txt', 496), ('htm', 108), ('bmp', 28), ('rgb', 21), ('jpeg', 19), ('pcx', 8), ('gz', 7), ('pdf', 6), ('tiff', 6), ('tar', 3)]


### Q7) Total bandwidth transferred per day for the month of July 1995

In [31]:
from pyspark.sql.functions import col, to_date, sum, year, month, date_format
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession

# Assuming Spark session and logs_df are already defined outside the function

def total_bandwidth_per_day() -> dict[str, int]:
    """
    Q7: Total bandwidth transferred per day for the month of July 1995.

    Objective:
        Sum the number of bytes transferred per day.
        Skip entries where the byte field is missing or '-'.

    Returns:
        dict: A dictionary mapping each date to total bytes transferred.
              Example: {'01-Jul-1995': 123456789, ...}
    """
    # Cast content_size to Integer (handle '-' or bad data outside this)
    df_clean = logs_df.withColumn("content_size", col("content_size").cast(IntegerType()))

    # Filter for July 1995 and non-null content_size
    july_df = df_clean.filter(
        (col("content_size").isNotNull()) &
        (year("timestamp") == 1995) &
        (month("timestamp") == 7)
    )

    # Extract date formatted as dd-MMM-yyyy (e.g. 01-Jul-1995)
    july_df = july_df.withColumn("date_formatted", date_format("timestamp", "dd-MMM-yyyy"))

    # Group by formatted date and sum content_size
    daily_bandwidth_df = july_df.groupBy("date_formatted") \
        .agg(sum("content_size").alias("total_bandwidth")) \
        .orderBy("date_formatted")

    # Convert to dict[str, int]
    bandwidth_dict = {row["date_formatted"]: row["total_bandwidth"]
        for row in daily_bandwidth_df.collect()}

    return bandwidth_dict



answer7 = total_bandwidth_per_day()
print("Answer 7:")
print(answer7)

Answer 7:
{'01-Jul-1995': 17176718, '02-Jul-1995': 7964456, '03-Jul-1995': 11848912, '04-Jul-1995': 25281666, '05-Jul-1995': 22616402, '06-Jul-1995': 20594119, '07-Jul-1995': 9656497, '08-Jul-1995': 5538242, '09-Jul-1995': 4359408, '10-Jul-1995': 13323182, '11-Jul-1995': 22872167, '12-Jul-1995': 17997766, '13-Jul-1995': 16065902, '14-Jul-1995': 16292286, '15-Jul-1995': 17997640, '16-Jul-1995': 8217684, '17-Jul-1995': 18621348, '18-Jul-1995': 18104512, '19-Jul-1995': 16314394, '20-Jul-1995': 25673588, '21-Jul-1995': 26150302, '22-Jul-1995': 6390566, '23-Jul-1995': 10241345, '24-Jul-1995': 20728586, '25-Jul-1995': 23626657, '26-Jul-1995': 26882549, '27-Jul-1995': 23100954, '28-Jul-1995': 37632768, '29-Jul-1995': 16393207, '30-Jul-1995': 21269643, '31-Jul-1995': 29983159}


### Q8) Hourly request distribution

In [32]:
# Count how many HTTP requests occurred during each hour (0–23).

from pyspark.sql.functions import hour, col
from pyspark.sql import SparkSession

def hourly_request_distribution() -> dict[int, int]:
    """
    Q8: Hourly request distribution.

    Objective:
        Count the number of requests made during each hour (00 to 23).
        Useful for understanding traffic peaks.

    Returns:
        dict: A dictionary mapping hour (int) to request count.
              Example: {0: 120, 1: 90, ..., 23: 80}
    """

     # Filter out rows with null timestamps to avoid None keys
    filtered_df = logs_df.filter(col("timestamp").isNotNull())

    # Extract hour from timestamp
    hourly_df = filtered_df.withColumn("hour", hour(col("timestamp")))

    # Group by hour and count requests
    hourly_counts_df = hourly_df.groupBy("hour").count().orderBy("hour")

    # Convert to dictionary {hour: count}
    hourly_dict = {row["hour"]: row["count"] for row in hourly_counts_df.collect()}

    return hourly_dict


answer8 = hourly_request_distribution()
print("Answer 8:\n")
print(answer8)


Answer 8:

{0: 39619, 1: 32690, 2: 30750, 3: 28184, 4: 26032, 5: 22860, 6: 19874, 7: 17081, 8: 13892, 9: 11447, 10: 10588, 11: 10439, 12: 12325, 13: 15200, 14: 22101, 15: 30932, 16: 38039, 17: 46336, 18: 45779, 19: 50070, 20: 51183, 21: 52938, 22: 50530, 23: 46236}


### Q9) Top 10 most requested filenames

In [33]:
# Identify the top 10 most frequently requested filenames,regardless of status code.
from pyspark.sql.functions import col

def top_10_most_requested_filenames() -> list[tuple[str, int]]:
    """
    Q9: Top 10 most requested filenames.

    Objective:
        Identify the most commonly requested URLs (irrespective of status code).

    Returns:
        list: A list of tuples (filename, count), sorted by count in descending order.
              Example: [('index.html', 500), ...]
    """

    # Group by endpoint (filename) and count requests
    filename_counts_df = logs_df.groupBy("endpoint").count().orderBy(col("count").desc())

    # Take top 10 results and convert to list of tuples
    top_10_list = [(row["endpoint"], row["count"]) for row in filename_counts_df.take(10)]

    return top_10_list


answer9 = top_10_most_requested_filenames()
print("Answer 9:")
print(answer9)


Answer 9:
[('index.html', 140252), ('3.gif', 24006), ('2.gif', 23606), ('4.gif', 8018), ('244.gif', 5149), ('5.html', 5010), ('4097.gif', 4874), ('8870.jpg', 4493), ('6733.gif', 4278), ('8472.gif', 3843)]


### Q10) HTTP response code distribution

In [34]:
# Count the occurrences of each HTTP response status code (e.g., 200, 404).

def response_code_distribution() -> dict[int, int]:
    """
    Q10: HTTP response code distribution.

    Objective:
        Count how often each HTTP status code appears in the logs.

    Returns:
        dict: A dictionary mapping HTTP status codes (as int) to their frequency.
              Example: {200: 150000, 404: 3000}
    """

    # Cast status to integer (handle any bad data)
    df_clean = logs_df.withColumn("status", col("status").cast(IntegerType()))

    # Filter out rows with null status to avoid None keys
    df_filtered = logs_df.filter(col("status").isNotNull())

    # Group by status and count occurrences
    status_counts_df = df_filtered.groupBy("status").count().orderBy("status")

    # Convert to dict {status_code: count}
    status_dict = {row["status"]: row["count"] for row in status_counts_df.collect()}

    return status_dict


answer10 = response_code_distribution()
print("Answer 10:")
print(answer10)

Answer 10:
{200: 570161, 302: 30300, 304: 97796, 400: 15, 401: 46, 403: 4743, 404: 23593, 500: 42, 501: 43}


--------------
--------------

#### **Mrudula A P**

Youtube Link : https://youtu.be/K1VvCuUy9w0