## 1. Spark Session 만들기

In [1]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession 

In [2]:
# 세션 만들기
conf = pyspark.SparkConf().set('spark.driver.host', '127.0.0.1')
sc = pyspark.SparkContext(master='local', appName='recommender_system', conf = conf)
spark = SparkSession.builder.appName('recommender_system').getOrCreate()

## 2. 파일 로드

In [3]:
df = spark.read.csv('./movie_ratings_1m.csv', inferSchema = True, header = True)

df.limit(3).toPandas()

Unnamed: 0,userId,rating,movieId,title,genres
0,1,5,1193,One Flew Over the Cuckoo's Nest (1975),Drama
1,2,5,1193,One Flew Over the Cuckoo's Nest (1975),Drama
2,12,4,1193,One Flew Over the Cuckoo's Nest (1975),Drama


In [4]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



### String은 학습 되지 않음. Index로 변환

In [5]:
from pyspark.ml.feature import StringIndexer
# # string -> integer
stringIndexer = StringIndexer(inputCol = 'title', outputCol = 'title_new')

# # title에 적용
model = stringIndexer.fit(df)

# # 새로운 DataFrame w/ title_new
indexed = model.transform(df)

indexed.limit(5).toPandas()


# # 나중에 다시 String으로 바꿔주기
# movie_title = IndexToString(inputCol = "title_new", outputCol = "title", labels = model.labels)

Unnamed: 0,userId,rating,movieId,title,genres,title_new
0,1,5,1193,One Flew Over the Cuckoo's Nest (1975),Drama,43.0
1,2,5,1193,One Flew Over the Cuckoo's Nest (1975),Drama,43.0
2,12,4,1193,One Flew Over the Cuckoo's Nest (1975),Drama,43.0
3,15,4,1193,One Flew Over the Cuckoo's Nest (1975),Drama,43.0
4,17,5,1193,One Flew Over the Cuckoo's Nest (1975),Drama,43.0


## 3. 모델 학습 - ALS

In [6]:
train, test = df.randomSplit([0.8, 0.2])

# Alternating Least Squares 알고리즘 사용 (추천시스템)
from pyspark.ml.recommendation import ALS

als = ALS( maxIter = 10
                , regParam = 0.01
                , userCol = 'userId'
                , itemCol = 'movieId'
                , ratingCol = 'rating'
                , nonnegative = True
                , coldStartStrategy = 'drop')

model_rec = als.fit(train)

### 모델 튜닝, CrossValidation

In [None]:
# from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# # Model Tuning
# param_grid = ParamGridBuilder().addGrid(als.rank, [12, 13, 14])\
#                                .addGrid(als.maxIter, [18, 19, 20])\
#                                .addGrid(als.regParam, [.17, .18, .19])\
#                                .build()
# # CrossValidation 만들기
# cv = CrossValidator(estimator = als, estimatorParamMaps = param_grid, evaluator = re, numFolds = 3)

# # fitting
# model = cv.fit(train)
# # best tuning
# best_model = model.bestModel

# print(f"""***Best Model***
# Rank: {best_model.rank}
# MaxIter: {best_model._java_obj.parent().getMaxIter()}
# RegParam: {best_model._java_obj.parent().getRegParam()}""")

## 4. 모델 예측

In [7]:
# test set에 prediction
predict = model_rec.transform(test)

predict.limit(5).toPandas()

Unnamed: 0,userId,rating,movieId,title,genres,prediction
0,4169,3,148,"Awfully Big Adventure, An (1995)",Drama,2.515118
1,4227,2,148,"Awfully Big Adventure, An (1995)",Drama,1.179191
2,5333,3,148,"Awfully Big Adventure, An (1995)",Drama,1.961666
3,3184,4,148,"Awfully Big Adventure, An (1995)",Drama,2.068614
4,2383,2,148,"Awfully Big Adventure, An (1995)",Drama,3.490967


## 5. 모델 평가

In [8]:
# evaluate
from pyspark.ml.evaluation import RegressionEvaluator # RMSE로 평가

eva = RegressionEvaluator(metricName = 'rmse', predictionCol = 'prediction', labelCol = 'rating')
rmse = eva.evaluate(predict)

print(rmse) # RMSE error

0.8767460345292515


## 6. 모델 활용 - 영화 추천

In [9]:
model_rec.recommendForAllUsers(10).show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  1580|[{138, 7.2524557}...|
|  4900|[{526, 12.725696}...|
|  5300|[{138, 6.8905187}...|
|   471|[{729, 7.4960527}...|
|  1591|[{3303, 7.1923237...|
|  4101|[{1543, 8.326898}...|
|  1342|[{2129, 6.090068}...|
|  2122|[{2129, 7.3840494...|
|  2142|[{1743, 5.8823133...|
|   463|[{858, 4.875755},...|
|   833|[{1539, 8.99252},...|
|  5803|[{729, 9.003023},...|
|  3794|[{1038, 8.852898}...|
|  1645|[{526, 8.551011},...|
|  3175|[{2192, 5.6945477...|
|  4935|[{2063, 6.6553025...|
|   496|[{138, 10.358892}...|
|  2366|[{138, 10.632306}...|
|  2866|[{1930, 8.039445}...|
|  5156|[{687, 6.6893735}...|
+------+--------------------+
only showing top 20 rows



In [10]:
def get_recommendation(user_id, n):
    user_recommend = model_rec.recommendForAllUsers(n) # top n for all users
    recs = user_recommend.filter(user_recommend['userId'] == user_id)
    recs = recs.select('recommendations.movieId', 'recommendations.rating')
    movies = recs.select('movieId').toPandas().iloc[0, 0]
    ratings = recs.select('rating').toPandas().iloc[0, 0]
    ratings_matrix = pd.DataFrame(movies, columns = ['movieId'])
    ratings_matrix['ratings'] = ratings
    ratings_matrix = ratings_matrix.merge(df.toPandas()[['movieId', 'title', 'genres']], 
                                          on = 'movieId').drop_duplicates().reset_index(drop = True)
    ratings_matrix_ps = spark.createDataFrame(ratings_matrix)
    return ratings_matrix_ps.show(n, False)

In [11]:
get_recommendation(1580, 6)

+-------+-----------------+-----------------------------------------------------------------+--------------+
|movieId|ratings          |title                                                            |genres        |
+-------+-----------------+-----------------------------------------------------------------+--------------+
|138    |7.252455711364746|Neon Bible, The (1995)                                           |Drama         |
|687    |6.092929840087891|Country Life (1994)                                              |Drama|Romance |
|1664   |6.053616523742676|N�nette et Boni (1996)                                           |Drama         |
|2933   |6.028502464294434|Fire Within, The (Le Feu Follet) (1963)                          |Drama         |
|729    |5.852004051208496|Institute Benjamenta, or This Dream People Call Human Life (1995)|Drama         |
|2962   |5.806504249572754|Fever Pitch (1997)                                               |Comedy|Romance|
+-------+----------

In [12]:
spark.stop()