# Sample Code

In [1]:
!apt-get -y install openjdk-8-jre-headless
!pip install pyspark

Reading package lists... Done
Building dependency tree       
Reading state information... Done
Suggested packages:
  libnss-mdns fonts-dejavu-extra fonts-ipafont-gothic fonts-ipafont-mincho
  fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  openjdk-8-jre-headless
0 upgraded, 1 newly installed, 0 to remove and 37 not upgraded.
Need to get 28.2 MB of archives.
After this operation, 104 MB of additional disk space will be used.
Ign:1 http://archive.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jre-headless amd64 8u292-b10-0ubuntu1~18.04
Err:1 http://security.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jre-headless amd64 8u292-b10-0ubuntu1~18.04
  404  Not Found [IP: 91.189.88.152 80]
E: Failed to fetch http://security.ubuntu.com/ubuntu/pool/universe/o/openjdk-8/openjdk-8-jre-headless_8u292-b10-0ubuntu1~18.04_amd64.deb  404  Not Found [IP: 91.189.88.152 80]
E: Unable to fetch some archives, maybe run apt-get update o

In [2]:
from pyspark.sql import SparkSession, DataFrame, Window
from pyspark.sql.functions import col, regexp_extract, regexp_replace, lit, when
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.utils import AnalysisException
from pyspark import StorageLevel

import sys
from datetime import datetime, timedelta
from functools import reduce
from itertools import chain

In [3]:
spark = SparkSession.builder \
    .appName("content-based") \
    .config("spark.sql.files.ignoreCorruptFiles", "true") \
    .config("spark.sql.session.timeZone", "Asia/Taipei") \
    .getOrCreate()

## 基礎建設

In [4]:
import pandas as pd
import gzip, json

def parse(path):
    g = gzip.open(path, 'rb')
    for l in g:
        yield json.loads(l)

def getDF(path):
    i = 0
    df = {}
    for d in parse(path):
        df[i] = d
        i += 1
    return pd.DataFrame.from_dict(df, orient='index')

## 載入資料

In [5]:
!wget http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/All_Beauty.csv
!wget http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles2/meta_All_Beauty.json.gz

--2021-12-26 10:10:48--  http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/All_Beauty.csv
Resolving deepyeti.ucsd.edu (deepyeti.ucsd.edu)... 169.228.63.50
Connecting to deepyeti.ucsd.edu (deepyeti.ucsd.edu)|169.228.63.50|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 15499476 (15M) [application/octet-stream]
Saving to: ‘All_Beauty.csv.7’


2021-12-26 10:10:49 (23.2 MB/s) - ‘All_Beauty.csv.7’ saved [15499476/15499476]

--2021-12-26 10:10:49--  http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles2/meta_All_Beauty.json.gz
Resolving deepyeti.ucsd.edu (deepyeti.ucsd.edu)... 169.228.63.50
Connecting to deepyeti.ucsd.edu (deepyeti.ucsd.edu)|169.228.63.50|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10329961 (9.9M) [application/octet-stream]
Saving to: ‘meta_All_Beauty.json.gz.7’


2021-12-26 10:10:50 (18.3 MB/s) - ‘meta_All_Beauty.json.gz.7’ saved [10329961/10329961]



In [6]:
# schema_rating = StructType([
#                     StructField("asin", StringType())
#                       , StructField("reviewerID", StringType())
#                       , StructField("overall", StringType())
#                       , StructField("unixReviewTime", StringType())
#                       ])

metadata = spark.read.json("file:////content/meta_All_Beauty.json.gz")
# ratings = spark.read.format("csv").option("header", True).schema(schema_rating).load("file:///content/All_Beauty.csv")

# metadata = getDF('/content/meta_All_Beauty.json.gz')
ratings = pd.read_csv('/content/All_Beauty.csv', names=['asin', 'reviewerID', 'overall', 'unixReviewTime'], header=None)
# metadata.head()
# ratings.head()

## 資料整理

