In [None]:
#!sudo apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
!tar xf spark-3.2.4-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import findspark
findspark.init("/content/spark-3.2.4-bin-hadoop3.2")

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [None]:
from google.colab import drive
drive.mount('/gdrive')
gpath = '/gdrive/MyDrive/data/'

Mounted at /gdrive


In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

colNames = ["movieId", "title", "genres"]

movies_schema = StructType()
for name in colNames:
    if name == "movieId":
        movies_schema.add(StructField(name, IntegerType(), True))
    else:
        movies_schema.add(StructField(name, StringType(), True))

movies = spark.read.csv(gpath+"movies.csv", header=True, schema=movies_schema)
movies.take(1)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy')]

In [None]:
movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [None]:
movies_pd = movies.toPandas().set_index("movieId")

In [None]:
movies_pd[movies_pd['title'].str.contains("Iron Man")]

Unnamed: 0_level_0,title,genres
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1
59315,Iron Man (2008),Action|Adventure|Sci-Fi
77561,Iron Man 2 (2010),Action|Adventure|Sci-Fi|Thriller|IMAX
102007,"Invincible Iron Man, The (2007)",Animation
102125,Iron Man 3 (2013),Action|Sci-Fi|Thriller|IMAX
142056,Iron Man & Hulk: Heroes United (2013),Action|Adventure|Animation
167296,Iron Man (1931),Drama


In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

colNames = ["userId", "movieId", "rating", "timestamp"]

ratings_schema = StructType()
for name in colNames:
    if name == "rating":
        ratings_schema.add(StructField(name, DoubleType(), True))
    else:
        ratings_schema.add(StructField(name, IntegerType(), True))

ratings = spark.read.csv(gpath+"ratings.csv", header=True, schema=ratings_schema)

ratings.take(1)

[Row(userId=1, movieId=1, rating=4.0, timestamp=964982703)]

In [None]:
(trainData, testData) = ratings.randomSplit([0.8, 0.2])

### ALS(Alternate Least Squares) 모델

- `userCol`
- `itemCol`
- `ratingCol`
- `coldStartStrategy` = "drop" : 아직 평가되지 않은, 즉 결측치 값을 갖는 데이터를 제외하고 모델의 성능을 평가하는 것

In [None]:
from pyspark.ml.recommendation import ALS
# 01. ALS 모델 초기화
als = ALS(
    userCol="userId",      # 사용자 ID가 있는 컬럼의 이름. 사용자 정보를 나타내는 열을 지정합니다.
    itemCol="movieId",     # 영화 ID가 있는 컬럼의 이름. 아이템(영화) 정보를 나타내는 열을 지정합니다.
    ratingCol="rating",    # 평점이 있는 컬럼의 이름. 사용자가 아이템에 대한 평점을 나타내는 열을 지정합니다. Prediction 시 무시됩니다.
    coldStartStrategy="drop"  # "drop"으로 설정하면 추천 시스템이 평가 데이터에 존재하지 않는 새로운 사용자 또는 아이템을 무시합니다.
)

# 02. 모델 훈련
model = als.fit(trainData)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# 03. 모델의 성능 평가
predictions = model.transform(testData)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8758175907959856


In [None]:
# ALS 모델에서 학습한 사용자 잠재 요인(user latent factors)을 검색하고 ID에 따라 정렬하여 표시합니다.
model.userFactors.orderBy("id").show(truncate=False)



