In [1]:
import findspark
findspark.init()

In [2]:
#import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import *
import pyspark.sql.functions as F
import warnings
from pyspark.sql.types import *
import pandas as pd

warnings.filterwarnings('ignore')

In [3]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [4]:
# Initializing a Spark session
spark = SparkSession.builder \
    .appName('BBG Case Study Case 2') \
    .getOrCreate()

### Loading data from files

In [5]:
# Loading the sales data into Spark Cluster
sales_df = spark.read.csv('F:/usecase/DATA/sales_and_traffic_data.csv', header=True)
sales_df.show(5, truncate=False)

+---+----------+--------+----------+-------------+-----------------+---------------------+--------------------------+-------------------+-----------------------+------+---------------------+-----------+
|#  |child_asin|sessions|page_views|units_ordered|units_ordered_b2b|ordered_products_sale|ordered_products_sales_b2b|total_ordered_items|total_ordered_items_b2b|region|shop_name            |report_date|
+---+----------+--------+----------+-------------+-----------------+---------------------+--------------------------+-------------------+-----------------------+------+---------------------+-----------+
|1  |B00H5155LG|1       |1         |0            |0                |116.64               |62.49                     |0                  |0                      |UK    |Elektronik-Star      |2022-03-20 |
|2  |B002P7L4R4|1       |2         |0            |0                |121.55               |227.24                    |0                  |0                      |UK    |Elektronik-Star     

## Loading JSON file via Python Code using pandas and then converted to pyspark

In [6]:
import json


json_file_path = 'F:/usecase/DATA/campaign_object.json'

# Opening the JSON file and load its contents
with open(json_file_path, 'r') as f:
    json_data = json.load(f)

# Now 'data' contains the contents of your JSON file
#print(json_data)

In [7]:
# Converting JSON data to a PySpark DataFrame
spark_df = spark.createDataFrame(json_data['data'], json_data['columns'])
# Showing the contents of the Spark DataFrame
spark_df.show(5)