In [7]:
metadata = metadata.select(
                col("asin")
                , col("title")
                , func.when(col("title") == '', 0).otherwise(1).alias("if_title")
                , col("feature")
                , func.when(func.size("feature") == 0, 0).otherwise(1).alias("if_feature")
                , col("description")
                , func.when(func.size("description") == 0, 0).otherwise(1).alias("if_description")
                , func.translate(col("price"), ',$', '').cast(DoubleType()).alias("price")
                , col("imageURL"), col("imageURLHighRes")
                , func.when(func.size("imageURL") == 0, 0).otherwise(1).alias("if_imageURL")
                , func.when(func.size("imageURLHighRes") == 0, 0).otherwise(1).alias("if_imageURLHighRes")
                , col("also_buy"), col("also_view")
                , func.translate(func.split(col("rank"), ' in ')[0], ',', '').cast(IntegerType()).alias("rank_sub_category")
                , func.translate(func.split(col("rank"), ' in ')[1], '()"];', '').alias("sub_category")
                , col("brand")
                , col("tech1")
                , func.when(func.length("tech1") == 0, 0).otherwise(1).alias("if_tech1")
                , col("similar_item"), col("date")
                , col("details.\n    Item Weight: \n    ").alias("item_weight")
                , col("details.\n    Product Dimensions: \n    ").alias("product_dimensions")
                , col("details.Batteries").alias("batteries")
                , func.when(col("details.Batteries").isNull(), 0).otherwise(1).alias("if_batteries")
                , col("details.Discontinued by manufacturer:").alias("discountedByManufacturer")
                , func.when(col("details.Discontinued by manufacturer:").isNull(), 0).otherwise(1).alias("if_discountedByManufacturer")
                , col("details.Domestic Shipping: ").alias("domestic_shipping")
                , col("details.International Shipping: ").alias("international_shipping")
                , col("details.Item model number:").alias("item_model_no")
                , col("details.Shipping Advisory:").alias("shipping_advisory")
                , col("details.Shipping Weight:").alias("shipping_weight")
                )

In [8]:
metadata = metadata.select(col("*"), when(col("sub_category") == 'Beauty & Personal Care ', 'Beauty & Personal Care')
                  .when(col("sub_category") == 'Beauty & Personal Care See Top 100', 'Beauty & Personal Care')
                  .when(col("sub_category") == 'Beauty & Personal Care See top 100', 'Beauty & Personal Care')
                  .when(col("sub_category") == 'Beauty &amp Personal Care ', 'Beauty & Personal Care')
                  .when(col("sub_category") == 'Grocery & Gourmet Food ', 'Grocery & Gourmet Food')
                  .when(col("sub_category") == 'Grocery & Gourmet Food See Top 100', 'Grocery & Gourmet Food')
                  .when(col("sub_category") == 'Grocery &amp Gourmet Food ', 'Grocery & Gourmet Food')
                  .when(col("sub_category") == 'Health & Household ', 'Health & Household')
                  .when(col("sub_category") == 'Health &amp Household ', 'Health & Household')
                  .when(col("sub_category").isNull(), 'no Category')
                  .otherwise(col("sub_category")).alias("subcategory"))

In [9]:
# if the info on below variables are not null, then sum as 1
# log10 rank by sub_category and multiply 100 as score
metadata_scored = metadata.withColumn('score_if', func.sum(col("if_title")+col("if_feature")+col("if_description")+col("if_imageURL") \
                                      +col("if_imageURLHighRes")+col("if_tech1")+col("if_batteries") \
                                      +col("if_discountedByManufacturer")).over(Window.partitionBy("asin"))) \
                          .withColumn('score_rank', (100 / func.log10(col("rank_sub_category"))).alias("score_rank")) \
                          .withColumn('product_score', col("score_if") + col("score_rank"))
metadata_selected = metadata_scored.select("asin", "brand", "subcategory", "product_score")

In [10]:
# metadata.stat.corr("rank_sub_category", "score_if")
# metadata_scored.summary().show(10, False)
metadata_selected.summary().show(10, False)

