In [25]:
import os
import sys
import pyspark
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, to_date, col
from pyspark.sql.types import StringType, StructType, StructField, DoubleType, NumericType, DecimalType, ArrayType

spark = SparkSession.builder.appName('SparkByExample.com').config("spark.driver.memory", "4g").config('spark.executor.cores', '4').getOrCreate()
print(spark)

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

<pyspark.sql.session.SparkSession object at 0x00000199482C0408>


# Case 1

In [26]:
def exchange_rates():
    response=requests.get('https://v6.exchangerate-api.com/v6/d95f46213104ffa309bca4f3/latest/EUR')
    if response.status_code !=200:
        print("Status Code :",response.status_code)
        raise Exception("There was error!")
    
    print("Header:",response.headers['Content-Type'])
    data=response.json()
    
    conversion_rates_all = data['conversion_rates']
    
    return conversion_rates_all

conversion_rates_all = exchange_rates()

Header: application/json


In [27]:
#Reading sales file

sales = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv("sales_and_traffic_data.csv")

mapping = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv("amazon_shop_mapping.csv")

In [28]:
mapping = mapping.withColumnRenamed("country", "region")
mapping = mapping.drop('id')

# Drop duplicate rows based on specific columns (keep the first occurrence)

columns_to_check_duplicates = ["shop_name", "region"]
mapping = mapping.dropDuplicates(subset=columns_to_check_duplicates)

In [29]:
#Joining on sale and mapping

join_columns = ["shop_name", "region"]

result_df = sales.join(mapping, on=join_columns, how="left")


In [30]:
#changig json to dataframe

schema = StructType([
    StructField("currency", StringType(), True),
    StructField("exchange_rate", DoubleType(), True)
])

# Convert the dictionary to a list of tuples with key-value pairs
data = [(currency, float(exchange_rate)) for currency, exchange_rate in conversion_rates_all.items()]

# Create a DataFrame
currenc_df = spark.createDataFrame(data, schema=schema)


In [31]:
#joining currenc_df with result_df

result_df = result_df.join(currenc_df, on='currency', how="left")

In [34]:
result_df.count()

5000

In [35]:
#Converting base on exchange rae
result_df = result_df.withColumn("ordered_products_sales_euro", col("ordered_products_sale") / col("exchange_rate"))
result_df = result_df.withColumn("ordered_products_sales_b2b_euro", col("ordered_products_sales_b2b") / col("exchange_rate"))

In [37]:
result_df.show()

+--------+---------------+------+---+----------+--------+----------+-------------+-----------------+---------------------+--------------------------+-------------------+-----------------------+-----------+-------------+---------------------------+-------------------------------+
|currency|      shop_name|region|  #|child_asin|sessions|page_views|units_ordered|units_ordered_b2b|ordered_products_sale|ordered_products_sales_b2b|total_ordered_items|total_ordered_items_b2b|report_date|exchange_rate|ordered_products_sales_euro|ordered_products_sales_b2b_euro|
+--------+---------------+------+---+----------+--------+----------+-------------+-----------------+---------------------+--------------------------+-------------------+-----------------------+-----------+-------------+---------------------------+-------------------------------+
|     GBP|Electronic-Star|    UK|  1|B00H5155LG|       1|         1|            0|                0|               116.64|                     62.49|           

In [38]:
#Adding new col by summation of newly generated cols

result_df = result_df.withColumn("Total_euro", col("ordered_products_sales_b2b_euro") + col("ordered_products_sales_euro"))

In [39]:
# Sum the values in the "Total_euro" column

total_sum = result_df.agg(sum("Total_euro")).collect()[0][0]

# Print the total sum
print("Total revenue of ordered_products_sales_b2b_euro:", total_sum)

Total revenue of ordered_products_sales_b2b_euro: 1402132.8410984552


In [40]:
#aggregation by region
agg_df = result_df.groupBy("region").agg(sum("Total_euro").alias("total_sales_by_region"))

In [41]:
agg_df.show()

+------+---------------------+
|region|total_sales_by_region|
+------+---------------------+
|    NL|    60722.72000000001|
|    PL|   1979.0659209123162|
|    MX|     407.814706370961|
|    CA|   12647.852548585655|
|    DE|    359060.0199999994|
|    ES|    78008.69000000003|
|    TR|                 null|
|    US|    409119.5933806148|
|    FR|   185737.00000000006|
|    IT|   177971.91999999984|
|    SE|   2436.5142472747343|
|    UK|   114041.65029469554|
+------+---------------------+



In [42]:
#aggregation by shop_name