+---+----------------------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                                    |
+---+----------------------------------------------------------------------------------------------------------------------------+
|1  |[-1.1248379, 0.54584867, 0.9606724, 0.47710204, -0.5774898, -0.5032624, 1.1317714, -0.28293285, -0.22718443, -0.62583226]   |
|2  |[-0.8566326, -0.41705447, 0.47114533, 0.96188194, 0.11470162, -0.34528956, 1.2581453, -0.39371917, 0.08109456, -0.51343125] |
|3  |[-0.04039247, -0.008030852, -0.65790707, 0.5550941, -1.5706716, 0.98675764, 0.9225341, -0.39208722, 0.3255331, -0.4515712]  |
|4  |[-1.2664526, -0.7700593, 0.7200424, 0.8954235, -0.25302544, -0.97437733, 0.3907305, -0.020199522, -0.32961154, 0.46209392]  |
|5  |[-1.0496464, 0.027840717, 0.72774065, 0.7010844, -0.9985403, -0.7464322, 0.346

In [None]:
# ALS 모델에서 학습한 아이템 잠재 요인(item latent factors)을 검색하고 ID에 따라 정렬하여 표시합니다.
model.itemFactors.orderBy("id").show(truncate=False)

+---+----------------------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                                    |
+---+----------------------------------------------------------------------------------------------------------------------------+
|1  |[-1.0494039, 0.17547913, 0.8934499, 1.1032925, -0.9191482, -0.6195514, 0.7959617, -0.2877912, 0.15922393, -0.34671858]      |
|2  |[-0.85618657, 0.14650868, 0.8222707, 0.66359514, -0.59942496, -0.65007925, 0.69193494, -0.44325742, 0.28117636, -0.676525]  |
|3  |[-0.3015907, 0.0981968, 0.9181901, 0.6325812, -0.23113918, -0.76179945, 1.1464102, -0.82921475, 0.03859286, -0.48202437]    |
|4  |[-0.7009465, -0.044833373, 0.88383365, 0.017065119, -0.28327438, -0.301322, 0.4178733, -0.074868225, 0.23491703, -0.8827924]|
|5  |[-0.75879776, 0.47123194, 0.6627624, 0.5240069, -0.08934613, -0.66024154, 0.71

In [None]:
# 각각의 user가 영화를 얼마나 봤는지 추출
from collections import Counter
user_list = [int(row['userId']) for row in ratings.collect()]
c = Counter(user_list)
print(c.most_common(5))

[(414, 2698), (599, 2478), (474, 2108), (448, 1864), (274, 1346)]


In [None]:
# targetUserID를 설정: 가장 많이 본 영화를 기반으로 추천할 사용자를 선택합니다.
# c.most_common()은 사용자별 영화 시청 횟수를 기반으로 사용자를 정렬하고, 가장 많이 본 사용자를 선택합니다.
targetUserID = c.most_common()[0][0]

# targetUserID가 시청한 영화를 추출합니다.
# ratings 데이터프레임에서 해당 사용자의 평점 정보를 가져옵니다.
userWatched = ratings.where(ratings.userId == targetUserID)

# userWatched 데이터프레임을 Pandas 데이터프레임으로 변환합니다.
userWatchedPD = userWatched.toPandas()

# 각 영화 ID를 영화 제목으로 변환하여 'title' 열을 추가합니다.
userWatchedPD['title'] = userWatchedPD['movieId'].apply(lambda x: movies_pd.loc[x].title)

userWatchedPD

Unnamed: 0,userId,movieId,rating,timestamp,title
0,414,1,4.0,961438127,Toy Story (1995)
1,414,2,3.0,961594981,Jumanji (1995)
2,414,3,4.0,961439278,Grumpier Old Men (1995)
3,414,5,2.0,961437647,Father of the Bride Part II (1995)
4,414,6,3.0,961515642,Heat (1995)
...,...,...,...,...,...
2693,414,180045,4.0,1515207301,Molly's Game (2017)
2694,414,180497,4.0,1525548614,The Post (2017)
2695,414,180985,3.5,1527978072,The Greatest Showman (2017)
2696,414,184791,2.5,1519592410,Fred Armisen: Standup for Drummers (2018)


In [None]:
userWatched.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|   414|      1|   4.0| 961438127|
|   414|      2|   3.0| 961594981|
|   414|      3|   4.0| 961439278|
|   414|      5|   2.0| 961437647|
|   414|      6|   3.0| 961515642|
|   414|      7|   3.0| 961439170|
|   414|      8|   3.0| 961594849|
|   414|     10|   3.0| 961515863|
|   414|     11|   5.0|1052148205|
|   414|     15|   2.0| 961514611|
|   414|     16|   3.0| 961517557|
|   414|     17|   4.0| 961513829|
|   414|     18|   3.0| 961682128|
|   414|     21|   4.0| 961438199|
|   414|     22|   3.0| 961518227|
|   414|     23|   2.0| 961682276|
|   414|     24|   3.0| 961436964|
|   414|     25|   3.0| 961517140|
|   414|     27|   2.0| 961518812|
|   414|     31|   3.0| 961518520|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
# 중복 사용자를 고려하여 'user' 데이터프레임을 생성합니다.
user = userWatched.distinct()

# 모델을 사용하여 사용자에게 영화를 추천합니다.
# 'userSubsetRecs'는 추천 결과를 포함하는 Pandas 데이터프레임입니다.
userSubsetRecs = model.recommendForUserSubset(user, 10).toPandas()

# 추천 결과에 영화 제목을 추가합니다.
userSubsetRecs['title'] = userSubsetRecs['recommendations'].apply(lambda x: [movies_pd.loc[i[0]].title for i in x])

# 각 사용자에게 추천된 영화 제목을 출력합니다.
for item in userSubsetRecs.title:
    print(item)



['Bad Boy Bubby (1993)', 'Autumn Sonata (Höstsonaten) (1978)', 'Saving Face (2004)', 'On the Beach (1959)', 'Discreet Charm of the Bourgeoisie, The (Charme discret de la bourgeoisie, Le) (1972)', 'Cherish (2002)', 'Belle époque (1992)', 'Rain (2001)', 'Last Tango in Paris (Ultimo tango a Parigi) (1972)', 'Yojimbo (1961)']


In [None]:
users = ratings.select(als.getUserCol()).distinct().limit(3)
users.show()

+------+
|userId|
+------+
|   148|
|   463|
|   471|
+------+



In [None]:
# 각각의 user가 볼만한 영화 상위 5개 추천하기
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 5)
userSubsetRecs.show(truncate=False)



+------+----------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                     |
+------+----------------------------------------------------------------------------------------------------+
|471   |[{26810, 5.0079393}, {3379, 5.0060716}, {33649, 4.872698}, {2324, 4.757104}, {117531, 4.7542944}]   |
|463   |[{51931, 4.83825}, {78836, 4.7497525}, {3030, 4.7482405}, {80906, 4.707691}, {6086, 4.699011}]      |
|148   |[{33649, 4.7066727}, {98491, 4.653718}, {183897, 4.5992017}, {7669, 4.4332714}, {160718, 4.3958735}]|
+------+----------------------------------------------------------------------------------------------------+



In [None]:
# 각각의 영화를 볼만한 user 5명 씩 추천하기
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 5)
movieSubSetRecs.show(truncate=False)

