In [71]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel

In [72]:
spark = (pyspark.sql.SparkSession.builder 
  .master("local[*]")
  .getOrCreate())

In [73]:
!head ratings.json

{"user_id": 6040, "movie_id": 858, "rating": 4, "timestamp": 956678732.0}
{"user_id": 6040, "movie_id": 2384, "rating": 4, "timestamp": 956678754.0}
{"user_id": 6040, "movie_id": 593, "rating": 5, "timestamp": 956678754.0}
{"user_id": 6040, "movie_id": 1961, "rating": 4, "timestamp": 956678777.0}
{"user_id": 6040, "movie_id": 1419, "rating": 3, "timestamp": 956678856.0}
{"user_id": 6040, "movie_id": 213, "rating": 5, "timestamp": 956678856.0}
{"user_id": 6040, "movie_id": 3111, "rating": 5, "timestamp": 956678856.0}
{"user_id": 6040, "movie_id": 573, "rating": 4, "timestamp": 956678856.0}
{"user_id": 6040, "movie_id": 3505, "rating": 4, "timestamp": 956678856.0}
{"user_id": 6040, "movie_id": 1734, "rating": 2, "timestamp": 956678881.0}


In [74]:
movie_ratings.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- rating: long (nullable = true)
 |-- timestamp: double (nullable = true)
 |-- user_id: long (nullable = true)



In [75]:
from pyspark.sql.types import (
    ArrayType,
    AtomicType,
    BinaryType,
    BooleanType,
    ByteType,
    CloudPickleSerializer,
    DataType,
    DataTypeSingleton,
    DateConverter,
    DateType,
    DatetimeConverter,
    DecimalType,
    DoubleType,
    FloatType,
    FractionalType,
    IntegerType,
    IntegralType,
    JavaClass,
    LongType,
    MapType,
    NullType,
    NumericType,
    Row,
    ShortType,
    SparkContext,
    StringType,
    StructField,
    StructType,
    TimestampType,
    UserDefinedType,
)

In [76]:
schema = StructType(
    [
        StructField('user_id', IntegerType()),
        StructField('movie_id', IntegerType()),
        StructField('rating', FloatType()),
        StructField('timestamp', LongType()),
    ]
)

In [77]:
movie_ratings = spark.read.json('ratings.json')

In [78]:
movie_ratings.show()

+--------+------+------------+-------+
|movie_id|rating|   timestamp|user_id|
+--------+------+------------+-------+
|     858|     4|9.56678732E8|   6040|
|    2384|     4|9.56678754E8|   6040|
|     593|     5|9.56678754E8|   6040|
|    1961|     4|9.56678777E8|   6040|
|    1419|     3|9.56678856E8|   6040|
|     213|     5|9.56678856E8|   6040|
|    3111|     5|9.56678856E8|   6040|
|     573|     4|9.56678856E8|   6040|
|    3505|     4|9.56678856E8|   6040|
|    1734|     2|9.56678881E8|   6040|
|    2503|     5|9.56678991E8|   6040|
|     919|     5|9.56678991E8|   6040|
|     912|     5|9.56678991E8|   6040|
|     527|     5|9.56679019E8|   6040|
|    1252|     5|9.56679057E8|   6040|
|     649|     5|9.56679057E8|   6040|
|     318|     4|9.56679057E8|   6040|
|    3289|     5|9.56679105E8|   6040|
|     608|     4|9.56679275E8|   6040|
|    2396|     3|9.56679275E8|   6040|
+--------+------+------------+-------+
only showing top 20 rows



In [80]:
movie_ratings.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: double, user_id: bigint]

In [81]:
(trainingdata, testdata) = movie_ratings.randomSplit([0.7, 0.3], seed = 10)

In [83]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel

als = ALS(
    rank=10,
    maxIter=10,
    userCol='user_id',
    itemCol='movie_id',
    ratingCol='rating',
)

In [84]:
als_model = als.fit(trainingdata)

In [85]:
ALSModel.transform

<function pyspark.ml.base.Transformer.transform(self, dataset, params=None)>

In [86]:
predictions = als_model.transform(testdata)

In [87]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='rating')
evaluator.evaluate(predictions, {evaluator.metricName: 'rmse'})

nan

In [88]:
predictions.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: double, user_id: bigint, prediction: float]

In [93]:
movie_ratings.show(1)

+--------+------+------------+-------+
|movie_id|rating|   timestamp|user_id|
+--------+------+------------+-------+
|     858|     4|9.56678732E8|   6040|
+--------+------+------------+-------+
only showing top 1 row



In [89]:
predictions.show(1)

+--------+------+------------+-------+----------+
|movie_id|rating|   timestamp|user_id|prediction|
+--------+------+------------+-------+----------+
|     148|     5|9.75592024E8|    673| 2.1567326|
+--------+------+------------+-------+----------+
only showing top 1 row



In [90]:
user_factors = als_model.userFactors

In [91]:
user_factors.limit(1).toPandas()

Unnamed: 0,id,features
0,640,"[-1.9456915855407715, -0.11777438223361969, -0..."


In [92]:
item_factors = als_model.itemFactors