In [5]:
import pyspark
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("Currency_conversion").getOrCreate()

In [6]:
sales_and_traffic_data = spark.read.csv("C:/Users/LENOVO/Downloads/Skill_Test_SDE/Skill_Test/DATA/sales_and_traffic_data.csv", inferSchema=True, header=True)

In [7]:
# analyse the data and its schema
sales_and_traffic_data.printSchema()
sales_and_traffic_data.select("child_asin", "sessions","page_views","units_ordered","units_ordered_b2b","ordered_products_sale", "ordered_products_sales_b2b").show(20,  False)
sales_and_traffic_data.select("total_ordered_items","total_ordered_items_b2b", "region", "shop_name", "report_date").show(20, False)           

root
 |-- #: integer (nullable = true)
 |-- child_asin: string (nullable = true)
 |-- sessions: integer (nullable = true)
 |-- page_views: integer (nullable = true)
 |-- units_ordered: integer (nullable = true)
 |-- units_ordered_b2b: integer (nullable = true)
 |-- ordered_products_sale: double (nullable = true)
 |-- ordered_products_sales_b2b: double (nullable = true)
 |-- total_ordered_items: integer (nullable = true)
 |-- total_ordered_items_b2b: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- shop_name: string (nullable = true)
 |-- report_date: timestamp (nullable = true)

+----------+--------+----------+-------------+-----------------+---------------------+--------------------------+
|child_asin|sessions|page_views|units_ordered|units_ordered_b2b|ordered_products_sale|ordered_products_sales_b2b|
+----------+--------+----------+-------------+-----------------+---------------------+--------------------------+
|B00H5155LG|1       |1         |0            |0     

In [8]:
sales_and_traffic_data.show(3)

+---+----------+--------+----------+-------------+-----------------+---------------------+--------------------------+-------------------+-----------------------+------+--------------------+-------------------+
|  #|child_asin|sessions|page_views|units_ordered|units_ordered_b2b|ordered_products_sale|ordered_products_sales_b2b|total_ordered_items|total_ordered_items_b2b|region|           shop_name|        report_date|
+---+----------+--------+----------+-------------+-----------------+---------------------+--------------------------+-------------------+-----------------------+------+--------------------+-------------------+
|  1|B00H5155LG|       1|         1|            0|                0|               116.64|                     62.49|                  0|                      0|    UK|     Elektronik-Star|2022-03-20 00:00:00|
|  2|B002P7L4R4|       1|         2|            0|                0|               121.55|                    227.24|                  0|                      0

In [9]:
amazon_shop_mapping = spark.read.csv("C:/Users/LENOVO/Downloads/Skill_Test_SDE/Skill_Test/DATA/amazon_shop_mapping.csv", inferSchema=True, header=True)

In [10]:
amazon_shop_mapping.show(10, False)
amazon_shop_mapping.printSchema()

+----------+-------+----+--------+
|shop_name |country|id  |currency|
+----------+-------+----+--------+
|Flyweight |IT     |5823|EUR     |
|Flyweight |SE     |4213|SEK     |
|Flyweight |PL     |621 |PLN     |
|Flyweight |TR     |8772|TRY     |
|Flyweight |FR     |9257|EUR     |
|Flyweight |NL     |7731|EUR     |
|Flyweight |ES     |7735|EUR     |
|Flyweight |DE     |4583|EUR     |
|Flyweight |UK     |9144|GBP     |
|Scandfield|IT     |3807|EUR     |
+----------+-------+----+--------+
only showing top 10 rows

root
 |-- shop_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- currency: string (nullable = true)



In [11]:
amazon_shop_mapping.show(10, False)
amazon_shop_mapping.printSchema()

+----------+-------+----+--------+
|shop_name |country|id  |currency|
+----------+-------+----+--------+
|Flyweight |IT     |5823|EUR     |
|Flyweight |SE     |4213|SEK     |
|Flyweight |PL     |621 |PLN     |
|Flyweight |TR     |8772|TRY     |
|Flyweight |FR     |9257|EUR     |
|Flyweight |NL     |7731|EUR     |
|Flyweight |ES     |7735|EUR     |
|Flyweight |DE     |4583|EUR     |
|Flyweight |UK     |9144|GBP     |
|Scandfield|IT     |3807|EUR     |
+----------+-------+----+--------+
only showing top 10 rows

root
 |-- shop_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- currency: string (nullable = true)



