In [None]:
# data source https://nijianmo.github.io/amazon/index.html

import pyspark
from pyspark.sql import functions as F
import utils

spark = pyspark.sql.SparkSession.builder \
    .appName("Amazon Product Simplifier") \
    .getOrCreate()

# gets rid of the COLUMN ALREADY EXISTS error
spark.conf.set('spark.sql.caseSensitive', True) 
spark.catalog.clearCache()


REVIEW_DATA = '../dataset/Clothing_Shoes_and_Jewelry.json'
PRODUCT_DATA = '../dataset/meta_Clothing_Shoes_and_Jewelry.json'
SAMPLE_PRODUCTDATA = '../dataset/sample2.json'

In [None]:
product_data = spark.read.json(PRODUCT_DATA)
product_data = product_data.drop('imageURL','imageURLHighRes','date','tech1','tech2','details','fit')
# product_data = product_data.drop('similar_item')

product_data.show(10)

In [None]:
print(f'There are {product_data.count()} entries.')

In [5]:
# regex extract for rank of in clothing shoes & jewelry and cast to integer
# main cateogries controls the rank
EXP = r'(\d*\,*\d*\,*\d+)\s*in\s*Clothing,\s*Shoes\s*\&*\s*Jewelry'
product_data = product_data.withColumn('rank',F.regexp_replace(
                                               F.regexp_extract('rank',EXP,1),",",'')
                                                .cast('int')
                                        )

# regex extract price and change to float type
EXP = r'\$*(\d+\.*\d+)'
product_data = product_data.withColumn('price',F.regexp_extract('price',EXP,1)
                                                .cast('float')
                                        )

In [6]:
# clean up some null values
# clean up main_cat if there is no value, then its main cat is Clothing_shoes_and_jewelry
# we're gonna erly heavily on feature and description, drop null
product_data = product_data.na.drop(subset=['description','feature'])
# print(f'DF has {product_data.count()} entries')


# change description, and feature into string type 
# product_data = product_data.select([F.concat_ws(',',c).alias(c) for c in  ['description','feature']])
product_data = product_data.withColumn('description',F.concat_ws(',','description'))
product_data = product_data.withColumn('feature',F.concat_ws(',','feature'))
product_data = product_data.withColumn('category',F.concat_ws(',','category'))

In [None]:
product_data.show()

In [7]:
# remove all white space and digits
EXP = r'Clothing, Shoes & Jewelry|\W+|\d+'

product_data=product_data.withColumn('category_1', 
                                    F.lower(
                                        F.regexp_replace('category',EXP,' ') 
                                        )
                                    )

In [329]:
# clean up the categories abit 
# product_data.select('category').distinct().show(20,truncate=0)

# within each element remove white space?
# remove numerical numbers 
# nlp to identify relevant words

In [None]:
# go straight into nlp
# Wrapper for the processes 
#   https://www.johnsnowlabs.com/unleashing-the-power-of-text-tokenization-with-spark-nlp/ 
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF


# preprocessing
tokenizer = RegexTokenizer(inputCol="category_1", outputCol="words", pattern="\\W+")
df = tokenizer.transform(product_data.select('category_1'))
# df_tokenized.select("category_1", "words").show(truncate=False)

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df = remover.transform(df)
# df_filtered.select("words", "filtered").show(truncate=False)

# use TF-IDF + maybe knn + NER to figure out what's important
# TF-IDF
    # cant use hasing TF bc i wont be able to extract the hash key value pairs 
        # hashing_tf = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
        # df_hashing = hashing_tf.transform(df)
vectorizer = CountVectorizer(inputCol='filtered', outputCol='rawFeatures')
model = vectorizer.fit(df)
df_vect = model.transform(df)
idf = IDF(inputCol="rawFeatures", outputCol="features")
model = idf.fit(df_vect)
tfidf_data = model.transform(df_vect)




In [None]:
tfidf_data.select('words','features').show(truncate=False)
# df_vect.select('words','rawFeatures').show(truncate=False)

In [None]:
tfidf_data.select('features','words').show(truncate=False)

In [None]:
tfidf_data.select("category_1", "features").show(truncate=False)

In [None]:
df.select('filtered','rawFeatures').show(truncate=0)

In [None]:
# clean up any white space character and numbers 
# delete repetitions

In [None]:
# show 5 column not truncated
product_data.show(10, False)


# need to change `description` into element string instead of array
# drop nulls
# drop images
# change `price` to integer
# modify rank 
# change `category`, `description`, `features` from array to string type
    # note that features can be parsed throught bc it does contain key, value pairs

where

- asin - ID of the product, e.g. 0000031852
- title - name of the product
- feature - bullet-point format features of the product
- description - description of the product
- price - price in US dollars (at time of crawl)
- imageURL - url of the product image
- imageURL - url of the high resolution product image
- related - related products (also bought, also viewed, bought together, buy after viewing)
- salesRank - sales rank information
- brand - brand name
- categories - list of categories the product belongs to
- tech1 - the first technical detail table of the product
- tech2 - the second technical detail table of the product
- similar - similar product table

In [16]:
# end session 
spark.stop()