In [3]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 50.1MB/s eta 0:00:01
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 41.6MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Found existing installation: py4j 0.10.8.1
    Uninstalling py4j-0.10.8.1:
      Successfully uninstalled py4j-0.10.8.1
Successfully installed py4j-0.10.7 pyspark-2.4.4
Not

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[2] pyspark-shell"
os.environ['JAVA_HOME'] = "/usr/local/java/jdk1.8.0_221"

In [4]:
pip install pandas

Collecting pandas
[?25l  Downloading https://files.pythonhosted.org/packages/7e/ab/ea76361f9d3e732e114adcd801d2820d5319c23d0ac5482fa3b412db217e/pandas-0.25.1-cp37-cp37m-manylinux1_x86_64.whl (10.4MB)
[K     |████████████████████████████████| 10.4MB 3.4MB/s eta 0:00:01
[?25hCollecting numpy>=1.13.3 (from pandas)
[?25l  Downloading https://files.pythonhosted.org/packages/ba/e0/46e2f0540370f2661b044647fa447fef2ecbcc8f7cdb4329ca2feb03fb23/numpy-1.17.2-cp37-cp37m-manylinux1_x86_64.whl (20.3MB)
[K     |████████████████████████████████| 20.3MB 30.5MB/s eta 0:00:01
Collecting pytz>=2017.2 (from pandas)
[?25l  Downloading https://files.pythonhosted.org/packages/87/76/46d697698a143e05f77bec5a526bf4e56a0be61d63425b68f4ba553b51f2/pytz-2019.2-py2.py3-none-any.whl (508kB)
[K     |████████████████████████████████| 512kB 35.4MB/s eta 0:00:01
Installing collected packages: numpy, pytz, pandas
Successfully installed numpy-1.17.2 pandas-0.25.1 pytz-2019.2
Note: you may need to restart the kernel t

In [2]:
from __future__ import print_function

import sys
if sys.version >= '3':
    long = int

from pyspark.sql import SparkSession

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row


In [4]:
# pip install spark

In [3]:
spark = SparkSession\
        .builder\
        .appName("ALSExample")\
        .getOrCreate()


In [52]:
books = spark.read.text("file:///root/infs3208/data/ratings.csv").rdd

In [53]:
print(books.take(5))

[Row(value='book_id,user_id,rating'), Row(value='1,314,5'), Row(value='1,439,3'), Row(value='1,588,5'), Row(value='1,1169,4')]


In [54]:
partitions = books.map(lambda row: row.value.split(","))

In [55]:
print(partitions.take(5))

[['book_id', 'user_id', 'rating'], ['1', '314', '5'], ['1', '439', '3'], ['1', '588', '5'], ['1', '1169', '4']]


In [56]:
header = partitions.first()
print(header)
booksRDD = partitions.filter(lambda line: line != header)

['book_id', 'user_id', 'rating']


In [57]:
ratingsRDD = booksRDD.map(lambda p: Row(bookId= int(p[0]), userId= int(p[1]), rating=int(p[2])))

In [58]:
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

In [59]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="bookId", ratingCol="rating",
              coldStartStrategy="drop")

In [60]:
model = als.fit(training)

In [61]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


Root-mean-square error = 1.8124273077648978


In [65]:
# Generate top 10 book recommendations for each user
userRecs = model.recommendForAllUsers(10)

# Generate top 10 user recommendations for each book
bookRecs = model.recommendForAllItems(10)

In [68]:
userRecs.head()

Row(userId=148, recommendations=[Row(bookId=9479, rating=8.701570510864258), Row(bookId=9842, rating=8.336432456970215), Row(bookId=6027, rating=7.325491905212402), Row(bookId=2100, rating=6.825406551361084), Row(bookId=3235, rating=6.799763202667236), Row(bookId=5074, rating=6.651035308837891), Row(bookId=8233, rating=6.632462978363037), Row(bookId=8538, rating=6.495440483093262), Row(bookId=5626, rating=6.388382911682129), Row(bookId=8176, rating=6.352085590362549)])

In [69]:
bookRecs.head()

Row(bookId=1580, recommendations=[Row(userId=32928, rating=13.468367576599121), Row(userId=52088, rating=12.2013521194458), Row(userId=21048, rating=11.964018821716309), Row(userId=23756, rating=11.902909278869629), Row(userId=17314, rating=11.564733505249023), Row(userId=11520, rating=11.55067253112793), Row(userId=52750, rating=11.50261402130127), Row(userId=39000, rating=11.501254081726074), Row(userId=7539, rating=11.266420364379883), Row(userId=48973, rating=11.263007164001465)])

In [None]:
spark.stop()