In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import expr, regexp_replace, col, monotonically_increasing_id, upper, initcap, when, count, isnan, isnull, split, explode, trim, lit
from pyspark import SparkContext, SparkConf

In [2]:
# create spark session to work with dataframe
spark = SparkSession.builder.\
        appName("iabsa-pipeline").\
        master('spark://spark-master:7077').\
        config("spark.executor.memory","512mb").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/24 21:50:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/24 21:50:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# import iabsa data from csv
iabsa_df = spark.read.option("escape", "\"").csv("./raw_data/iabsa/*.csv",header=True, inferSchema=True) # other scape .option("quote", "\"")
# add id incremental column
#iabsa_rdd =  iabsa_df.rdd.zipWithIndex().toDF()
#iabsa_df=iabsa_rdd.select(col("_1.*"),col("_2").alias('id'))

                                                                                

In [4]:
# drop description column
iabsa_df = iabsa_df.drop("description") # count: 610,782
# capitalize (title) color, product_name, product_category
iabsa_df = iabsa_df.withColumn('color',initcap(col('color')))
iabsa_df = iabsa_df.withColumn('product_name',initcap(col('product_name')))
iabsa_df = iabsa_df.withColumn('product_category',initcap(col('product_category')))
# upper case total_sizes and available_size
iabsa_df = iabsa_df.withColumn('total_sizes',upper(col('total_sizes')))
iabsa_df = iabsa_df.withColumn('available_size',upper(col('available_size')))
# remove $ character from mrp column
iabsa_df = iabsa_df.withColumn('mrp', regexp_replace('mrp', r'\$', ''))
# convert mrp column to float type
iabsa_df = iabsa_df.withColumn('mrp',iabsa_df.mrp.cast("float")) # other way: col('mrp').astype('float')
# remove duplicated records
iabsa_df = iabsa_df.dropDuplicates(subset=['product_name','mrp','pdp_url','total_sizes','available_size','color']) # count: 148020
# drop pdp_url column
iabsa_df = iabsa_df.drop('pdp_url')
# standardize brand_name
iabsa_df = iabsa_df.withColumn('brand_name',\
                             when(col('brand_name').rlike("([bB]).(tempt|TEMPT)('?|%27)(d|D)"),"B.Tempt'd")\
                            .when(col('brand_name').rlike("^([Vv]ictoria|VICTORIA)'?s(\s+|-)([Ss]ecret|SECRET)"),"Victoria's Secret")\
                            .when(col('brand_name').rlike("^([Hh]anky|HANKY)(\s*|-)([Pp]anky|PANKY)"),"Hanky Panky")\
                            .when(col('brand_name').rlike("^([Cc]alvin|CALVIN)(\s*|-)([Kk]lein|KLEIN)"),"Calvin Klein")\
                            .when(col('brand_name').rlike("^([Ww]acoal|WACOAL)"),"Wacoal")\
                            .otherwise(col('brand_name')))
# add index_id column
iabsa_df = iabsa_df.withColumn('index_id',monotonically_increasing_id())
# fill NaN value in color column with 'None' value
iabsa_df = iabsa_df.na.fill('None','color')
# drop na value from total_sizes and available_size
#iabsa_df = iabsa_df.na.drop(subset=['total_sizes','available_size'])
#iabsa_df = iabsa_df.na.fill('None','total_sizes')
#iabsa_df = iabsa_df.na.fill('None','available_size')
# split total sizes, remove white space and drop columns total_sizes and available_size from iabsa_size_df
iabsa_size_df = iabsa_df.withColumn('size',explode(split('total_sizes',',')))
iabsa_size_df = iabsa_size_df.withColumn('size', trim('size'))
iabsa_size_df = iabsa_size_df.drop('total_sizes','available_size')
# split available sizes, remove white space
iabsa_available_size_df = iabsa_df.withColumn('available_size',explode(split('available_size',','))) # row count: 2,779,052
iabsa_available_size_df = iabsa_available_size_df.withColumn('available_size', trim('available_size')) # row count: 424,421
iabsa_available_size_df = iabsa_available_size_df.drop('total_sizes')
# drop null value from iabsa_size_df and iabsa_available_size_df
iabsa_size_df = iabsa_size_df.na.drop(subset=['size'])
iabsa_available_size_df = iabsa_available_size_df.na.drop(subset=['available_size'])
# add column total_size and total_available_size with value 1
iabsa_size_df = iabsa_size_df.withColumn('total_size',lit(1))
iabsa_available_size_df = iabsa_available_size_df.withColumn('total_available_size',lit(1))
# aggregate total size and total available size --> total size 170,982, total available size  113,231
iabsa_size_df = iabsa_size_df.groupBy('product_name','mrp','brand_name','product_category','retailer','size','color')\
                            .sum('total_size')\
                            .select('product_name','mrp','brand_name','product_category','retailer','size','color', col('sum(total_size)').alias('total_size'))
