# Spark The Definite Guide - 추천
- 사람들의 평점을 통한 명시적 선호 또는 관찰된 행동을 통한 암시적 선호도를 연구함으로써 특정 사용자와 다른 사용자 사이의 유사성이나 사용자가 선호나는 특정 제품과 다른 제품 간의 유사성을 도출하여 사용자가 좋아할 많난 것을 추천 할 수 있음
- 이러한 유사성을 기반으로 사용자에게 새로운 추천을 할수 있음

## 활용 사례
- 추천 엔진은 가장 성공적인 빅데이터 활용 사례 중  하나
- Spark는 대규모 추천을 위해 오픈소스 도구로 사용되고 있음
- ex) 
    - 영화 추천
    - 과목 추천
- Spark에서는 추천 위한 알고리즘으로 ALS(Alternating Least Square - 교차최소제곱)을 제공
    - Collaborative filtering 기술을 활용하여 사용자가 과거에 상호작용한 아이템들을 기반으로 추천을 함
    - 사용자 또는 아이템에 대한 추가적인 특징이 필요하지 않음
- ALS 외에 연관 규칙을 찾아내는 빈발 패턴 마이닝도 제공함

## ALS(교차최소제곱) 알고리즘을 사용하여 협업 필터링 구현하기

In [31]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [32]:
# 데이터셋을 사용하여 모델을 학습하기
ratings = spark.read.text("data/sample_movielens_ratings.txt")

ratings = (
    ratings.rdd.toDF()
    .selectExpr("split(value, '::') as col")
    .selectExpr(
    "cast(col[0] as int) as userId",
    "cast(col[1] as int) as movieId",
    "cast(col[2] as float) as rating",
    "cast(col[3] as long) as timestamp")
)

training, test = ratings.randomSplit([0.8,0.2])

als = (ALS()
       .setMaxIter(5)
       .setRegParam(0.01)
       .setUserCol("userId")
       .setItemCol("movieId")
       .setRatingCol("rating")
      )

alsModel = als.fit(training)

predictions = alsModel.transform(test)

In [33]:
#ratings = sc.textFile("data/sample_movielens_ratings.txt")
#ratings = ratings.map((lambda l: l.split("::")))
#ratings = spark.createDataFrame(ratings,['userid','movieid','rate','timeStamp'])

#ratings = (ratings
#           .withColumn("userId",(ratings['userid']).cast("integer"))
#           .withColumn("movieId",(ratings['movieid']).cast("integer"))
#           .withColumn("rating",(ratings['rate']).cast("float")).drop("rate")
#           .withColumn("timestamp",(ratings['timeStamp']).cast("long"))
#          )

In [34]:
# DataFrame 형태의 userId와 배열 형태의 추천 결과 및 각 영화에 대한 평점을 반환
alsModel.recommendForAllUsers(10).selectExpr("userId", "explode(recommendations)").show()

+------+---------------+
|userId|            col|
+------+---------------+
|    28|[30, 5.4756284]|
|    28| [92, 5.052089]|
|    28|[12, 4.8499913]|
|    28|[81, 4.6342797]|
|    28| [49, 4.126742]|
|    28| [69, 4.092031]|
|    28|[93, 4.0766826]|
|    28|[89, 4.0483093]|
|    28| [2, 3.9828472]|
|    28|[40, 3.6615667]|
|    26| [74, 5.641425]|
|    26| [30, 5.542386]|
|    26|[51, 5.4557705]|
|    26|[32, 5.2386336]|
|    26|[94, 5.1843386]|
|    26| [88, 5.167331]|
|    26|  [7, 5.012852]|
|    26|[98, 4.9463615]|
|    26|[22, 4.8742166]|
|    26|[75, 4.4384103]|
+------+---------------+
only showing top 20 rows



In [35]:
# movieId와 영화별 상위 사용자를 DataFrame 형태로 반환
alsModel.recommendForAllItems(10).selectExpr("movieId", "explode(recommendations)").show()

+-------+---------------+
|movieId|            col|
+-------+---------------+
|     31| [7, 2.9600422]|
|     31| [8, 2.8510668]|
|     31|[12, 2.7046494]|
|     31| [6, 2.3811903]|
|     31|[21, 2.3764756]|
|     31| [1, 2.1563532]|
|     31| [9, 2.0135367]|
|     31|[25, 1.8588212]|
|     31|[23, 1.7739711]|
|     31|[15, 1.7601122]|
|     85| [8, 5.0888376]|
|     85|[16, 4.6845336]|
|     85|  [7, 3.968389]|
|     85|[24, 3.7878375]|
|     85|[11, 3.7300146]|
|     85|[12, 3.4305716]|
|     85|[21, 3.1815712]|
|     85| [1, 3.1612375]|
|     85| [0, 2.9950883]|
|     85| [6, 2.8789682]|
+-------+---------------+
only showing top 20 rows



