In [1]:
import findspark

In [2]:
findspark.init()

In [3]:
import pyspark
import random

## Caculate PI

In [4]:
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()

pi = 4 * count / num_samples
print(pi)

sc.stop()

3.14165584


## Recommendation System (ALS model) on PySpark

dataset: MovieLens 20M

https://www.kaggle.com/grouplens/movielens-20m-dataset
https://www.kaggle.com/jneupane12/movielens

In [5]:
! ls /data/datasets/movielens/

genome_scores.csv  genome_tags.csv  link.csv  movie.csv  rating.csv  tag.csv


In [6]:
! ls /data/datasets/movielens-lite/

links.csv  movies.csv  ratings.csv  README.txt	tags.csv


In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS

In [8]:
spark = (SparkSession
    .builder
    .appName("movielens-reco")
    .config("spark.executor.memory", "8g")
    .getOrCreate())

#### load data

In [9]:
ratings = spark.read.csv('/data/datasets/movielens-lite/ratings.csv', inferSchema=True, header=True)
movies = spark.read.csv('/data/datasets/movielens-lite/movies.csv', inferSchema=True, header=True)
ratings.join(movies, "movieId").show(3)

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|      2|     1|   3.5|1112486027|      Jumanji (1995)|Adventure|Childre...|
|     29|     1|   3.5|1112484676|City of Lost Chil...|Adventure|Drama|F...|
|     32|     1|   3.5|1112484819|Twelve Monkeys (a...|Mystery|Sci-Fi|Th...|
+-------+------+------+----------+--------------------+--------------------+
only showing top 3 rows



#### prepare data

In [10]:
data = ratings.select("userId", "movieId", "rating")
traindf, testdf = data.randomSplit([0.7, 0.3])
traindf = traindf.withColumnRenamed("rating", "label")
testdf = testdf.withColumnRenamed("rating", "trueLabel")
#calculate number of rows
train_rows = traindf.count()
test_rows = testdf.count()
print(f"training data rows: {train_rows}, testing data row: {test_rows}")

training data rows: 733493, testing data row: 315082


In [11]:
traindf.show(3)

+------+-------+-----+
|userId|movieId|label|
+------+-------+-----+
|     1|      2|  3.5|
|     1|     29|  3.5|
|     1|    151|  4.0|
+------+-------+-----+
only showing top 3 rows



In [12]:
traindf.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- label: double (nullable = true)



#### training model

In [13]:
als = ALS(maxIter=20,
          regParam=0.01,
          userCol="userId", 
          itemCol="movieId",
          ratingCol="label")
model = als.fit(traindf)

In [14]:
print(model.explainParams())

coldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: nan,drop. (default: nan)
itemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: movieId)
predictionCol: prediction column name (default: prediction)
userCol: column name for user ids. Ids must be within the integer value range. (default: user, current: userId)


In [15]:
model.rank

10

#### Predict

In [16]:
prediction = model.transform(testdf)

In [18]:
# prediction.show()
df = prediction.toPandas()

In [20]:
df.shape

(315082, 4)

In [42]:
from pyspark.sql.types import IntegerType

usersList = [1, 7120]
usersDF = spark.createDataFrame(usersList, IntegerType()).toDF('userId')

usersDF.take(3)
ur = model.recommendForUserSubset(usersDF, numItems=100)

In [43]:
ur

DataFrame[userId: int, recommendations: array<struct<movieId:int,rating:float>>]

In [39]:
df_u1 = ur.toPandas()
df_u1

Unnamed: 0,userId,recommendations
0,7120,"[(87234, 14.2313232421875), (58425, 13.8183975..."
1,1,"[(82240, 6.374285697937012), (6668, 5.92352819..."


In [40]:
df_u1.explode("recommendations")

