# Program który na podstawie ocen uzytkowników będzie polecał filmy
###

In [5]:

from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName('użycie collaborative filtering dla rekomendacji filmów')\
        .getOrCreate()

rawData = spark.read.format('csv').\
option('header','true').\
load('../datasets/movielens/ratings.csv')

## wczytane dane konwertujemy do DataFrame

In [8]:
rawData.toPandas().head()


Unnamed: 0,userId,movieId,rating,timestamp
0,1,31,2.5,1260759144
1,1,1029,3.0,1260759179
2,1,1061,3.0,1260759182
3,1,1129,2.0,1260759185
4,1,1172,4.0,1260759205


### rzutujemy dane z kolumn na typ integer
### porzucamy także zmienna 'timestamp' jako nieużyteczną

In [9]:
from pyspark.sql.functions import col

dataset  = rawData.select(col('userId').cast('int'),
                          col('movieId').cast('int'),
                          col('rating').cast('int')
                         )

dataset.toPandas().head()

Unnamed: 0,userId,movieId,rating
0,1,31,2
1,1,1029,3
2,1,1061,3
3,1,1129,2
4,1,1172,4


In [10]:
dataset.select('rating').toPandas().describe()

Unnamed: 0,rating
count,100004.0
mean,3.416123
std,1.100971
min,0.0
25%,3.0
50%,4.0
75%,4.0
max,5.0


## dzielimy dane na zbiór testowy i treningowy, oraz inicjujemy model  a następnie uruchamiamy model na danych testowych

In [11]:
(trainingData, testData) = dataset.randomSplit([0.8,0.2])

In [12]:
from pyspark.ml.recommendation import ALS

als = ALS(maxIter = 5,
         regParam = 0.1,
         userCol = 'userId',
         itemCol =  'movieId',
         ratingCol = 'rating',
         coldStartStrategy = 'drop')

In [13]:
model  = als.fit(trainingData)

## wywołujemy wyniki

In [14]:
predictions = model.transform(testData)
predictions.toPandas().head()

Unnamed: 0,userId,movieId,rating,prediction
0,452,463,2,2.924944
1,85,471,3,3.279933
2,588,471,3,3.897369
3,86,471,4,4.231772
4,19,471,3,3.895056


In [15]:
predictions.select('rating', 'prediction').toPandas().describe()

Unnamed: 0,rating,prediction
count,19273.0,19273.0
mean,3.414881,3.262427
std,1.101214,0.782692
min,0.0,-0.056316
25%,3.0,2.798256
50%,4.0,3.341745
75%,4.0,3.810047
max,5.0,6.147857


## ponieważ do systemu rekomendacji użyliśmy oceny czyli wartości która oceną jest sama w sobie (wart. 'implicit') do określenia 'jakości' modelu możemy użyć wartości RMSE

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName = 'rmse',
                                labelCol = 'rating',
                                predictionCol = 'prediction'
                                )

rmse = evaluator.evaluate(predictions)
print('RMSE = ', rmse)

RMSE =  0.9583224124090036


## jak na razie zbudowliśmy model który przewiduje oceny jakie dostaną filmy od użytkownikow, stwórzmy więc kolejną część systemu która zajmie się rekomendacjami

### weźmy 3 najbardziej rekomendowane filmy dla użytkowników...

In [17]:
userRecsAll = model.recommendForAllUsers(3)
userRecsAll

DataFrame[userId: int, recommendations: array<struct<movieId:int,rating:float>>]

In [18]:
userRecsAll.toPandas().head()

Unnamed: 0,userId,recommendations
0,471,"[(2938, 4.82033634185791), (108583, 4.81328535..."
1,463,"[(67504, 5.40000581741333), (83411, 5.40000581..."
2,496,"[(67504, 5.734083652496338), (83411, 5.7340836..."
3,148,"[(67504, 5.8726630210876465), (83411, 5.872663..."
4,540,"[(34072, 4.925570964813232), (3865, 4.91039752..."


### i na odwrót: które filmy będą wyświetlane z najwiekszym prawdopodobieństwem dla 3 użytkowników

In [19]:
movieRecsAll = model.recommendForAllItems(3)
movieRecsAll.toPandas().head()

Unnamed: 0,movieId,recommendations
0,1580,"[(113, 4.895040512084961), (46, 4.737482547760..."
1,5300,"[(257, 5.0248870849609375), (404, 4.9761595726..."
2,6620,"[(269, 4.621665000915527), (153, 4.56535005569..."
3,7340,"[(621, 3.9375312328338623), (70, 3.93183088302..."
4,32460,"[(46, 5.195440292358398), (515, 5.005160331726..."


