In [1]:
# Create Pyspark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Web_Traffic") \
    .getOrCreate()

In [23]:
import os

# Directory path where CSV files are located
directory_path = "C:\\Projects\\Web_Traffic"

# Get the list of CSV files in the directory
csv_files = [file for file in os.listdir(directory_path) if file.endswith('.csv')]

# Display how many CSV files are detected
num_csvs = len(csv_files)
print(f"{num_csvs} CSV(s) detected.")

# Read each CSV file and assign it to variables dynamically
dfs = []
for i, csv_file in enumerate(csv_files, start=1):
    df_name = f"df{i}"
    df = spark.read.option("header", "true").csv(os.path.join(directory_path, csv_file))
    globals()[df_name] = df
    dfs.append(df)

# Example usage
for i, df in enumerate(dfs, start=1):
    print(f"Dataframe df{i}:")
    df.show()

1 CSV(s) detected.
Dataframe df1:
+----------+------+----------------------+-------------------+-----------+------------------------+---------------+----------------+-----------------------------+--------------------+-----------------+----------------+----------------+----------------------+------------------+--------------------+--------------------+--------------------+------------------------------+------------------+-----------------+-----------------+---------------+-----------------+-------------------------+
|      Date|Visits|Ad spends(in Hundreds)|Total Pages visited|Bounce Rate|Average Session Duration|Longest Session|Shortest Session|Percentage of Return Visitors|Most Visited country|Highest Load Time|Lowest Load Time|Total Page Loads|Total Sessions Created|Number of Sign Ups|    Ratings Received|    Reviews Received|   Page Most visited|Top most searched related word|Visited by Desktop|Visited by Mobile|Visited by Chrome|Visited by Edge|Visited by Safari|Visited by Other Br

In [26]:
from pyspark.sql.functions import *

def clean_dataframe(df):
    df = df.withColumn("Date", date_format(to_date(df["Date"], "dd/MM/yyyy"), "dd/MM/yyyy"))
    df = df.withColumn("Visits", df["Visits"].cast("int"))
    df = df.withColumn("Ad spends", (col("Ad spends(in Hundreds)") * 100).cast("double")).drop("Ad spends(in Hundreds)")
    df = df.withColumn("Total Pages visited", df["Total Pages visited"].cast("int"))
    df = df.withColumn("Bounce Rate", df["Bounce Rate"].cast("double"))
    df = df.withColumn("Average Session Duration", substring(df["Average Session Duration"], 1, 8))
    df = df.withColumn("Longest Session", substring(df["Longest Session"], 1, 8))
    df = df.withColumn("Shortest Session", substring(df["Shortest Session"], 1, 8))
    df = df.withColumn("Percentage of Return Visitors", df["Percentage of Return Visitors"].cast("double"))
    df = df.withColumn("Highest Load Time", df["Highest Load Time"].cast("double"))
    df = df.withColumn("Lowest Load Time", df["Lowest Load Time"].cast("double"))
    df = df.withColumn("Total Page Loads", df["Total Page Loads"].cast("int"))
    df = df.withColumn("Total Sessions Created", df["Total Sessions Created"].cast("int"))
    df = df.withColumn("Number of Sign Ups", df["Number of Sign Ups"].cast("int"))

    df = df.fillna("Unknown", subset=["Most Visited country", "Page Most visited", "Top most searched related word"])
    df = df.fillna(0, subset=["Visits", "Ad spends", "Total Pages visited", "Bounce Rate",
                              "Percentage of Return Visitors", "Highest Load Time", "Lowest Load Time",
                              "Total Page Loads", "Total Sessions Created", "Number of Sign Ups",
                              "Ratings Received", "Reviews Received"])

    print('Cleaning DF done')
    return df

for i, csv_file in enumerate(csv_files, start=1):
    df = clean_dataframe(df)

df.show()


