<a href="https://colab.research.google.com/github/zylammmmmm/T001/blob/main/CDS4005A1_Q2ZhengYingLam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [60]:
!pip install pyspark



In [61]:
from google.colab import drive
drive.mount('/content/drive/')
data_path = "/content/drive/MyDrive/Colab Notebooks/"

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [62]:
# create entry points to spark
from pyspark.sql import SparkSession

ss  = SparkSession.builder \
                            .master("local[1]")\
                            .appName("sold product")\
                            .getOrCreate()
spark = ss.sparkContext


In [63]:

df = ss.read.option("multiline", "true") \
    .json(data_path+ "sold_products.json")

# Prepare keyword mapping
keywords = {
    "smartphone": "e",
    "charger": "e",
    "laptop": "e",
    "electronics": "e",
    "sofa": "f",
    "table": "f",
    "bookshelf": "f",
    "furniture": "f",
    "bag": "a",
    "accessories": "a",
    "clothing": "c",
    "shirt": "c",
    "pants": "c",
    "jacket": "c",
    "other": "o"
}

In [64]:
from pyspark.sql.functions import  lower, udf
def classifyProduct(product_name):
    lowerProdName = product_name.lower()  # Convert to lowercase for matching
    for keyword, category in keywords.items():
        if keyword in lowerProdName:
            return category
    return 'o'  # Default category if no keywords matched

# Register the UDF
classify_udf = udf(classifyProduct)

In [65]:
from pyspark.sql import functions as F
from pyspark.sql.functions import explode, col

classified_df = df.select(
    col("product_id"),
    F.arrays_zip(col("product_names"), col("descriptions")).alias("NameNDescription")
)

# Explode classified_df and create new columns
seperate_df = classified_df.select(
    col("product_id"),
    F.explode(col("NameNDescription")).alias("productInfo")
).select(
    col("product_id"),
    lower(col("productInfo.product_names")).alias("product_name_lower "),
    col("productInfo.descriptions").alias("full_description ")
)

# Classify products and handle full_description
ordered_df = seperate_df.withColumn(
    "category_id", classify_udf(col("product_name_lower "))
).withColumn(
    "full_description ", F.regexp_replace(F.concat_ws(' ', col("full_description ")), "blanket", "")
)

rearrange_df = ordered_df.withColumn("category_id", col("category_id")) \
    .withColumn("product_name_lower", col("product_name_lower ")) \
    .withColumn("full_description", col("full_description ")) \
    .select("product_id", "category_id", "product_name_lower", "full_description")

# Apply filtering for non-"Other" categories and descriptions with more than 50 words and sort the DataFrame by product_id in ascending order
final_df = rearrange_df.filter(
    (col("category_id") != 'o') &
    (F.size(F.split(col("full_description "), ' ')) > 50)).orderBy(col("product_id").asc())




In [66]:

#  save the path of JSON file
output_path = "/path/to/save/products.json"

# Save  DataFrame
final_df.write.json(output_path, mode="overwrite")

# Show the results
final_df.show(truncate=False)

+----------+-----------+------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|produ