In [1]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("SimpleApp_movies").getOrCreate()

In [3]:
lines=spark.read.text("./ml-latest-small/ratings.csv").rdd

In [11]:
parts = lines.map(lambda row: row.value.split(","))

In [16]:
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))

In [17]:
ratings = spark.createDataFrame(ratingsRDD)

In [18]:
ratings.printSchema()

root
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- userId: long (nullable = true)



In [19]:
ratings.show()

+-------+------+---------+------+
|movieId|rating|timestamp|userId|
+-------+------+---------+------+
|      1|   4.0|964982703|     1|
|      3|   4.0|964981247|     1|
|      6|   4.0|964982224|     1|
|     47|   5.0|964983815|     1|
|     50|   5.0|964982931|     1|
|     70|   3.0|964982400|     1|
|    101|   5.0|964980868|     1|
|    110|   4.0|964982176|     1|
|    151|   5.0|964984041|     1|
|    157|   5.0|964984100|     1|
|    163|   5.0|964983650|     1|
|    216|   5.0|964981208|     1|
|    223|   3.0|964980985|     1|
|    231|   5.0|964981179|     1|
|    235|   4.0|964980908|     1|
|    260|   5.0|964981680|     1|
|    296|   3.0|964982967|     1|
|    316|   3.0|964982310|     1|
|    333|   5.0|964981179|     1|
|    349|   4.0|964982563|     1|
+-------+------+---------+------+
only showing top 20 rows



In [20]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [22]:
model = als.fit(ratings)


In [24]:
userRecs=model.recommendForAllUsers(10)

In [26]:
userRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[4248, 9.54594],...|
|   463|[[6380, 6.7925863...|
|   496|[[5080, 10.06289]...|
|   148|[[72167, 7.054126...|
|   540|[[2239, 6.539907]...|
|   392|[[89118, 15.54864...|
|   243|[[48322, 8.790028...|
|    31|[[6380, 9.513541]...|
|   516|[[85367, 8.273881...|
|   580|[[54256, 6.555609...|
|   251|[[6461, 7.640572]...|
|   451|[[103341, 8.24939...|
|    85|[[4474, 8.76842],...|
|   137|[[535, 5.7677956]...|
|    65|[[2936, 6.2114697...|
|   458|[[3270, 8.891875]...|
|   481|[[2290, 8.551037]...|
|    53|[[2239, 7.867791]...|
|   255|[[4802, 12.387266...|
|   588|[[2290, 8.612237]...|
+------+--------------------+
only showing top 20 rows



In [27]:
itemRecs=model.recommendForAllItems(10)

In [28]:
itemRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[[258, 6.720557],...|
|   4900|[[413, 7.922308],...|
|   5300|[[461, 6.730769],...|
|   6620|[[81, 8.26186], [...|
|   7340|[[494, 7.8861723]...|
|  32460|[[173, 7.4233856]...|
|  54190|[[112, 9.180499],...|
|    471|[[461, 8.891054],...|
|   1591|[[138, 8.779334],...|
| 140541|[[259, 7.3094997]...|
|   1342|[[77, 6.755507], ...|
|   2122|[[112, 9.486969],...|
|   2142|[[531, 6.4050665]...|
|   7982|[[497, 9.596617],...|
|  44022|[[258, 6.184769],...|
| 141422|[[461, 6.0717373]...|
| 144522|[[35, 4.1572585],...|
|    833|[[394, 7.965689],...|
|   5803|[[392, 8.323678],...|
|   7833|[[259, 8.228318],...|
+-------+--------------------+
only showing top 20 rows



In [41]:
users = ratings.select(als.getUserCol()).distinct().limit(3)


'data'

In [87]:
tmp=spark.createDataFrame([463],"int").withColumnRenamed("value","userId")

In [88]:
tmp.show()

+------+
|userId|
+------+
|   463|
+------+



In [89]:
model.recommendForUserSubset(tmp,10)

DataFrame[userId: int, recommendations: array<struct<movieId:int,rating:float>>]