<a href="https://colab.research.google.com/github/vangdale-krishna/forage-jpmc-swe-task-1/blob/main/CourseWork_Task_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install PySpark in Colab
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=eac58a804cd73d02684de6295cfe031b45b1f29e4a325bdf83a2b262970d1677
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:

# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import regexp_extract, col, count, sum, rank, lag, when, isnull

# Create a Spark session
spark = SparkSession.builder.appName("WebLogParser").getOrCreate()

# Read the web.log file
weblog_file_data = spark.read.text("/content/drive/MyDrive/web.log")

# Define the regular expression pattern for CLF
clf_pattern = r'^(\S+) - - (\S+) "(\S+) (\S+) (\S+)" (\d{3}) (\d+) (.+)$'

# Use the regular expression to extract fields into columns
web_log_df = weblog_file_data.select(
    regexp_extract("value", clf_pattern, 1).alias("Host making the request"),
    regexp_extract("value", clf_pattern, 2).alias("Timestamp"),
    regexp_extract("value", clf_pattern, 3).alias("HTTP Method"),
    regexp_extract("value", clf_pattern, 4).alias("URL"),
    regexp_extract("value", clf_pattern, 5).alias("HTTP Version"),
    regexp_extract("value", clf_pattern, 6).alias("HTTP Status Code"),
    regexp_extract("value", clf_pattern, 7).cast("integer").alias("Bytes in the reply"),
    regexp_extract("value", clf_pattern, 8).alias("Message"),
)

# Show the DataFrame
web_log_df.show(20, truncate=False)


AnalysisException: ignored

In [None]:
# Advanced query operations 1 - Trial
# Grouping by "HTTP Method"
# Aggregating the count of requests, sum of bytes, and calculating average bytes per request

# advanced_query_result = web_log_df.groupBy("HTTP Method").agg(
#     count("HTTP Method").alias("Request Count"),
#     sum("Bytes in the reply").alias("Total Bytes"),
#     (sum(col("Bytes in the reply")) / count("HTTP Method")).alias("Average Bytes")
# )
# advanced_query_result.show(truncate=False)

In [None]:
# Vrajraj Task 1 - Advanced query operations 1
# In this advanced query:
# To Understand the pattern and performance of the web service.
# I filtered out successful requests with status codes which are equal to 200.
# I grouped the data by "Method" and "URL".
# I aggregated the count of requests, sum of bytes, and calculated average bytes per request.
# I sorted the result by the number of requests in descending order.
advanced_query_result = (
    web_log_df
    .filter((col("HTTP Status Code") == 200))  # Filter successful requests
    .groupBy("HTTP Method", "URL")
    .agg(
        count("*").alias("Request_Count"),
        sum("Bytes in the reply").alias("Total_Bytes"),
        (sum(col("Bytes in the reply")) / count("*")).alias("Average_Bytes"),
    )
    .orderBy("Request_Count", ascending=False)
)
# Show the advanced query result
advanced_query_result.show(20,truncate=False)

In [None]:
#Vrajraj Task 1 - Advanced Query 2
# Calculates the cumulative sum of bytes within each "HTTP Status Code" partition.
# Retrieves the lagged value of the "Bytes in the reply" column for each request within the "HTTP Status Code" partition.
# This query provides information about the rank of each request within its partition, the cumulative sum of bytes up to that point,
# and the previous value of bytes for each request.

# windowSpec = Window.partitionBy("HTTP Status Code").orderBy("Timestamp")
# advanced_query_two_2398922 = (
#     web_log_df
#     .withColumn("Cumulative_Sum_Bytes", sum("Bytes in the reply").over(windowSpec))
#     .withColumn("Previous_Bytes", lag("Bytes in the reply").over(windowSpec))
#     .select(
#         "HTTP Status Code",
#         "Bytes in the reply",
#         "Timestamp",
#         "Cumulative_Sum_Bytes",
#         "Previous_Bytes"
#     )
# )
# advanced_query_two_2398922.show(truncate=False)


# Define the window specification
windowSpec = Window.partitionBy("HTTP Status Code").orderBy("Timestamp")

# Define the subquery for calculating percentage change
percentage_change_subquery = (
    col("Bytes in the reply") - lag("Bytes in the reply").over(windowSpec)
) / lag("Bytes in the reply").over(windowSpec) * 100

# Main query with subquery
advanced_query_with_subquery = (
    web_log_df
    .withColumn("Cumulative_Sum_Bytes", sum("Bytes in the reply").over(windowSpec))
    .withColumn("Previous_Bytes", lag("Bytes in the reply").over(windowSpec))
    .withColumn("Percentage_Change", when(isnull(percentage_change_subquery), 0).otherwise(percentage_change_subquery))
    .select(
        "HTTP Status Code",
        "Bytes in the reply",
        "Timestamp",
        "Cumulative_Sum_Bytes",
        "Previous_Bytes",
        "Percentage_Change"
    )
)

