In [4]:
%idle_timeout 5
%worker_type Standard
%number_of_workers 4

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.6 
Current idle_timeout is None minutes.
idle_timeout has been set to 5 minutes.
Previous worker type: None
Setting new worker type to: Standard
Previous number of workers: None
Setting new number of workers to: 4


In [4]:
spark = SparkSession.builder.appName("Amazon Product Data Tansformation").getOrCreate()




In [27]:
# Load libraries
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, IntegerType, StringType, NumericType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Tokenizer, HashingTF, IDF, VectorAssembler
from pyspark.sql.functions import col, log1p, percent_rank, length, sum, when

from pyspark.sql.window import Window
import pyspark.sql.functions as F




# 1. Load Data

In [5]:
bucket_name = 'amazon-product-dataset-2024'
categories_file_path = 's3://{}/amazon_categories.csv'.format(bucket_name)
products_cleaned_file_path = "s3://amazon-product-dataset-2024/cleaned/"




In [6]:
categories = spark.read.csv(categories_file_path, header=True, inferSchema=True)
categories.show(5)

+---+--------------------+
| id|       category_name|
+---+--------------------+
|  1|Beading & Jewelry...|
|  2|   Fabric Decorating|
|  3|Knitting & Croche...|
|  4|Printmaking Supplies|
|  5|Scrapbooking & St...|
+---+--------------------+
only showing top 5 rows


In [16]:
df = spark.read.parquet(products_cleaned_file_path)
df.show(5)

+----------+--------------------+-----+-------+------------------+---------+-----------+------------+-----------------+-------------+
|      asin|               title|stars|reviews|             price|listPrice|category_id|isBestSeller|boughtInLastMonth|has_listPrice|
+----------+--------------------+-----+-------+------------------+---------+-----------+------------+-----------------+-------------+
|B014TMV5YE|Sion Softside Exp...|  4.5|      0|139.99000549316406|      0.0|        104|       False|             2000|        False|
|B07GDLCQXV|Luggage Sets Expa...|  4.5|      0|169.99000549316406|   209.99|        104|       False|             1000|         True|
|B07XSCCZYG|Platinum Elite So...|  4.6|      0|  365.489990234375|   429.99|        104|       False|              300|         True|
|B08MVFKGJM|Freeform Hardside...|  4.6|      0| 291.5899963378906|   354.37|        104|       False|              400|         True|
|B01DJLKZBA|Winfield 2 Hardsi...|  4.5|      0|174.99000549316

# 2. Transform

In [17]:
df = df.join(categories, df.category_id == categories.id, how='left')




In [18]:
df = df.drop('category_id', 'id')




In [19]:
df.show(5)

+----------+--------------------+-----+-------+------------------+---------+------------+-----------------+-------------+-------------+
|      asin|               title|stars|reviews|             price|listPrice|isBestSeller|boughtInLastMonth|has_listPrice|category_name|
+----------+--------------------+-----+-------+------------------+---------+------------+-----------------+-------------+-------------+
|B014TMV5YE|Sion Softside Exp...|  4.5|      0|139.99000549316406|      0.0|       False|             2000|        False|    Suitcases|
|B07GDLCQXV|Luggage Sets Expa...|  4.5|      0|169.99000549316406|   209.99|       False|             1000|         True|    Suitcases|
|B07XSCCZYG|Platinum Elite So...|  4.6|      0|  365.489990234375|   429.99|       False|              300|         True|    Suitcases|
|B08MVFKGJM|Freeform Hardside...|  4.6|      0| 291.5899963378906|   354.37|       False|              400|         True|    Suitcases|
|B01DJLKZBA|Winfield 2 Hardsi...|  4.5|      0|1

In [24]:
# convert booleans to 0,1
df = df.withColumn("has_listPrice", when(col("has_listPrice") == True, 1).otherwise(0))
df = df.withColumn("isBestSeller", when(col("isBestSeller") == True, 1).otherwise(0))




In [28]:
# rating_weighted_reviews: Multiply stars by reviews to create a new feature
df = df.withColumn('rating_weighted_reviews', col('stars') * col('reviews'))




In [29]:
# isPopular: Flag products with reviews above the 75th percentile
# Define a window to compute percentiles
windowSpec = Window.orderBy(col('reviews'))

# Compute the percentile rank of reviews
df = df.withColumn("percent_rank", percent_rank().over(windowSpec))

# Create 'isPopular' feature: 1 if above 75th percentile, 0 otherwise
df = df.withColumn("isPopular", (col("percent_rank") > 0.75).cast("int"))




In [30]:
# price_log: Log-transform price
df = df.withColumn('price_log', log1p(col('price')))
# do we need to?




In [31]:
# title_length
df = df.withColumn('title_length', length(col('title')))




