In [35]:
from pyspark.sql import SparkSession

In [36]:
MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("movie-recommendation")\
    .config("spark.executor.memory", MAX_MEMORY)\
    .config("spark.driver.memory", MAX_MEMORY)\
    .getOrCreate()

In [37]:
ratings_file = "/home/ithingvv34/data-engineering/spark/data/ml-25m/ratings.csv"
ratings_df = spark.read.csv(f"file:///{ratings_file}", inferSchema=True, header=True)

                                                                                

In [38]:
ratings_df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [39]:
ratings_df = ratings_df.select(["userId", "movieId", "rating"])

In [40]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [41]:
ratings_df.select("rating").describe().show()



+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|          25000095|
|   mean| 3.533854451353085|
| stddev|1.0607439611423535|
|    min|               0.5|
|    max|               5.0|
+-------+------------------+



                                                                                

In [42]:
train_df, test_df = ratings_df.randomSplit([0.8, 0.2])

In [43]:
from pyspark.ml.recommendation import ALS

In [44]:
als = ALS(
    maxIter=5,
    regParam=0.1,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"
)

In [45]:
model = als.fit(train_df)

                                                                                

In [46]:
predictions = model.transform(test_df)

In [47]:
predictions.show()



+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   148|     32|   4.0|  4.021391|
|   148|     47|   4.0| 4.1730494|
|   148|    912|   4.0| 4.1919765|
|   148|    919|   3.5| 3.8573008|
|   148|   1222|   4.0| 4.0958395|
|   148|   1247|   3.5|   4.02036|
|   148|   1250|   4.0| 4.0922856|
|   148|   2019|   5.0| 4.2554216|
|   148|   2329|   4.0|   4.13705|
|   148|   2571|   5.0| 4.1222534|
|   148|   3000|   4.0| 4.1257954|
|   148|   3462|   4.5|  4.071162|
|   148|   3949|   4.0|  3.983326|
|   148|   4886|   4.0| 3.7664251|
|   148|   4993|   4.5| 4.0552874|
|   148|   8228|   4.5| 3.9888904|
|   148|  79132|   4.0| 4.0232186|
|   148| 109487|   4.0| 3.9219072|
|   148| 112556|   4.5| 3.8461192|
|   148| 134853|   4.0| 3.8228917|
+------+-------+------+----------+
only showing top 20 rows



                                                                                

In [48]:
predictions.select('rating', 'prediction').describe().show()



+-------+-----------------+------------------+
|summary|           rating|        prediction|
+-------+-----------------+------------------+
|  count|          4998423|           4998423|
|   mean|3.534334829205131|3.4284369788107427|
| stddev|1.060661417303487|0.6417545960421398|
|    min|              0.5|        -1.4957137|
|    max|              5.0|         7.4791794|
+-------+-----------------+------------------+



                                                                                

In [49]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol='rating', predictionCol='prediction')

In [50]:
rmse = evaluator.evaluate(predictions)

                                                                                

In [51]:
print(rmse)

0.8082022417321826


### 유저 id별로 3개 추천 