+-------+-------------------+--------+-----------+------------------+
|summary|asin               |brand   |subcategory|product_score     |
+-------+-------------------+--------+-----------+------------------+
|count  |32892              |32892   |32892      |32452             |
|mean   |8.525224324631579E9|Infinity|null       |19.32704697297634 |
|stddev |1.088695684419192E9|NaN     |null       |1.9722009771916256|
|min    |6546546450         |        |Automotive |16.286408057528053|
|25%    |7.41420479E9       |-417.0  |null       |17.912292819689   |
|50%    |8.867326759E9      |1907.0  |null       |19.21833163786428 |
|75%    |9.623402791E9      |Infinity|null       |20.272914982707913|
|max    |B01HJEGTYK         |zonman  |no Category|68.7639852180729  |
+-------+-------------------+--------+-----------+------------------+



In [11]:
metadata_pd = metadata_selected.toPandas()

In [12]:
ratings['DATE'] = pd.to_datetime(ratings['unixReviewTime'], unit='s')
ratings['avg_review'] = ratings.groupby(['asin'])['overall'].transform('mean')
# ratings = ratings.select("asin", "reviewerID", col("overall").cast(DoubleType()).alias("overall")
#               , func.from_unixtime("unixReviewTime","yyyy-MM-dd HH:mm:ss").alias("timestamp"))

In [13]:
ratings_metadata = pd.merge(ratings,metadata_pd,on='asin',how='left')
ratings_metadata.head(2)

Unnamed: 0,asin,reviewerID,overall,unixReviewTime,DATE,avg_review,brand,subcategory,product_score
0,143026860,A1V6B6TNIC10QE,1.0,1424304000,2015-02-19,4.117647,,,
1,143026860,A2F5GHSXFQ0W6J,4.0,1418860800,2014-12-18,4.117647,,,


In [14]:
ratings_metadata['brand_loyalty'] = ratings_metadata.groupby(["reviewerID", "brand"])["asin"].transform('count')
ratings_metadata['brand_loyalty'].fillna(1, inplace=True)
ratings_metadata['subcategory_loyalty'] = ratings_metadata.groupby(["reviewerID", "subcategory"])["asin"].transform('count')
ratings_metadata['subcategory_loyalty'].fillna(1, inplace=True)
ratings_metadata['product_popularity'] = ratings_metadata.groupby(["asin"])["reviewerID"].transform('count')
ratings_metadata['product_score'].fillna(1, inplace=True)
ratings_metadata.head(2)

Unnamed: 0,asin,reviewerID,overall,unixReviewTime,DATE,avg_review,brand,subcategory,product_score,brand_loyalty,subcategory_loyalty,product_popularity
0,143026860,A1V6B6TNIC10QE,1.0,1424304000,2015-02-19,4.117647,,,1.0,1.0,1.0,17
1,143026860,A2F5GHSXFQ0W6J,4.0,1418860800,2014-12-18,4.117647,,,1.0,1.0,1.0,17


In [15]:
ratings_metadata.describe()

Unnamed: 0,overall,unixReviewTime,avg_review,product_score,brand_loyalty,subcategory_loyalty,product_popularity
count,387654.0,387654.0,387654.0,387654.0,387654.0,387654.0,387654.0
mean,4.118012,1438586000.0,4.118012,23.070007,1.244739,1.470146,900.689311
std,1.358516,73180790.0,0.643092,7.057271,0.551563,1.086152,1945.473119
min,1.0,947462400.0,1.0,1.0,1.0,1.0,1.0
25%,4.0,1413936000.0,3.857143,20.085894,1.0,1.0,20.0
50%,5.0,1456186000.0,4.238095,21.776706,1.0,1.0,114.0
75%,5.0,1485302000.0,4.529889,24.849992,1.0,2.0,589.0
max,5.0,1538438000.0,5.0,68.763985,13.0,26.0,8672.0


## 資料切分

In [16]:
ratings_trainings = ratings_metadata[
    (ratings_metadata['DATE'] < '2018-09-01')
]
ratings_testings = ratings_metadata[
    (ratings_metadata['DATE'] >= '2018-09-01') & 
    (ratings_metadata['DATE'] <= '2018-09-30')
]
ratings_testings_by_user = ratings_testings.groupby('reviewerID').agg(list).reset_index()[['reviewerID', 'asin']].to_dict('records')
ratings_testings_by_user = { rating['reviewerID']: rating['asin'] for rating in ratings_testings_by_user }
users = list(ratings_testings_by_user.keys())

