In [1]:
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder.appName("Movie_Recommendation_System").getOrCreate()
movies_path = "./data/movies.csv"
ratings_path = "./data/ratings.csv"

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/18 11:44:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#Open the 2 csv files
movies_df = spark.read.csv(movies_path, header=True, inferSchema=True)
ratings_df = spark.read.csv(ratings_path, header=True, inferSchema=True)

#Joining the 2 df in a single one and deleting the duplicated column
df = movies_df.join(ratings_df, movies_df.movieId == ratings_df.movieId, "inner").drop(ratings_df["movieId"])



In [4]:
movies_df.show(5, truncate=False)
count_movies = movies_df.count()
print(f"Nb of lines in the movies_df : {count_movies}")

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows

Nb of lines in the movies_df : 9742


In [5]:
ratings_df.show(5, truncate=False)
count_ratings = ratings_df.count()
print(f"Nb of lines in the ratings_df : {count_ratings}")

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|1     |1      |4.0   |964982703|
|1     |3      |4.0   |964981247|
|1     |6      |4.0   |964982224|
|1     |47     |5.0   |964983815|
|1     |50     |5.0   |964982931|
+------+-------+------+---------+
only showing top 5 rows

Nb of lines in the ratings_df : 100836


In [6]:
df.show(100, truncate=False)
count_df = df.count()
print(f"Nb of lines in df : {count_df}")

+-------+------------------------------------------------------------------------------+--------------------------------------------------+------+------+---------+
|movieId|title                                                                         |genres                                            |userId|rating|timestamp|
+-------+------------------------------------------------------------------------------+--------------------------------------------------+------+------+---------+
|1      |Toy Story (1995)                                                              |Adventure|Animation|Children|Comedy|Fantasy       |1     |4.0   |964982703|
|3      |Grumpier Old Men (1995)                                                       |Comedy|Romance                                    |1     |4.0   |964981247|
|6      |Heat (1995)                                                                   |Action|Crime|Thriller                             |1     |4.0   |964982224|
|47     |Seven (

In [7]:
#Loading Vanessa's model
from pyspark.ml.recommendation import ALS

model = ALS().load("./model/ALS_Movie_Rec_model/")


In [8]:
#Select a single user for the df
single_user = df.filter(df['userId']==12).select(['movieId','userId', 'title'])
#single_user.show(100,truncate=False)
#title = df.filter(df['movieId']==39).select('title')
#title.show()

#Take the recommendations for that user
recommendations = model.fit(df).transform(single_user)
#Show the recommendations for that user
title_rec = recommendations.orderBy('prediction',ascending=False).select('title')

title_rec.show()

type(df)


23/04/18 11:44:25 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/04/18 11:44:25 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/04/18 11:44:25 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/04/18 11:44:25 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
+--------------------+
|               title|
+--------------------+
|        Shine (1996)|
| Billy Elliot (2000)|
|Love Actually (2003)|
| First Knight (1995)|
|Notebook, The (2004)|
|  'burbs, The (1989)|
|         Emma (1996)|
|Pride & Prejudice...|
|10 Things I Hate ...|
|      Titanic (1997)|
|Circle of Friends...|
|Gone with the Win...|
|Romeo and Juliet ...|
|Groundhog Day (1993)|
|Four Weddings and...|
|What Women Want (...|
| Little Women (1994)|
|Sweet Home Alabam...|
|So I Married an A...|
|    

pyspark.sql.dataframe.DataFrame

In [9]:
from pyspark.sql.functions import split
titles = df.select('title')
df1 = df.withColumn("year", split(df.title, r'[()]').getField(2))
#df1.select("year").show(truncate=False)
df1.select("year").distinct().show(truncate=False)

+----------------+
|year            |
+----------------+
|null            |
| Alone          |
| Redo           |
| Days of Summer |
| Advance        |
|"               |
|                |
|                |
+----------------+



In [10]:
from pyspark.sql.functions import regexp_extract, col

df = df.withColumn('year', regexp_extract(col('title'), r'\((\d{4})\)$', 1))
df = df.withColumn('title_only', split(df.title, r'[()]').getItem(0))
df = df.withColumn('title_only', split(df.title_only, ',').getItem(0))
df.show(truncate=False)

+-------+-----------------------------------------+-------------------------------------------+------+------+---------+----+-----------------------------------+
|movieId|title                                    |genres                                     |userId|rating|timestamp|year|title_only                         |
+-------+-----------------------------------------+-------------------------------------------+------+------+---------+----+-----------------------------------+
|1      |Toy Story (1995)                         |Adventure|Animation|Children|Comedy|Fantasy|1     |4.0   |964982703|1995|Toy Story                          |
|3      |Grumpier Old Men (1995)                  |Comedy|Romance                             |1     |4.0   |964981247|1995|Grumpier Old Men                   |
|6      |Heat (1995)                              |Action|Crime|Thriller                      |1     |4.0   |964982224|1995|Heat                               |
|47     |Seven (a.k.a. Se7en) (199

In [12]:
#Loading Vanessa's model
from pyspark.ml.recommendation import ALS

model_loaded = ALS().load("./model/ALS_Movie_Rec_model/")

def title_rec_given_user(user_id : int, df, model_loaded):
    """
    Give the recommended movies'titles for a given user sorted from the best recommendations to the worst
    in our dataset (df) who has the format "pyspark.sql.dataframe.DataFrame"
    """
    #taking the movieId, userId and title for the given user_id
    user = df.filter(df['userId'] == user_id).select(['movieId','userId', 'title','title_only','year'])
    #Take the recommendations for the user defined the line above
    recommendations = model_loaded.fit(df).transform(user)
    #Show the recommended movies for that user sorted from the best recommendations to the worst
    title_rec = recommendations.orderBy('prediction',ascending=False).select('movieId','title','prediction','title_only','year')

    return title_rec

rec_movies = title_rec_given_user(user_id=1, df=df, model_loaded=model)
rec_movies.show(truncate=False)
    

+-------+------------------------------------------------------------------------------+----------+-----------------------------------------------+----+
|movieId|title                                                                         |prediction|title_only                                     |year|
+-------+------------------------------------------------------------------------------+----------+-----------------------------------------------+----+
|1198   |Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)|5.179106  |Raiders of the Lost Ark                        |1981|
|260    |Star Wars: Episode IV - A New Hope (1977)                                     |5.119077  |Star Wars: Episode IV - A New Hope             |1977|
|1197   |Princess Bride, The (1987)                                                    |5.1116304 |Princess Bride                                 |1987|
|1196   |Star Wars: Episode V - The Empire Strikes Back (1980)                    

23/04/18 19:12:51 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 913786 ms exceeds timeout 120000 ms
23/04/18 19:12:51 WARN SparkContext: Killing executors is not supported by current scheduler.
23/04/18 19:12:56 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:643)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1057)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238)
	at sc

In [None]:
df.select(df.userId).distinct().count()


610

In [None]:
from pyspark.sql.functions import sum
from pyspark.sql.functions import sumDistinct

#sum = df.userId.sum()

df.select(sumDistinct(df.userId)).show()



+--------------------+
|sum(DISTINCT userId)|
+--------------------+
|              186355|
+--------------------+



In [None]:
somme = 0
for i in range(611):
    somme += i
print(somme)

186355


23/04/18 11:33:18 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