In [52]:
model.recommendForAllUsers(3).show()



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{183947, 5.67025...|
|     3|[{127252, 5.85581...|
|     5|[{203086, 6.33379...|
|     6|[{203086, 6.17574...|
|     9|[{127252, 6.27498...|
|    12|[{203882, 5.36120...|
|    13|[{203086, 5.88755...|
|    15|[{203882, 6.82466...|
|    16|[{203086, 6.18654...|
|    17|[{127252, 6.09247...|
|    19|[{200872, 5.29795...|
|    20|[{203086, 6.32537...|
|    22|[{127252, 6.25013...|
|    26|[{127252, 5.63907...|
|    27|[{203882, 6.09061...|
|    28|[{203882, 6.87935...|
|    31|[{203882, 4.00257...|
|    34|[{127252, 5.70343...|
|    35|[{203882, 6.00346...|
|    37|[{183947, 6.19046...|
+------+--------------------+
only showing top 20 rows



                                                                                

### 영화 별로 3개 추천

In [53]:
model.recommendForAllItems(3).show()



+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|      1|[{18230, 5.595490...|
|      3|[{143282, 5.62795...|
|      5|[{143282, 6.00836...|
|      6|[{156252, 5.58854...|
|      9|[{87426, 5.355251...|
|     12|[{87426, 5.368873...|
|     13|[{108346, 5.3152}...|
|     15|[{87426, 5.173529...|
|     16|[{156252, 5.46176...|
|     17|[{143282, 5.61735...|
|     19|[{87426, 5.067372...|
|     20|[{87426, 5.207658...|
|     22|[{18230, 5.295377...|
|     26|[{18230, 5.191001...|
|     27|[{143282, 5.83510...|
|     28|[{105801, 5.54564...|
|     31|[{143282, 5.66622...|
|     34|[{93907, 5.750576...|
|     35|[{93907, 5.113336...|
|     37|[{143282, 4.77822...|
+-------+--------------------+
only showing top 20 rows



                                                                                

In [54]:
from pyspark.sql.types import IntegerType

user_list = [65, 78, 81]
users_df = spark.createDataFrame(user_list, IntegerType()).toDF('userId')

users_df.show()

+------+
|userId|
+------+
|    65|
|    78|
|    81|
+------+



In [72]:
user_recs = model.recommendForUserSubset(users_df, 10)

In [73]:
movies_list = user_recs.collect()[0].recommendations

In [74]:
recs_df = spark.createDataFrame(movies_list)
recs_df.show()

+-------+------------------+
|movieId|            rating|
+-------+------------------+
| 144202| 6.121954917907715|
| 169606| 5.945925712585449|
| 126397| 5.922198295593262|
| 127252|5.9189653396606445|
| 196717| 5.861759185791016|
| 149484| 5.694983005523682|
| 205277| 5.675905704498291|
| 198535|5.5750885009765625|
| 164751| 5.517662048339844|
| 156773|5.4184489250183105|
+-------+------------------+



In [75]:
movies_file = "/home/ithingvv34/data-engineering/spark/data/ml-25m/movies.csv"
movies_df = spark.read.csv(f"file:///{movies_file}", inferSchema=True, header=True)

In [76]:
movies_df.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [77]:
recs_df.createOrReplaceTempView("recommendations")
movies_df.createOrReplaceTempView("movies")

In [78]:
query = """
SELECT *
FROM
    movies JOIN recommendations
    ON movies.movieId = recommendations.movieId
ORDER BY
    rating desc
"""
recommended_movies = spark.sql(query)
recommended_movies.show()

+-------+--------------------+--------------------+-------+------------------+
|movieId|               title|              genres|movieId|            rating|
+-------+--------------------+--------------------+-------+------------------+
| 144202|Catch That Girl (...|     Action|Children| 144202| 6.121954917907715|
| 169606|Dara O'Briain Cro...|              Comedy| 169606| 5.945925712585449|
| 126397|The Encounter (2010)|      Children|Drama| 126397| 5.922198295593262|
| 127252|The Veil of Twili...|Crime|Fantasy|Mys...| 127252|5.9189653396606445|
| 196717|Bernard and the G...|Comedy|Drama|Fantasy| 196717| 5.861759185791016|
| 149484|All About My Wife...|      Comedy|Romance| 149484| 5.694983005523682|
| 205277|   Inside Out (1991)|Comedy|Drama|Romance| 205277| 5.675905704498291|
| 198535|Trick: The Movie ...|Comedy|Crime|Mystery| 198535|5.5750885009765625|
| 164751|Love Never Dies (...|       Drama|Romance| 164751| 5.517662048339844|
| 156773|Jimmy Carr: Live ...|              Comedy| 

### 추천 결과 API

In [79]:
def get_recommendations(user_id, num_recs):
    users_df = spark.createDataFrame([user_id], IntegerType()).toDF('userId')
    user_recs_df = model.recommendForUserSubset(users_df, num_recs)
    
    recs_list = user_recs_df.collect()[0].recommendations
    recs_df = spark.createDataFrame(recs_list)
    recommended_movies = spark.sql(query)
    return recommended_movies

### id가 456번 유저에게 10개의 영화 추천

In [80]:
recs = get_recommendations(456, 10)

In [81]:
result_df = recs.toPandas()

In [82]:
result_df

Unnamed: 0,movieId,title,genres,movieId.1,rating
0,144202,Catch That Girl (2002),Action|Children,144202,6.121955
1,169606,Dara O'Briain Crowd Tickler (2015),Comedy,169606,5.945926
2,126397,The Encounter (2010),Children|Drama,126397,5.922198
3,127252,The Veil of Twilight (2014),Crime|Fantasy|Mystery,127252,5.918965
4,196717,Bernard and the Genie (1991),Comedy|Drama|Fantasy,196717,5.861759
5,149484,All About My Wife (2012),Comedy|Romance,149484,5.694983
6,205277,Inside Out (1991),Comedy|Drama|Romance,205277,5.675906
7,198535,Trick: The Movie (2002),Comedy|Crime|Mystery,198535,5.575089
8,164751,Love Never Dies (2012),Drama|Romance,164751,5.517662
9,156773,Jimmy Carr: Live (2004),Comedy,156773,5.418449