iabsa_available_size_df = iabsa_available_size_df.groupBy('product_name','mrp','brand_name','product_category','retailer','available_size','color')\
                            .sum('total_available_size')\
                            .select('product_name','mrp','brand_name','product_category','retailer','available_size','color', col('sum(total_available_size)').alias('total_available_size'))
# merge total size with available size



In [5]:
# verify if there is null or nan value in each column
iabsa_df.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in iabsa_df.columns]).show() # other way col(c).isNull()
#iabsa_df.where(col('color').isNull()).show()




+------------+---+----------+----------------+--------+-----------+--------------+-----+--------+
|product_name|mrp|brand_name|product_category|retailer|total_sizes|available_size|color|index_id|
+------------+---+----------+----------------+--------+-----------+--------------+-----+--------+
|           0|  0|         0|               0|       0|         40|            26|    0|       0|
+------------+---+----------+----------------+--------+-----------+--------------+-----+--------+



                                                                                

In [7]:
iabsa_available_size_df.count() # 113,231

                                                                                

113376

In [8]:
iabsa_df.count()

                                                                                

148020

In [9]:
iabsa_size_df.count() # 170,982

                                                                                

171113

In [9]:

inventory_df = iabsa_size_df.join(iabsa_available_size_df,\
                          (iabsa_size_df.product_name == iabsa_available_size_df.product_name) &\
                          (iabsa_size_df.mrp == iabsa_available_size_df.mrp) &\
                          (iabsa_size_df.brand_name == iabsa_available_size_df.brand_name) &\
                          (iabsa_size_df.product_category == iabsa_available_size_df.product_category) &\
                          (iabsa_size_df.retailer == iabsa_available_size_df.retailer) &\
                          (iabsa_size_df.color == iabsa_available_size_df.color) &\
                          (iabsa_size_df.size == iabsa_available_size_df.available_size)
                          ,'left')\
        .select(iabsa_size_df.product_name, iabsa_size_df.mrp, iabsa_size_df.brand_name, iabsa_size_df.product_category, iabsa_size_df.retailer, iabsa_size_df.color, iabsa_size_df.size, iabsa_size_df.total_size, iabsa_available_size_df.total_available_size)

In [10]:
inventory_df = inventory_df.na.fill(0,'total_available_size')

In [11]:
inventory_df.filter('total_available_size = 0').show()

[Stage 56:>                                                         (0 + 1) / 1]

+--------------------+----+------------+------------------+------------+--------------+-------+----------+--------------------+
|        product_name| mrp|  brand_name|  product_category|    retailer|         color|   size|total_size|total_available_size|
+--------------------+----+------------+------------------+------------+--------------+-------+----------+--------------------+
|'b Delighted' Con...|40.0|   B.Tempt'd|Skin-tone Lingerie|Nordstrom US|         Night|  32DDD|         2|                   0|
|'bodysuede' Lace ...|18.0|      Wacoal|   Women's Panties|Nordstrom US|         Ivory|      8|         1|                   0|
|'ciao Bella' Unde...|38.0|   B.Tempt'd|              Bras|Nordstrom US|         Ivory|   38DD|         1|                   0|
|'ciao Bella' Unde...|38.0|   B.Tempt'd|              Bras|Nordstrom US|         Night|  36DDD|         1|                   0|
|'ciao Bella' Unde...|38.0|   B.Tempt'd|              Bras|Nordstrom US|       Peacoat|  30DDD|         

                                                                                

