In [261]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct, count, format_number, monotonically_increasing_id, col, lower, length, udf, lit
from pyspark.sql.types import FloatType, array
from pyspark.ml.linalg import *
import numpy as np

In [2]:
sp = SparkSession.builder.config('spark.driver.host','localhost').appName('AmazonFashionDiscovery').getOrCreate()

In [17]:
df = sp.read.options(samplingRatio=0.5).json('tops_fashion.json', multiLine=True)

In [18]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- author: string (nullable = true)
 |-- availability: string (nullable = true)
 |-- availability_type: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- color: string (nullable = true)
 |-- editorial_reivew: string (nullable = true)
 |-- editorial_review: string (nullable = true)
 |-- formatted_price: string (nullable = true)
 |-- large_image_url: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- medium_image_url: string (nullable = true)
 |-- model: string (nullable = true)
 |-- product_type_name: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- reviews: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- sku: string (nullable = true)
 |-- small_image_url: string (nullable = true)
 |-- title: string (nullable = true)



In [19]:
df.head(1)

[Row(asin='B016I2TS4W', author=None, availability=None, availability_type=None, brand='FNC7C', color=None, editorial_reivew=None, editorial_review="Minions Como Superheroes Ironman Women's O Neck Long Sleeve T-shirt,it Is Made Of 100% Cotton,well Printed Images By Environmental Healthy Inks,Customized T Shirts Will Be The Best And Sincere Gifts For Your Family And Friends.", formatted_price=None, large_image_url='https://images-na.ssl-images-amazon.com/images/I/41cfoWwna2L.jpg', manufacturer=None, medium_image_url='https://images-na.ssl-images-amazon.com/images/I/41cfoWwna2L._SL160_.jpg', model=None, product_type_name='SHIRT', publisher=None, reviews=['false', 'https://www.amazon.com/reviews/iframe?akid=AKIAIAKOAEV2HN4GKVDQ&alinkCode=xm2&asin=B016I2TS4W&atag=123456000e-20&exp=2017-09-05T06%3A59%3A21Z&v=2&sig=LDmdE4MmvXmyykwLtlen%252FtB3%252FlWVYIR%252FyWrzDeDtp%252BI%253D'], sku=None, small_image_url='https://images-na.ssl-images-amazon.com/images/I/41cfoWwna2L._SL75_.jpg', title='Mini

In [20]:
nrows = df.count()
ncols = len(df.columns)

print("# datapoints {}, # features {}".format(nrows, ncols))

# datapoints 183138, # features 19


In [21]:
columns = df.columns
columns

['asin',
 'author',
 'availability',
 'availability_type',
 'brand',
 'color',
 'editorial_reivew',
 'editorial_review',
 'formatted_price',
 'large_image_url',
 'manufacturer',
 'medium_image_url',
 'model',
 'product_type_name',
 'publisher',
 'reviews',
 'sku',
 'small_image_url',
 'title']

In [25]:
df = df.select(['asin','brand','color', 'product_type_name', 'title', 'formatted_price'])

In [26]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- color: string (nullable = true)
 |-- product_type_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- formatted_price: string (nullable = true)



In [27]:
df.head(1)

[Row(asin='B016I2TS4W', brand='FNC7C', color=None, product_type_name='SHIRT', title='Minions Como Superheroes Ironman Long Sleeve Round Neck T-Shirt For Women', formatted_price=None)]

In [28]:
df.rdd.getNumPartitions()

1

In [29]:
df = df.repartition(4)

In [30]:
df.rdd.getNumPartitions()

4

In [34]:
df.show()

+----------+-------------------+--------+-----------------+--------------------+---------------+
|      asin|              brand|   color|product_type_name|               title|formatted_price|
+----------+-------------------+--------+-----------------+--------------------+---------------+
|B0725WJ391|          La Sophia|    null|            SHIRT|La Sophia Women S...|           null|
|B0039GZQ2Y|     Paradise Found|    null|            SHIRT|Monstera Paradise...|           null|
|B0752DBWK9|   Silver Jeans Co.|    null|            SHIRT|Silver Jeans Co. ...|           null|
|B06WV8HLXD|           Junarose|    null|            SHIRT|Junarose Women's ...|           null|
|B01LDMDQTU|Custom Apparel R Us|    null|            SHIRT|Sports Apparel Fo...|           null|
|B074MHZ1JK|   signaturetshirts|Charcoal|            SHIRT|SignatureTshirts ...|           null|
|B01N7M9A5X|              Flank|   Black|            SHIRT|Flank Women Loose...|           null|
|B018T3OL0E|Mexican Clothing C

In [31]:
nrows = df.count()
ncols = len(df.columns)

print("# datapoints {}, # features {}".format(nrows, ncols))

# datapoints 183138, # features 6


In [376]:
# df.describe().show()

In [104]:
#for each row count missing values
(
sp.createDataFrame(
    df
    .rdd
    .map(lambda row: (row[0], sum([c == None for c in row])))
    .filter(lambda r: r[1]>1)
    .collect(), ['asin', 'CountMissing'])
    .orderBy('CountMissing', ascending=False)
    .show(5)
)

+----------+------------+
|      asin|CountMissing|
+----------+------------+
|B01M1HXQ0O|           3|
|B00JENH1XS|           3|
|B00YFV81VU|           3|
|B0742QZV3H|           3|
|B0742R1H7P|           3|
+----------+------------+
only showing top 5 rows



In [105]:
#show row with 3 missing values
df.where('asin == "B016ARDT72"').show()

+----------+-----+-----+-----------------+--------------------+---------------+
|      asin|brand|color|product_type_name|               title|formatted_price|
+----------+-----+-----+-----------------+--------------------+---------------+
|B016ARDT72| null| null|            SHIRT|Women's Tunic Top...|           null|
+----------+-----+-----+-----------------+--------------------+---------------+



In [106]:
# % data missing in each column
cols = ["brand", "color", "product_type_name", "title", "formatted_price"]
op = [( format_number(1 - (count(df[col]) / count("*") ) , 2) ).alias(col + "_mis") for col in cols]
df.select(op).show()

+---------+---------+---------------------+---------+-------------------+
|brand_mis|color_mis|product_type_name_mis|title_mis|formatted_price_mis|
+---------+---------+---------------------+---------+-------------------+
|     0.00|     0.65|                 0.00|     0.00|               0.84|
+---------+---------+---------------------+---------+-------------------+



In [110]:
df = df.dropna(subset=['color','formatted_price'])

In [111]:
nrows = df.count()
ncols = len(df.columns)

print("# datapoints {}, # features {}".format(nrows, ncols))

# datapoints 28385, # features 6


In [118]:
#unique value in each categorical feature
cols = ['product_type_name', 'brand', 'color', 'formatted_price']
op   = [countDistinct(df[col]).alias("unique_" + col) for col in cols]
df.select(op).show()

+------------------------+------------+------------+----------------------+
|unique_product_type_name|unique_brand|unique_color|unique_formatted_price|
+------------------------+------------+------------+----------------------+
|                      56|        3636|        4527|                  3133|
+------------------------+------------+------------+----------------------+



In [113]:
df.groupBy('product_type_name').count().sort('count',ascending=False).show()

+--------------------+-----+
|   product_type_name|count|
+--------------------+-----+
|               SHIRT|21504|
|BOOKS_1973_AND_LATER| 3333|
|             APPAREL| 1053|
|           ACCESSORY|  559|
|      SPORTING_GOODS|  391|
|           OUTERWEAR|  340|
|               DRESS|  177|
|             SWEATER|  155|
|              BLAZER|  127|
|OUTDOOR_RECREATIO...|  106|
|HEALTH_PERSONAL_CARE|   79|
|      TOYS_AND_GAMES|   79|
|          MISC_OTHER|   62|
|           UNDERWEAR|   60|
|           SLEEPWEAR|   49|
|      AUTO_ACCESSORY|   46|
|               PANTS|   40|
|         ETHNIC_WEAR|   34|
|POWERSPORTS_PROTE...|   30|
|               SKIRT|   24|
+--------------------+-----+
only showing top 20 rows



In [121]:
df.groupBy('brand').count().sort('count',ascending=False).show()

+--------------------+-----+
|               brand|count|
+--------------------+-----+
|           TOOGOO(R)|  177|
|           SODIAL(R)|  133|
|               Reiss|  125|
|            Ash City|  124|
|          G by GUESS|  121|
|    Black Temptation|  121|
|           Anna-Kaci|  117|
|               Nanon|  115|
|          Absolutely|  115|
|         Worthington|  106|
|           DSQUARED2|  106|
|   Head Case Designs|  100|
|        Vitamina USA|  100|
|           City Chic|  100|
|                PERI|   98|
|        Horace Small|   96|
|               H'nan|   96|
|       susana monaco|   95|
|Aip Yep Novelty F...|   94|
|                null|   93|
+--------------------+-----+
only showing top 20 rows



In [122]:
df.groupBy('color').count().sort('count',ascending=False).show()

+-------------+-----+
|        color|count|
+-------------+-----+
|        Black| 5181|
|        White| 3220|
|         Blue| 1351|
|          Red| 1095|
|         Pink|  810|
|       Purple|  531|
|         Grey|  478|
|        Green|  469|
|         Gray|  463|
|         Navy|  369|
|        Multi|  330|
|       Yellow|  271|
|       Orange|  239|
|  Multi-color|  208|
|        Beige|  199|
|        Brown|  198|
|        Ivory|  173|
|Multicoloured|  157|
| Multicolored|  124|
|  Black/White|  122|
+-------------+-----+
only showing top 20 rows



In [123]:
df.groupBy('formatted_price').count().sort('count',ascending=False).show()

+---------------+-----+
|formatted_price|count|
+---------------+-----+
|         $19.99|  945|
|          $9.99|  748|
|          $9.50|  601|
|         $14.99|  472|
|          $7.50|  463|
|         $24.99|  414|
|         $29.99|  370|
|          $8.99|  343|
|          $9.01|  336|
|         $16.99|  316|
|         $12.99|  293|
|         $34.99|  272|
|         $17.99|  272|
|         $15.99|  254|
|         $18.99|  244|
|         $54.99|  242|
|         $44.99|  239|
|         $39.99|  218|
|         $11.99|  213|
|         $21.99|  169|
+---------------+-----+
only showing top 20 rows



In [124]:
df.groupBy('title').count().sort('count',ascending=False).show()

+--------------------+-----+
|               title|count|
+--------------------+-----+
|FINEJO Casual Wom...|   47|
|Girlzwalk Women C...|   43|
|Victoria Scoop Ne...|   40|
|Long Sleeve Mock ...|   31|
|Women's FOOTBALL ...|   30|
|Women's Sherlock ...|   30|
|WenHong Women Cut...|   28|
|Crazy Girls Women...|   28|
|LJT Women's 2016 ...|   25|
|LJT Women's 2016 ...|   24|
|Fashion Womens Lo...|   23|
|Tri-Mountain Wome...|   22|
|Yosshita & Neha M...|   17|
|Ash City Core 365...|   17|
|Women's Basic Amb...|   17|
|Ladies Lightweigh...|   17|
|Cutter & Buck L/S...|   16|
|Niceda Women's Ow...|   15|
|Niceda Women's Ca...|   15|
|GUESS Factory Wom...|   14|
+--------------------+-----+
only showing top 20 rows



In [125]:
#handle duplicate data

In [126]:
df.count(), df.select(['brand','color', 'product_type_name', 'title', 'formatted_price']).distinct().count()

(28385, 27015)

In [127]:
#num duplicate rows
28385 - 27015

1370

In [133]:
#some of the duplicated rows
df.groupBy(['brand','color', 'product_type_name', 'title', 'formatted_price']).count().filter('count > 1').show(5)

+----------+-------------+-----------------+--------------------+---------------+-----+
|     brand|        color|product_type_name|               title|formatted_price|count|
+----------+-------------+-----------------+--------------------+---------------+-----+
|G by GUESS|Midnight Wine|            SHIRT|GUESS Factory Wom...|         $17.50|    2|
|     ZAMME|      White E|            SHIRT|ZAMME Women's Sho...|         $26.99|    4|
|    Boohoo|        Ivory|            SHIRT|Boohoo Womens Pet...|         $36.00|    2|
|     Reiss|    Off White|            SHIRT|Reiss Womens Sann...|         $87.99|    2|
| Academyus|        Black|            SHIRT|Academyus Craft M...|          $6.99|    6|
+----------+-------------+-----------------+--------------------+---------------+-----+
only showing top 5 rows



In [135]:
#example duplicate entries
df.where('brand == "Academyus" And color == "Black"').show()

+----------+---------+-----+-----------------+--------------------+---------------+
|      asin|    brand|color|product_type_name|               title|formatted_price|
+----------+---------+-----+-----------------+--------------------+---------------+
|B071KKT2WJ|Academyus|Black|            SHIRT|Academyus Craft M...|          $6.99|
|B071KKSCZW|Academyus|Black|            SHIRT|Academyus Craft M...|          $6.99|
|B071XHXWHP|Academyus|Black|            SHIRT|Academyus Craft M...|          $6.99|
|B072KRWLYG|Academyus|Black|            SHIRT|Academyus Craft M...|          $6.99|
|B0721XY3YV|Academyus|Black|            SHIRT|Academyus Craft M...|          $6.99|
|B071KKSLT3|Academyus|Black|            SHIRT|Academyus Craft M...|          $6.99|
+----------+---------+-----+-----------------+--------------------+---------------+



In [157]:
#drop duplicate entries

df = (
    df.select(df.asin)
    .withColumn("id", monotonically_increasing_id())
    .join(df.select(['brand','color', 'product_type_name', 'title', 'formatted_price'])
          .withColumn("id", monotonically_increasing_id())
          .dropDuplicates(subset=['brand','color', 'product_type_name', 'title', 'formatted_price']), 'id')
    .drop('id')
)


In [158]:
#check duplicates
df.count(), df.select(['brand','color', 'product_type_name', 'title', 'formatted_price']).distinct().count()

(27015, 27015)

In [161]:
df.show(5)

+----------+-------------+-----------+-----------------+--------------------+---------------+
|      asin|        brand|      color|product_type_name|               title|formatted_price|
+----------+-------------+-----------+-----------------+--------------------+---------------+
|B06XBHLW8H|   Isaac Liev|   Burgundy|            SHIRT|Isaac Liev Women'...|         $17.99|
|B01LKD9TK2|Amuse Society|Casa Blanca|            SHIRT|Amuse Society Wom...|         $44.50|
|B0714C9M7S|        Sofra|      White|            SHIRT|Womens Tank Top -...|          $9.94|
|B00CA6LVR8| Cecilia Pink|      Green|            SHIRT|Cecilia Pink Wome...|          $9.99|
|B012XYG53G|    SODIAL(R)|      Black|   SPORTING_GOODS|SODIAL(R) Women's...|         $16.72|
+----------+-------------+-----------+-----------------+--------------------+---------------+
only showing top 5 rows



In [183]:
#remove records with short titles
df.select(length(col('title')).alias("len")).orderBy('len').show(5)

+---+
|len|
+---+
|  6|
|  6|
|  6|
|  6|
|  7|
+---+
only showing top 5 rows



In [None]:
# df.filter(length(col('title')) > 4).show(5)

In [159]:
#remove duplicate titles
df.count(), df.select(['title']).distinct().count()

(27015, 26060)

In [160]:
#num duplicate titles
27015 - 26060

955

In [162]:
df.groupby('title').count().filter('count > 1').orderBy('count', ascending=False).show(5)

+--------------------+-----+
|               title|count|
+--------------------+-----+
|WenHong Women Cut...|   28|
|FINEJO Casual Wom...|   28|
|Crazy Girls Women...|   25|
|Girlzwalk Women C...|   25|
|Yosshita & Neha M...|   14|
+--------------------+-----+
only showing top 5 rows



In [185]:
#drop duplicate titles
df = (
    df.select(df.asin)
    .withColumn("id", monotonically_increasing_id())
    .join(df.select(['brand','color', 'product_type_name', 'title', 'formatted_price'])
          .withColumn("id", monotonically_increasing_id())
          .dropDuplicates(subset=['title']), 'id')
    .drop('id')
)

df.cache()


DataFrame[asin: string, brand: string, color: string, product_type_name: string, title: string, formatted_price: string]

In [186]:
#check operation status
df.count(), df.select(['title']).distinct().count()

(26060, 26060)

In [187]:
#Todo
#remove entries with similar title

In [188]:
#Text normalization

In [202]:
from pyspark.ml.feature import StopWordsRemover, Tokenizer, CountVectorizer, IDF, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

In [204]:
tokenizer = Tokenizer(inputCol='title', outputCol='token')
stop_word_remover = StopWordsRemover(inputCol='token', outputCol='token_no_stop_words')
word_vectorizer = CountVectorizer(inputCol='token_no_stop_words', outputCol='c_vec')
tfidf = IDF(inputCol='c_vec', outputCol='tfidf')

brand_indexer = StringIndexer(inputCol='brand', outputCol='brand_index')
brand_encoder = OneHotEncoder(inputCol='brand_index', outputCol='brand_vec')

color_indexer = StringIndexer(inputCol='color', outputCol='color_index')
color_encoder = OneHotEncoder(inputCol='color_index', outputCol='color_vec')

assembler = VectorAssembler(inputCols=['tfidf', 'brand_vec', 'color_vec'], outputCol='features')


In [205]:
pipeline = Pipeline(stages=[tokenizer, stop_word_remover, word_vectorizer, tfidf, brand_indexer, brand_encoder, color_indexer, color_encoder, assembler])

In [347]:
# data_cleaning_pipe = pipeline.fit(df.sample(False, 0.01, 42))
data_cleaning_pipe = pipeline.fit(df)

In [348]:
clean_data = data_cleaning_pipe.transform(df)
# clean_data.cache()

In [349]:
clean_data.show(5)

+----------+-------------+--------------+-----------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+-----------+-------------------+-----------+-------------------+--------------------+
|      asin|        brand|         color|product_type_name|               title|formatted_price|               token| token_no_stop_words|               c_vec|               tfidf|brand_index|          brand_vec|color_index|          color_vec|            features|
+----------+-------------+--------------+-----------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+-----------+-------------------+-----------+-------------------+--------------------+
|B01NBQSBMN|   Royal Silk|        Orange|            DRESS|Sleek Tahiti Tuni...|         $85.00|[sleek, tahiti, t...|[sleek, tahiti, t...|(16195,[8,29,35,5...|(16195,[8,29,35,5...|     1188.0|(3628,[118

In [372]:
query = clean_data.where('asin ="B01NBQSBMN"').rdd.map(lambda row: row.features).take(1)[0]

In [373]:
result = clean_data.rdd.map(lambda row:  (row.asin, row.features.squared_distance(query) )).take(200)

In [363]:
best_match = sorted(result, key=lambda x: x[1])[:3]

In [364]:
asins = []
for asin, _ in best_match:
    asins.append(asin)

In [374]:
asins

['B01NBQSBMN', 'B01EUKOEL6', 'B01F2UDTT6']

In [375]:
df.filter(col('asin').isin(asins)).show()

+----------+--------------+------+-----------------+--------------------+---------------+
|      asin|         brand| color|product_type_name|               title|formatted_price|
+----------+--------------+------+-----------------+--------------------+---------------+
|B01NBQSBMN|    Royal Silk|Orange|            DRESS|Sleek Tahiti Tuni...|         $85.00|
|B01F2UDTT6| Toby Nicholas|   Red|            SHIRT|Summer Fashion Re...|         $22.00|
|B01EUKOEL6|Cynthia Rowley| White|            SHIRT|Silk Georgette Bl...|        $352.00|
+----------+--------------+------+-----------------+--------------------+---------------+



In [377]:
sp.stop()