In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=fdaad27a90726a0d29c6d0383eccf2591927a53170a98afb3154b1325062c609
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, expr
from pyspark.sql.types import FloatType
import datetime

# Initialize a Spark session
spark = SparkSession.builder.appName("ECommerceCaseStudy").getOrCreate()

# Load the data files into a PySpark DataFrame
sales_df = spark.read.csv("sales_and_traffic_data.csv", header=True)
mapping_df = spark.read.csv("amazon_shop_mapping.csv", header=True)
campaign_df = spark.read.json("campaign_object.json")

# Define a User-Defined Function (UDF) to convert currency
def convert_to_euro(amount, currency):
    try:
        # Convert currency to Euro based on a fixed exchange rate
        exchange_rate = 0.85
        amount_euro = float(amount) * exchange_rate
        return round(amount_euro, 2)
    except ValueError:
        return 0.0  # Handle non-numeric or missing values

# Register the UDF for conversion
convert_udf = spark.udf.register("convert_to_euro", convert_to_euro, FloatType())

# Join the sales_df with mapping_df to get currency information
joined_df = sales_df.join(mapping_df, "shop_name", "left")

# Perform currency conversion using the UDF
sales_df_with_euro = joined_df.withColumn(
    "ordered_products_sale_euro",
    convert_udf(col("ordered_products_sale"), col("currency"))
)

# Calculate the total revenue
total_revenue = sales_df_with_euro.selectExpr("round(sum(ordered_products_sale_euro), 2) as total_revenue").collect()[0]["total_revenue"]

# total revenue per country
revenue_per_country = sales_df_with_euro.groupBy("country").sum("ordered_products_sale_euro")
revenue_per_country = revenue_per_country.withColumnRenamed("sum(ordered_products_sale_euro)", "total_revenue_per_country")


# total revenue per shop
revenue_per_shop = sales_df_with_euro.groupBy("shop_name").sum("ordered_products_sale_euro")
revenue_per_shop = revenue_per_shop.withColumnRenamed("sum(ordered_products_sale_euro)", "total_revenue_per_shop")



# the report_date column to a date format
sales_df_with_euro = sales_df_with_euro.withColumn("report_date", col("report_date").cast("date"))

sales_df_with_euro = sales_df_with_euro.withColumn("month", expr("month(report_date)"))

# total revenue per month
revenue_per_month = sales_df_with_euro.groupBy("month").sum("ordered_products_sale_euro")
revenue_per_month = revenue_per_month.withColumnRenamed("sum(ordered_products_sale_euro)", "total_revenue_per_shop")

# Print Results
print("Total Revenue in Euros:", total_revenue)
revenue_per_country.show()
revenue_per_shop.show()
revenue_per_month.show()

# Stop the Spark session
spark.stop()


Total Revenue in Euros: 803962.01
+-------+-------------------------+
|country|total_revenue_per_country|
+-------+-------------------------+
|     NL|       20841.619942605495|
|     PL|       20841.619942605495|
|   NULL|        616387.4300203435|
|     DE|       20841.619942605495|
|     ES|       20841.619942605495|
|     TR|       20841.619942605495|
|     FR|       20841.619942605495|
|     IT|       20841.619942605495|
|     SE|       20841.619942605495|
|     UK|       20841.619942605495|
+-------+-------------------------+

+--------------------+----------------------+
|           shop_name|total_revenue_per_shop|
+--------------------+----------------------+
|         BBG BeerCup|    30144.650010395795|
|BBG The Friendly ...|     386635.7399078831|
|     Elektronik-Star|     34396.09006881714|
|    BBG Spielehelden|    249.25999450683594|
| Berlin Brands Group|                 50.75|
|            SLIMPURO|     87197.68994675577|
|Klarstein Deutsch...|    26476.269998028874|
|

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, split, when
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import col, explode
# Create a Spark session
spark = SparkSession.builder.appName("CampaignAnalysis").getOrCreate()