+----------+---------+--------------------+
|CAMPAIGNID|STARTDATE|            CREATIVE|
+----------+---------+--------------------+
|  54356000| 20220326|{'brandName': 'Pa...|
|  32659511| 20220326|{'brandName': 'Pa...|
|  14600371| 20221004|{'brandName': 'Pa...|
|  68849012| 20221004|{'brandName': 'Pa...|
|  39939835| 20221004|{'brandName': 'Pa...|
+----------+---------+--------------------+
only showing top 5 rows



In [8]:
spark_df.printSchema()

root
 |-- CAMPAIGNID: long (nullable = true)
 |-- STARTDATE: long (nullable = true)
 |-- CREATIVE: string (nullable = true)



## Task 1 - Separating the column CREATIVE to multiple columns ( brandName , brandLogoAssetID , headline , asins , brandLogoUrl ).

In [9]:
# Defining a UDF to extract values from the JSON string
def extract_values(creative):
    try:
        # If the string contains single quotes ('), it is replaced with double quotes (") to ensure it is in the correct JSON format
        creative_dict = json.loads(creative.replace("'", '"'))
        return [creative_dict.get('brandName'), creative_dict.get('brandLogoAssetID'), creative_dict.get('headline'), creative_dict.get('asins'), creative_dict.get('brandLogoUrl')]
    except:
        return [None, None, None, None, None]
# Defining the schema for the new columns
schema = ArrayType(StringType())
# Registering the UDF
extract_values_udf = udf(extract_values, schema)

# Applying the UDF to create new columns
spark_df = spark_df.withColumn("brandName", extract_values_udf(spark_df['CREATIVE'])[0]) \
       .withColumn("brandLogoAssetID", extract_values_udf(spark_df['CREATIVE'])[1]) \
       .withColumn("headline", extract_values_udf(spark_df['CREATIVE'])[2]) \
       .withColumn("asins", extract_values_udf(spark_df['CREATIVE'])[3]) \
       .withColumn("brandLogoUrl", extract_values_udf(spark_df['CREATIVE'])[4])

# Showing the resulting DataFrame
spark_df.show()

+----------+---------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+
|CAMPAIGNID|STARTDATE|            CREATIVE|brandName|    brandLogoAssetID|            headline|               asins|        brandLogoUrl|
+----------+---------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+
|  54356000| 20220326|{'brandName': 'Pa...|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|  32659511| 20220326|{'brandName': 'Pa...|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|  14600371| 20221004|{'brandName': 'Pa...|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|  68849012| 20221004|{'brandName': 'Pa...|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6, B074...|https://m.media-a...|
|  39939835| 20221004|{'brandName'

## Task 2 to split the column named 'asins' into 3 asins named as asin_1, asin_2 and asin_3

In [10]:
from pyspark.sql.functions import split

# Separating the asins column into three new columns
spark_df = spark_df.withColumn("asin_1", split(spark_df['asins'], ',').getItem(0)) \
                  .withColumn("asin_2", split(spark_df['asins'], ',').getItem(1)) \
                  .withColumn("asin_3", split(spark_df['asins'], ',').getItem(2)) \
                  .select("CAMPAIGNID", "STARTDATE", "CREATIVE", "brandName", "brandLogoAssetID", "headline", "asin_1", "asin_2", "asin_3", "brandLogoUrl")


# Showing the resulting DataFrame
spark_df.show()


+----------+---------+--------------------+---------+--------------------+--------------------+-----------+------------+------------+--------------------+
|CAMPAIGNID|STARTDATE|            CREATIVE|brandName|    brandLogoAssetID|            headline|     asin_1|      asin_2|      asin_3|        brandLogoUrl|
+----------+---------+--------------------+---------+--------------------+--------------------+-----------+------------+------------+--------------------+
|  54356000| 20220326|{'brandName': 'Pa...|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6|  B074F576VM| B078VLNF5K]|https://m.media-a...|
|  32659511| 20220326|{'brandName': 'Pa...|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6|  B074F576VM| B078VLNF5K]|https://m.media-a...|
|  14600371| 20221004|{'brandName': 'Pa...|   Pamara|amzn1.assetlibrar...|Per il tuo futuro...|[B06W5543D6|  B074F576VM| B078VLNF5K]|https://m.media-a...|
|  68849012| 20221004|{'brandName': 'Pa...|   Pamara|amzn1.assetlibrar

## Task 3 - Getting list of distinct ASINs

In [11]:
# Extracting distinct ASIN values
sales_df = spark.read.csv('F:/usecase/DATA/sales_and_traffic_data.csv', header=True)
distinct_asin_df = sales_df.select("child_asin").distinct()
# Showing the distinct ASIN DataFrame
distinct_asin_df.show()

+----------+
|child_asin|
+----------+
|B08HVGLWJM|
|B07R621F4H|
|B00TKHT08M|
|B0747M22NQ|
|B08TRK1YCX|
|B07ZKW56QG|
|B0924NDG7G|
|B07VFVZL12|
|B01HXLIGIG|
|B074M96TMX|
|B00F85E0BC|
|B08K3L7531|
|B07T22YGXX|
|B07DHPR4CZ|
|B092R2MB5B|
|B01MY4KWIE|
|B08C59MLCP|
|B00S9SMLDY|
|B07WS884YH|
|B00LIHFPWC|
+----------+
only showing top 20 rows



## Task 4 - Optimized Approach where we are finding active_asin 

In [12]:
from pyspark.sql.functions import when

# Collecting child_asin values as a list
child_asin_list = distinct_asin_df.select("child_asin").rdd.flatMap(lambda x: x).collect()

In [13]:
from pyspark.sql.functions import expr, when

# Broadcasting the child_asin_list
child_asin_list_bc = spark.sparkContext.broadcast(set(child_asin_list))

# Conditions for finding active_asin
conditions = [
    when((col("asin_1").isin(child_asin_list_bc.value)), col("asin_1")),
    when((col("asin_2").isin(child_asin_list_bc.value)), col("asin_2")),
    when((col("asin_3").isin(child_asin_list_bc.value)), col("asin_3"))
]

'''
 The coalesce function is used to find the first non-null value among the conditions specified in the conditions list. 
 The * operator is used to unpack the list and provide its elements as separate arguments to the coalesce function.
'''

# Using the coalesce function to find the first non-null value
distinct_asin_df_with_active_asin = spark_df.withColumn(
    "active_asin",
    coalesce(*conditions)
).filter(col("active_asin").isNotNull())

# Showing the resulting DataFrame
distinct_asin_df_with_active_asin.show()


+----------+---------+--------+---------+----------------+--------+------+------+------+------------+-----------+
|CAMPAIGNID|STARTDATE|CREATIVE|brandName|brandLogoAssetID|headline|asin_1|asin_2|asin_3|brandLogoUrl|active_asin|
+----------+---------+--------+---------+----------------+--------+------+------+------+------------+-----------+
+----------+---------+--------+---------+----------------+--------+------+------+------+------------+-----------+



## Task 4 - first approach which didn't stopped while running

In [None]:
# Initialize an empty list to store the results
results = []
# Loop through the list and check for matches in spark_df
for child_asin in child_asin_list:
    active_asin = None
    for row in spark_df.collect():
        asin_1 = row.asin_1
        asin_2 = row.asin_2
        asin_3 = row.asin_3
        if child_asin == asin_1 or child_asin == asin_2 or child_asin == asin_3:
            active_asin = child_asin
            break
    results.append((child_asin, active_asin))


In [None]:
# Create a new DataFrame from the results list
distinct_asin_df_with_active_asin = spark.createDataFrame(results, ["child_asin", "active_asin"])

# Show the resulting DataFrame
distinct_asin_df_with_active_asin.show()