# Show the result
advanced_query_with_subquery.show(truncate=False)

In [None]:
#SAI KRISHNA VANGDALE TASK-1 PRACTICE QUESTION :

#the purpose of this code is to calculate both the average bytes and the cumulative average bytes for each HTTP Method
# Defining a window specification based on HTTP Method and Timestamp columns
# Calculating the average Bytes in the reply for each HTTP Method
# Calculating cumulative average Bytes in the reply for each HTTP Method
# and average Bytes in the reply for each HTTP Method
# Calculate cumulative average Bytes in the reply for each HTTP Method
# Show the result for average Bytes in the reply for each HTTP Method
# Show the result for cumulative average Bytes in the reply for each HTTP Method

# window_spec = Window.partitionBy("HTTP Method").orderBy("Timestamp")
# avg_bytes_df = web_log_df.withColumn(
#    "Average_Bytes",
#   avg("Bytes in the reply").over(window_spec)
# )
# cum_avg_bytes_df = web_log_df.withColumn(
#    "Cumulative_Avg_Bytes",
#   avg("Bytes in the reply").over(window_spec.rowsBetween(Window.unboundedPreceding, 0))
# )

# avg_bytes_by_method_df = avg_bytes_df.groupBy("HTTP Method").agg(
#    avg("Average_Bytes").alias("Average_Bytes")
# )
# cum_avg_bytes_by_method_df = cum_avg_bytes_df.groupBy("HTTP Method").agg(
#    avg("Cumulative_Avg_Bytes").alias("Cumulative_Avg_Bytes")
# )
# avg_bytes_by_method_df.show(truncate=False)
# cum_avg_bytes_by_method_df.show(truncate=False)

In [None]:
#SAI KRISHNA VANGDALE TASK-1 / QUERY- 1:
#This query is for ranking rows based on the amount of data "Bytes in the reply" within each HTTP method
# and  calculating the running total of bytes for each row within its HTTP method partition.
#This query helps to gain insights into the distribution and accumulation of the data over time and across different HTTP methods.

#I have added rank function to each row within its HTTP method
#I have partitioned the rank func by "HTTP METHOD" and
#sorted the ordered by "Bytes in the reply" in descending order.
#computed the "Running_Total_Bytes",with sum of  "Bytes in the reply"   for each row within its "HTTP Method" partition, ordered by "Timestamp."

# Defining the rank window specification

rank_window_spec = Window.partitionBy("HTTP Method").orderBy(desc("Bytes in the reply"))
ranked_rows_df = web_log_df.withColumn(
    "Rank",
    rank().over(rank_window_spec)
)
running_total_df = web_log_df.withColumn(
    "Running_Total_Bytes",
    sum("Bytes in the reply").over(Window.partitionBy("HTTP Method").orderBy("Timestamp").rowsBetween(Window.unboundedPreceding, 0))
)

# Main query with subqueries
complex_query_df = (
    web_log_df
    .select(

        "Timestamp",
        "HTTP Method",
        "Bytes in the reply",
        rank().over(rank_window_spec).alias("Rank"),
        sum("Bytes in the reply").over(Window.partitionBy("HTTP Method").orderBy("Timestamp").rowsBetween(Window.unboundedPreceding, 0)).alias("Running_Total_Bytes")
    )
)

# Show the result
complex_query_df.show(truncate=False)

In [None]:
#SAI KRISHNA VANGDALE TASK -1 / QUERY 2:
#This query is to find the average no. of bytes for each "HTTP Method" and the percentage of entries in each HTTP method group where the "Bytes in the reply" is above the calculated average.

#In this query i have first created a window specification where the data is partitioned by "" HTTP method".
#The window  function will operate independently for each  unique "HTTP method".
#I calculated the average value of "Bytes in the reply" for each row within its respective HTTP method group.
#Next i have calculated the percentage of rows in each HTTP method group where the "Bytes in the reply" is greater than the calculated average.
#aggregated with the sum and count functions to determine the percentage .

window_spec = Window.partitionBy("HTTP Method")

# Calculate the average bytes for each HTTP method
avg_bytes_df = (
    web_log_df
    .withColumn("Average_Bytes", avg("Bytes in the reply").over(window_spec))
)

# Calculate the percentage of entries above the average for each HTTP method
percentage_above_avg_df = (
    avg_bytes_df
    .withColumn(
        "Above_Avg_Percentage",
        sum(when(col("Bytes in the reply") > col("Average_Bytes"), 1).otherwise(0)).over(window_spec) / count("*").over(window_spec)
    )
    .select("HTTP Method",
            "Bytes in the reply",
            "Average_Bytes",
            "Above_Avg_Percentage")
)

# Show the result
percentage_above_avg_df.show(truncate=False)