# Get the latest Euro based exchange rate from Curreny api

In [17]:
from pyspark.sql.functions import  substring, col
import requests

# Define the API URL
api_url = "http://apilayer.net/api/live"
params = {
    "access_key": "64766765c7abaeb4ad69da724e4dbff2",
    "source": "EUR",
    "format": "1"
}

# Make the API call
response = requests.get(api_url, params=params)

# Check if the API call was successful
if response.status_code == 200:
    # Parse the JSON response
    data = response.json()
    
    # Extract the currency exchange rates from the response
    quotes = data.get("quotes", {})
    
    # Create a list of tuples containing currency code and exchange rate
    currency_quotes = [(currency, float(rate)) for currency, rate in quotes.items()]
    # Create a Spark DataFrame from the currency quotes with explicit schema
    #schema = [("CurrencyCode", StringType()), ("ExchangeRate", StringType())]
    quotes_df = spark.createDataFrame(currency_quotes, ["code", "ExchangeRate"])
    euro_currency_exchangeRate = quotes_df.withColumn("currency_code", substring(col("code"), -3, 3))
    euro_currency_exchangeRate.printSchema()
    # Show the DataFrame
    euro_currency_exchangeRate.show()
else:
    print("Failed to retrieve data from the API")


root
 |-- code: string (nullable = true)
 |-- ExchangeRate: double (nullable = true)
 |-- currency_code: string (nullable = true)

+------+------------+-------------+
|  code|ExchangeRate|currency_code|
+------+------------+-------------+
|EURAED|     3.84581|          AED|
|EURAFN|   80.848051|          AFN|
|EURALL|  106.067869|          ALL|
|EURAMD|  426.259391|          AMD|
|EURANG|    1.887784|          ANG|
|EURAOA|  867.478546|          AOA|
|EURARS|  366.519001|          ARS|
|EURAUD|    1.658037|          AUD|
|EURAWG|    1.887308|          AWG|
|EURAZN|    1.779041|          AZN|
|EURBAM|    1.955614|          BAM|
|EURBBD|    2.117058|          BBD|
|EURBDT|  115.488998|          BDT|
|EURBGN|    1.955784|          BGN|
|EURBHD|    0.394691|          BHD|
|EURBIF| 2970.089399|          BIF|
|EURBMD|     1.04705|          BMD|
|EURBND|    1.440101|          BND|
|EURBOB|     7.23838|          BOB|
|EURBRL|    5.410735|          BRL|
+------+------------+-------------+
only 

In [18]:
euro_currency_exchangeRate.printSchema()
euro_currency_exchangeRate.show()

root
 |-- code: string (nullable = true)
 |-- ExchangeRate: double (nullable = true)
 |-- currency_code: string (nullable = true)

+------+------------+-------------+
|  code|ExchangeRate|currency_code|
+------+------------+-------------+
|EURAED|     3.84581|          AED|
|EURAFN|   80.848051|          AFN|
|EURALL|  106.067869|          ALL|
|EURAMD|  426.259391|          AMD|
|EURANG|    1.887784|          ANG|
|EURAOA|  867.478546|          AOA|
|EURARS|  366.519001|          ARS|
|EURAUD|    1.658037|          AUD|
|EURAWG|    1.887308|          AWG|
|EURAZN|    1.779041|          AZN|
|EURBAM|    1.955614|          BAM|
|EURBBD|    2.117058|          BBD|
|EURBDT|  115.488998|          BDT|
|EURBGN|    1.955784|          BGN|
|EURBHD|    0.394691|          BHD|
|EURBIF| 2970.089399|          BIF|
|EURBMD|     1.04705|          BMD|
|EURBND|    1.440101|          BND|
|EURBOB|     7.23838|          BOB|
|EURBRL|    5.410735|          BRL|
+------+------------+-------------+
only 

# Case 1.1 

1. Convert the sales data from sales_and_traffic_data.csv to euro by using amazon_shop_mapping.csv and the currency conversion api presented or any forex api from your choice (python packages not allowed).

In [19]:
# Rename the column shop_name to avoid confusion in groupBy and select clauses 

sales_and_traffic_data = sales_and_traffic_data.withColumnRenamed("shop_name","sales_shop_name")


df1 =sales_and_traffic_data.join(amazon_shop_mapping, sales_and_traffic_data['region'] == amazon_shop_mapping['country'])

