## CA4022 - Amazon Recommender System

The following notebook is used to read in the data, create the model and handle recommendations for a new user.

We will first start by installing pyspark and importing the relevant libraries.

In [1]:
!pip3 install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 58.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=a17d3ec1d723e65e90a893bbacd79262deb20c016e08f41ecdc5b33f1e9623f5
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


Import Libraries

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

from pyspark.sql import SparkSession, functions as F

import pandas as pd
import numpy as np
import pickle

Take the separate TSV files and concatenate them into one TSV that we can read in as a pyspark dataframe.

In [21]:
from google.colab import drive
import pandas as pd
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
books = pd.read_csv("drive/MyDrive/CA4022/total_Books.tsv", sep="\t", header = 0, quoting=3)

In [6]:
videos = pd.read_csv("drive/MyDrive/CA4022/total_Video DVD.tsv", sep="\t", header = 0, quoting=3)

  exec(code_obj, self.user_global_ns, self.user_ns)


In [7]:
games = pd.read_csv("drive/MyDrive/CA4022/total_Video Games.tsv", sep="\t", header = 0, quoting=3)

In [8]:
total = pd.concat([books, videos, games])

Now we will drop the rows with nulls

In [9]:
# Drop the 2 rows that have no values present
total = total[~total['product_parent'].isna()]

The product id is an alphanumeric string, which is not supported by out ALS model. So we will need to convert this.

In [10]:
# We need to convert the alphanumeric id in product id to a numeric one.
# There are no real options here that I can find to make use of functions, so I will manually do this.
# Some of the most related were zip with index or zip with unique id in spark, but these don't account for duplicates it seems.
# For ease of transforming the data, the new id will just be the latest index of the item.
old_id = set(total["product_id"].values)
new_id = [i for i in range(len(old_id))]

old_new_map = {k: v for k, v in zip(old_id, new_id)}

In [11]:
def update_id(value):
  return old_new_map[value]

total["new_product_id"] = total["product_id"].apply(lambda x: update_id(x))

### Consideration before training

Users have varied ratings, some users rate more harsh than other or some more lenient. So it may be worth normalizing a users ratings so we can understand how much they liked an item compared to how they usually rate items.

In [12]:
average_rating = total[["customer_id", "rating"]].groupby("customer_id").mean()

In [13]:
average_rating = average_rating.reset_index()

In [14]:
user_dict = {k: v for k, v in average_rating[["customer_id", "rating"]].values}

In [15]:
total['normalized_rating'] = total.apply(lambda row : row['rating'] - user_dict[row["customer_id"]], axis = 1)

Now we can look at one user and see if their ratings have been normalizaed.

In [25]:
total[total["customer_id"] == 52970208]

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,rating,helpful_votes,total_votes,new_product_id,normalized_rating
1975472,US,52970208,R1CET7QY7O0NTD,B000CELOVI,95055651.0,Charles in Charge: Season 1,Video DVD,5.0,2.0,4.0,617414,2.625
1975473,US,52970208,R2W6WGFMG2395B,B00065GWZQ,689385145.0,Paradise,Video DVD,2.0,21.0,25.0,544544,-0.375
1975474,US,52970208,R3PK4I41ZDM98U,B0001ENX5E,75542491.0,"Sex and the City: Season 6, Part 1",Video DVD,1.0,34.0,41.0,661840,-1.375
2967235,US,52970208,R20JM855NQEEW1,B0001CCXZW,526357477.0,Gilmore Girls: Season 1,Video DVD,2.0,17.0,29.0,613186,-0.375
2967236,US,52970208,R2EAMKHFWLLZSG,B00003CXTG,283264319.0,Pearl Harbor,Video DVD,2.0,2.0,3.0,1069719,-0.375
2967237,US,52970208,R330BNV0XU4H6M,B00005Q4DZ,774205929.0,Deeply,Video DVD,4.0,3.0,7.0,207206,1.625
3959114,US,52970208,R1QFNUXQ9ZS2TJ,B00012QM8G,335815469.0,Schindler's List (Widescreen Edition),Video DVD,2.0,1.0,7.0,533230,-0.375
3959115,US,52970208,R3I3OB7J7BBKUR,B000HC2LVW,254221913.0,"Chronicles of Narnia - The Lion, the Witch & t...",Video DVD,1.0,18.0,54.0,904152,-1.375


