# Recommendation system

## Imports and db connection

In [1]:
import sqlite3
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
#import streamlit as st
import time
import matplotlib.pyplot as plt

Set up of Spark environment

In [2]:
# import findspark
import findspark
# initialize findspark with spark directory
findspark.init("/home/juliendesmedt/spark/")
# import pyspark
import pyspark
# create spark context
sc = pyspark.SparkContext()
# create spark session
spark = pyspark.sql.SparkSession(sc)

22/12/02 10:38:02 WARN Utils: Your hostname, JdsThinkPad resolves to a loopback address: 127.0.1.1; using 192.168.1.8 instead (on interface wlp58s0)
22/12/02 10:38:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/02 10:38:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Cache memory

In [6]:
from functools import lru_cache
import time


url = "../Data/title_basics.tsv"

dict{query1 : cache_memory}


@lru_cache()
def get_csv1(url):
    print("Fetching csv from local...")
    basic = pd.read_csv(url, sep= "\t")
    return basic


# Execution start time
begin = time.time()
url = "../Data/title_basics.tsv"
get_csv1(url)
  
# Execution end time
end = time.time()

# Execution start time 2
begin_cache = time.time()
url = "../Data/title_basics.tsv"
get_csv1(url)
  
# Execution end time
end_cache = time.time()

print('no cache :'+ str(end-begin))
print('cache :'+ str(end_cache -begin_cache ) )


Fetching csv from local...


  basic = pd.read_csv(url, sep= "\t")


In [11]:
# Execution start time
begin_cache = time.time()
url = "../Data/title_basics.tsv"
get_csv1(url)
  
# Execution end time
end_cache = time.time()

print('no cache :'+ str(end-begin))
print('cache :'+ str(end_cache -begin_cache ) )

no cache :24.450178623199463
cache :0.0018494129180908203


Loading of data set under Spark

In [3]:
#/home/juliendesmedt/Documents/becode/recomendation_imdb/Data/
# import basic data
title_basics = spark.read.csv("../Data/title_basics.tsv", header=True, inferSchema=True, sep= "\t")

# import crew data
title_crew = spark.read.csv("../Data/title_crew.tsv", header=True, inferSchema=True, sep= "\t")

# import rating data
title_ratings = spark.read.csv("../Data/title_ratings.tsv", header=True, inferSchema=True, sep= "\t")

                                                                                

In [4]:
# inspect tale
title_ratings.show(5)
# inspect number of partitions of data
title_ratings.rdd.getNumPartitions()
# inspect data types
title_ratings.printSchema()


+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1924|
|tt0000002|          5.8|     259|
|tt0000003|          6.5|    1737|
|tt0000004|          5.6|     174|
|tt0000005|          6.2|    2550|
+---------+-------------+--------+
only showing top 5 rows

root
 |-- tconst: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)



### Preprocessing of data in order to suit SQL spark

To read in the data with an SQL statement, the classes of all columns need to be defined manually with the `StructType` and `StructField` command. The latter has three parameters:
- `name` of the column
- `dataType` of the column
- `nullable`which defines whether the column can be null: true/false

Note the difference in  duration between the spark and SQL command.

In [5]:
# import sql types
from pyspark.sql.types import *

In [6]:
# define the schemas
title_basicsSchema  = StructType([StructField('tconst', StringType(), True),
                                StructField('titleType', StringType(), True),
                                StructField('primaryTitle', StringType(), True),
                                StructField('originalTitle', StringType(), True),
                                StructField('isAdult', BooleanType(), True),
                                StructField('startYear', IntegerType(), True),
                                StructField('endYear', IntegerType(), True),
                                StructField('runtimeMinutes', IntegerType(), True),
                                StructField('genres', StringType(), True)])

title_crewSchema  = StructType([StructField('tconst', StringType(), True),
                              StructField('directors', StringType(), True),
                              StructField('writers', StringType(), True)])

title_ratingsSchema  = StructType([StructField('tconst', StringType(), True),
                              StructField('averageRating', DoubleType(), True),
                              StructField('numVotes', IntegerType(), True)]) 


In [7]:
# import basic data
title_basics = spark.read.csv("../Data/title_basics.tsv", header=True, inferSchema=True, sep= "\t", schema=title_basicsSchema)

# import crew data
title_crew = spark.read.csv("../Data/title_crew.tsv", header=True, inferSchema=True, sep= "\t", schema=title_crewSchema)

# import rating data
title_ratings = spark.read.csv("../Data/title_ratings.tsv", header=True, inferSchema=True, sep= "\t",schema=title_ratingsSchema)

In [8]:
# inspect tale
title_ratings.show(5)
# inspect number of partitions of data
title_ratings.rdd.getNumPartitions()
# inspect data types
title_ratings.printSchema()

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1924|
|tt0000002|          5.8|     259|
|tt0000003|          6.5|    1737|
|tt0000004|          5.6|     174|
|tt0000005|          6.2|    2550|
+---------+-------------+--------+
only showing top 5 rows