# Build dataframe with name euro_currency_exchangeRate with current exchange Rate for calculating sales in Euro
df2 = df1.join(euro_currency_exchangeRate, df1['currency'] == euro_currency_exchangeRate['currency_code'])


final_df = df2.withColumn("ordered_products_sale_euro", col("ordered_products_sale") / col("ExchangeRate"))
final_df = final_df.withColumn("ordered_products_sales_b2b_euro", col("ordered_products_sales_b2b") / col("ExchangeRate"))

# Show the resulting DataFrame
final_df.show()

final_df.printSchema()


+---+----------+--------+----------+-------------+-----------------+---------------------+--------------------------+-------------------+-----------------------+------+---------------+-------------------+------------------+-------+----+--------+------+------------+-------------+--------------------------+-------------------------------+
|  #|child_asin|sessions|page_views|units_ordered|units_ordered_b2b|ordered_products_sale|ordered_products_sales_b2b|total_ordered_items|total_ordered_items_b2b|region|sales_shop_name|        report_date|         shop_name|country|  id|currency|  code|ExchangeRate|currency_code|ordered_products_sale_euro|ordered_products_sales_b2b_euro|
+---+----------+--------+----------+-------------+-----------------+---------------------+--------------------------+-------------------+-----------------------+------+---------------+-------------------+------------------+-------+----+--------+------+------------+-------------+--------------------------+----------------

2. Calculate the total revenue.

In [21]:
from pyspark.sql.functions import sum 

# Aggregate the total ordder sales and b2b sales

tatal_revenue_df = final_df.agg(sum(col("ordered_products_sale_euro") + col("ordered_products_sales_b2b_euro")).alias("Total_revenue"))
tatal_revenue_df.show()


+-----------------+
|    Total_revenue|
+-----------------+
|7292236.939195005|
+-----------------+



3. In separate Dataframes present the total revenue per country and per shop and per month.

In [22]:
# Generate new column for proper aggrigation on the basis of Month
final_df = final_df.withColumn("year", substring(col("report_date"), 0, 4))
final_df = final_df.withColumn("month", substring(col("report_date"), 6, 2))
#final_df.select("report_date","year", "month").show(3000)
# asssuming all data belongs to same year else we need to add year in a groupby clause
df_by_country_shop = final_df.groupBy(["country", "shop_name","month"]).agg(sum(col("ordered_products_sale_euro") + col("ordered_products_sales_b2b_euro")).alias("Total_revenue"))
df_by_country_shop.sort("country").show() # apply sorting for proper analyzing the data

+-------+------------------+-----+------------------+
|country|         shop_name|month|     Total_revenue|
+-------+------------------+-----+------------------+
|     CA|The Friendly Swede|   02| 4623.840863064573|
|     CA|          Gramercy|   01|3433.6577627951788|
|     CA|        Superlunar|   02| 4623.840863064573|
|     CA|      Spielehelden|   03| 4682.491746450399|
|     CA|               BBG|   02| 4623.840863064573|
|     CA|      Spielehelden|   01|3433.6577627951788|
|     CA|         Accubuddy|   03| 4682.491746450399|
|     CA|The Friendly Swede|   01|3433.6577627951788|
|     CA|               BBG|   03| 4682.491746450399|
|     CA|         Accubuddy|   02| 4623.840863064573|
|     CA|         Guetewerk|   03| 4682.491746450399|
|     CA|         Guetewerk|   01|3433.6577627951788|
|     CA|        Fox & Fern|   03| 4682.491746450399|
|     CA|      Spielehelden|   02| 4623.840863064573|
|     CA|               BBG|   01|3433.6577627951788|
|     CA|The Friendly Swede|

4. Replicate the above questions in SQL.

In [23]:
# SQL 1
sales_and_traffic_data.createOrReplaceTempView("sales_and_traffic_data")
amazon_shop_mapping.createOrReplaceTempView("amazon_shop_mapping")
euro_currency_exchangeRate.createOrReplaceTempView("euro_currency_exchangeRate")

sales_result = spark.sql("select s.*, m.country, m.shop_name, s.ordered_products_sale/e.ExchangeRate as ordered_products_sale_euro , s.ordered_products_sales_b2b/e.ExchangeRate as ordered_products_sales_b2b_euro  from sales_and_traffic_data s, amazon_shop_mapping m, euro_currency_exchangeRate e " + \
    "where s.region == m.country and m.currency == e.currency_code")
