In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 71kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 34.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=d540481936db58c8425cf3d058cceb95cb83260c6f2f26e83502cb2ec7c380aa
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RankingEvaluator

spark = SparkSession.builder.master('local[*]').getOrCreate()

In [3]:
def download_dataset():
  print('Downloading movielens data...')
  from urllib.request import urlretrieve
  import zipfile

  url = 'http://files.grouplens.org/datasets/movielens/ml-100k.zip'
  dest_file = 'movielens.zip'

  urlretrieve(url, dest_file)
  zip_ref = zipfile.ZipFile(dest_file, 'r')
  zip_ref.extractall()
  print('Done. Dataset contains:')
  print(zip_ref.read('ml-100k/u.info'))


def read_ratings():
  print('Reading the ratings file...')
  ratings_schema = T.StructType(
      [T.StructField('user_id', T.DoubleType(), False),
       T.StructField('movie_id', T.DoubleType(), True),
       T.StructField('rating', T.DoubleType(), True),
       T.StructField('unix_timestamp', T.LongType(), True)]
       )

  ratings = (spark.read
             .load('ml-100k/u.data', format='csv', sep='\t',
                   header='false', schema=ratings_schema)
             .withColumn('unix_timestamp',
                         F.to_timestamp(F.col('unix_timestamp'))))

  print(f'Ingested {ratings.count()} ratings.')
  return ratings

In [4]:
download_dataset()

ratings = read_ratings()

Downloading movielens data...
Done. Dataset contains:
b'943 users\n1682 items\n100000 ratings\n'
Reading the ratings file...
Ingested 100000 ratings.


In [5]:
ratings.show()

+-------+--------+------+-------------------+
|user_id|movie_id|rating|     unix_timestamp|
+-------+--------+------+-------------------+
|  196.0|   242.0|   3.0|1997-12-04 15:55:49|
|  186.0|   302.0|   3.0|1998-04-04 19:22:22|
|   22.0|   377.0|   1.0|1997-11-07 07:18:36|
|  244.0|    51.0|   2.0|1997-11-27 05:02:03|
|  166.0|   346.0|   1.0|1998-02-02 05:33:16|
|  298.0|   474.0|   4.0|1998-01-07 14:20:06|
|  115.0|   265.0|   2.0|1997-12-03 17:51:28|
|  253.0|   465.0|   5.0|1998-04-03 18:34:27|
|  305.0|   451.0|   3.0|1998-02-01 09:20:17|
|    6.0|    86.0|   3.0|1997-12-31 21:16:53|
|   62.0|   257.0|   2.0|1997-11-12 22:07:14|
|  286.0|  1014.0|   5.0|1997-11-17 15:38:45|
|  200.0|   222.0|   5.0|1997-10-05 09:05:40|
|  210.0|    40.0|   3.0|1998-03-27 21:59:54|
|  224.0|    29.0|   3.0|1998-02-21 23:40:57|
|  303.0|   785.0|   3.0|1997-11-14 05:28:38|
|  122.0|   387.0|   5.0|1997-11-11 17:47:39|
|  194.0|   274.0|   2.0|1997-11-14 20:36:34|
|  291.0|  1042.0|   4.0|1997-09-2

In [6]:
train, test = ratings.randomSplit(weights=[0.8, 0.2], seed=42)

In [7]:
model = ALS(userCol='user_id',
            itemCol='movie_id',
            ratingCol='rating').fit(train)

In [8]:
k = 3
test_recomm = model.recommendForUserSubset(dataset=test, numItems=k)
test_recomm = (test_recomm
               .withColumn('recommended_movies',
                           F.col('recommendations')
                           .movie_id.cast(T.ArrayType(T.DoubleType()))))

test_recomm.show(truncate=False)

+-------+--------------------------------------------------------+------------------------+
|user_id|recommendations                                         |recommended_movies      |
+-------+--------------------------------------------------------+------------------------+
|471    |[{342, 4.8859982}, {1605, 4.8586993}, {349, 4.8451447}] |[342.0, 1605.0, 349.0]  |
|463    |[{1449, 4.3137207}, {318, 4.171251}, {1344, 4.1497726}] |[1449.0, 318.0, 1344.0] |
|833    |[{1368, 4.777424}, {1597, 4.737522}, {1643, 4.4611235}] |[1368.0, 1597.0, 1643.0]|
|496    |[{253, 4.6821923}, {1022, 4.625275}, {1449, 4.481355}]  |[253.0, 1022.0, 1449.0] |
|148    |[{1129, 5.6085124}, {919, 4.9986076}, {408, 4.943635}]  |[1129.0, 919.0, 408.0]  |
|540    |[{1449, 5.018511}, {1643, 4.7956796}, {1398, 4.7175856}]|[1449.0, 1643.0, 1398.0]|
|392    |[{1643, 5.790415}, {1398, 5.0850043}, {119, 5.034845}]  |[1643.0, 1398.0, 119.0] |
|243    |[{1449, 4.7110043}, {1512, 4.432862}, {1398, 4.387753}] |[1449.0, 1512.

In [9]:
test_pivot = (test
              .orderBy('rating', ascending=False)
              .withColumn('id_ratings', F.create_map('movie_id', 'rating'))
              .groupBy('user_id').agg(
                  F.collect_list('movie_id').alias('movie_list'),
                  F.collect_list('rating').alias('ratings_list'),
                  F.collect_list('id_ratings').alias('id_ratings_list')))

In [21]:
eval_set = (test_recomm
            .join(test_pivot.select(F.col('user_id'),
                                    F.slice('movie_list', start=1, length=3)
                                    .alias('movie_list')),
                  on='user_id', how='inner'))

In [22]:
eval_set.show()

+-------+--------------------+--------------------+--------------------+
|user_id|     recommendations|  recommended_movies|          movie_list|
+-------+--------------------+--------------------+--------------------+
|    471|[{342, 4.8859982}...|[342.0, 1605.0, 3...|[393.0, 477.0, 93...|
|    463|[{1449, 4.3137207...|[1449.0, 318.0, 1...|[124.0, 301.0, 50.0]|
|    833|[{1368, 4.777424}...|[1368.0, 1597.0, ...| [11.0, 47.0, 168.0]|
|    496|[{253, 4.6821923}...|[253.0, 1022.0, 1...|[133.0, 181.0, 43...|
|    148|[{1129, 5.6085124...|[1129.0, 919.0, 4...| [56.0, 71.0, 133.0]|
|    540|[{1449, 5.018511}...|[1449.0, 1643.0, ...|[515.0, 13.0, 222.0]|
|    392|[{1643, 5.790415}...|[1643.0, 1398.0, ...|[50.0, 172.0, 302.0]|
|    243|[{1449, 4.7110043...|[1449.0, 1512.0, ...|[157.0, 285.0, 12...|
|    623|[{1233, 4.60893},...|[1233.0, 1194.0, ...|[210.0, 483.0, 66.0]|
|    737|[{856, 4.9471164}...|[856.0, 1449.0, 5...|[127.0, 192.0, 47...|
|    897|[{313, 4.9228754}...|[313.0, 963.0, 11...|

In [23]:
eval = RankingEvaluator(predictionCol='recommended_movies',
                        labelCol='movie_list',
                        metricName='meanAveragePrecision', k=k)

Precision @ K: $p(k)=\frac{1}{M}\sum_{i=0}^{M-1}\frac{1}{k}\sum_{j=0}^{min(Q_i, k)^-1}rel_{D_i}(R_i(j))$

In [24]:
eval.evaluate(eval_set)

0.005478967833156589