## 추천을 위한 평가기

In [36]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = (
    RegressionEvaluator()
    .setMetricName("rmse")
    .setLabelCol("rating")
    .setPredictionCol("prediction")
)

rsme = evaluator.evaluate(predictions)
print("Root-mean-square error = %f" % rsme)

Root-mean-square error = 2.232471


## 성과 평가지표

### 회귀 평가지표
- 각 예측값이 해당 사용자 및 아이템의 실제 평가 결과와 얼마나 가까운지 간단히 볼 수 있음

In [38]:
from pyspark.mllib.evaluation import RegressionMetrics

regComparison = predictions.select("rating","prediction")\
.rdd.map(lambda x: (x(0),x(1)))

metrics = RegressionMetrics(regComparison)
print(type(metrics))

<class 'pyspark.mllib.evaluation.RegressionMetrics'>


### 순위 평가지표

In [39]:
from pyspark.mllib.evaluation import RankingMetrics, RegressionMetrics
from pyspark.sql.functions import col, expr

perUserActual = (
    predictions
    .where("rating > 2.5")
    .groupBy("userId")
    .agg(expr("collect_set(movieId) as movies"))
)

In [41]:
perUserActual.show()

+------+-------------------+
|userId|             movies|
+------+-------------------+
|    28|               [19]|
|    26|[81, 68, 24, 4, 23]|
|    27|           [66, 19]|
|    12|           [31, 91]|
|    22|   [70, 32, 68, 80]|
|     1|               [77]|
|    13|           [53, 72]|
|    16|           [51, 96]|
|     6|           [61, 25]|
|     3|            [18, 8]|
|    20|               [90]|
|     5|   [20, 56, 36, 55]|
|    15|                [1]|
|    17|   [46, 22, 94, 55]|
|     9|        [2, 32, 14]|
|     4|           [87, 41]|
|     8|       [96, 72, 29]|
|    23|   [48, 50, 87, 55]|
|     7|               [25]|
|    10|    [2, 89, 42, 25]|
+------+-------------------+
only showing top 20 rows



In [42]:
perUserPredictions = (
    predictions
    .orderBy(col("userId"), expr("prediction DESC"))
    .groupBy("userId")
    .agg(expr("collect_list(movieId) as movies"))
)

In [43]:
perUserPredictions.show()