## 產生推薦

In [23]:
product_df = ratings_trainings[['asin', 'product_score', 'product_popularity', 'avg_review']].drop_duplicates()
# review_df = ratings_trainings[['reviewerID', 'asin', 'avg_review']].drop_duplicates()

In [None]:
for user in users:
  # choose top 2 brands for user
  brand_list = []
  index = ratings_trainings[ratings_trainings['reviewerID'] == 'AZZZ5UJWUVCYZ']["brand_loyalty"].nlargest(2).index
  for id in index:
    brand_list.append(ratings_trainings['brand'][id])
  brand_list = list(dict.fromkeys(brand_list))

  product_list = []
  for brand in brand_list:
    prod_list = ratings_trainings[ratings_trainings['brand'] == brand]['asin'].tolist()
    prod_list = list(dict.fromkeys(prod_list))
    product_list.append(prod_list)
  product_list = list(chain(*product_list))

  # review_df = ratings_trainings[['reviewerID', 'asin', 'overall']]
  # product_score / avg_review / product_popularity
  top25_by_productScore = product_df[product_df['asin'].isin(product_list)].nlargest(25, "product_score")
  top15_by_avgReview = top25_by_productScore.nlargest(15, "avg_review")
  if len(top15_by_avgReview.asin) >= 10:
    bottom = top15_by_avgReview.nsmallest(5, "product_popularity")['asin'].tolist()
    top = top15_by_avgReview.nlargest(5, "product_popularity")['asin'].tolist()
    recommendation = bottom + top
  else:
    sample = len(top15_by_avgReview.asin)
    random_sample = 10 - len(top15_by_avgReview.asin)
    bottom = top15_by_avgReview.nsmallest(sample, "product_popularity")['asin'].tolist()
    top = product_df[product_df['asin'].isin(product_list)]['asin'].sample(n=random_sample).tolist()
    recommendation = bottom + top


In [19]:
def recommender(training_data, users=[], k=10):
    '''
    * training_data: dataframe 輸入的訓練資料集（2018-09-01 以前資料）
    * users: [] 需要被推薦的使用者
    * k: int 每個使用者需要推薦的商品數
    * recommendations: dict
      {
          使用者一： [推薦商品一, 推薦商品二, ...],
          使用者二： [...], ...
      }
    '''
    recommendations = {}
    '''
    ruled-based
    '''
    ratings_trainings = training_data
    recommendations = {user: ratings_trainings['asin'].sample(n=k).tolist() for user in users}
    return recommendations


ratings_by_user = recommender(ratings_trainings, users)
ratings_by_user