root
 |-- tconst: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)



In [9]:
# import sql functions
import pyspark.sql.functions as F

In [10]:
# Register the DataFrame
title_basics.createOrReplaceTempView("title_basicsSQL")
title_crew.createOrReplaceTempView("title_crewSQL")
title_ratings.createOrReplaceTempView("title_ratingsSQL")


df = spark.sql("""SELECT tb.tconst, tb.primaryTitle, tb.startYear, tb.genres, tc.directors, tr.averageRating, tr.numVotes
FROM title_basicsSQL tb
LEFT JOIN title_crewSQL tc
ON tb.tconst = tc.tconst
LEFT JOIN title_ratingsSQL tr
ON tb.tconst = tr.tconst
WHERE tr.averageRating >= 0.8 AND tr.numVotes >= 1000 """)


In [19]:
df.rdd.getNumPartitions()



10

In [12]:
df = df.withColumn("genres", F.regexp_replace(str=F.col("genres"), pattern=",", replacement=" ")) \
        .withColumn("genres", F.lower("genres")) \
        .withColumn("primaryTitle_lower", F.lower("primaryTitle")) \
        .withColumn("soup",F.concat_ws(' ','directors','genres'))

In [13]:
#Create a pandas dataframe
df.toPandas().head(6)

                                                                                

Unnamed: 0,tconst,primaryTitle,startYear,genres,directors,averageRating,numVotes,primaryTitle_lower,soup
0,tt0000008,Edison Kinetoscopic Record of a Sneeze,1894.0,documentary short,nm0005690,5.4,2068,edison kinetoscopic record of a sneeze,nm0005690 documentary short
1,tt0000015,Autour d'une cabine,1894.0,animation short,nm0721526,6.2,1033,autour d'une cabine,nm0721526 animation short
2,tt0000242,The Sign of the Cross,1899.0,fantasy horror short,nm0617588,6.4,1174,the sign of the cross,nm0617588 fantasy horror short
3,tt0000614,The Red Spectre,1907.0,fantasy horror short,"nm0159015,nm0954087",6.6,1068,the red spectre,"nm0159015,nm0954087 fantasy horror short"
4,tt0001223,Frankenstein,1910.0,fantasy horror sci-fi,nm0205986,6.4,4302,frankenstein,nm0205986 fantasy horror sci-fi
5,tt0001740,The Lonedale Operator,1911.0,drama romance short,nm0000428,6.5,1228,the lonedale operator,nm0000428 drama romance short


### Create a pipeline to process the data and create vectors

In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer,StopWordsRemover, Tokenizer

In [16]:
import nltk
nltk.download("stopwords")
stopwordList = nltk.corpus.stopwords.words('english')

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/juliendesmedt/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [17]:
# define the tokenizer
TO = Tokenizer(inputCol="soup", outputCol="soup_tok")
# define the stop word remover
SWR = StopWordsRemover(inputCol='soup_tok', outputCol='soup_cleaned', stopWords=stopwordList)
# define bow model
BOW = CountVectorizer(inputCol = 'soup_cleaned', outputCol = 'soup_vec')

# inspect the output of one of the feature extractors
temp_pipeline = Pipeline().setStages([TO, SWR, BOW]).fit(df)
df = temp_pipeline.transform(df)

                                                                                

In [18]:
from pyspark.sql.functions import monotonically_increasing_id 

df = df.withColumn("ID", monotonically_increasing_id())

In [17]:
df.show(10)



22/12/01 15:02:29 WARN DAGScheduler: Broadcasting large task binary with size 1039.7 KiB


[Stage 21:>                                                         (0 + 1) / 1]