Unnamed: 0,userId,recommendations
0,7120,"(87234, 14.2313232421875)"
0,7120,"(58425, 13.818397521972656)"
0,7120,"(93287, 12.97952938079834)"
0,7120,"(69685, 12.815133094787598)"
0,7120,"(4282, 12.567197799682617)"
...,...,...
1,1,"(4267, 4.086800575256348)"
1,1,"(760, 4.086605072021484)"
1,1,"(3044, 4.086340427398682)"
1,1,"(2061, 4.085627555847168)"


In [58]:
from pyspark.sql.types import IntegerType

usersDF = spark.createDataFrame([1,7120], IntegerType()).toDF('userId')
    
userRecs = model.recommendForUserSubset(usersDF, numItems=30000)

moviesList = userRecs.collect()[0].recommendations
moviesDF = spark.createDataFrame(moviesList)
moviesDF.show()

+-------+------------------+
|movieId|            rating|
+-------+------------------+
|  87234|  14.2313232421875|
|  58425|13.818397521972656|
|  93287| 12.97952938079834|
|  69685|12.815133094787598|
|   4282|12.567197799682617|
|   5172|11.966699600219727|
|   6485|11.771373748779297|
|   7456|11.678054809570312|
|  94018| 11.27645492553711|
|   4332|11.238056182861328|
|   2681| 11.05762004852295|
|   7259| 10.85806655883789|
|   5915|10.753786087036133|
|   5799|10.730061531066895|
|   3620|10.655664443969727|
|  67867|10.583354949951172|
|   4627|10.573225021362305|
|  73881|10.490897178649902|
|  67429| 10.48541259765625|
|   7792|10.435096740722656|
+-------+------------------+
only showing top 20 rows



In [59]:
df_u2 = moviesDF.toPandas()

In [60]:
df_r1 = spark.createDataFrame([ {'userId': r.userId, 'movieId': row.movieId, 'rating': row.rating} for r in userRecs.collect() for row in r.recommendations])



In [61]:
pdf_r1 = df_r1.toPandas()

In [73]:
pdf_r1.shape

(26256, 3)

In [76]:
pdf_r1[pdf_r1.rating>0].groupby('userId').size()

userId
1       13124
7120    12500
dtype: int64

In [64]:
pdf_r1.groupby('userId').size()

userId
1       13128
7120    13128
dtype: int64

In [70]:
tdf = traindf.toPandas()

In [72]:
tdf.movieId.unique().size

13128

In [57]:
movies.count()

27278

In [47]:
df_u2.shape

(1000, 2)

In [23]:
df.groupby('userId').size()

userId
1       58
2       15
3       48
4       10
5       22
        ..
7116    79
7117    46
7118    13
7119    15
7120     6
Length: 7120, dtype: int64

In [18]:
prediction.join(movies, "movieId").select(
    "userId", "title", "prediction", "trueLabel").filter("userId==1259").show(n=10, truncate=False)

+------+---------------------------------------------------+----------+---------+
|userId|title                                              |prediction|trueLabel|
+------+---------------------------------------------------+----------+---------+
|1259  |Jerky Boys, The (1995)                             |1.8968399 |1.0      |
|1259  |Shallow Grave (1994)                               |4.554488  |5.0      |
|1259  |Lord of Illusions (1995)                           |3.3361583 |2.0      |
|1259  |Jade (1995)                                        |4.228554  |1.0      |
|1259  |Bullets Over Broadway (1994)                       |4.8040743 |4.0      |
|1259  |City Slickers II: The Legend of Curly's Gold (1994)|2.852591  |1.0      |
|1259  |Outbreak (1995)                                    |3.6788158 |2.0      |
|1259  |Desperado (1995)                                   |3.933239  |2.0      |
|1259  |Little Odessa (1994)                               |3.4827013 |4.0      |
|1259  |Tombston

#### Evaluate

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="trueLabel", predictionCol="prediction", metricName="rmse")

cleanPrediction = prediction.dropna(how="any", subset=["prediction"])
rmse = evaluator.evaluate(cleanPrediction)
rmse

0.9041674530778843

In [20]:
spark.stop()