In [26]:
total.to_csv("drive/MyDrive/CA4022/total.tsv", sep="\t", index = False)

In [27]:
total['new_product_id'] = total['new_product_id'].astype('int')

Now let us create a mapping from the product id to the product title so we can find the item from the id.

In [28]:
# Create a dictionary of pairs so we can get the name of products later
item_dict = {k: v for k, v in total[["new_product_id", "product_title"]].values}

with open('drive/MyDrive/CA4022/item_dict.pickle', 'wb') as p:
    pickle.dump(item_dict, p, protocol=pickle.HIGHEST_PROTOCOL)

In [29]:
# It seems that the title is not unique, items can have the same name but be considered different. Possibly different sellers
vals = [v for v in item_dict.values() if "Shrek" in v]
print(len(set(vals)), len(vals))

111 149


In [30]:
reviews = total[["new_product_id", "rating"]].groupby("new_product_id").count()
reviews.sort_values("rating", ascending=False)

Unnamed: 0_level_0,rating
new_product_id,Unnamed: 1_level_1
835354,10361
525702,4625
65039,4620
450165,4463
311100,4402
...,...
241198,1
677208,1
241204,1
241207,1


In [31]:
total[total["new_product_id"] == 4402]

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,rating,helpful_votes,total_votes,new_product_id,normalized_rating
403706,US,50122160,R3M8T3ZOT0G8I0,962965146,854243470.0,The Litter of Leaving: Collected Poems,Books,5.0,1.0,1.0,4402,0.001931


## Training Model
Initialise Spark Session

In [4]:
spark = SparkSession.builder.appName("amazon_recommender").getOrCreate()

Read the total data into a spark dataframe

In [5]:
df = spark.read.options(delimiter='\t').csv('drive/MyDrive/CA4022/total.tsv', inferSchema=True, header=True)

In [7]:
df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+------+-------------+-----------+--------------+-----------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|rating|helpful_votes|total_votes|new_product_id|normalized_rating|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+------+-------------+-----------+--------------+-----------------+
|         US|   12068447|R1XHTJ69WFA79N|0240807316|  2.09406469E8|Sports Media:  Re...|           Books|   5.0|          8.0|        8.0|       1005097|              0.0|
|         US|   12069735|R3GBNWUL5TX127|B0009A0GXW|  3.55154473E8|The Secret Goldfi...|           Books|   5.0|          3.0|        3.0|        384052|              0.0|
|         US|   12070636|R3VK6TOXURV3Q2|0316107719|  5.29689635E8|The Baby Sleep Bo...|           Books|   3.0|        111.0|      120.0|        

In [6]:
df = df.withColumn("customer_id",df.customer_id.cast('int'))

In [7]:
df = df.withColumn("new_product_id",df.new_product_id.cast('int'))

In [8]:
df = df.withColumn("rating",df.rating.cast('double'))

In [9]:
df = df.drop("marketplace","review_id","product_id", "product_parent", "helpful_votes", "total_votes", "rating")

In [14]:
df.show()

+-----------+--------------------+----------------+--------------+------------------+
|customer_id|       product_title|product_category|new_product_id| normalized_rating|
+-----------+--------------------+----------------+--------------+------------------+
|   12068447|Sports Media:  Re...|           Books|       1005097|               0.0|
|   12069735|The Secret Goldfi...|           Books|        384052|               0.0|
|   12070636|The Baby Sleep Bo...|           Books|        942355|               0.0|
|   12070768|Firehouse Food: C...|           Books|        330029|               0.0|
|   12070940|             Hatchet|           Books|        551123|               0.0|
|   12071875|The Zartarbia Tal...|           Books|       1091547|               0.0|
|   12072043|Gerbils: The Comp...|           Books|        958107|               0.0|
|   12072415|Introducing Micro...|           Books|        165881|               0.0|
|   12073588|The Case for Hill...|           Books|   