### stwórzymy listę użytkowników dla których chcemy dokonać rekomendacji

In [20]:
from pyspark.sql.types import IntegerType

usersList = [148,463,267]
usersDF = spark.createDataFrame(usersList, IntegerType()).toDF('userId')

usersDF.take(3)

[Row(userId=148), Row(userId=463), Row(userId=267)]

In [21]:
userRecs = model.recommendForUserSubset(usersDF,5)
userRecs.toPandas()

Unnamed: 0,userId,recommendations
0,148,"[(83318, 5.8726630210876465), (67504, 5.872663..."
1,463,"[(83318, 5.40000581741333), (67504, 5.40000581..."
2,267,"[(59684, 5.399184226989746), (2068, 5.29083919..."


### skupmy się na rekomendacjach dla jednego użytkownika

In [26]:
userMoviesList = userRecs.filter(userRecs.userId == 148).select('recommendations')
print(type(userMoviesList))
userMoviesList.collect()

<class 'pyspark.sql.dataframe.DataFrame'>


[Row(recommendations=[Row(movieId=83318, rating=5.8726630210876465), Row(movieId=67504, rating=5.8726630210876465), Row(movieId=83411, rating=5.8726630210876465), Row(movieId=83359, rating=5.8726630210876465), Row(movieId=2938, rating=5.352392196655273)])]

In [27]:
moviesList = userMoviesList.collect()[0].recommendations
moviesList

[Row(movieId=83318, rating=5.8726630210876465),
 Row(movieId=67504, rating=5.8726630210876465),
 Row(movieId=83411, rating=5.8726630210876465),
 Row(movieId=83359, rating=5.8726630210876465),
 Row(movieId=2938, rating=5.352392196655273)]

In [28]:
moviesDF = spark.createDataFrame(moviesList)
moviesDF.toPandas()

Unnamed: 0,movieId,rating
0,83318,5.872663
1,67504,5.872663
2,83411,5.872663
3,83359,5.872663
4,2938,5.352392


### aby otrzymać nazwy filmów (dla danych ID) wczytujemy kolejny plik, konwertujemy go do DataFrame i łączymy oba nasze DF

In [29]:
movieData = spark.read.format('csv').\
option('header','true').\
load('../datasets/movielens/movies.csv')

movieData.toPandas().head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [31]:
recommendedMovies = movieData.join(moviesDF, on=['movieId'])\
                                .orderBy('rating',ascending =False)\
                                .select('title','genres', 'rating')

recommendedMovies.toPandas()

Unnamed: 0,title,genres,rating
0,"Goat, The (1921)",Comedy,5.872663
1,Land of Silence and Darkness (Land des Schweig...,Documentary,5.872663
2,Cops (1922),Comedy,5.872663
3,"Play House, The (1921)",Comedy,5.872663
4,Man Facing Southeast (1986),Drama|Sci-Fi,5.352392


### finalnie pakujemy wszystko w jedną funkcję która przyjmuje jako pramtetry Id uzytkownika i ilośc rekomendacji

In [32]:
from pyspark.sql.types import IntegerType

def getRecommendationsForUser(userId, numRecs):
    
    usersDF = spark.createDataFrame([userId], IntegerType()).toDF('userId')
    userRecs = model.recommendForUserSubset(usersDF, numRecs)
    
    moviesList = userRecs.collect()[0].recommendations
    moviesDF = spark.createDataFrame(moviesList)
    
    recommendedMovies = movieData.join(moviesDF, on='movieId').orderBy('rating', ascending = False).select('title', 'genres', 'rating')
    return recommendedMovies

In [33]:
recommendationsForUser = getRecommendationsForUser(219,10)
recommendationsForUser.toPandas()

Unnamed: 0,title,genres,rating
0,My Sassy Girl (Yeopgijeogin geunyeo) (2001),Comedy|Romance,5.483136
1,Rivers and Tides (2001),Documentary,5.42789
2,Pride and Prejudice (1995),Drama|Romance,5.384478
3,"Myth of the American Sleepover, The (2010)",Comedy|Drama|Romance,5.261257
4,Wish Upon a Star (1996),Comedy,5.261257
5,Lake of Fire (2006),Documentary,5.229225
6,Fawlty Towers (1975-1979),Comedy,5.212121
7,"Ruling Class, The (1972)",Comedy|Drama,5.211771
8,Fanny and Alexander (Fanny och Alexander) (1982),Drama|Fantasy|Mystery,5.209134
9,Midnight in Paris (2011),Comedy|Fantasy|Romance,5.168336
