In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

%matplotlib inline

plt.style.use('ggplot')

In [2]:
import pyspark
from pyspark.sql.types import *
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.functions import countDistinct, col

Testing py files

In [3]:
# %load_ext autoreload
# %autoreload 2

# from src.recommender import *
# from src.run import *

In [4]:
# movies = pd.read_csv('data/movies.dat', sep='|', names=['movie_id', 'title', 'genres'])

In [5]:
requests_df = pd.read_csv('data/requests.csv')

In [6]:
requests_df.head()

Unnamed: 0,user,movie
0,4958,1924
1,4958,3264
2,4958,2634
3,4958,1407
4,4958,2399


In [7]:
# for x, y in testing.iterrows():
#     print(x)
#     print(y.values)
# #     print(predicted_rating(x,y))
#     break

In [8]:
# testing.index.values

In [9]:
# test_rates

In [12]:
# requests_df.groupby('user').count().sort_values('movie')

In [13]:
training = pd.read_csv('data/training.csv')

In [14]:
training.head()

Unnamed: 0,user,movie,rating,timestamp
0,6040,858,4,956703932
1,6040,593,5,956703954
2,6040,2384,4,956703954
3,6040,1961,4,956703977
4,6040,2019,5,956703977


In [15]:
training

Unnamed: 0,user,movie,rating,timestamp
0,6040,858,4,956703932
1,6040,593,5,956703954
2,6040,2384,4,956703954
3,6040,1961,4,956703977
4,6040,2019,5,956703977
...,...,...,...,...
799995,1875,2940,5,975767255
799996,1875,589,4,975767289
799997,1875,110,4,975767289
799998,1875,2791,3,975767289


In [16]:
training.groupby('user').count().sort_values('movie')

Unnamed: 0_level_0,movie,rating,timestamp
user,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
3326,2,2,2
3038,2,2,2
4273,3,3,3
3288,3,3,3
1779,4,4,4
...,...,...,...
3618,1344,1344,1344
4277,1407,1407,1407
4169,1440,1440,1440
889,1518,1518,1518


In [17]:
training['movie'].nunique()

3662

In [18]:
# Setup a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [19]:
# Convert a Pandas DF to a Spark DF
spark_df = spark.createDataFrame(training) 

In [20]:
spark_df.printSchema()

root
 |-- user: long (nullable = true)
 |-- movie: long (nullable = true)
 |-- rating: long (nullable = true)
 |-- timestamp: long (nullable = true)



In [21]:
spark_df = spark_df.drop('timestamp')

In [22]:
spark_df.show(5)

+----+-----+------+
|user|movie|rating|
+----+-----+------+
|6040|  858|     4|
|6040|  593|     5|
|6040| 2384|     4|
|6040| 1961|     4|
|6040| 2019|     5|
+----+-----+------+
only showing top 5 rows



In [23]:
train, validation = spark_df.randomSplit([0.8, 0.2], seed=111)

In [24]:
train.show(5)

+----+-----+------+
|user|movie|rating|
+----+-----+------+
|4615|    6|     4|
|4615|   34|     4|
|4615|   47|     5|
|4615|   50|     5|
|4615|  111|     5|
+----+-----+------+
only showing top 5 rows



In [25]:
num_users = train.select('user').distinct().count()
num_movies = train.select('movie').distinct().count()
density = train.count() / (num_users*num_movies)
density

0.032644998010331584

In [26]:
als_model = ALS(userCol='user',
                itemCol='movie',
                ratingCol='rating',
                nonnegative=True,
                regParam=0.1,
                rank=10
)

In [27]:
recommender = als_model.fit(train)

In [28]:
predictions = recommender.transform(train)

In [29]:
predictions.show(5)

+----+-----+------+----------+
|user|movie|rating|prediction|
+----+-----+------+----------+
| 673|  148|     5|  3.934546|
|4227|  148|     2|  2.125327|
|3184|  148|     4| 3.3994436|
|4784|  148|     3| 2.8922584|
|2383|  148|     2|  2.208449|
+----+-----+------+----------+
only showing top 5 rows



In [30]:
predictions.describe().show()