+-------+---------------------------------------------------------------------------------------+
|movieId|recommendations                                                                        |
+-------+---------------------------------------------------------------------------------------+
|1580   |[{53, 5.034507}, {543, 4.7012224}, {584, 4.5791097}, {93, 4.578053}, {452, 4.546686}]  |
|3175   |[{53, 4.784447}, {236, 4.6418185}, {93, 4.6397085}, {543, 4.62903}, {452, 4.5821843}]  |
|2366   |[{90, 4.7034163}, {236, 4.6722536}, {319, 4.619934}, {597, 4.537985}, {250, 4.5343695}]|
+-------+---------------------------------------------------------------------------------------+



In [None]:
%%time
# 각각의 user가 볼만한 영화 상위 10개를 추천하기
userRecs = model.recommendForAllUsers(10)
userRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{5490, 5.697898}...|
|     3|[{74754, 6.382153...|
|     5|[{25825, 5.179233...|
|     6|[{32892, 5.291233...|
|     9|[{174053, 5.19038...|
|    12|[{32892, 6.026744...|
|    13|[{7842, 5.3404}, ...|
|    15|[{92259, 5.01236}...|
|    16|[{3379, 4.6498923...|
|    17|[{3030, 4.9018674...|
|    19|[{3379, 4.0316625...|
|    20|[{3022, 5.3883724...|
|    22|[{26171, 5.524589...|
|    26|[{26171, 4.381305...|
|    27|[{102903, 5.18258...|
|    28|[{7842, 4.550928}...|
|    31|[{51931, 5.481102...|
|    34|[{5034, 5.406087}...|
|    35|[{2843, 5.7377973...|
|    37|[{26171, 6.183514...|
+------+--------------------+
only showing top 20 rows

CPU times: user 39.6 ms, sys: 6.63 ms, total: 46.2 ms
Wall time: 6.63 s


In [None]:
%%time
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
movieRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|      1|[{53, 5.566167}, ...|
|      3|[{43, 4.699165}, ...|
|      5|[{43, 4.5287957},...|
|      6|[{53, 5.175746}, ...|
|      9|[{43, 4.6390963},...|
|     12|[{554, 4.723083},...|
|     13|[{543, 4.1858363}...|
|     15|[{543, 4.7105474}...|
|     16|[{371, 5.374192},...|
|     17|[{138, 5.4572945}...|
|     19|[{243, 4.119828},...|
|     20|[{154, 4.022012},...|
|     22|[{53, 4.5834928},...|
|     26|[{43, 4.9227858},...|
|     27|[{594, 5.2572956}...|
|     28|[{236, 5.7998238}...|
|     31|[{43, 4.5819845},...|
|     34|[{360, 5.358178},...|
|     40|[{413, 5.3194833}...|
|     41|[{53, 4.806}, {55...|
+-------+--------------------+
only showing top 20 rows

CPU times: user 62.4 ms, sys: 5.43 ms, total: 67.8 ms
Wall time: 11.2 s


### **도전**

In [None]:
from pyspark.sql import Row

# 사용자 지정 사용자 데이터를 생성합니다.
# Row를 사용하여 사용자 ID, 영화 ID, 평점 및 타임스탬프를 나타내는 행을 생성합니다.
customUserData = sc.parallelize(
    [Row(99999999, 59315, 5, 123456789),
     Row(99999999, 77561, 4, 123456789),
     Row(99999999, 102125, 4, 123456789),
     Row(99999999, 190297, 1, 123456789),
     Row(99999999, 155384, 2, 123456789),
     Row(99999999, 170763, 1, 123456789),
]
).toDF(["userId", "movieId", "rating", "timestamp"])

# ratings_with_userData 변수에 원래의 평점 데이터와 사용자 지정 데이터를 합칩니다.
ratings_with_userData = ratings.union(customUserData)

# PySpark의 ALS 모델을 임포트합니다.
from pyspark.ml.recommendation import ALS

# ALS 모델을 설정합니다.
als = ALS(
    rank=50,               # 잠재 요인(latent factor)의 개수. 모델의 복잡성을 결정합니다.
    regParam=0.05,         # 정규화(regularization) 파라미터. 과적합을 줄이기 위해 사용됩니다.
    maxIter=10,            # ALS 알고리즘의 반복 횟수. 모델 학습 과정의 반복 횟수를 설정합니다.
    userCol="userId",      # 사용자 ID가 있는 컬럼의 이름. 사용자 정보를 나타내는 열을 지정합니다.
    itemCol="movieId",     # 영화 ID가 있는 컬럼의 이름. 아이템(영화) 정보를 나타내는 열을 지정합니다.
    ratingCol="rating",    # 평점이 있는 컬럼의 이름. 사용자가 아이템에 대한 평점을 나타내는 열을 지정합니다.
    coldStartStrategy="drop"  # "drop"으로 설정하면 추천 시스템이 평가 데이터에 존재하지 않는 새로운 사용자 또는 아이템을 무시합니다.
)


In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

# 특정 사용자에 대한 영화 추천을 생성합니다.
# 사용자 ID를 선택하고 해당 사용자에게 10개의 영화를 추천합니다.
userSubsetRecs = model.recommendForUserSubset(customUserData.select("userId").distinct(), 10).toPandas()

# 추천 결과에 영화 제목을 추가합니다.
# movies_pd는 영화 데이터가 포함된 Pandas DataFrame으로 가정합니다.
userSubsetRecs['title'] = userSubsetRecs['recommendations'].apply(lambda x: [movies_pd.loc[i[0]].title for i in x])

# 결과를 출력합니다.
userSubsetRecs

Unnamed: 0,userId,recommendations,title
0,99999999,"[(59315, 4.878994941711426), (58559, 4.526727676391602), (1210, 4.474745273590088), (89745, 4.472021102905273), (2571, 4.42887544631958), (1196, 4.375718116760254), (60069, 4.357156753540039), (1198, 4.35041618347168), (57669, 4.320343971252441), (318, 4.303648471832275)]","[Iron Man (2008), Dark Knight, The (2008), Star Wars: Episode VI - Return of the Jedi (1983), Avengers, The (2012), Matrix, The (1999), Star Wars: Episode V - The Empire Strikes Back (1980), WALL·E (2008), Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981), In Bruges (2008), Shawshank Redemption, The (1994)]"