{'A100XQFWKQ30O2': ['B018WCT01C',
  'B016I2W7E0',
  'B015UVIX84',
  'B0064GQHNS',
  'B01HD23OJG',
  'B00RTTSPIO',
  'B00528OB14',
  'B01BHM6K1C',
  'B0067F28ZW',
  'B01FNJ9MOW'],
 'A103T1QOGFCSEH': ['B018J05XSQ',
  'B005C18UTK',
  'B0012Y0ZG2',
  'B00RNKER4U',
  'B001MKP8C2',
  'B001AJ6YS2',
  'B01AISX5HS',
  'B00FBFSOKC',
  'B000URXP6E',
  'B00M8EWFX6'],
 'A106UKKSJ2KXPF': ['B00U7AIC62',
  'B00JDZ3O2O',
  'B00YQ28IQQ',
  'B00BOJRGGM',
  'B0091OCA86',
  'B01CM29QF8',
  'B000WYJTZG',
  'B00PMQAWMI',
  'B01DJI7796',
  'B018ZLUTTW'],
 'A10A7GV4D5A11V': ['B00COGVXNQ',
  'B01B6YTFRC',
  'B000GG7NU8',
  'B0017TNZZU',
  'B0000530HU',
  'B00AFCOWJS',
  'B000050FDY',
  'B000FED5DU',
  'B00B5OXGXI',
  'B006WYJM8Y'],
 'A1119JJ37ZLB8R': ['B01BY2V64Q',
  'B000VV1YOY',
  'B01ARKX9ZU',
  'B0014H4YDW',
  'B00KGC1TD4',
  'B01DSA4QIK',
  'B00005JS5C',
  'B0001TMDF0',
  'B01BUH44RQ',
  'B01CTQC9J2'],
 'A113UOOLBSZN52': ['B00ED5HCXG',
  'B00EP2ZX7Y',
  'B0140HC1NC',
  'B00435KHQA',
  'B0070Z6Y2G',
  'B012

## 結果評估

In [20]:
def evaluate(ratings_testings_by_user={}, ratings_by_user={}, method=None):
    '''
    * ratings_testings_by_user: dict 真實被購買的商品資料（2018-09-01 以後資料）
    * ratings_by_user: dict 利用訓練資料學習的推薦商品
    * method: str
    * score: float
    '''
    total = 0
    for d in ratings_testings_by_user:
        if d in ratings_by_user:
            total += len(set(ratings_by_user[d]) & set(ratings_testings_by_user[d]))

    score = total / len(ratings_testings)
    return score

evaluate(ratings_testings_by_user, ratings_by_user)

0.013559322033898305

In [21]:
def recommender_random(training_data, users=[], k=2):
    '''
    * training_data: dataframe 輸入的訓練資料集（2018-09-01 以前資料）
    * users: [] 需要被推薦的使用者
    * k: int 每個使用者需要推薦的商品數
    * recommendations: dict
      {
          使用者一： [推薦商品一, 推薦商品二, ...],
          使用者二： [...], ...
      }
    '''
    recommendations = {}
    '''
    random-based
    '''
    ratings_trainings = training_data
    recommendations = {user: ratings_trainings['asin'].sample(n=k).tolist() for user in users}
    return recommendations
ratings_by_user = recommender_random(ratings_trainings, users)
ratings_by_user

{'A100XQFWKQ30O2': ['B001AJ8YGC', 'B00P088748'],
 'A103T1QOGFCSEH': ['B00IIZG80U', 'B008SK2D54'],
 'A106UKKSJ2KXPF': ['B00H1M8R3C', 'B008U1Q4DI'],
 'A10A7GV4D5A11V': ['B006GDB518', 'B00021DJ1E'],
 'A1119JJ37ZLB8R': ['B01ERY5TW8', 'B0067F28ZW'],
 'A113UOOLBSZN52': ['B00YI0XFUU', 'B00IO7WB26'],
 'A12M4U7WK4ALCR': ['B005IHT94S', 'B0067F28ZW'],
 'A12T8YTW6VWT7S': ['B00999HMS6', 'B0002JHI1I'],
 'A1364JXGKB46MM': ['B00LS6FEDI', 'B00QUBLCX2'],
 'A137DALOQFKBTI': ['B01GLA54SA', 'B000WZH8EY'],
 'A13FEZ3WV7S2EY': ['B01BJZ7WV4', 'B00005JS5C'],
 'A13IV4I1B0RXMG': ['B00CFCUZME', 'B000050B6B'],
 'A13JU88JAHN72I': ['B00W259T7G', 'B00VIO80N6'],
 'A13K55R6VH1OOD': ['B017OXQKYA', 'B00NT0AR7E'],
 'A13P7VFU075A': ['B000050FDY', 'B001QY8QXM'],
 'A13SWYE4QLB6NG': ['B00020UR4C', 'B00VF344X0'],
 'A13ZTQ0Q4ATA41': ['B0089A9YZU', 'B001UQ70NS'],
 'A142EDN04OD62U': ['B006WYJM8Y', 'B00ZYYIL2Q'],
 'A142I22FIC8MZK': ['B001EWEP9A', 'B004K4J95O'],
 'A14834QTII5TLT': ['B004E4GHRS', 'B015PV6SWC'],
 'A14A447VPACTBC': ['B