In [2]:
from pyspark.ml.recommendation import ALS
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator


# Create a SparkSession
spark = SparkSession.builder \
    .appName("TextFileReader") \
    .getOrCreate()
lines = spark.read.text("ratings.csv").rdd
parts = lines.map(lambda row: row.value.split(","))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/02 13:10:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
data = ratings.collect()
data

                                                                                

[Row(userId=1, movieId=1, rating=4.0, timestamp=964982703),
 Row(userId=1, movieId=3, rating=4.0, timestamp=964981247),
 Row(userId=1, movieId=6, rating=4.0, timestamp=964982224),
 Row(userId=1, movieId=47, rating=5.0, timestamp=964983815),
 Row(userId=1, movieId=50, rating=5.0, timestamp=964982931),
 Row(userId=1, movieId=70, rating=3.0, timestamp=964982400),
 Row(userId=1, movieId=101, rating=5.0, timestamp=964980868),
 Row(userId=1, movieId=110, rating=4.0, timestamp=964982176),
 Row(userId=1, movieId=151, rating=5.0, timestamp=964984041),
 Row(userId=1, movieId=157, rating=5.0, timestamp=964984100),
 Row(userId=1, movieId=163, rating=5.0, timestamp=964983650),
 Row(userId=1, movieId=216, rating=5.0, timestamp=964981208),
 Row(userId=1, movieId=223, rating=3.0, timestamp=964980985),
 Row(userId=1, movieId=231, rating=5.0, timestamp=964981179),
 Row(userId=1, movieId=235, rating=4.0, timestamp=964980908),
 Row(userId=1, movieId=260, rating=5.0, timestamp=964981680),
 Row(userId=1, mo

In [31]:
(training, test) = ratings.randomSplit([0.8, 0.2])
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=10, rank=4, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(training)

                                                                                

In [28]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

[Stage 827:>                                                        (0 + 1) / 1]

Root-mean-square error = 1.01613214712455


                                                                                

In [30]:
predictions.collect()

                                                                                

[Row(userId=148, movieId=356, rating=4.0, timestamp=1482548476, prediction=3.7310914993286133),
 Row(userId=148, movieId=1197, rating=3.0, timestamp=1482548478, prediction=4.102832317352295),
 Row(userId=148, movieId=6377, rating=3.0, timestamp=1482548514, prediction=3.7924420833587646),
 Row(userId=148, movieId=30816, rating=5.0, timestamp=1482548570, prediction=3.3213391304016113),
 Row(userId=148, movieId=69844, rating=4.0, timestamp=1482548500, prediction=3.7151927947998047),
 Row(userId=148, movieId=99149, rating=3.0, timestamp=1482548617, prediction=3.5743420124053955),
 Row(userId=148, movieId=110102, rating=4.0, timestamp=1482548669, prediction=4.206722736358643),
 Row(userId=148, movieId=112852, rating=3.5, timestamp=1482548700, prediction=4.318251132965088),
 Row(userId=148, movieId=122882, rating=4.0, timestamp=1482548529, prediction=3.9765498638153076),
 Row(userId=148, movieId=122920, rating=3.5, timestamp=1482548707, prediction=4.65450382232666),
 Row(userId=148, movieId=

In [7]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

In [8]:
print(userRecs.collect()[0])



Row(userId=1, recommendations=[Row(movieId=1354, rating=6.579340934753418), Row(movieId=215, rating=6.541501522064209), Row(movieId=33649, rating=6.421547889709473), Row(movieId=167746, rating=6.292983055114746), Row(movieId=2732, rating=6.137629508972168), Row(movieId=1250, rating=6.129623889923096), Row(movieId=1733, rating=6.118701457977295), Row(movieId=2239, rating=6.093202114105225), Row(movieId=6385, rating=6.059037685394287), Row(movieId=1244, rating=6.027035713195801)])


[Stage 99:>                                                         (0 + 1) / 1]                                                                                

In [30]:
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

In [9]:
user_features = model.userFactors.collect()

In [13]:
user_features[1]

Row(id=20, features=[0.06796813011169434, -0.8614345192909241, 0.348619669675827, 0.4708808958530426, -0.5011972188949585, -0.07230988889932632, 0.11925026774406433, -0.5673407316207886, 0.520450234413147, -0.010830006562173367])

In [11]:
item_features = model.itemFactors.collect()

In [14]:
item_features[1]

Row(id=20, features=[0.1491539627313614, -0.4275745749473572, -0.49865686893463135, 2.7166950702667236, 0.5046796202659607, 1.2561553716659546, -1.1429063081741333, -0.48492076992988586, 1.1132147312164307, 0.8884957432746887])

In [15]:
user_features

[Row(id=10, features=[0.15448984503746033, -0.07973533868789673, 0.9632895588874817, 0.692747950553894, -0.32261309027671814, 0.31909653544425964, -0.1788538098335266, -0.23124602437019348, 0.13509352505207062, -0.3632819354534149]),
 Row(id=20, features=[0.06796813011169434, -0.8614345192909241, 0.348619669675827, 0.4708808958530426, -0.5011972188949585, -0.07230988889932632, 0.11925026774406433, -0.5673407316207886, 0.520450234413147, -0.010830006562173367]),
 Row(id=30, features=[-0.128927543759346, -0.4491872489452362, -0.023271214216947556, 1.0105412006378174, -0.03659968450665474, 0.011854132637381554, -0.784808337688446, -0.004157557617872953, 0.5019543766975403, -0.023662690073251724]),
 Row(id=40, features=[0.8628284335136414, 0.15035833418369293, 0.6276306509971619, 0.3462124168872833, -0.23873145878314972, 0.4462171494960785, 0.2900751233100891, 0.7158995866775513, 0.6382717490196228, -0.0005715176812373102]),
 Row(id=50, features=[0.3075927793979645, -0.15727593004703522, -

In [17]:
model.

'userId'