# Products and packages preprocessing

# 1. Import dependencies & declare variables

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lit, array_distinct, split, regexp_replace, collect_list, struct, create_map, concat, lit, expr, create_map, map_from_entries, first
from pyspark.sql.types import StructType, StructField, LongType, StringType, FloatType, MapType, IntegerType, ArrayType, BooleanType

In [2]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Process Product Data") \
    .getOrCreate()

24/08/06 21:48:58 WARN Utils: Your hostname, Miguels-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.13 instead (on interface en0)
24/08/06 21:48:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/06 21:49:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
packages_path = "../output/packages.parquet"
products_path = "../output/products.parquet"
# products_json_path = "../output/full_product_extraction_2.json"
products_json_path = "../output/full_product_extraction_sample.json"


# product json schema
# product_json_schema = StructType([
#     StructField("packages", ArrayType(StructType([
#         StructField("name", StringType(), True),
#         StructField("typeName", StringType(), True),
#         StructField("itemNo", StringType(), True),
#         StructField("articleNumber", StructType([
#             StructField("label", StringType(), True),
#             StructField("value", StringType(), True)
#         ]), True),
#         StructField("measurements", ArrayType(StructType([
#             StructField("label", StringType(), True),
#             StructField("type", StringType(), True),
#             StructField("text", StringType(), True),
#             StructField("value", FloatType(), True)
#         ])), True),
#         StructField("quantity", StructType([
#             StructField("label", StringType(), True),
#             StructField("value", IntegerType(), True)
#         ]), True),
#         StructField("multiPackDisclaimerText", BooleanType(), True)
#     ])), True),
#     StructField("maxMeasurments", StructType([
#         StructField("weightText", StringType(), True),
#         StructField("widthText", StringType(), True),
#         StructField("lengthText", StringType(), True),
#         StructField("heightText", StringType(), True),
#         StructField("diameterText", StringType(), True)
#     ]), True),
#     StructField("price", FloatType(), True),
#     StructField("currency", StringType(), True),
#     StructField("measurement_ensembled_text", StringType(), True),
#     StructField("url", StringType(), True),
#     StructField("name", StringType(), True),
#     StructField("category", StringType(), True)
# ])


# Define the schema for products and packages
# products_schema = StructType([
#     StructField("id", LongType(), nullable=False),
#     StructField("item_number", StringType(), nullable=False), -> Dimensionality inconsistency
#     StructField("ensembled_measurements", StringType(), nullable=True),
#     StructField("name", StringType(), nullable=False),
#     StructField("category", StringType(), nullable=False),
#     StructField("price", FloatType(), nullable=True),
#     StructField("currency", StringType(), nullable=True),
#     StructField("url", StringType(), nullable=True),
#     StructField("product_components", MapType(StringType(), IntegerType()), nullable=True)
# ])

# packages_schema = StructType([
#     StructField("id", LongType(), nullable=False),
#     StructField("products_item_number", StringType(), nullable=False),
#     StructField("width", FloatType(), nullable=False),
#     StructField("length", FloatType(), nullable=False),
#     StructField("height", FloatType(), nullable=False),
#     StructField("weight", FloatType(), nullable=False),
#     StructField("distance_units", StringType(), nullable=False),
#     StructField("weight_units", StringType(), nullable=False)
# ])

In [4]:
product_json_schema = StructType([
    StructField("price", FloatType(), True),
    StructField("currency", StringType(), True),
    StructField("measurement_ensembled_text", StringType(), True),
    StructField("url", StringType(), True),
    StructField("name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("packages", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("typeName", StringType(), True),
        StructField("itemNo", StringType(), True),
        StructField("articleNumber", StringType(), True),
        StructField("measurements", StructType([
            StructField("dimensions", StructType([
                StructField("width", IntegerType(), True),
                StructField("height", IntegerType(), True),
                StructField("length", IntegerType(), True),
                StructField("unit", StringType(), True)
            ]), True),
            StructField("weight", StructType([
                StructField("value", FloatType(), True),
                StructField("unit", StringType(), True)
            ]), True),
            StructField("volume", StructType([
                StructField("value", IntegerType(), True),
                StructField("unit", StringType(), True)
            ]), True)
        ]), True),
        StructField("quantity", IntegerType(), True),
    ])), True)
])

## 2. Product processing

## 2.1 Bronce Layer

In [38]:
df_product_bronce = (
    spark
    .read
    .option("header", True)
    .format("json")
    .option("schema", product_json_schema)
    #.option("inferSchema", "True")
    .option("mode", "PERMISSIVE")
    .option("sep", ",")
    .option("quote", '"')
    .option("escape", '"')
    .option("multiLine", True)
    .load(products_json_path)
    #.filter($"_corrupt_record".isNotNull).count()
)

In [39]:
df_product_bronce.show()

