### Import

In [None]:
#conda install -c conda-forge pyspark #conda pyspark 설치
#기타 요청되는 패키지 참고 : https://spark.apache.org/docs/latest/api/python/getting_started/install.html

In [1]:
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext
from surprise import Dataset

### Init pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
sc = SparkContext
# sc.setCheckpointDir('checkpoint') -- Dir 경로 설정
spark = SparkSession.builder.appName('Recommendations').getOrCreate()

### Load data

In [3]:
#https://grouplens.org/datasets/movielens/ 데이터 출처
#추후에 우리 데이터에 적용
movies = spark.read.csv("movies.csv",header=True)
#ratings = Dataset.load_builtin('ml-100k') #surprise 데이터셋
ratings = spark.read.csv("rating.csv",header=True)
#ratings = spark.read.csv("ratings.csv",header=True) #샘플링 데이터 OOSOO

In [4]:
type(ratings)

pyspark.sql.dataframe.DataFrame

In [5]:
#surprise 데이터셋 변환
#pd_data = pd.DataFrame(ratings.__dict__['raw_ratings'], columns=['user_id','movie_id','rating','timestamp'])
#pd_data.rename(columns = {'user_id':'userId', 'movie_id':'movieId'}, inplace = True)
#pd_data

#ratings = spark.createDataFrame(pd_data)


In [6]:
ratings=ratings.drop(ratings[0])
ratings.show()

+-------+----------+------------------+
|user_id|content_id|            rating|
+-------+----------+------------------+
|      0|  m_286217|  8.35652661686786|
|      0|  m_398181| 6.065895390403058|
|      0|    m_1372| 7.785727443497953|
|      0|  m_127585| 7.603591877875605|
|      0|  m_363676| 6.663915179160493|
|      0|     m_640| 7.554297212111876|
|      0|  m_374720| 7.291435186859245|
|      0|   m_16869| 8.561720428183207|
|      0|   m_24428|  6.46182288139771|
|      0|  m_284054| 6.714157000059457|
|      0|   t_79242| 7.783442736814031|
|      0|   t_75006| 8.982251404112121|
|      0|     m_707|  5.48429382439606|
|      0|  m_293863|7.5892713766171855|
|      0|    m_1927| 5.603114486722044|
|      0|  m_259693| 6.644639058769108|
|      0|     m_451| 8.072781617158265|
|      0|  m_376570| 6.948982726753748|
|      0|   m_13939| 7.224361006669324|
|      0|  m_353486| 6.035672200020115|
+-------+----------+------------------+
only showing top 20 rows



In [7]:
ratings = ratings.select(
    col('user_id').alias('userId'), 
    col('content_id').alias('movieId'),
    col('rating')
)

In [8]:
ratings = ratings.\
    withColumn('userId', col('userId').cast('integer')).\
    withColumn('movieId', col('movieId').cast('string')).\
    withColumn('rating', col('rating').cast('float'))
ratings.show()

+------+--------+---------+
|userId| movieId|   rating|
+------+--------+---------+
|     0|m_286217| 8.356526|
|     0|m_398181|6.0658956|
|     0|  m_1372|7.7857275|
|     0|m_127585| 7.603592|
|     0|m_363676| 6.663915|
|     0|   m_640|7.5542974|
|     0|m_374720|7.2914352|
|     0| m_16869| 8.561721|
|     0| m_24428| 6.461823|
|     0|m_284054| 6.714157|
|     0| t_79242| 7.783443|
|     0| t_75006| 8.982251|
|     0|   m_707| 5.484294|
|     0|m_293863|7.5892715|
|     0|  m_1927|5.6031146|
|     0|m_259693| 6.644639|
|     0|   m_451| 8.072782|
|     0|m_376570|6.9489827|
|     0| m_13939| 7.224361|
|     0|m_353486| 6.035672|
+------+--------+---------+
only showing top 20 rows



In [9]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: float (nullable = true)



### Calculate sparsity

In [10]:
# rating 데이터 셋의 개수
numerator = ratings.select("rating").count()

# 유저의 수와 영화의 수
num_users = ratings.select("userId").distinct().count()
num_movies = ratings.select("movieId").distinct().count()

print("Rate Data Count : %d" % numerator)
print("User Data Count : %d" % num_users)
print("Movie Data Count : %d" % num_movies)

# 전체 발생 가능 데이터의 개수
denominator = num_users * num_movies

# 전체 발생 데이터 대비 현재 데이터 비율 계산
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

Rate Data Count : 100000
User Data Count : 100
Movie Data Count : 9898
The ratings dataframe is  89.90% empty.


### Interpret ratings

In [11]:
# 유저의 작성 리뷰 수
userId_ratings = ratings.groupBy("userId").count().orderBy('count', ascending=False)
userId_ratings.show()