# Load the campaign_object.json data
campaign_data = spark.read.json("campaign_object.json")

# Extract the "CREATIVE" JSON string from the "data" array
campaign_df = campaign_data.select(col("data")[0][2].alias("CREATIVE"))

In [28]:
# Define the schema for the "CREATIVE" column
creative_schema = StructType([
    StructField("brandName", StringType(), True),
    StructField("brandLogoAssetID", StringType(), True),
    StructField("headline", StringType(), True),
    StructField("asins", StringType(), True),  # Assuming it's a string
    StructField("brandLogoUrl", StringType(), True)
])

# Parse the "CREATIVE" column as JSON using the specified schema
campaign_data = campaign_data.withColumn("CREATIVE_JSON", from_json(col("data")[0][2], creative_schema))

# Access the keys in the "CREATIVE_JSON" struct
campaign_data.select(
    "CREATIVE_JSON.brandName",
    "CREATIVE_JSON.brandLogoAssetID",
    "CREATIVE_JSON.headline",
    "CREATIVE_JSON.asins",  # Access it as a string
    "CREATIVE_JSON.brandLogoUrl"
).show(truncate=False)

+---------+---------------------------------------------------------------------+--------------------------+----------------------------------------+------------------------------------------------------------------------------------------------+
|brandName|brandLogoAssetID                                                     |headline                  |asins                                   |brandLogoUrl                                                                                    |
+---------+---------------------------------------------------------------------+--------------------------+----------------------------------------+------------------------------------------------------------------------------------------------+
|Pamara   |amzn1.assetlibrary.asset1.198f5d4d083135d4acff704bf947a982:version_v1|Per il tuo futuro luminoso|["B06W5543D6","B074F576VM","B078VLNF5K"]|https://m.media-amazon.com/images/S/al-eu-726f4d26-7fdb/1b6aeff7-5d01-458e-9e26-5afb917cf8aa.png|
+---------+-

In [30]:
asins_df=campaign_data.select('CREATIVE_JSON.asins')
asins_df.show()

+--------------------+
|               asins|
+--------------------+
|["B06W5543D6","B0...|
+--------------------+



In [31]:
from pyspark.sql.functions import split, col

# Split the "asins" column into an array of strings
df = asins_df.withColumn("asins_array", split(col("asins"), ","))

# Create columns for each element in the array
for i in range(3):  # Change the range as needed
    df = df.withColumn(f"asins_{i + 1}", col("asins_array")[i])

# Select the desired columns
df = df.select("asins_1", "asins_2", "asins_3")
df.show()

+-------------+------------+-------------+
|      asins_1|     asins_2|      asins_3|
+-------------+------------+-------------+
|["B06W5543D6"|"B074F576VM"|"B078VLNF5K"]|
+-------------+------------+-------------+



In [32]:
from pyspark.sql.functions import col, coalesce, split, lit


sales_df = spark.read.csv("sales_and_traffic_data.csv", header=True)

# Task 3: Extract distinct ASINs from sales_and_traffic_data.csv
distinct_asin_df = sales_df.select("child_asin").distinct()

# Task 4: Create the "active_asin" column based on distinct ASINs
distinct_asin_list = [row.child_asin for row in distinct_asin_df.collect()]

campaign_df = df.withColumn("active_asin", coalesce(
    col("asins_1"),
    when(col("asins_2").isin(distinct_asin_list), col("asins_2")),
    when(col("asins_3").isin(distinct_asin_list), col("asins_3")),
    lit(None)
))

# Show the transformed campaign DataFrame
campaign_df.show(5)

+-------------+------------+-------------+-------------+
|      asins_1|     asins_2|      asins_3|  active_asin|
+-------------+------------+-------------+-------------+
|["B06W5543D6"|"B074F576VM"|"B078VLNF5K"]|["B06W5543D6"|
+-------------+------------+-------------+-------------+