+--------------------+--------+--------------------------+--------------+--------------------+------+--------------------+
|            category|currency|measurement_ensembled_text|          name|            packages| price|                 url|
+--------------------+--------+--------------------------+--------------+--------------------+------+--------------------+
|            Librería|       €|              80x28x202 cm|         BILLY|[{002.638.50, 002...| 59.99|https://www.ikea....|
|Librería con puer...|       €|              80x30x202 cm|BILLY / OXBERG|[{692.817.76, 692...|139.99|https://www.ikea....|
|Combinación libre...|       €|           95/95x28x202 cm|         BILLY|[{993.959.36, 993...|139.97|https://www.ikea....|
+--------------------+--------+--------------------------+--------------+--------------------+------+--------------------+



## 2.2 Silver Layer

In [40]:
df_product_silver = (
    df_product_bronce
    .withColumn("product_id", concat(lit("pro-"), expr("uuid()")))
    .withColumn("package_explode", explode(col("packages")))
    .withColumn("product_components", create_map(col("package_explode.itemNo"), col("package_explode.quantity")))
    #.withColumn("package_measurements", explode(col("package_explode")))
    .select(
        col("product_id"),
        col("package_explode.name").alias("name"),
        col("category"),
        col("package_explode.typeName").alias("type_name"),
        col("package_explode.itemNo").alias("item_number"),
        col("package_explode.articleNumber").alias("article_number"),
        col("package_explode.measurements.dimensions.width").alias("width"),
        col("package_explode.measurements.dimensions.height").alias("height"),
        col("package_explode.measurements.dimensions.length").alias("length"),
        col("package_explode.measurements.weight.value").alias("weight"),
        col("package_explode.measurements.volume.value").alias("volume"),       
        col("package_explode.quantity").alias("package_quantity"),
        col("price"),
        col("currency"),
        col("url"),
        col("product_components")
    )
)

df_product_silver.show()

+--------------------+--------------+--------------------+--------------------+-----------+--------------+-----+------+------+------+------+----------------+------+--------+--------------------+------------------+
|          product_id|          name|            category|           type_name|item_number|article_number|width|height|length|weight|volume|package_quantity| price|currency|                 url|product_components|
+--------------------+--------------+--------------------+--------------------+-----------+--------------+-----+------+------+------+------+----------------+------+--------+--------------------+------------------+
|pro-8c8c077b-4966...|         BILLY|            Librería|            Librería|   00263850|    002.638.50|   29|    13|   206|  37.9| 77662|               1| 59.99|       €|https://www.ikea....|   {00263850 -> 1}|
|pro-e4879f36-2905...|BILLY / OXBERG|Librería con puer...|Librería con puer...|   69281776|    692.817.76| NULL|  NULL|  NULL|  NULL|  NULL|    

24/08/07 06:23:13 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 990317 ms exceeds timeout 120000 ms
24/08/07 06:23:13 WARN SparkContext: Killing executors is not supported by current scheduler.
24/08/07 06:39:38 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

## 2.3 Gold Layer

### 2.3.1 Package Gold 

In [17]:
# packages_schema = StructType([
#     StructField("id", LongType(), nullable=False),                     -> done
#     StructField("products_item_number", StringType(), nullable=False), ???
#     StructField("width", FloatType(), nullable=False),                 -> done
#     StructField("length", FloatType(), nullable=False),                -> done
#     StructField("height", FloatType(), nullable=False),                -> done
#     StructField("weight", FloatType(), nullable=False),                -> done
#     StructField("distance_units", StringType(), nullable=False),       -> TBC
#     StructField("weight_units", StringType(), nullable=False)          -> TBC

In [36]:
df_package_gold = (
    df_product_silver
    .withColumn("stock_quantity", lit(100))
    .select(
        col("item_number").alias("package_id"),
        col("name"),
        col("width"),
        col("height"),
        col("length"),
        col("volume"),
        col("stock_quantity")
    )
)
df_package_gold.show()

+----------+--------------+-----+------+------+------+--------------+
|package_id|          name|width|height|length|volume|stock_quantity|
+----------+--------------+-----+------+------+------+--------------+
|  00263850|         BILLY|   29|    13|   206| 77662|           100|
|  69281776|BILLY / OXBERG| NULL|  NULL|  NULL|  NULL|           100|
|  00263850|         BILLY|   29|    13|   206| 77662|           100|
|  50275558|        OXBERG|   40|     3|   211| 25320|           100|
|  99395936|         BILLY| NULL|  NULL|  NULL|  NULL|           100|
|  50263838|         BILLY|   39|     7|   207| 43470|           100|
|  40104109|         BILLY|   20|     1|    22|   440|           100|
+----------+--------------+-----+------+------+------+--------------+



### 2.3.2 Product Gold

In [None]:
# products_schema = StructType([
#     StructField("id", LongType(), nullable=False),                                               -> done
#     StructField("item_number", StringType(), nullable=False),                                    -> Dimensionality inconsistency
#     StructField("ensembled_measurements", StringType(), nullable=True),                          -> TBC
#     StructField("name", StringType(), nullable=False),                                           -> done
#     StructField("category", StringType(), nullable=False),                                       -> done
#     StructField("price", FloatType(), nullable=True),                                            -> done
#     StructField("currency", StringType(), nullable=True),                                        -> done
#     StructField("url", StringType(), nullable=True),                                             -> done
#     StructField("product_components", MapType(StringType(), IntegerType()), nullable=True)       -> done
# ])

In [32]:
df_product_gold = (
    df_product_silver
    .groupBy("product_id")
    .agg(
        first("name").alias("name"),
        first("category").alias("category"),
        #first("measurement_ensembled_text").alias("measurement_ensembled_text"),
        first("url").alias("url"),
        first("price").alias("price"),
        first("currency").alias("currency"),
        collect_list("product_components").alias("product_components"),
    )
)

df_product_gold.show()


+--------------------+--------------+--------------------+--------------------+------+--------+--------------------+
|          product_id|          name|            category|                 url| price|currency|  product_components|
+--------------------+--------------+--------------------+--------------------+------+--------+--------------------+
|pro-e8476f25-ce62...|BILLY / OXBERG|Librería con puer...|https://www.ikea....|139.99|       €|[{69281776 -> 1},...|
|pro-0a6c542e-7716...|         BILLY|            Librería|https://www.ikea....| 59.99|       €|   [{00263850 -> 1}]|
|pro-c733eda6-1e8c...|         BILLY|Combinación libre...|https://www.ikea....|139.97|       €|[{99395936 -> 1},...|
+--------------------+--------------+--------------------+--------------------+------+--------+--------------------+