+------+--------------------+
|userId|              movies|
+------+--------------------+
|    28|[85, 59, 63, 52, ...|
|    26|[81, 97, 16, 48, ...|
|    27|[43, 28, 31, 42, ...|
|    12|[74, 8, 84, 78, 1...|
|    22|[16, 70, 68, 26, ...|
|     1|[74, 43, 12, 13, ...|
|    13|[72, 53, 22, 98, ...|
|     6|[96, 39, 68, 22, ...|
|    16|[34, 99, 36, 96, ...|
|     3|[0, 2, 8, 15, 70,...|
|    20|[93, 39, 90, 78, ...|
|     5|[84, 15, 55, 99, ...|
|    19|[84, 58, 37, 61, ...|
|    15|[1, 97, 82, 32, 7...|
|     9|[54, 59, 22, 73, ...|
|    17|[46, 94, 22, 57, ...|
|     4|[89, 41, 15, 87, ...|
|     8|[29, 11, 18, 7, 2...|
|    23|[43, 29, 55, 77, ...|
|     7|[63, 11, 32, 55, ...|
+------+--------------------+
only showing top 20 rows



In [45]:
perUserActualPred = (perUserActual
                     .join(perUserPredictions, ["userId"]).rdd
                     .map(lambda row:(row[1], row[2][:15]))
                    )
ranks = RankingMetrics(perUserActualPred)

In [46]:
# 평균정밀도의 평균으로 알고리즘 얼마나 정확한지 확인
# 특정 순위 구간의 정확도를 도출하여 주로 어느 구간에서 추천이 실패하는지 파악 가능
ranks.meanAveragePrecision
ranks.precisionAt(5)

0.503448275862069

---------
# EXAMPLE
---------

## RDD

In [1]:
def csv2RDD(path):
    rdd = spark.sparkContext.textFile(path)
    rdd_header = rdd.take(1)[0]
    rdd = rdd\
        .filter(lambda line: line!=rdd_header) \
        .map(lambda line: line.split(",")) \
        .map(lambda tokens: (tokens[0],tokens[1],tokens[2])) \
        .cache()
    return rdd

### ratings

In [16]:
import os
small_ratings = os.path.join('../data/ml-latest-small/ratings.csv')

In [17]:
small_ratings_rdd = spark.sparkContext.textFile(small_ratings)

In [18]:
small_ratings_rdd_header = small_ratings_rdd.take(1)

In [19]:
small_ratings_rdd_header

['userId,movieId,rating,timestamp']

In [20]:
ratings = (small_ratings_rdd
           .filter(lambda line: line!=small_ratings_rdd_header[0])
           .map(lambda line: line.split(","))
           .map(lambda tokens: (tokens[0],tokens[1],tokens[2]))
           .cache()
          )

In [21]:
ratings.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

In [22]:
ratingspath = os.path.join('../data/ml-latest-small/ratings.csv')
ratings = csv2RDD(small_ratings)

In [23]:
ratings.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

### movies

In [25]:
moviespath = os.path.join('../data/ml-latest-small/movies.csv')
movies = csv2RDD(moviespath)

In [27]:
movies.take(3)

[('1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'),
 ('2', 'Jumanji (1995)', 'Adventure|Children|Fantasy'),
 ('3', 'Grumpier Old Men (1995)', 'Comedy|Romance')]

### train, test 분리

In [28]:
from pyspark.mllib.recommendation import ALS
import math

train, org_validation, org_test = ratings.randomSplit([6,2,2], seed=0)

In [29]:
org_validation.take(3)

[('1', '6', '4.0'), ('1', '47', '5.0'), ('1', '163', '5.0')]

In [30]:
validation = org_validation.map(lambda x: (x[0], x[1]))

In [31]:
test = org_test.map(lambda x: (x[0], x[1]))

### ALS 모델링



```python
#----------------ml package(DataFrame)------------------
from pyspark.ml.recommendation import ALS
als = ALS(rank=10, maxIter=20, userCol="user", itemCol="item", ratingCol="rating")
model = als.fit(trainDf)
predictions = model.transform(testDf)

#----------------mllib package(RDD)----------------
from pyspark.mllib.recommendation import ALS
model = ALS.train(trainRdd, 10, seed=3, iterations=20)
predictions = model.predictAll(testRdd).map(lambda r: (r.user, r.product, r.rating))
```

ALS 모델에 필요한 설정이 있다.
* rank는 숨겨진 요인 latent factors의 수
* iterations 반복회수
* lambda는 regularization 계수 (높으면 regularizaton 높음...

In [32]:
seed = 5
iterations = 10
regularization_parameter = 0.1

errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

rank=4
# start for
# ranks = [4, 8, 12]
# for rank in ranks:
model = ALS.train(train,rank,seed=seed,iterations=iterations,lambda_=0.1)

### 예측

#### 전체

```predictAll()``` 함수는 ```ratings``` 평가점수를 예측한다. 즉 사용자의 영화에 대한 평가점수가 없는 경우, 몇 점인지 예측하게 된다.

In [33]:
predictions = model.predictAll(validation)

In [34]:
predictions.take(2)

[Rating(user=610, product=81132, rating=3.1807547132989855),
 Rating(user=480, product=6156, rating=3.462198788774775)]

In [35]:
predictions=predictions.map(lambda r: ((r[0], r[1]), r[2]))

In [36]:
rates_and_preds = org_validation\
    .map(lambda r: ((int(r[0]), int(r[1])), float(r[2])))\
    .join(predictions)

In [37]:
rates_and_preds.take(3)

[((1, 2193), (4.0, 4.214270983630589)),
 ((3, 26409), (4.5, 0.2902626066967151)),
 ((4, 914), (5.0, 4.125248624296204))]

### 사용자별 예측
- userId, movieId를 넣으면 평가점수 예측 할수 있음

In [38]:
# 1번 사용자가 1번 영화에 대한 예측 rating은 대략 4.79
model.predict(1,1)

4.790980724422168

In [39]:
# 1번 사용자의 상위 10개 추천
user1Top10 = model.recommendProducts(1,10)

In [40]:
user1Top10

[Rating(user=1, product=3379, rating=6.4158343229672745),
 Rating(user=1, product=6818, rating=6.253854681185788),
 Rating(user=1, product=7748, rating=6.180248513720873),
 Rating(user=1, product=33649, rating=6.1540872085397025),
 Rating(user=1, product=59018, rating=6.063044888164292),
 Rating(user=1, product=3200, rating=6.004648389795263),
 Rating(user=1, product=58301, rating=6.003154000537293),
 Rating(user=1, product=2239, rating=5.807160201242671),
 Rating(user=1, product=5485, rating=5.806598376177139),
 Rating(user=1, product=93988, rating=5.795515866260636)]

In [41]:
top10movies = user1Top10[1][1]
for i in range(0,10):
    print(movies.lookup(str(user1Top10[i][1])))
    #print(user1Top10[i][1])
#movies.lookup('93988')

['On the Beach (1959)']
['Come and See (Idi i smotri) (1985)']
['Pierrot le fou (1965)']
['Saving Face (2004)']
['"Visitor']
['"Last Detail']
['Funny Games U.S. (2007)']
["Swept Away (Travolti da un insolito destino nell'azzurro mare d'Agosto) (1975)"]
['Tadpole (2002)']
['North & South (2004)']


In [42]:
movies.lookup('93988')

['North & South (2004)']

## DataFrame

### rating

In [44]:
ratings_df = spark.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true').load('../data/ml-latest-small/ratings.csv')

In [45]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [46]:
ratings_df = ratings_df.drop('timestamp')

In [47]:
ratings_df.show(3)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
+------+-------+------+
only showing top 3 rows



### movies

In [48]:
movies_df = spark.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true').load('../data/ml-latest-small/movies.csv')

In [49]:
movies_df.show(3)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows



## train, test

In [50]:
train_df, org_validation_df, org_test_df = ratings_df.randomSplit([0.6,0.2,0.2])

In [51]:
validation_df = org_validation_df

In [52]:
test_df = org_test_df.drop('rating')

### ALS 모델

In [53]:
from pyspark.ml.recommendation import ALS

als = (ALS()
       .setMaxIter(10)
       .setRegParam(0.1)
       .setUserCol("userId")
       .setItemCol("movieId")
       .setRatingCol("rating")
      )


alsModel = als.fit(train_df)

### 예측

In [54]:
validate_predictions = alsModel.transform(validation_df)

In [55]:
validate_predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   385|    471|   4.0| 2.4761214|
|   500|    471|   1.0| 2.2541206|
|   176|    471|   5.0| 4.1756897|
|   216|    471|   3.0| 3.2429502|
|   411|    471|   4.0| 3.6109314|
|   260|    471|   4.5|  3.319919|
|   357|    471|   3.5| 3.8527763|
|   307|    833|   1.0| 1.8112105|
|   608|    833|   0.5| 2.5559247|
|   177|   1088|   3.5|  3.600533|
|   132|   1088|   4.0|   2.95846|
|    64|   1088|   4.0|  3.581046|
|   286|   1088|   3.5| 3.0356274|
|   387|   1088|   1.5| 2.5480225|
|   226|   1088|   1.0| 3.2058072|
|   188|   1088|   4.0|  4.021937|
|    68|   1088|   3.5| 3.1473124|
|   600|   1088|   3.5| 2.9869893|
|   517|   1088|   1.0| 3.1542857|
|   385|   1238|   3.0| 2.8106213|
+------+-------+------+----------+
only showing top 20 rows



In [56]:
# DataFrame 형태의 userId와 배열 형태의 추천 결과 및 각 영화에 대한 평점을 반환
alsModel.recommendForAllUsers(5).selectExpr("userId", "explode(recommendations)").orderBy("userId").show()

+------+-------------------+
|userId|                col|
+------+-------------------+
|     1| [33649, 5.7864656]|
|     1|    [3022, 5.81411]|
|     1|   [3379, 5.595037]|
|     1|[177593, 5.6490088]|
|     1|  [26258, 5.941104]|
|     2| [32892, 4.7452865]|
|     2|    [213, 4.748828]|
|     2|   [7121, 4.694185]|
|     2|[131724, 4.9052835]|
|     2|[177593, 5.1851015]|
|     3| [70946, 4.9118304]|
|     3|  [6835, 4.9118304]|
|     3|   [5181, 4.871731]|
|     3|  [5746, 4.9118304]|
|     3|  [5919, 4.9118304]|
|     4|   [456, 5.5200477]|
|     4|   [3030, 5.598499]|
|     4|[132333, 5.2960567]|
|     4|  [25850, 5.366564]|
|     4|   [1147, 5.617336]|
+------+-------------------+
only showing top 20 rows



In [57]:
# movieId와 영화별 상위 사용자를 DataFrame 형태로 반환
alsModel.recommendForAllItems(5).selectExpr("movieId", "explode(recommendations)").orderBy("movieId").show()

+-------+----------------+
|movieId|             col|
+-------+----------------+
|      1| [43, 4.9595304]|
|      1|  [77, 4.920113]|
|      1|[452, 4.8996873]|
|      1|  [171, 4.96419]|
|      1|[429, 4.9567018]|
|      2|[337, 4.3921747]|
|      2| [267, 4.413191]|
|      2|[594, 4.4981236]|
|      2| [498, 4.444351]|
|      2|[475, 4.3758307]|
|      3|  [43, 4.627588]|
|      3| [543, 4.320625]|
|      3| [77, 4.2711987]|
|      3| [523, 4.156305]|
|      3|[267, 4.1473327]|
|      4|[243, 3.4013493]|
|      4| [43, 3.5434914]|
|      4|[337, 3.4515815]|
|      4|[557, 3.4062204]|
|      4| [594, 3.582818]|
+-------+----------------+
only showing top 20 rows



### 순위 평가지표

In [58]:
from pyspark.mllib.evaluation import RankingMetrics, RegressionMetrics
from pyspark.sql.functions import col, expr
# 평점  4 이상
perUserActual = (
    validate_predictions
    .where("prediction > 4")
    .groupBy("userId")
    .agg(expr("collect_set(movieId) as movies"))
)

In [59]:
perUserActual.orderBy("userId").show()

+------+--------------------+
|userId|              movies|
+------+--------------------+
|     1|[2542, 1206, 590,...|
|     3|       [7899, 26409]|
|     4|               [215]|
|     5|               [608]|
|     6|[981, 27, 318, 37...|
|     7|       [4306, 31878]|
|     8|               [457]|
|     9|  [1987, 3328, 2300]|
|    10|    [137595, 112006]|
|    11|[150, 529, 2028, ...|
|    12|[6942, 920, 40629...|
|    15|              [1196]|
|    16|              [3741]|
|    17|[750, 1259, 81932...|
|    18|[4973, 114066, 18...|
|    19|[1355, 2227, 2263...|
|    20|[661, 3536, 2687,...|
|    21|[86068, 122886, 1...|
|    23|               [912]|
|    24|  [27773, 1197, 457]|
+------+--------------------+
only showing top 20 rows



In [60]:
org_validation_df.groupBy('userId').count().orderBy('userId').show()

+------+-----+
|userId|count|
+------+-----+
|     1|   35|
|     2|    3|
|     3|    6|
|     4|   44|
|     5|    9|
|     6|   53|
|     7|   36|
|     8|   11|
|     9|    6|
|    10|   21|
|    11|   17|
|    12|   12|
|    13|    3|
|    14|   12|
|    15|   30|
|    16|   27|
|    17|   23|
|    18|  101|
|    19|  125|
|    20|   45|
+------+-----+
only showing top 20 rows



In [61]:
org_validation_df.groupBy('userId').pivot('rating').agg({"rating":"count"}).fillna(0).orderBy("userId").show()
#bicycle.groupBy('year').pivot('month').agg({"count":"sum"}).show()

+------+---+---+---+---+---+---+---+---+---+---+
|userId|0.5|1.0|1.5|2.0|2.5|3.0|3.5|4.0|4.5|5.0|
+------+---+---+---+---+---+---+---+---+---+---+
|     1|  0|  0|  0|  0|  0|  3|  0|  7|  0| 25|
|     2|  0|  0|  0|  1|  0|  0|  1|  1|  0|  0|
|     3|  3|  0|  0|  1|  0|  0|  0|  0|  2|  0|
|     4|  0|  4|  0|  7|  0| 12|  0| 13|  0|  8|
|     5|  0|  0|  0|  1|  0|  2|  0|  3|  0|  3|
|     6|  0|  1|  0|  1|  0| 27|  0| 18|  0|  6|
|     7|  2|  4|  4|  0|  1|  4|  4|  6|  7|  4|
|     8|  0|  0|  0|  0|  0|  6|  0|  2|  0|  3|
|     9|  0|  0|  0|  1|  0|  2|  0|  1|  0|  2|
|    10|  3|  2|  0|  0|  2|  2|  5|  4|  1|  2|
|    11|  0|  0|  0|  1|  0|  5|  0|  4|  0|  7|
|    12|  0|  0|  0|  0|  0|  0|  1|  0|  1| 10|
|    13|  0|  0|  0|  0|  0|  2|  0|  1|  0|  0|
|    14|  0|  1|  0|  2|  0|  3|  0|  3|  0|  3|
|    15|  0|  1|  0|  1|  2|  6|  5|  8|  3|  4|
|    16|  0|  0|  0|  0|  1|  3|  9| 12|  1|  1|
|    17|  0|  0|  0|  0|  0|  0|  3|  8| 10|  2|
|    18|  1|  1|  1|