+------+-----+
|userId|count|
+------+-----+
|    31| 1000|
|    85| 1000|
|    65| 1000|
|    53| 1000|
|    78| 1000|
|    34| 1000|
|    81| 1000|
|    28| 1000|
|    76| 1000|
|    26| 1000|
|    27| 1000|
|    44| 1000|
|    12| 1000|
|    91| 1000|
|    22| 1000|
|    93| 1000|
|    47| 1000|
|     1| 1000|
|    52| 1000|
|    13| 1000|
+------+-----+
only showing top 20 rows



In [12]:
# 영화의 작성 리뷰 수
movieId_ratings = ratings.groupBy("movieId").count().orderBy('count', ascending=False)
movieId_ratings.show()

+--------+-----+
| movieId|count|
+--------+-----+
| m_27205|  237|
|m_157336|  210|
|m_293660|  210|
|m_299536|  202|
| m_24428|  194|
|   m_155|  193|
| m_19995|  184|
|    m_13|  177|
|   m_550|  170|
| m_76341|  163|
|   m_680|  160|
|m_118340|  159|
|m_475557|  156|
| m_68718|  155|
|   m_597|  155|
|  m_1726|  155|
|   m_671|  154|
|   m_603|  151|
| m_10138|  150|
|m_135397|  149|
+--------+-----+
only showing top 20 rows



In [13]:
movie_indexer = StringIndexer(inputCol="movieId", outputCol="movieId_index")

indexer_model = movie_indexer.fit(ratings)
ratings = indexer_model.transform(ratings)
ratings.show()

+------+--------+---------+-------------+
|userId| movieId|   rating|movieId_index|
+------+--------+---------+-------------+
|     0|m_286217| 8.356526|         55.0|
|     0|m_398181|6.0658956|       1453.0|
|     0|  m_1372|7.7857275|        448.0|
|     0|m_127585| 7.603592|        145.0|
|     0|m_363676| 6.663915|        500.0|
|     0|   m_640|7.5542974|        132.0|
|     0|m_374720|7.2914352|         73.0|
|     0| m_16869| 8.561721|         67.0|
|     0| m_24428| 6.461823|          4.0|
|     0|m_284054| 6.714157|         51.0|
|     0| t_79242| 7.783443|       1579.0|
|     0| t_75006| 8.982251|        373.0|
|     0|   m_707| 5.484294|       1401.0|
|     0|m_293863|7.5892715|        513.0|
|     0|  m_1927|5.6031146|        511.0|
|     0|m_259693| 6.644639|        381.0|
|     0|   m_451| 8.072782|       2219.0|
|     0|m_376570|6.9489827|        944.0|
|     0| m_13939| 7.224361|       3571.0|
|     0|m_353486| 6.035672|        166.0|
+------+--------+---------+-------

## ALS 모델링 및 측정

In [14]:
# Import
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [15]:
# Test, Train 셋 나누기
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 2017152017)

# ALS 모델링
als = ALS(userCol="userId", itemCol="movieId_index", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")
als

ALS_034e67fd10a0

In [16]:
# grid의 Hyperparam 조정(수시로 조정하며 비교해보자)
#다수의 모델로 예측값을 근사화 하기 위함
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 50, 100, 150, 200]) \
    .addGrid(als.regParam, [.01, .05, .1, .15]) \
    .build()
           
# RMSE 유사도 측정하고 이를 기반으로 예측
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print("테스트 모델 : %d개" %len(param_grid))

테스트 모델 : 20개


In [17]:
# Cross validation
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
print(cv)

CrossValidator_aca63935ce87


### 최적 모델 및 파라미터 찾기

In [18]:
#*************!!!주의!!! ML 라이브러리 모델링 연산 작업 오래 걸림 !!!!********

# Cross Validation한 als모델로 학습(fitting)
model = cv.fit(train)

#최적 모델 확인(이 부분이 PySpark ML 가장 많이 의존하는 부분)
bestmodel = model.bestModel
print(bestmodel)

#최적 모델 파라미터(Best Param)
print("Rank : ", bestmodel._java_obj.parent().getRank())
print("MaxIter : ", bestmodel._java_obj.parent().getMaxIter())
print("RegParam : ", bestmodel._java_obj.parent().getRegParam())

ALSModel: uid=ALS_034e67fd10a0, rank=100
Rank :  100
MaxIter :  10
RegParam :  0.15


In [19]:
# 예측 평가 (RMSE 지표)
test_predictions = bestmodel.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.5324743369168851


In [20]:
test_predictions.show()