In [76]:
inventory_df.filter("total_available_size > total_size").show()

                                                                                

+--------------------+----+----------+--------------------+--------+--------------------+----+----------+--------------------+
|        product_name| mrp|brand_name|    product_category|retailer|               color|size|total_size|total_available_size|
+--------------------+----+----------+--------------------+--------+--------------------+----+----------+--------------------+
|B.active Low-impa...|44.0| B.Tempt'd|Women - Lingerie ...|Macys US|Blue Depths/teaberry| 32D|         3|                   4|
|B.active Low-impa...|44.0| B.Tempt'd|Women - Lingerie ...|Macys US| Night/deep Sea Blue|32DD|         4|                   5|
+--------------------+----+----------+--------------------+--------+--------------------+----+----------+--------------------+



In [66]:
inventory_df.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in inventory_df.columns]).show() 

[Stage 643:>                                                        (0 + 4) / 4]

+------------+---+----------+----------------+--------+-----+----+----------+--------------------+
|product_name|mrp|brand_name|product_category|retailer|color|size|total_size|total_available_size|
+------------+---+----------+----------------+--------+-----+----+----------+--------------------+
|           0|  0|         0|               0|       0|    0|   0|         0|               60755|
+------------+---+----------+----------------+--------+-----+----+----------+--------------------+



                                                                                

In [77]:
inventory_df.count()

                                                                                

170942

In [8]:
iabsa_df.show()

[Stage 16:>                                                         (0 + 4) / 4]

+--------------------+----+-----------------+--------------------+------------------+--------------------+--------------------+--------------------+--------+
|        product_name| mrp|       brand_name|    product_category|          retailer|         total_sizes|      available_size|               color|index_id|
+--------------------+----+-----------------+--------------------+------------------+--------------------+--------------------+--------------------+--------+
|  Aerie String Thong| 7.5|            AERIE|Everyday Loves Un...|             Ae US|XXS, XS, S, M, L,...|XS, S, M, L, XL, XXL|                Buff|       0|
|Aerie Lace Back S...| 7.5|            AERIE|              Thongs|             Ae US|XXS, XS, S, M, L,...|XS, S, M, L, XL, XXL|        Dark Heather|       1|
| Aerie String Bikini| 7.5|            AERIE|             Bikinis|             Ae US|XS, S, M, L, XL, XXL|XS, S, M, L, XL, XXL|        Dark Heather|       2|
|        Aerie Cheeky| 7.5|            AERIE|Everyda

                                                                                

In [10]:
iabsa_df.printSchema()

root
 |-- product_name: string (nullable = true)
 |-- mrp: float (nullable = true)
 |-- brand_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- retailer: string (nullable = true)
 |-- total_sizes: string (nullable = true)
 |-- available_size: string (nullable = true)
 |-- color: string (nullable = false)
 |-- index_id: long (nullable = false)



In [11]:
iabsa_df.select("retailer").groupBy("retailer").count().orderBy("count", ascending=0).show()



+------------------+------+
|          retailer| count|
+------------------+------+
|Victoriassecret US|113818|
|             Ae US| 11236|
|         Amazon US|  9056|
|          Macys US|  4976|
|     Hankypanky US|  3521|
|   Calvin Klein US|  2669|
|      Nordstrom US|  1247|
|        Topshop US|  1041|
|        Btemptd US|   456|
+------------------+------+



                                                                                

In [12]:
spark.stop()