sales_result.createOrReplaceTempView("sales_result")

# SQL 2
spark.sql( " select sum(ordered_products_sale_euro) +sum(ordered_products_sales_b2b_euro) as Total_revenue from sales_result").show()

# SQL 3

spark.sql( " select country, shop_name, MONTH(report_date) AS month, sum(ordered_products_sale_euro) +sum(ordered_products_sales_b2b_euro) as Total_revenue " + \
    "From sales_result group by country, shop_name,month order by country, month").show()


+-----------------+
|    Total_revenue|
+-----------------+
|7292236.939195035|
+-----------------+

+-------+------------------+-----+------------------+
|country|         shop_name|month|     Total_revenue|
+-------+------------------+-----+------------------+
|     CA|The Friendly Swede|    1|3433.6577627951788|
|     CA|         Guetewerk|    1|3433.6577627951788|
|     CA|        Fox & Fern|    1|3433.6577627951788|
|     CA|         Accubuddy|    1|3433.6577627951788|
|     CA|        Superlunar|    1|3433.6577627951788|
|     CA|      Spielehelden|    1|3433.6577627951788|
|     CA|               BBG|    1|3433.6577627951788|
|     CA|          Gramercy|    1|3433.6577627951788|
|     CA|The Friendly Swede|    2| 4623.840863064573|
|     CA|          Gramercy|    2| 4623.840863064573|
|     CA|        Superlunar|    2| 4623.840863064573|
|     CA|         Accubuddy|    2| 4623.840863064573|
|     CA|               BBG|    2| 4623.840863064573|
|     CA|         Guetewerk|    2| 

In [24]:
campaign_object= spark.read.format('org.apache.spark.sql.json').load("C:/Users/LENOVO/Downloads/Skill_Test_SDE/Skill_Test/DATA/campaign_object.json")

In [30]:
from pyspark.sql.functions import explode, col, from_json
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

#campaign_object.select(explode("data")).show()
cmp_df =campaign_object.withColumn("data", explode("data")) \
    .withColumn("CAMPAIGNID", col("data")[0]) \
    .withColumn("STARTDATE", col("data")[1]) \
    .withColumn("CREATIVE", col("data")[2]) 

cmp_df.printSchema()

#cmp_df.drop("columns","data").show(50, 150, False)

# Define the schema for the JSON data
json_schema = StructType([
    StructField("brandName", StringType(), True),
    StructField("brandLogoAssetID", StringType(), True),
    StructField("headline", StringType(), True),
    StructField("asins", ArrayType(StringType()), True),
    StructField("brandLogoUrl", StringType(), True)
])

df = cmp_df.withColumn("parsed_data", from_json(col("CREATIVE"), json_schema))

# Extract specific attributes from the parsed data
df = df.withColumn("brandName", col("parsed_data.brandName"))
df = df.withColumn("brandLogoAssetID", col("parsed_data.brandLogoAssetID"))
df = df.withColumn("headline", col("parsed_data.headline"))
df = df.withColumn("asins", col("parsed_data.asins"))
df = df.withColumn("brandLogoUrl", col("parsed_data.brandLogoUrl"))

df.drop("columns","data", "CREATIVE", "parsed_data").show(20, 20, False)

root
 |-- columns: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- data: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- CAMPAIGNID: string (nullable = true)
 |-- STARTDATE: string (nullable = true)
 |-- CREATIVE: string (nullable = true)

+----------+---------+---------+--------------------+--------------------+--------------------+--------------------+
|CAMPAIGNID|STARTDATE|brandName|    brandLogoAssetID|            headline|               asins|        brandLogoUrl|
+----------+---------+---------+--------------------+--------------------+--------------------+--------------------+
|  54356000| 20220326|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|  32659511| 20220326|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|  14600371| 20221004|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|


# Case 1.2

1. Transform the campaign_object.csv by separating the column CREATIVE to multiple columns ( brandName , brandLogoAssetID , headline , asins , brandLogoUrl ).


In [34]:
df.select("brandName", "brandLogoAssetID", "headline","asins", "brandLogoUrl").dropna("all").show() # remove rows if every column is null

+---------+--------------------+--------------------+--------------------+--------------------+
|brandName|    brandLogoAssetID|            headline|               asins|        brandLogoUrl|
+---------+--------------------+--------------------+--------------------+--------------------+
|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|   Pamara|amzn1.assetlibrar...|Per il t