+---------+--------------------+---------+--------------------+-------------------+-------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+---+
|   tconst|        primaryTitle|startYear|              genres|          directors|averageRating|numVotes|  primaryTitle_lower|                soup|            soup_tok|        soup_cleaned|            soup_vec| ID|
+---------+--------------------+---------+--------------------+-------------------+-------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+---+
|tt0000008|Edison Kinetoscop...|     1894|   documentary short|          nm0005690|          5.4|    2068|edison kinetoscop...|nm0005690 documen...|[nm0005690, docum...|[nm0005690, docum...|(28643,[13,20,179...|  0|
|tt0000015| Autour d'une cabine|     1894|     animation short|          nm0721526|          6.2|    1033| autour d'une cabine|nm0721526

                                                                                

In [66]:
# filter on the spark data set data with specific requirements
df.filter(df.primaryTitle=='Titanic').select("ID").toPandas().head(10)

                                                                                

Unnamed: 0,ID
0,17179871794
1,25769804075
2,25769812703
3,34359738916
4,42949674059


[Stage 174:>                                                        (0 + 1) / 1]

22/12/01 14:05:45 ERROR RetryingBlockTransferor: Exception while beginning fetch of 1 outstanding blocks (after 3 retries)
java.io.IOException: Connecting to /10.40.10.169:45911 timed out (120000 ms)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:285)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:126)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:154)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.lambda$initiateRetry$0(RetryingBlockTransferor.java:184)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolEx

# I tried to reproduced the cosin function but without success

In [None]:
from pyspark.mllib.linalg.distributed import IndexedRowMatrix
mat = IndexedRowMatrix(df).toBlockMatrix()
dot = mat.multiply(mat.transpose())
dot.toLocalMatrix().toArray()

### Option 1 with sql commands (main issue is that doing it on all vector is too big)

In [18]:
dot_udf = F.udf(lambda x,y: float(x.dot(y)), DoubleType())
df.alias("i").join(df.alias("j"), F.col("i.ID") < F.col("j.ID"))\
    .select(
        F.col("i.ID").alias("i"), 
        F.col("j.ID").alias("j"), 
        dot_udf("i.soup_vec", "j.soup_vec").alias("dot"))\
    .sort("i", "j")\
    .limit(10) \
    .show()



22/12/01 15:08:30 WARN DAGScheduler: Broadcasting large task binary with size 1070.1 KiB


ERROR:root:KeyboardInterrupt while sending command.               (0 + 4) / 100]
Traceback (most recent call last):
  File "/home/juliendesmedt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/juliendesmedt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

### Option 2 DataFrame

In [69]:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

movies_names = []

mat = IndexedRowMatrix(
    df.select("ID", "soup_vec")\
        .rdd.map(lambda row: IndexedRow(row.ID, row.soup_vec.toArray()))).toBlockMatrix()

for i in movies_names:
    dot = mat[i].multiply(mat.transpose())
    dot.toLocalMatrix().toArray()



22/12/01 14:37:22 WARN DAGScheduler: Broadcasting large task binary with size 1044.2 KiB


                                                                                

22/12/01 14:37:29 WARN DAGScheduler: Broadcasting large task binary with size 1055.2 KiB


ERROR:root:KeyboardInterrupt while sending command.                 (0 + 4) / 5]
Traceback (most recent call last):
  File "/home/juliendesmedt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/juliendesmedt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

Next of non pyspark code 

In [54]:
# Compute the Cosine Similarity matrix based on the count_matrix
cosine_sim2 = cosine_similarity(count_matrix, count_matrix)

In [None]:
print(cosine_sim2.shape)
print(cosine_sim2.dtypes)

print(cosine_sim2)

In [56]:
# Function that takes in movie title as input and outputs most similar movies
def get_recommendations(title, cosine_sim=cosine_sim2):
    # Get the index of the movie that matches the title
    idx = df_clean.index[df_clean['primaryTitle'] == title]
    print(f"1. idx : {idx}")
    print(idx)

    # Get the pairwsie similarity scores of all movies with that movie
    sim_scores = cosine_sim2[idx]
    print(f"2. sim_scores : {sim_scores}")
    print(sim_scores.shape)
    df_sim=pd.DataFrame(sim_scores.reshape(-1),columns=['cos'])

    df_sim.reset_index()

    df_sim.columns
    # print(1000000000000000000)
    # print(df_sim.head())

    rslt_df = df_sim.sort_values(by='cos',ascending=False)

    # print(rslt_df[0:11])
    rec_list=rslt_df.reset_index()
    list1=[]
    list1=rec_list['index'][1:11].values

    # Return the top 10 most similar movies
    return df_clean['primaryTitle'].iloc[list1]

(1, 14891)

In [None]:
# Function that takes in movie title as input and outputs most similar movies
def get_recommendations_list(liked_movies_list, cosine_sim=cosine_sim2):
    # Get the index of the movie that matches the title
    
    idx_list=[]
    for m in liked_movies_list:
        
        idx = df_clean.index[df_clean['primaryTitle'] == m]
        idx_list.append(idx)

    df_sim_list=pd.DataFrame()
    print(idx_list)
    for m in idx_list:
        sim_scores = cosine_sim2[m]
        df_sim=pd.DataFrame(sim_scores.reshape(-1),columns=[m])
        df_sim_list[m]=df_sim



        
    df_sim_list['average']=df_sim_list.mean(numeric_only=True, axis=1)

    rslt_df = df_sim_list.sort_values(by='average',ascending=False)

    # print(rslt_df[0:11])
    rec_list=rslt_df.reset_index()
    list1=[]
    list1=rec_list['index'][0:11].values

    # Return the top 10 most similar movies
    return df_clean['primaryTitle'].iloc[list1]

In [60]:
var = get_recommendations('The Kid')
print(var)

1. idx : Int64Index([3], dtype='int64')
2. sim_scores : [(0, array([0.25     , 0.       , 0.25     , ..., 0.2236068, 0.       ,
       0.       ]))]
3. sorted sim_scores : [(0, array([0.25     , 0.       , 0.25     , ..., 0.2236068, 0.       ,
       0.       ]))]


AttributeError: 'list' object has no attribute 'iloc'