In [2]:
import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
import pandas as pd
import numpy as np

In [3]:
# read the utility data (user, movie, rating, date)
utility_df = pd.read_csv('data/u.data', sep='\t',header=None)

In [4]:
# drop the date column
utility_df.drop(3, axis=1, inplace=True)

In [5]:
# Setup a SparkSession
spark = SparkSession.builder.getOrCreate()

# Convert a Pandas DF to a Spark DF
# also add column names (names used work with ALS model below)
utility_spark_df = spark.createDataFrame(utility_df, ["user", "movie", "rating"])

In [6]:
# check returned data
utility_spark_df.show()

+----+-----+------+
|user|movie|rating|
+----+-----+------+
| 196|  242|     3|
| 186|  302|     3|
|  22|  377|     1|
| 244|   51|     2|
| 166|  346|     1|
| 298|  474|     4|
| 115|  265|     2|
| 253|  465|     5|
| 305|  451|     3|
|   6|   86|     3|
|  62|  257|     2|
| 286| 1014|     5|
| 200|  222|     5|
| 210|   40|     3|
| 224|   29|     3|
| 303|  785|     3|
| 122|  387|     5|
| 194|  274|     2|
| 291| 1042|     4|
| 234| 1184|     2|
+----+-----+------+
only showing top 20 rows



In [7]:
# do a test/train split using the pyspark method
train, test = utility_spark_df.randomSplit([0.8, 0.2], seed=427471138)

In [8]:
als_model = ALS(
    itemCol='movie',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=0.1,
    rank=10
    ) 

In [9]:
# fit the model
fit_als_model = als_model.fit(train)

In [10]:
# subselect data for user 1 only
utility_1 = utility_df[utility_df[0] == 1]
# subselect user 1 data for movie 100 only
one_row_pandas_df = utility_1[utility_1[1] == 100]
# convert to single-user, single-movie spark data frame
one_row_spark_df = spark.createDataFrame(one_row_pandas_df, ["user", "movie", "rating"])
# show result
one_row_spark_df.show()

+----+-----+------+
|user|movie|rating|
+----+-----+------+
|   1|  100|     5|
+----+-----+------+



In [11]:
# use model to create predictions on our single-row df
fit_als_model.transform(one_row_spark_df).show()

+----+-----+------+----------+
|user|movie|rating|prediction|
+----+-----+------+----------+
|   1|  100|     5|  4.512069|
+----+-----+------+----------+



In [12]:
# use model to predict for our test data
fit_als_model.transform(test).show()

+----+-----+------+----------+
|user|movie|rating|prediction|
+----+-----+------+----------+
| 642|  148|     5|  3.730146|
|  27|  148|     3| 2.7040005|
| 330|  148|     4| 4.1282973|
| 416|  148|     5| 3.6591568|
| 935|  148|     4|   4.45987|
| 297|  148|     3| 2.5351734|
| 178|  148|     4| 3.4242537|
| 923|  148|     4| 3.4890065|
| 455|  148|     3|  2.942392|
| 891|  148|     5| 3.7899954|
| 930|  148|     1| 3.4828787|
| 434|  148|     3| 4.2464223|
| 438|  148|     5| 3.9753437|
| 293|  148|     1| 2.0894637|
| 793|  148|     4| 2.9233704|
| 320|  148|     4| 3.4986992|
| 893|  148|     3| 2.8671303|
| 396|  148|     4|  3.521694|
|  90|  148|     2| 3.1481078|
| 203|  148|     3| 3.2632225|
+----+-----+------+----------+
only showing top 20 rows



In [13]:
# retrieve item factor list from fitted model (for movie 100)
movie_100_f = np.array(fit_als_model.itemFactors.filter('id = 100').collect()[0][1])

In [14]:
# retrieve user factor list from fitted model (for user 1)
user_1_f = np.array(fit_als_model.userFactors.filter('id = 1').collect()[0][1])

In [15]:
# the dot product of these two lists is 
# the predicted rating for user 1 on movie 100
np.dot(movie_100_f,user_1_f)

4.5120691326121438

In [16]:
# get prediction for training data
recomendations_train = fit_als_model.transform(train)

In [17]:
# get prediction for test data
recomendations_test = fit_als_model.transform(test)

In [18]:
# this creates a table we can query with SQL
recomendations_train.createOrReplaceTempView("recomendations_train")

In [19]:
# use SQL query to demonstrate there are no nulls in our dataset
result = spark.sql("""SELECT count(*) 
                    FROM recomendations_train 
                    WHERE user is not null""")

result.show()

+--------+
|count(1)|
+--------+
|   79953|
+--------+



In [21]:
result = spark.sql("""SELECT distinct(prediction) 
                    FROM recomendations_train 
                    limit 10 """)

result.show()

+----------+
|prediction|
+----------+
|  3.963504|
|  3.440942|
|  3.197185|
| 3.8421497|
| 3.6884224|
| 4.0891385|
| 3.1011424|
| 3.6093917|
| 3.2182221|
| 3.6423123|
+----------+



In [23]:
result = spark.sql("""SELECT count(*) 
                    FROM recomendations_train 
                    WHERE prediction > 2
                    limit 10""")
result.show()

+--------+
|count(1)|
+--------+
|   76919|
+--------+