2. Separate the new formatted column asins to 3 new columns asin_1 , asin_2 , asin_2

In [36]:
df20 = df.withColumn("asin_1", col("asins")[0])
df20 = df20.withColumn("asin_2", col("asins")[1])
df20 = df20.withColumn("asin_3", col("asins")[2])
df20.select("asin_1", "asin_2", "asin_3").dropna("all").show()

+----------+----------+----------+
|    asin_1|    asin_2|    asin_3|
+----------+----------+----------+
|B06W5543D6|B074F576VM|B078VLNF5K|
|B06W5543D6|B074F576VM|B078VLNF5K|
|B06W5543D6|B074F576VM|B078VLNF5K|
|B06W5543D6|B074F576VM|B078VLNF5K|
|B06W5543D6|B074F576VM|B078VLNF5K|
|B06W5543D6|B074F576VM|B078VLNF5K|
|B06W5543D6|B074F576VM|B078VLNF5K|
|B06W5543D6|B074F576VM|B078VLNF5K|
|B074F576VM|B078VLNF5K|      null|
|B074F576VM|B078VLNF5K|      null|
|B06W5543D6|B074F576VM|B078VLNF5K|
|B06W5543D6|B074F576VM|B078VLNF5K|
|B06W5543D6|B074F576VM|B078VLNF5K|
|B06W5543D6|B074F576VM|B078VLNF5K|
|B074PRYDMV|B074PRTQ1N|B074PQZ4CM|
|B0749RJM8P|B07589QRGJ|B073QVW1CL|
|B00VEAW58K|B07D3BBW1L|B07N2TXXF1|
|B073P78667|B07KWS1WK5|B07CVGTN5H|
|B01N94QWSV|B00Q74XCBW|B00BBW45NO|
|B07V3F81MV|B01N3B6DJN|B012H7F8CS|
+----------+----------+----------+
only showing top 20 rows



3. By using sales_and_traffic_data.csv, extract the distinct asin list and save it into a dataframe.

In [37]:
distinct_asin = sales_and_traffic_data.select("child_asin").distinct()
distinct_asin.printSchema()
distinct_asin.show()

root
 |-- child_asin: string (nullable = true)

+----------+
|child_asin|
+----------+
|B08HVGLWJM|
|B07R621F4H|
|B00TKHT08M|
|B0747M22NQ|
|B08TRK1YCX|
|B07ZKW56QG|
|B0924NDG7G|
|B07VFVZL12|
|B01HXLIGIG|
|B074M96TMX|
|B00F85E0BC|
|B08K3L7531|
|B07T22YGXX|
|B07DHPR4CZ|
|B092R2MB5B|
|B01MY4KWIE|
|B08C59MLCP|
|B00S9SMLDY|
|B07WS884YH|
|B00LIHFPWC|
+----------+
only showing top 20 rows



In [43]:
from pyspark.sql.functions import coalesce, lit

new_campaign_df=df20.withColumn(
    "active_asin",
    coalesce(
        df20["asin_1"],
        df20["asin_2"],
        df20["asin_3"],
        lit("No Active ASIN")  # Default value if none of the asins exist
    )
)

# Show the updated DataFrame
new_campaign_df.dropna("all").show(2, 7, False)

#new_campaign_df.select("asin_1", "asin_2", "asin_3", "active_asin").show(200)

+-------+-------+----------+---------+--------+-----------+---------+----------------+--------+-------+------------+-------+-------+-------+-----------+
|columns|   data|CAMPAIGNID|STARTDATE|CREATIVE|parsed_data|brandName|brandLogoAssetID|headline|  asins|brandLogoUrl| asin_1| asin_2| asin_3|active_asin|
+-------+-------+----------+---------+--------+-----------+---------+----------------+--------+-------+------------+-------+-------+-------+-----------+
|[CAM...|[543...|   5435...|  2022...| {'br...|    {Pam...|   Pamara|         amzn...| Per ...|[B06...|     http...|B06W...|B074...|B078...|    B06W...|
|[CAM...|[326...|   3265...|  2022...| {'br...|    {Pam...|   Pamara|         amzn...| Per ...|[B06...|     http...|B06W...|B074...|B078...|    B06W...|
+-------+-------+----------+---------+--------+-----------+---------+----------------+--------+-------+------------+-------+-------+-------+-----------+
only showing top 2 rows

+----------+----------+----------+--------------+
|    as