In [None]:
# Spark Code to process Monthly Data

'''
Input File:
    - monthly_data/ *.csv

Produces the following csv files:
    - engagements_data
    - traffic_sources
    - estimated_monthly_visits
'''

In [1]:
from pyspark.sql import SparkSession
import os
spark = SparkSession.builder.appName('monthly_spark').getOrCreate()
#spark

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, BooleanType, MapType, ArrayType
from pyspark.sql.functions import lit

# Define schema for the data
schema = StructType([
    StructField("Version", IntegerType(), True),
    StructField("SiteName", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("TopCountryShares", ArrayType(StructType([
        StructField("Country", IntegerType(), True),
        StructField("CountryCode", StringType(), True),
        StructField("Value", DoubleType(), True),
    ])), True),
    StructField("Title", StringType(), True),
    StructField("Engagments", StructType([
        StructField("BounceRate", StringType(), True),
        StructField("Month", StringType(), True),
        StructField("Year", StringType(), True),
        StructField("PagePerVisit", StringType(), True),
        StructField("Visits", StringType(), True),
        StructField("TimeOnSite", StringType(), True),
    ]), True),
    StructField("EstimatedMonthlyVisits", MapType(StringType(), IntegerType()), True),
    StructField("GlobalRank", StructType([
        StructField("Rank", IntegerType(), True),
    ]), True),
    StructField("CountryRank", StructType([
        StructField("Country", StringType(), True),
        StructField("CountryCode", StringType(), True),
        StructField("Rank", IntegerType(), True),
    ]), True),
    StructField("CategoryRank", StructType([
        StructField("Rank", IntegerType(), True),
        StructField("Category", StringType(), True),
    ]), True),
    StructField("IsSmall", BooleanType(), True),
    StructField("Policy", IntegerType(), True),
    StructField("TrafficSources", StructType([
        StructField("Social", DoubleType(), True),
        StructField("Paid Referrals", DoubleType(), True),
        StructField("Mail", DoubleType(), True),
        StructField("Referrals", DoubleType(), True),
        StructField("Search", DoubleType(), True),
        StructField("Direct", DoubleType(), True),
    ]), True),
    StructField("Category", StringType(), True),
    StructField("LargeScreenshot", StringType(), True),
    StructField("IsDataFromGa", BooleanType(), True),
    StructField("Countries", ArrayType(StructType([
        StructField("Code", StringType(), True),
        StructField("UrlCode", StringType(), True),
        StructField("Name", StringType(), True),
    ])), True),
    StructField("Competitors", StructType([
        StructField("TopSimilarityCompetitors", ArrayType(StructType([
            StructField("Domain", StringType(), True),
        ])), True),
    ]), True),
])


In [3]:
MONTHLY_DATA_PATH = r'../data/monthly_data/' 
files = os.listdir(MONTHLY_DATA_PATH)

FILE_DATE = None
try:
    json_file = [f for f in files if f.endswith('.json')]
    file = json_file[0]
    FILE_DATE = file.split('.')[0].split('_')[0]
    steam_traffic = spark.read.json(
        MONTHLY_DATA_PATH + file,
        multiLine=True,
        schema=schema  
    )
    steam_traffic.cache()
    #steam_traffic.show()
    steam_traffic = steam_traffic.withColumn("FILE_DATE", lit(FILE_DATE))
except Exception as e:
    print("An error occurred while reading the JSON file:", e)


+-------+--------------------+--------------------+--------------------+----------------+--------------------+----------------------+----------+------------------+------------+-------+------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+
|Version|            SiteName|         Description|    TopCountryShares|           Title|          Engagments|EstimatedMonthlyVisits|GlobalRank|       CountryRank|CategoryRank|IsSmall|Policy|      TrafficSources|            Category|     LargeScreenshot|IsDataFromGa|           Countries|         Competitors|
+-------+--------------------+--------------------+--------------------+----------------+--------------------+----------------------+----------+------------------+------------+-------+------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+
|      1|store.steampowere...|steam is the ulti...|[{840, US, 0.1457..

In [4]:

engagements_data = steam_traffic.select("Engagments.BounceRate",
                                        "Engagments.Month",
                                        "Engagments.Year",
                                        "Engagments.PagePerVisit",
                                        "Engagments.Visits",
                                        "Engagments.TimeOnSite")

engagements_data = engagements_data.withColumn("FILE_DATE", lit(FILE_DATE))
#engagements_data.show()


+-------------------+-----+----+-----------------+------------------+------------------+----------+
|         BounceRate|Month|Year|     PagePerVisit|            Visits|        TimeOnSite| FILE_DATE|
+-------------------+-----+----+-----------------+------------------+------------------+----------+
|0.48138174732732564|    8|2023|4.426160884504654|168273664.85540068|190.13336013714985|2023-09-24|
+-------------------+-----+----+-----------------+------------------+------------------+----------+



In [5]:
from pyspark.sql.functions import explode

estimated_monthly_visits = steam_traffic.select("EstimatedMonthlyVisits")
# Split the 'EstimatedMonthlyVisits' column into separate rows and columns
estimated_monthly_visits = estimated_monthly_visits.select(
    explode("EstimatedMonthlyVisits").alias("Date", "Visits")
)
estimated_monthly_visits = estimated_monthly_visits.withColumn("FILE_DATE", lit(FILE_DATE))
#estimated_monthly_visits.show()

+----------+---------+
|Date      |Visits   |
+----------+---------+
|2023-06-01|172451708|
|2023-07-01|188136085|
|2023-08-01|168273664|
+----------+---------+



In [6]:
traffic_sources = steam_traffic.select("TrafficSources.Social",
                                       "TrafficSources.Paid Referrals",
                                       "TrafficSources.Mail",
                                       "TrafficSources.Referrals",
                                       "TrafficSources.Search",
                                       "TrafficSources.Direct")

traffic_sources = traffic_sources.withColumn("FILE_DATE", lit(FILE_DATE))
#traffic_sources.show(truncate=False)

+-------------------+---------------------+--------------------+-------------------+------------------+-------------------+----------+
|Social             |Paid Referrals       |Mail                |Referrals          |Search            |Direct             |FILE_DATE |
+-------------------+---------------------+--------------------+-------------------+------------------+-------------------+----------+
|0.06460781172767084|0.0016597399304503874|0.016175191632167206|0.09163391781301873|0.4531740415577455|0.37274929733894735|2023-09-24|
+-------------------+---------------------+--------------------+-------------------+------------------+-------------------+----------+



In [7]:
engagements_path = r"../saved_data/monthly_data/engagements_data"
traffic_sources_path = r"../saved_data/monthly_data/traffic_sources"
estimated_monthly_visits_path = r"../saved_data/monthly_data/estimated_monthly_visits"

# Save the DataFrame as CSV
engagements_data.write.format("csv").mode("overwrite").option("header", "true").save(engagements_path)
traffic_sources.write.format("csv").mode("overwrite").option("header", "true").save(traffic_sources_path)
estimated_monthly_visits.write.format("csv").mode("overwrite").option("header", "true").save(estimated_monthly_visits_path)

In [9]:
# Stop the SparkSession
spark.stop()