In [77]:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StringType, FloatType, IntegerType

In [7]:
spark = SparkSession \
        .builder \
        .master('local[1]') \
        .appName('MovieLensMF') \
        .config('spark.executor.memory', '2g') \
        .config('spark.executor.instances', '2') \
        .getOrCreate()

## 데이터 로드

In [8]:
path = '/Users/yeomyungro/Documents/github/recommendation/data/ml-latest-small'
os.listdir(path)

['links.csv', 'tags.csv', 'ratings.csv', 'README.txt', 'movies.csv']

In [78]:
schema = StructType() \
      .add("userId",IntegerType(),True) \
      .add("movieId",IntegerType(),True) \
      .add("rating",FloatType(),True) \
      .add("timestamp",StringType(),True)

In [79]:
ratings = spark.read.option('header', True) \
    .format('csv') \
    .schema(schema) \
    .load(path+'/ratings.csv')

In [80]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: string (nullable = true)



In [81]:
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [41]:
ratings.count()

100836

In [42]:
ratings.agg(f.countDistinct('userId')).show()

+-------------+
|count(userId)|
+-------------+
|          610|
+-------------+



In [43]:
ratings.agg(f.countDistinct('movieId')).show()

+--------------+
|count(movieId)|
+--------------+
|          9724|
+--------------+



## Sparsity 확인

In [44]:
def get_mat_sparsity(ratings):
    # Count the total number of ratings in the dataset
    count_nonzero = ratings.select("rating").count()

    # Count the number of distinct userIds and distinct movieIds
    total_elements = ratings.select("userId").distinct().count() * ratings.select("movieId").distinct().count()

    # Divide the numerator by the denominator
    sparsity = (1.0 - (count_nonzero *1.0)/total_elements)*100
    print("The ratings dataframe is ", "%.2f" % sparsity + "% sparse.")
    
get_mat_sparsity(ratings)

The ratings dataframe is  98.30% sparse.


## Train, Test 데이터 나누기

In [82]:
(train, test) = ratings.randomSplit([0.8, 0.2], seed=1990)

In [83]:
train.count()

80480

In [84]:
test.count()

20356

## Implict 데이터 만들기
영화 점수를 1(시청)/0(미시청)으로 변환

In [85]:
ratings = ratings.withColumn('isWatch', f.lit(1))
ratings.show(5)

+------+-------+------+---------+-------+
|userId|movieId|rating|timestamp|isWatch|
+------+-------+------+---------+-------+
|     1|      1|   4.0|964982703|      1|
|     1|      3|   4.0|964981247|      1|
|     1|      6|   4.0|964982224|      1|
|     1|     47|   5.0|964983815|      1|
|     1|     50|   5.0|964982931|      1|
+------+-------+------+---------+-------+
only showing top 5 rows



In [86]:
userId_df = ratings.select('userId').distinct()
movieId_df = ratings.select('movieId').distinct()

In [87]:
user_movie_df = userId_df.crossJoin(movieId_df).join(ratings, ['userId', 'movieId'], 'left') \
    .select(['userId', 'movieId', 'isWatch', 'rating']).fillna(0)

In [88]:
user_movie_df.show(5)

+------+-------+-------+------+
|userId|movieId|isWatch|rating|
+------+-------+-------+------+
|   148|   1580|      0|   0.0|
|   148|   2366|      0|   0.0|
|   148|   3175|      0|   0.0|
|   148|   1088|      0|   0.0|
|   148|  32460|      0|   0.0|
+------+-------+-------+------+
only showing top 5 rows



In [89]:
user_movie_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- isWatch: integer (nullable = true)
 |-- rating: float (nullable = false)



## 학습

In [72]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

* nonnegative : rating 값이 0 이상인지 여부
* implicitPrefs : Explicit or Implicit
* coldStartStrategy : user/item factor가 없는 경우

In [94]:
als = ALS(
    userCol='userId', 
    itemCol='movieId', 
    ratingCol='rating', 
    nonnegative=True,
    implicitPrefs=True,
    coldStartStrategy='drop')

하이퍼파라미터 grid search

In [95]:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

평가 지표

In [96]:
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


cross validation 세팅

In [97]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

학습

In [98]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

In [99]:
#Extract best model from the cv model above
best_model = model.bestModel
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

3.223215614199257


In [100]:
print("**Best Model**")
# Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())
# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

**Best Model**
  Rank: 10
  MaxIter: 10
  RegParam: 0.01


## 추천(예측)

In [101]:
# Generate n Recommendations for all users
recommendations = best_model.recommendForAllUsers(5)
recommendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[{2571, 0.6160514...|
|   463|[{2571, 0.5005672...|
|   496|[{2571, 0.3983799...|
|   148|[{68954, 0.563131...|
|   540|[{2571, 0.6567966...|
|   392|[{2571, 0.3298897...|
|   243|[{590, 0.68851244...|
|    31|[{356, 0.5901356}...|
|   516|[{260, 0.24742907...|
|   580|[{2959, 1.148741}...|
|   251|[{318, 0.76826394...|
|   451|[{592, 0.6089305}...|
|    85|[{904, 0.22922753...|
|   137|[{318, 1.0380927}...|
|    65|[{2571, 0.5823167...|
|   458|[{150, 0.92451924...|
|   481|[{2916, 0.2085701...|
|    53|[{440, 0.11010618...|
|   255|[{260, 0.2168643}...|
|   588|[{318, 0.86326796...|
+------+--------------------+
only showing top 20 rows



In [105]:
recommendations = recommendations\
    .withColumn("rec_exp", f.explode("recommendations"))\
    .select('userId', f.col("rec_exp.movieId"), f.col("rec_exp.rating"))
recommendations.limit(10).show()

+------+-------+----------+
|userId|movieId|    rating|
+------+-------+----------+
|   471|   2571| 0.6160514|
|   471|    318| 0.5997103|
|   471|   2959| 0.5990828|
|   471|   7153| 0.5593015|
|   471|   4993|0.55717885|
|   463|   2571|0.50056726|
|   463|    318|0.49084532|
|   463|    296| 0.4677875|
|   463|   2959|0.46737164|
|   463|    356|0.45888856|
+------+-------+----------+