agg_df = result_df.groupBy("shop_name").agg(sum("Total_euro").alias("total_sales_by_shop_name"))
agg_df.show()

+--------------------+------------------------+
|           shop_name|total_sales_by_shop_name|
+--------------------+------------------------+
|              Zelite|       690.9692671394798|
|           Klarstein|       65260.91481682656|
|     Electronic-Star|       84098.05564312954|
|                 BBG|       173.0496453900709|
|BBG Scandinavian Hub|                    null|
|             Feelino|       44839.60833930427|
|            Gramercy|        60459.3646544838|
|        Spielehelden|                  550.04|
|            slimpuro|      201142.67883492835|
|          Superlunar|                  746.84|
|           Casa Chic|       4489.829738818908|
|  The Friendly Swede|       870672.5663658577|
|             BeerCup|        69008.9237925745|
|            skullcap|                    null|
+--------------------+------------------------+



In [43]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [44]:
#aggregation by month

# Convert the "report_date" column to a date type
result_df = result_df.withColumn("report_date", to_date(col("report_date"), "MM/dd/yyyy"))

# Extract the month from the "report_date" column
result_df = result_df.withColumn("month", month("report_date"))

# Group by "month" and aggregate the sum of "ordered_products_sales_b2b_euro" for each month
agg_df = result_df.groupBy("month").agg(sum("Total_euro").alias("total_sales_by_month"))

# Show the aggregated DataFrame
agg_df.show()

+-----+--------------------+
|month|total_sales_by_month|
+-----+--------------------+
|    1|  362648.36560848355|
|    3|   597288.9980882284|
|    2|  442195.47740174085|
+-----+--------------------+



# Case 2

In [54]:
data = spark.read.json('campaign_object.json')

In [55]:
data

DataFrame[columns: array<string>, data: array<array<string>>]

In [58]:
# Extract column names and data rows
columns = data.select("columns").first()[0]
data_rows = data.select("data").first()[0]

# Create a DataFrame using the column names and data rows
df = spark.createDataFrame(data_rows, columns)

# Define a schema for the "CREATIVE" column
schema = StructType([
    StructField("brandName", StringType(), True),
    StructField("brandLogoAssetID", StringType(), True),
    StructField("headline", StringType(), True),
    StructField("asins", ArrayType(StringType()), True),
    StructField("brandLogoUrl", StringType(), True),
])

# Extract values from the "CREATIVE" column into separate columns
df = df.withColumn("CREATIVE_JSON", col("CREATIVE").cast(StringType()))
df = df.withColumn("CREATIVE", from_json(col("CREATIVE_JSON"), schema))
df = df.select(columns + [col("CREATIVE.*")])

df.show(truncate=False)


+----------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------------------------------------------------------------------+-------------------------------------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------+
|CAMPAIGNID|STARTDATE|CREATIVE                                                                                                                                                                                                                                                                    |brandName|brandLogoAssetID                                                     |headline               

In [59]:
#Adding asin cols

df = df.withColumn("asin_1", col("asins").getItem(0))
df = df.withColumn("asin_2", col("asins").getItem(1))
df = df.withColumn("asin_3", col("asins").getItem(2))

In [60]:
df.show()

+----------+---------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+----------+----------+----------+
|CAMPAIGNID|STARTDATE|            CREATIVE|brandName|    brandLogoAssetID|            headline|               asins|        brandLogoUrl|    asin_1|    asin_2|    asin_3|
+----------+---------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+----------+----------+----------+
|  54356000| 20220326|{Pamara, amzn1.as...|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|B06W5543D6|B074F576VM|B078VLNF5K|
|  32659511| 20220326|{Pamara, amzn1.as...|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|B06W5543D6|B074F576VM|B078VLNF5K|
|  14600371| 20221004|{Pamara, amzn1.as...|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|B06W5543

In [61]:
#getting distinct asins from sales df
distinct_asins_df = result_df.select("child_asin").distinct()

In [62]:
distinct_asins_df.show()

+----------+
|child_asin|
+----------+
|B08HVGLWJM|
|B07R621F4H|
|B00TKHT08M|
|B0747M22NQ|
|B08TRK1YCX|
|B07ZKW56QG|
|B0924NDG7G|
|B07VFVZL12|
|B01HXLIGIG|
|B074M96TMX|
|B00F85E0BC|
|B08K3L7531|
|B07T22YGXX|
|B07DHPR4CZ|
|B092R2MB5B|
|B01MY4KWIE|
|B08C59MLCP|
|B00S9SMLDY|
|B07WS884YH|
|B00LIHFPWC|
+----------+
only showing top 20 rows