Split data into train and test

In [17]:
df = df.filter(~df.rating.isNull())

AttributeError: ignored

In [13]:
train, test = df.randomSplit([0.8, 0.2], seed=42)

Build ALS model.

itemCol is an interesting problem. Values have to be numeric so we can't use product_id as is.

coldStartStrategy = 'drop' means to drop new users as they have no reference to recommend.


In [10]:
als = ALS(maxIter=12, regParam=0.2, rank=12, userCol="customer_id", itemCol="new_product_id", ratingCol='normalized_rating', coldStartStrategy='drop', nonnegative=False)

The following code was for use tuning the parameters, on colab it doesn't seem like there is enough compute power (GPU instances are available but I was unable to get pyspark to use the GPU).

In [24]:
param_grid = ParamGridBuilder().addGrid(als.rank, [8, 10, 12]).addGrid(als.maxIter, [10, 11, 12]).addGrid(als.regParam, [.1, .15, .2]).build()

In [16]:
eval = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol='prediction')

In [17]:
tvs = TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid, evaluator=eval)

In [None]:
best_model = model.bestModel

Now we can train our model

In [11]:
model = als.fit(train)

Let us make some sample predictions and see how accurate they seem.

In [14]:
pred = model.transform(test)

In [15]:
pred.show(10)

+-----------+--------------------+----------------+--------------+-------------------+-----------+
|customer_id|       product_title|product_category|new_product_id|  normalized_rating| prediction|
+-----------+--------------------+----------------+--------------+-------------------+-----------+
|   16462099|Julia Child! Amer...|       Video DVD|             3|0.33333333333333304| 0.21960293|
|   30607712|Julia Child! Amer...|       Video DVD|             3|               1.25|  0.8169154|
|   31594910|Julia Child! Amer...|       Video DVD|             3|                0.5| 0.21618445|
|   51257355|Masquerade: The C...|           Books|            12|                0.0|        0.0|
|   51856095|Masquerade: The C...|           Books|            12| 0.3793103448275863|0.118816815|
|   51894318|The Reagan Years ...|           Books|            31| 1.5588235294117645|  1.4034457|
|   52997019|Heaven Below: Ear...|           Books|            34|                0.0|        0.0|
|   530519

In [16]:
eval = RegressionEvaluator(metricName="rmse", labelCol="normalized_rating", predictionCol='prediction')

The root mean squared error of the model. This is absolutely awful. Maybe we need to set a threshold for the amount of ratings a user has to have.

In [17]:
rmse = eval.evaluate(pred)
print(rmse)

0.43931763823993336


In [None]:
user_1 = test.filter(test['customer_id'] == 10206).select(['product_id', 'customer_id'])

In [None]:
user_1.show()

+-----------+-----------+-------------+----------+--------------+--------------------+----------------+------+-------------+-----------+
|marketplace|customer_id|    review_id|product_id|product_parent|       product_title|product_category|rating|helpful_votes|total_votes|
+-----------+-----------+-------------+----------+--------------+--------------------+----------------+------+-------------+-----------+
|         US|      10206|R1ALPEI2SO0D1|B00HNYWFMC|     805852390| Far Cry Compliation|     Video Games|     5|            0|          0|
|         US|      10206|RLQIPPSIW9KQ0|B0053WVBSA|     603885070|Happy Feet Two: T...|     Video Games|     3|            0|          1|
+-----------+-----------+-------------+----------+--------------+--------------------+----------------+------+-------------+-----------+



In [None]:
rec=model.transform(user_1)

In [None]:
rec.orderBy('prediction', ascending=False).show()

+-----------+-----------+-------------+----------+--------------+--------------------+----------------+------+-------------+-----------+----------+
|marketplace|customer_id|    review_id|product_id|product_parent|       product_title|product_category|rating|helpful_votes|total_votes|prediction|
+-----------+-----------+-------------+----------+--------------+--------------------+----------------+------+-------------+-----------+----------+
|         US|      10206|RLQIPPSIW9KQ0|B0053WVBSA|     603885070|Happy Feet Two: T...|     Video Games|     3|            0|          1|-0.5765556|
|         US|      10206|R1ALPEI2SO0D1|B00HNYWFMC|     805852390| Far Cry Compliation|     Video Games|     5|            0|          0| -0.988089|
+-----------+-----------+-------------+----------+--------------+--------------------+----------------+------+-------------+-----------+----------+