In [32]:
df.show(5)

+----------+--------------------+-----+-------+------------------+---------+------------+-----------------+-------------+--------------------+-----------------------+------------+---------+------------------+------------+
|      asin|               title|stars|reviews|             price|listPrice|isBestSeller|boughtInLastMonth|has_listPrice|       category_name|rating_weighted_reviews|percent_rank|isPopular|         price_log|title_length|
+----------+--------------------+-----+-------+------------------+---------+------------+-----------------+-------------+--------------------+-----------------------+------------+---------+------------------+------------+
|B0BNN7RWWV|Jerify 4 Pcs Foam...|  4.5|      0|24.989999771118164|      0.0|           0|               50|            0|Sports & Outdoor ...|                    0.0|         0.0|        0| 3.257711839846864|         136|
|B0CD2BQ7TK|48 Pieces Mini St...|  4.5|      0| 9.989999771118164|      0.0|           0|                0|     

In [34]:
print(df.columns)

['asin', 'title', 'stars', 'reviews', 'price', 'listPrice', 'isBestSeller', 'boughtInLastMonth', 'has_listPrice', 'category_name', 'rating_weighted_reviews', 'percent_rank', 'isPopular', 'price_log', 'title_length']


In [37]:
df.describe().show(truncate=False)

+-------+--------------------+--------------------+------------------+------------------+--------------------+------------------+--------------------+------------------+-------------------+--------------------+-----------------------+-------------------+-------------------+-------------------+------------------+
|summary|                asin|               title|             stars|           reviews|               price|         listPrice|        isBestSeller| boughtInLastMonth|      has_listPrice|       category_name|rating_weighted_reviews|       percent_rank|          isPopular|          price_log|      title_length|
+-------+--------------------+--------------------+------------------+------------------+--------------------+------------------+--------------------+------------------+-------------------+--------------------+-----------------------+-------------------+-------------------+-------------------+------------------+
|  count|             1372587|             1372587|       

In [None]:
numeric_columns = [
    'stars', 
    'reviews', 
    'price', 
    'listPrice', 
    'rating_weighted_reviews', 
    'percent_rank', 
    'price_log', 
    'title_length'
]

# Prepare aggregation expressions for mean, min, and max
agg_exprs = [
    F.mean(column).alias(f'mean_{column}') for column in numeric_columns
] + [
    F.min(column).alias(f'min_{column}') for column in numeric_columns
] + [
    F.max(column).alias(f'max_{column}') for column in numeric_columns
]

# Aggregate the DataFrame
agg_results = df.agg(*agg_exprs)

In [41]:
summary_pandas = agg_results.toPandas()




In [45]:
melted_summary = pd.melt(summary_pandas)




In [46]:
melted_summary.columns = ['Statistic', 'Value']




In [47]:
pd.set_option('display.float_format', '{:.2f}'.format)  # Control float format
melted_summary

                       Statistic      Value
0                     mean_stars       4.38
1                   mean_reviews     183.20
2                     mean_price      42.85
3                 mean_listPrice      12.27
4   mean_rating_weighted_reviews     821.57
5              mean_percent_rank       0.19
6                 mean_price_log       3.19
7              mean_title_length     120.72
8                      min_stars       1.00
9                    min_reviews       0.00
10                     min_price       0.01
11                 min_listPrice       0.00
12   min_rating_weighted_reviews       0.00
13              min_percent_rank       0.00
14                 min_price_log       0.01
15              min_title_length       1.00
16                     max_stars       4.90
17                   max_reviews  346563.00
18                     max_price   19731.81
19                 max_listPrice     999.99
20   max_rating_weighted_reviews 1490221.00
21              max_percent_rank

In [48]:
df.show()

+----------+--------------------+-----+-------+------------------+---------+------------+-----------------+-------------+-------------+-----------------------+------------+---------+------------------+------------+
|      asin|               title|stars|reviews|             price|listPrice|isBestSeller|boughtInLastMonth|has_listPrice|category_name|rating_weighted_reviews|percent_rank|isPopular|         price_log|title_length|
+----------+--------------------+-----+-------+------------------+---------+------------+-----------------+-------------+-------------+-----------------------+------------+---------+------------------+------------+
|B08DDFKXKC|Motorola MB8611 D...|  4.4|      0|177.99000549316406|   219.99|           0|                0|            1|    Computers|                    0.0|         0.0|        0|  5.18732996904819|         181|
|B015CH1PJU|SanDisk 128GB Ult...|  4.7|      0|11.989999771118164|      0.0|           0|                0|            0|    Computers|     

# 3. Save

In [49]:
output_path = "s3://amazon-product-dataset-2024/transformed/"




In [50]:
df.write.mode("overwrite").format("parquet").option("path", output_path).save()


