<a href="https://colab.research.google.com/github/jun9729/pyspark_project/blob/main/movie_recommend.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [47]:
# Pyspark Library #
# SQL
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
# ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [48]:
spark = SparkSession.builder.appName("movie_recommend").getOrCreate()

In [57]:
df = spark.read.csv("/content/movie_ratings.csv", inferSchema=True, header=True)
df.show()

+------+------+----------------+
|userId|rating|           title|
+------+------+----------------+
|     1|   4.0|Toy Story (1995)|
|     5|   4.0|Toy Story (1995)|
|     7|   4.5|Toy Story (1995)|
|    15|   2.5|Toy Story (1995)|
|    17|   4.5|Toy Story (1995)|
|    18|   3.5|Toy Story (1995)|
|    19|   4.0|Toy Story (1995)|
|    21|   3.5|Toy Story (1995)|
|    27|   3.0|Toy Story (1995)|
|    31|   5.0|Toy Story (1995)|
|    32|   3.0|Toy Story (1995)|
|    33|   3.0|Toy Story (1995)|
|    40|   5.0|Toy Story (1995)|
|    43|   5.0|Toy Story (1995)|
|    44|   3.0|Toy Story (1995)|
|    45|   4.0|Toy Story (1995)|
|    46|   5.0|Toy Story (1995)|
|    50|   3.0|Toy Story (1995)|
|    54|   3.0|Toy Story (1995)|
|    57|   5.0|Toy Story (1995)|
+------+------+----------------+
only showing top 20 rows



In [58]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString

stringIndexer = StringIndexer(inputCol='title',
                             outputCol='movieId')
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.limit(5).toPandas()

Unnamed: 0,userId,rating,title,movieId
0,1,4.0,Toy Story (1995),11.0
1,5,4.0,Toy Story (1995),11.0
2,7,4.5,Toy Story (1995),11.0
3,15,2.5,Toy Story (1995),11.0
4,17,4.5,Toy Story (1995),11.0


In [59]:
# ALS recommender algorithm
from pyspark.ml.recommendation import ALS

train, test = indexed.randomSplit([0.7, 0.3])

rec = ALS(maxIter=10,
         regParam=0.01,
         userCol='userId',
         itemCol='movieId',
         ratingCol='rating',
         nonnegative=True,
         coldStartStrategy='drop')
# Learn ALS model
rec_model = rec.fit(train)

# Predict using transform
pred_ratings = rec_model.transform(test)
pred_ratings.show()

+------+------+---------------+-------+----------+
|userId|rating|          title|movieId|prediction|
+------+------+---------------+-------+----------+
|   597|   5.0|Rain Man (1988)|  148.0|  4.013526|
|   332|   4.0|Rain Man (1988)|  148.0| 3.3470201|
|   577|   4.0|Rain Man (1988)|  148.0| 3.7954178|
|   606|   4.0|Rain Man (1988)|  148.0| 4.2757907|
|   103|   5.0|Rain Man (1988)|  148.0| 3.8720973|
|   330|   4.0|Rain Man (1988)|  148.0| 3.9671888|
|   230|   3.0|Rain Man (1988)|  148.0|  4.569448|
|   157|   4.0|Rain Man (1988)|  148.0| 3.8274999|
|   232|   4.0|Rain Man (1988)|  148.0| 3.4499242|
|    47|   4.0|Rain Man (1988)|  148.0|  3.325891|
|   305|   4.0|Rain Man (1988)|  148.0| 3.5082836|
|   363|   3.0|Rain Man (1988)|  148.0| 4.0165424|
|   274|   3.5|Rain Man (1988)|  148.0| 3.0592623|
|   474|   4.0|Rain Man (1988)|  148.0| 4.0432987|
|   169|   4.5|Rain Man (1988)|  148.0|  4.520741|
|   227|   4.5|Rain Man (1988)|  148.0|  4.048698|
|   531|   5.0|Rain Man (1988)|

In [60]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol='rating',
                               predictionCol='prediction',
                               metricName='rmse')
# Put the dataframe containing the predicted value in the evaluate method
rmse = evaluator.evaluate(pred_ratings)

mae_eval = RegressionEvaluator(labelCol='rating',
                              predictionCol='prediction',
                              metricName='mae')
mae = mae_eval.evaluate(pred_ratings)

print("RMSE:", rmse)
print("MAE:", mae)

RMSE: 1.0451856741422667
MAE: 0.7896248567737227


In [61]:
unique_movies = indexed.select("movieId").distinct()
def top_movies(user_id, n):
    """
    A function that recommends n movies that a specific user_id may like.
    """
    # alias the unique_movies dataframe to the dataframe 'a'
    a = unique_movies.alias('a')

    # Create a new dataframe containing only movies watched by a specific user_id
    watched_movies = indexed.filter(indexed['userId'] == user_id)\
                            .select('movieId')

    # Alias ​​the movies watched by a specific user_id with a dataframe called 'b'
    b = watched_movies.alias('b')

    # By joining watched_movies based on unique_movies, you can identify movies that user_id has not seen.
    total_movies = a.join(b, a['movieId'] == b['movieId'],
                         how='left')

    # Extract movies that user_id has not seen yet by extracting a.movieId from the row where the movieId value of the data frame has a missing value.
    remaining_movies = total_movies\
                       .where(col('b.movieId').isNull())\
                       .select('a.movieId').distinct()
    # Add a specific user_id value to the remaining_movies data frame as a new variable.
    remaining_movies = remaining_movies.withColumn('userId',
                                                  lit(int(user_id)))

    recommender = rec_model.transform(remaining_movies)\
                           .orderBy('prediction', ascending=False)\
                           .limit(n)
    # Use IndexToString to reverse the one created with StringIndexer (number of movie titles -> Korean titles)
    movie_title = IndexToString(inputCol='movieId',
                               outputCol='title',
                               labels=model.labels) 
    # Convert the movie title to numbers->Hangul by transforming it! => return as dataframe
    final_recommendations = movie_title.transform(recommender)

    return final_recommendations.show(n, truncate=False)

In [63]:
top_movies(100, 5)

+-------+------+----------+------------------------------+
|movieId|userId|prediction|title                         |
+-------+------+----------+------------------------------+
|3614.0 |100   |8.286353  |Topo, El (1970)               |
|2701.0 |100   |7.611109  |Capturing the Friedmans (2003)|
|1750.0 |100   |7.3170657 |Barcelona (1994)              |
|3206.0 |100   |7.206392  |Snow Dogs (2002)              |
|3392.0 |100   |7.1937494 |Garfield: The Movie (2004)    |
+-------+------+----------+------------------------------+