Cleaning DF done
+----------+------+-------------------+-----------+------------------------+---------------+----------------+-----------------------------+--------------------+-----------------+----------------+----------------+----------------------+------------------+--------------------+--------------------+--------------------+------------------------------+------------------+-----------------+-----------------+---------------+-----------------+-------------------------+---------+
|      Date|Visits|Total Pages visited|Bounce Rate|Average Session Duration|Longest Session|Shortest Session|Percentage of Return Visitors|Most Visited country|Highest Load Time|Lowest Load Time|Total Page Loads|Total Sessions Created|Number of Sign Ups|    Ratings Received|    Reviews Received|   Page Most visited|Top most searched related word|Visited by Desktop|Visited by Mobile|Visited by Chrome|Visited by Edge|Visited by Safari|Visited by Other Browsers|Ad spends|
+----------+------+----------------

In [27]:
from pyspark.sql.functions import *
import math

# Average Visits
avg_visits = df.select(bround(avg("Visits"))).collect()[0][0]

# Total Visits
total_visits = df.select(sum(col("Visits"))).collect()[0][0]

# Total Ad Spends
total_ad_spends = df.select(sum(col("Ad spends"))).collect()[0][0]

# Average Bounce Rate
avg_bounce_rate = df.select(round(avg("Bounce Rate"), 2)).collect()[0][0]

# Cost per Visit
cost_per_visit = total_ad_spends / total_visits

# Device Usage Distribution
average_desktop = df.select(avg("Visited by Desktop")).collect()[0][0]
average_mobile = df.select(avg("Visited by Mobile")).collect()[0][0]
if average_desktop % 1 >= 0.5:
    average_desktop = math.ceil(average_desktop)
else:
    average_desktop = math.floor(average_desktop)

if average_mobile % 1 >= 0.5:
    average_mobile = math.ceil(average_mobile)
else:
    average_mobile = math.floor(average_mobile)

# Ratio of Visits to Percentage of Return Visitors

return_visitors = df.select(sum(col("Percentage of Return Visitors"))).collect()[0][0]
percentage_return_visitors = (return_visitors / df.count()) * 100
ratio_visits_to_return_visitors = avg_visits / percentage_return_visitors

# Average Session Duration (assuming duration is in minutes:seconds:milliseconds)
# Convert "Average Session Duration" from HH:MM:SS to milliseconds
duration_milliseconds = (df["Average Session Duration"].substr(1, 2).cast("int") * 60 * 1000) + \
                        (df["Average Session Duration"].substr(4, 2).cast("int") * 1000) + \
                        (df["Average Session Duration"].substr(7, 2).cast("int") )

# Calculate the average session duration in milliseconds
avg_session_duration_milliseconds = df.select(avg(duration_milliseconds)).collect()[0][0]

# Convert the average session duration back to HH:MM:SS format
avg_session_duration_seconds = avg_session_duration_milliseconds // 1000
avg_session_duration_minutes = avg_session_duration_seconds // 60
avg_session_duration_seconds_remaining = avg_session_duration_seconds % 60
avg_session_duration_milliseconds_remaining = avg_session_duration_milliseconds % 1000
avg_session_duration = f"{int(avg_session_duration_minutes):02}:{int(avg_session_duration_seconds_remaining):02}:{int(avg_session_duration_milliseconds_remaining):03}"


#Result Outputs
print("Average Visits:", avg_visits)
print("Total Visits:", total_visits)
print("Total Ad Spends:", total_ad_spends)
print("Average Bounce Rate:", avg_bounce_rate)
print("Cost per Visit:", cost_per_visit)
print("Average Visited through Desktop:", average_desktop)
print("Average Visited through Mobile:", average_mobile)
print("Ratio of Visits to Percentage of Return Visitors:", avg_visits, ":", percentage_return_visitors)
print("Average Session Duration:", avg_session_duration)

Average Visits: 7244.0
Total Visits: 50710
Total Ad Spends: 3840000.0
Average Bounce Rate: 23.4
Cost per Visit: 75.72470913034904
Average Visited through Desktop: 46
Average Visited through Mobile: 54
Ratio of Visits to Percentage of Return Visitors: 7244.0 : 2055.714285714286
Average Session Duration: 11:37:880