From here down is actually making a recommendation. 

It's taking 3 users and recommending 3 products each to them.

In [None]:
users = df.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 3)

In [None]:
userSubsetRecs.show()

+-----------+--------------------+
|customer_id|     recommendations|
+-----------+--------------------+
|   12134452|[{40467685, 2.801...|
|   12182893|[{843965595, 5.08...|
|   12226553|[{92090520, 4.945...|
+-----------+--------------------+



In [20]:
# Save the model for use later
model.save("drive/MyDrive/CA4022/models/full_normal_rank12_iter12_reg0.2_model.dat")

## Streamlit interface

We will be making use of streamlit to record ratings for making predictions for a new users. 

The process will involve recording some ratings for items, storing these as a vector. getting the product of this with the transpose of the product matrix to approximate the latent represention and then by the product matrix again to get the recommendations

As this process is not supported by our model and is being ran manually there will be some performance issues. These would be addressed in a large scale deployment by using the sparse representation of a vector and having the model features cached. 

Our process has to load in the model item matrix and create a vector representation in numpy, which is not optimized for sparse vectors. We made these this manual evaluation is disjoint and is just for our review, so these performance issues are not impactful.

In [18]:
!pip install streamlit

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting streamlit
  Downloading streamlit-1.15.2-py2.py3-none-any.whl (9.2 MB)
[K     |████████████████████████████████| 9.2 MB 11.5 MB/s 
Collecting semver
  Downloading semver-2.13.0-py2.py3-none-any.whl (12 kB)
Collecting rich>=10.11.0
  Downloading rich-12.6.0-py3-none-any.whl (237 kB)
[K     |████████████████████████████████| 237 kB 68.1 MB/s 
[?25hCollecting pydeck>=0.1.dev5
  Downloading pydeck-0.8.0-py2.py3-none-any.whl (4.7 MB)
[K     |████████████████████████████████| 4.7 MB 46.3 MB/s 
Collecting validators>=0.2
  Downloading validators-0.20.0.tar.gz (30 kB)
Collecting pympler>=0.9
  Downloading Pympler-1.0.1-py3-none-any.whl (164 kB)
[K     |████████████████████████████████| 164 kB 83.0 MB/s 
Collecting gitpython!=3.1.19
  Downloading GitPython-3.1.29-py3-none-any.whl (182 kB)
[K     |████████████████████████████████| 182 kB 72.3 MB/s 
Collecting watchdog
  Downloading

In [19]:
!pip install pyngrok==4.1.1

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyngrok==4.1.1
  Downloading pyngrok-4.1.1.tar.gz (18 kB)
Building wheels for collected packages: pyngrok
  Building wheel for pyngrok (setup.py) ... [?25l[?25hdone
  Created wheel for pyngrok: filename=pyngrok-4.1.1-py3-none-any.whl size=15982 sha256=65f56702aa565dbf7bee68febb720b3d46ab487e2b0dc8f7734d9e6f7c1ad235
  Stored in directory: /root/.cache/pip/wheels/5e/0a/51/8cb053ccd84481dd3233eba4cdb608bc7a885fd8ca418c0806
Successfully built pyngrok
Installing collected packages: pyngrok
Successfully installed pyngrok-4.1.1


Streamlit will be ran in the background using the file created below. We can then make selections and rate these items to create a full representation of a new user.

This is taken multiplied with the transpose of the item-matrix to approxiate the latent representation of the user. Then with the item-,atrix again to get the recommendations.

In [60]:
%%writefile recommender.py
import streamlit as st
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql import SparkSession
import pickle
import numpy as np
import pandas as pd

@st.cache(allow_output_mutation=True)
def persistrating():
    return {}
  
@st.cache(allow_output_mutation=True)
def persistitem():
    return []

def load_model():
  spark = SparkSession.builder.appName("amazon_recommender").getOrCreate()
  model = ALSModel.load("drive/MyDrive/CA4022/models/full_normal_rank12_iter12_reg0.2_model.dat")
  return model

def sparse_list(ratings, item_ids):
  output = [0] * len(item_ids)
  not_found = []
  for k, v in ratings.items():
    try:
      index = item_ids.index(int(k))
      output[index] = v
    except ValueError:
      not_found.append(k)

  return output, not_found

@st.experimental_memo
def retrieve_item_dict():
  with open('drive/MyDrive/CA4022/item_dict.pickle', 'rb') as p:
    item_dict = pickle.load(p)
  return item_dict

@st.experimental_memo
def retrieve_names():
  return [f"{v}---{k}" for k, v in retrieve_item_dict().items()]

def filter_items(values, text):
  return [i for i in values if text.casefold() in i.casefold()]

def save_filtered(filtered):
  with open('drive/MyDrive/CA4022/filtered.pickle', 'wb') as p:
      pickle.dump(filtered, p, protocol=pickle.HIGHEST_PROTOCOL)
  
def load_filtered():
  with open('drive/MyDrive/CA4022/filtered.pickle', 'rb') as p:
    return pickle.load(p)

@st.experimental_memo
def compute_recommendations(ratings, item_id_name):
  model = load_model()
  items = model.itemFactors
  ids = [x["id"] for x in items.select("id").collect()]
  rating_vector, not_found = sparse_list(ratings, ids)
  ratings = np.asarray(rating_vector)
  features = [x["features"] for x in items.select("features").collect()]
  array_features = np.array(features)
  latent_rep = np.matmul(ratings, array_features)
  recommendations = np.matmul(latent_rep, np.transpose(array_features))
  names = [item_id_name[id] for id in ids]

  return zip(names, recommendations), not_found


def main():
  # Load in the model

  st.set_page_config(
    page_title="Amazon Recommender System",
    layout="wide"
)

  st.header("Amazon Recommender System")

  item_id_name = retrieve_item_dict()
  names = retrieve_names()

  items_to_rate = persistitem()
  user_ratings = persistrating()
  filtered_names = []

  if "recommendations" not in locals():
    recommendations = None

  left, right = st.columns([2, 5])

  left.subheader("Please search for items and make a selection. Then make ratings against these items")

  right.subheader("Your recommentations can be seen here once submitted")

  search_text = left.text_input("Search the item list here. Press Enter to search")

  if left.button("Search items"):

    save_filtered(filter_items(names, search_text))
  
  try:
    filtered_names = load_filtered()
    selection = left.selectbox("Select an item to add to your list of items to rate", filtered_names)

    if left.button("Add selection to list"):
      items_to_rate.append(selection)

  except:
    pass
  
  left.write(items_to_rate)
  
  if left.button("Clear the rating selection"):
    items_to_rate.clear()
    user_ratings.clear()
  
  expander =  left.expander("Rate your selected items", expanded=False)

  for item in items_to_rate:

    rating = expander.selectbox(f"Rating for {item}", [0,1,2,3,4,5])

    _, id = item.split("---")

    user_ratings[id] = rating
    
  if left.checkbox("Compute recommendations"):
    output, not_found = compute_recommendations(user_ratings, item_id_name)
    right.dataframe(pd.DataFrame(output, columns=["Items", "Predicted relevance"]).sort_values(by=["Predicted relevance"], ascending=False))


if __name__ == '__main__':
	main()

Overwriting recommender.py


In [4]:
!streamlit run --server.port 8501 recommender.py &>/dev/null&

In [6]:
!pgrep streamlit

1699


In [9]:
!kill 1583

In [7]:
from pyngrok import ngrok
# Setup a tunnel to the streamlit port 8501

!ngrok authtoken # get auth code from ngrok account

Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml


In [8]:
public_url = ngrok.connect(port='8501')

In [9]:
public_url

'http://2122-34-90-22-194.ngrok.io'