## Building Recommendation Engines with PySpark
PySpark를 활용한 추천 엔진 개발

### Import
---
필요한 라이브러리 Import

In [None]:
# !pip install pyspark findspark wget

In [None]:
import findspark
import pyspark
from pyspark.sql.functions import min, max, avg, col
from pyspark.sql import SparkSession

findspark.init()
spark = SparkSession.builder.appName('MovieLens').getOrCreate()

### Download Dataset
---
데이터셋 다운로드 받기

In [None]:
import wget, os

if not os.path.exists("./ml-20m.zip"):
    url = "https://files.grouplens.org/datasets/movielens/ml-20m.zip"
    wget.download(url)
else:
    print(".zip file already exists.")

파일 압축 풀기

In [None]:
import zipfile

if not os.path.exists("./ml-20m"):
    with zipfile.ZipFile("ml-20m.zip", 'r') as zip_ref:
        zip_ref.extractall(".")
else:
    print(".zip file already extracted.")

### Load Dataset
---
CSV 파일 로드하기

In [None]:
ratings = spark.read.option("header", "true").csv("ml-20m/ratings.csv")
movies = spark.read.option("header", "true").csv("ml-20m/movies.csv")

### 테이블 스키마

In [None]:
print("Schema of ratings table:")
ratings.printSchema()
ratings.show(5)

print("Schema of movies table:")
movies.printSchema()
movies.show(5)

### Data Preprocessing


Data sparsity 계산하기
- User - Movie 행렬이 있을 때, 전체 원소 중 $(u, m)$ element가 empty인 비율
- $\text{sparsity} = \frac{\text{Num\_ratings}}{\text{Num\_users}\times\text{Num\_movies}}$

In [None]:
# numerator = number of ratings
numerator = ratings.count()

# denominator = number of users * number of movies
num_users = ratings.select("userId").distinct().count()
num_movies = ratings.select("movieId").distinct().count()

print(f"""
Number of users: {num_users},
Number of movies: {num_movies}
""")

denominator = num_users * num_movies

sparsity = 1 - numerator * 1. / denominator
print(f"Sparsity: {sparsity}")

### GroupBy와 Aggregate function

In [None]:
# ## User 별 평점 부여 횟수
# ratings.groupBy("userId").count().show()
# ## User 중 가장 적은 평점 부여 횟수
# ratings.groupBy("userId").count().select(min("count")).show()
# ## User 중 가장 많은 평점 부여 횟수
# ratings.groupBy("userId").count().select(max("count")).show()
# ## User 당 평균 평점 부여 횟수
# ratings.groupBy("userId").count().select(avg("count")).show()

## 평점 부여 횟수가 20회 이상인 User들의 userId와 count
# ratings.groupBy("userId").count().filter(col("count")>=20).show()

In [None]:
movie_ratings = ratings.join(movies, "movieId", "left").select(["movieId", "userId", "rating"])

movie_ratings = movie_ratings.select(movie_ratings.userId.cast("integer"),\
    ratings.movieId.cast("integer"),\
    ratings.rating.cast("double"))
(training_data, test_data) = movie_ratings.randomSplit([.8, .2])

In [None]:
from pyspark.ml.recommendation import ALS

als = ALS(userCol="userId", 
            itemCol="movieId", 
            ratingCol="rating", 
            coldStartStrategy="drop", 
            nonnegative=True, 
            implicitPrefs=False)

In [None]:
ranks = [5, 40, 80, 120]
maxIters = [5, 100, 250, 500]
regParams = [.05, .1, 1.5]

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
param_grid = ParamGridBuilder()\
    .addGrid(als.rank, ranks)\
    .addGrid(als.maxIter, maxIters)\
    .addGrid(als.regParam, regParams)\
    .build()

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

cv = CrossValidator(estimator = als,
    estimatorParamMaps= param_grid,
    evaluator=evaluator,
    numFolds=5)

model = cv.fit(training_data)

best_model = model.bestModel

In [None]:
predictions = best_model.transform(test_data)
rmse = evaluator.evaluate(predictions)

print(f"""
    ***Best Model***
    RMSE = {rmse}
    Rank: {best_model.rank}
    MaxIter: {best_model._java_obj.parent().getMaxIter()}
    RegParam: {best_model._java_obj.parent().getRegParam()}
""")