+-------+------------------+------------------+------------------+------------------+
|summary|              user|             movie|            rating|        prediction|
+-------+------------------+------------------+------------------+------------------+
|  count|            639965|            639965|            639965|            639965|
|   mean|3402.1216879048075|1849.7404170540576|3.5898228809388013| 3.434739266339282|
| stddev|1546.7888494485946| 1086.589104168672|1.1207273959770319|0.6743455277131295|
|    min|               636|                 1|                 1|        0.33886337|
|    max|              6040|              3952|                 5|         5.4484425|
+-------+------------------+------------------+------------------+------------------+



In [31]:
recommender.itemFactors.show()

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.0, 0.22776745,...|
| 20|[0.19267851, 0.37...|
| 30|[0.8351714, 0.535...|
| 40|[0.55064714, 0.38...|
| 50|[0.39124775, 0.05...|
| 60|[0.38131517, 0.76...|
| 70|[0.26579908, 0.19...|
| 80|[0.79369724, 0.17...|
| 90|[0.3440217, 0.491...|
|100|[0.46107748, 0.27...|
|110|[0.08235355, 0.34...|
|120|[0.26086277, 0.35...|
|130|[0.1459052, 0.420...|
|140|[0.0, 0.7777547, ...|
|150|[0.0, 0.33216935,...|
|160|[0.07419564, 0.63...|
|170|[0.0, 0.6904511, ...|
|180|[0.10563368, 0.22...|
|190|[1.0760685, 0.406...|
|200|[0.4716783, 0.872...|
+---+--------------------+
only showing top 20 rows



In [32]:
recommender.userFactors.show()

+---+--------------------+
| id|            features|
+---+--------------------+
|640|[0.78973275, 0.0,...|
|650|[0.2406644, 0.535...|
|660|[0.49819872, 0.28...|
|670|[0.5646101, 0.0, ...|
|680|[0.31658038, 0.19...|
|690|[0.4835288, 0.436...|
|700|[1.434522, 0.6220...|
|710|[0.6611437, 0.279...|
|720|[0.13049588, 0.34...|
|730|[0.7321822, 0.220...|
|740|[1.1104559, 0.438...|
|750|[0.5993074, 0.644...|
|760|[1.0360019, 0.636...|
|770|[0.0, 0.028388018...|
|780|[0.11940959, 0.73...|
|790|[0.41411778, 1.08...|
|800|[0.0, 1.0876493, ...|
|810|[0.4704072, 0.426...|
|820|[0.0, 0.65515345,...|
|830|[1.0138775, 0.880...|
+---+--------------------+
only showing top 20 rows



In [33]:
user1 = recommender.recommendForUserSubset(predictions, 10).where('user==700').select('recommendations').take(1)

In [34]:
user1

[Row(recommendations=[Row(movie=557, rating=5.4388933181762695), Row(movie=2981, rating=5.351528644561768), Row(movie=2964, rating=5.230461120605469), Row(movie=1421, rating=5.2199482917785645), Row(movie=559, rating=5.070537567138672), Row(movie=960, rating=5.021181106567383), Row(movie=1164, rating=5.006983757019043), Row(movie=3880, rating=4.903035640716553), Row(movie=1406, rating=4.898168563842773), Row(movie=97, rating=4.8909759521484375)])]

In [35]:
def predicted_rating(user_id, movie_id):
    try:
        user = recommender.userFactors.where(f'id == {user_id}').collect()[0]['features']
        item = recommender.itemFactors.where(f'id == {movie_id}').collect()[0]['features']
    except:
        user = 1
        item = -1
        
    return np.dot(np.array(user), np.array(item))

In [36]:
predicted_rating(4615, 34)

3.517977425045802

In [37]:
predicted_rating(4958, 1924)

2.5477658287300926

In [38]:
testing = requests_df.copy().set_index('user')

In [None]:
test_rates = []

for x, y in zip(testing.index.values, testing.values):
    test_rates.append(predicted_rating(x,y[0]))  

In [11]:
test_request = requests_df.copy()
test_request['rating'] = test_request.apply(lambda x: predicted_rating(test_request['user'], test_request['movie']))