+------+--------+---------+-------------+----------+
|userId| movieId|   rating|movieId_index|prediction|
+------+--------+---------+-------------+----------+
|    31| m_10020| 8.318916|        396.0| 7.6256933|
|    31| m_10033|6.7017016|       2478.0| 6.1023073|
|    31| m_10083|5.9383483|       3515.0| 6.6587777|
|    31| m_10234|8.0762615|       2687.0|  6.917348|
|    31|   m_105| 8.708742|         60.0|  8.144167|
|    31|   m_107|7.8273644|        374.0|  7.624197|
|    31| m_11253|7.0236883|        848.0|  6.707112|
|    31| m_11321|7.4885607|        491.0|  7.553177|
|    31| m_11439|7.0154305|       1423.0|  6.489041|
|    31| m_11459|6.5143256|       1795.0|  5.816024|
|    31|m_116741|   6.4862|       1356.0|  6.279637|
|    31|m_118340| 8.379281|         11.0|  7.813232|
|    31| m_11888|6.4178314|       3215.0|  5.660842|
|    31|   m_120| 7.824252|         27.0|   8.31436|
|    31|m_120467|7.7925353|        153.0|  7.876638|
|    31|   m_122| 8.104738|         22.0|  8.3

## 추천

In [21]:
#모든 유저에 대해서 추천 결과 출력
#괄호의 인자는 추천할 갯수
nrecommendations = bestmodel.recommendForAllUsers(10)

#10명만 예시로 출력해봄
nrecommendations.limit(10).show()



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    20|[{8380, 9.897588}...|
|    40|[{8380, 9.874492}...|
|    10|[{8380, 9.830485}...|
|    50|[{8380, 9.892232}...|
|    80|[{8380, 9.875206}...|
|    70|[{8380, 9.906294}...|
|    60|[{8380, 9.906964}...|
|    90|[{8380, 9.907235}...|
|    30|[{8380, 9.854612}...|
|     0|[{8380, 9.889043}...|
+------+--------------------+



In [22]:
nrecommendations = nrecommendations.withColumn("rec_exp", explode("recommendations")).select('userId', col("rec_exp.movieId_index"), col("rec_exp.rating"))
nrecommendations.limit(20).show()

+------+-------------+---------+
|userId|movieId_index|   rating|
+------+-------------+---------+
|    20|         8380| 9.897588|
|    20|         9454| 9.881088|
|    20|         8814|  9.88074|
|    20|         7928|9.8785925|
|    20|         8267|   9.8681|
|    20|         9654| 9.864925|
|    20|         8793| 9.862417|
|    20|         8539| 9.860825|
|    20|         9448| 9.860794|
|    20|         8758| 9.858503|
|    40|         8380| 9.874492|
|    40|         9454| 9.859648|
|    40|         8814| 9.857499|
|    40|         7928| 9.856125|
|    40|         9654| 9.846782|
|    40|         8267|  9.84617|
|    40|         8793| 9.839868|
|    40|         9448| 9.838731|
|    40|         8539| 9.837894|
|    40|         8887| 9.836574|
+------+-------------+---------+



In [26]:
from pyspark.ml.feature import IndexToString

movie_id_to_label = IndexToString(inputCol="movieId_index", outputCol="movieId",labels=indexer_model.labels)
de = movie_id_to_label.transform(nrecommendations)
de.limit(20).show()

+------+-------------+---------+--------+
|userId|movieId_index|   rating| movieId|
+------+-------------+---------+--------+
|    20|         8380| 9.897588|m_513530|
|    20|         9454| 9.881088| t_42981|
|    20|         8814|  9.88074|m_625334|
|    20|         7928|9.8785925|m_425366|
|    20|         8267|   9.8681|m_494107|
|    20|         9654| 9.864925| t_72479|
|    20|         8793| 9.862417|m_619270|
|    20|         8539| 9.860825|m_550957|
|    20|         9448| 9.860794| t_41891|
|    20|         8758| 9.858503|m_609097|
|    40|         8380| 9.874492|m_513530|
|    40|         9454| 9.859648| t_42981|
|    40|         8814| 9.857499|m_625334|
|    40|         7928| 9.856125|m_425366|
|    40|         9654| 9.846782| t_72479|
|    40|         8267|  9.84617|m_494107|
|    40|         8793| 9.839868|m_619270|
|    40|         9448| 9.838731| t_41891|
|    40|         8539| 9.837894|m_550957|
|    40|         8887| 9.836574|m_654852|
+------+-------------+---------+--

In [None]:
#한 유저에 대한 추천
print("** 유저 123 추천리스트 **")
nrecommendations.join(movies, on='movieId').filter('userId = 123').show()
print("** 유저 123 평가기록 **")
ratings.join(movies, on='movieId').filter('userId = 123').sort('rating', ascending=False).limit(10).show()