In [1]:
!pip install pyspark



In [90]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, ArrayType
from pyspark.sql.functions import split,count, explode,col,sum,dense_rank, first,rank,max,row_number
# C:\Users\manal\PycharmProject\yt-scrap\pySpark-Tut

In [91]:
spark = SparkSession.builder.appName("tut").getOrCreate()

In [92]:
path = "./files/ml-1m/"

In [93]:
users_schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("occupation", IntegerType(), True),
    StructField("zip_code", IntegerType(), True),
])

movies_schema = StructType([
    StructField("movie_id", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("genres", StringType(), True)
])

ratings_schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("movie_id", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("timestamp", IntegerType(), True)
])

# Creating user dataframe

In [94]:
user_df = spark.read.option("delimiter", "::").csv(f"{path}users.dat", schema=users_schema)

In [95]:
user_df.show()

+-------+------+---+----------+--------+
|user_id|gender|age|occupation|zip_code|
+-------+------+---+----------+--------+
|      1|     F|  1|        10|   48067|
|      2|     M| 56|        16|   70072|
|      3|     M| 25|        15|   55117|
|      4|     M| 45|         7|    2460|
|      5|     M| 25|        20|   55455|
|      6|     F| 50|         9|   55117|
|      7|     M| 35|         1|    6810|
|      8|     M| 25|        12|   11413|
|      9|     M| 25|        17|   61614|
|     10|     F| 35|         1|   95370|
|     11|     F| 25|         1|    4093|
|     12|     M| 25|        12|   32793|
|     13|     M| 45|         1|   93304|
|     14|     M| 35|         0|   60126|
|     15|     M| 25|         7|   22903|
|     16|     F| 35|         0|   20670|
|     17|     M| 50|         1|   95350|
|     18|     F| 18|         3|   95825|
|     19|     M|  1|        10|   48073|
|     20|     M| 25|        14|   55113|
+-------+------+---+----------+--------+
only showing top

# Creating Movies dataframe

In [96]:
movie_df = spark.read.option("delimiter", "::").csv(f"{path}movies.dat", schema = movies_schema)

In [97]:
movie_df = movie_df.withColumn("genres", split(movie_df["genres"], "\\|").cast(ArrayType(StringType())))

In [98]:
movie_df.show()

+--------+--------------------+--------------------+
|movie_id|               title|              genres|
+--------+--------------------+--------------------+
|       1|    Toy Story (1995)|[Animation, Child...|
|       2|      Jumanji (1995)|[Adventure, Child...|
|       3|Grumpier Old Men ...|   [Comedy, Romance]|
|       4|Waiting to Exhale...|     [Comedy, Drama]|
|       5|Father of the Bri...|            [Comedy]|
|       6|         Heat (1995)|[Action, Crime, T...|
|       7|      Sabrina (1995)|   [Comedy, Romance]|
|       8| Tom and Huck (1995)|[Adventure, Child...|
|       9| Sudden Death (1995)|            [Action]|
|      10|    GoldenEye (1995)|[Action, Adventur...|
|      11|American Presiden...|[Comedy, Drama, R...|
|      12|Dracula: Dead and...|    [Comedy, Horror]|
|      13|        Balto (1995)|[Animation, Child...|
|      14|        Nixon (1995)|             [Drama]|
|      15|Cutthroat Island ...|[Action, Adventur...|
|      16|       Casino (1995)|   [Drama, Thri

# Creating Ratings Dataframe

In [99]:
rating_df = spark.read.option("delimiter", "::").csv(f"{path}ratings.dat", schema=ratings_schema)

In [100]:
rating_df.show()

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|    1193|     5|978300760|
|      1|     661|     3|978302109|
|      1|     914|     3|978301968|
|      1|    3408|     4|978300275|
|      1|    2355|     5|978824291|
|      1|    1197|     3|978302268|
|      1|    1287|     5|978302039|
|      1|    2804|     5|978300719|
|      1|     594|     4|978302268|
|      1|     919|     4|978301368|
|      1|     595|     5|978824268|
|      1|     938|     4|978301752|
|      1|    2398|     4|978302281|
|      1|    2918|     4|978302124|
|      1|    1035|     5|978301753|
|      1|    2791|     4|978302188|
|      1|    2687|     3|978824268|
|      1|    2018|     4|978301777|
|      1|    3105|     5|978301713|
|      1|    2797|     4|978302039|
+-------+--------+------+---------+
only showing top 20 rows




### Movie Analytics

#### distribution of each rating for moive

In [101]:
merge_df = rating_df.join(movie_df, "movie_id")

ratings_count_per_movie = merge_df.groupBy("title","rating").agg(count("user_id").alias("UserCount"))

pivoted_df = ratings_count_per_movie.groupBy("title").pivot("rating").sum("UserCount").na.fill(0)

pivoted_df.show(truncate=False)

+------------------------------------------------+---+---+---+---+---+
|title                                           |1  |2  |3  |4  |5  |
+------------------------------------------------+---+---+---+---+---+
|Night of the Living Dead (1968)                 |41 |72 |171|228|203|
|In the Heat of the Night (1967)                 |2  |7  |55 |164|120|
|If Lucy Fell (1996)                             |5  |18 |19 |19 |7  |
|Snow White and the Seven Dwarfs (1937)          |17 |39 |195|306|206|
|400 Blows, The (Les Quatre cents coups) (1959)  |3  |3  |23 |60 |98 |
|Paris, France (1993)                            |2  |0  |1  |0  |2  |
|Psycho (1960)                                   |17 |36 |182|447|581|
|Cosi (1996)                                     |1  |1  |5  |25 |4  |
|Gulliver's Travels (1939)                       |4  |10 |39 |46 |18 |
|Heavenly Creatures (1994)                       |9  |32 |104|201|131|
|Cry in the Dark, A (1988)                       |1  |5  |22 |35 |3  |
|7th V


#### unique genre in movie dataset



In [102]:
unique_genres = movie_df.select(explode("genres")).alias("Genre").distinct()
unique_genres.show(truncate=False)

+-----------+
|col        |
+-----------+
|Crime      |
|Romance    |
|Thriller   |
|Adventure  |
|Children's |
|Drama      |
|War        |
|Documentary|
|Fantasy    |
|Mystery    |
|Musical    |
|Animation  |
|Film-Noir  |
|Horror     |
|Western    |
|Comedy     |
|Action     |
|Sci-Fi     |
+-----------+



#### top 10 movies by view

In [103]:
merge_df = rating_df.join(movie_df, "movie_id")
merge_df.show()

+--------+-------+------+---------+--------------------+--------------------+
|movie_id|user_id|rating|timestamp|               title|              genres|
+--------+-------+------+---------+--------------------+--------------------+
|    1193|      1|     5|978300760|One Flew Over the...|             [Drama]|
|     661|      1|     3|978302109|James and the Gia...|[Animation, Child...|
|     914|      1|     3|978301968| My Fair Lady (1964)|  [Musical, Romance]|
|    3408|      1|     4|978300275|Erin Brockovich (...|             [Drama]|
|    2355|      1|     5|978824291|Bug's Life, A (1998)|[Animation, Child...|
|    1197|      1|     3|978302268|Princess Bride, T...|[Action, Adventur...|
|    1287|      1|     5|978302039|      Ben-Hur (1959)|[Action, Adventur...|
|    2804|      1|     5|978300719|Christmas Story, ...|     [Comedy, Drama]|
|     594|      1|     4|978302268|Snow White and th...|[Animation, Child...|
|     919|      1|     4|978301368|Wizard of Oz, The...|[Adventu

In [104]:
views_each_movie = merge_df.groupBy("title").agg(count("user_id").alias("views"))
views_each_movie.show(truncate=False)

+------------------------------------------------+-----+
|title                                           |views|
+------------------------------------------------+-----+
|Snow White and the Seven Dwarfs (1937)          |763  |
|Night of the Living Dead (1968)                 |715  |
|Elizabeth (1998)                                |938  |
|7th Voyage of Sinbad, The (1958)                |258  |
|Annie Hall (1977)                               |1334 |
|400 Blows, The (Les Quatre cents coups) (1959)  |187  |
|Heavenly Creatures (1994)                       |477  |
|Psycho (1960)                                   |1263 |
|Problem Child (1990)                            |160  |
|If Lucy Fell (1996)                             |68   |
|Seven Beauties (Pasqualino Settebellezze) (1976)|77   |
|When We Were Kings (1996)                       |277  |
|In the Heat of the Night (1967)                 |348  |
|Gulliver's Travels (1939)                       |117  |
|Fair Game (1995)              

In [105]:
top_viewed = views_each_movie.orderBy(col("views").desc()).limit(10)
top_viewed.show(truncate=False)

+-----------------------------------------------------+-----+
|title                                                |views|
+-----------------------------------------------------+-----+
|American Beauty (1999)                               |3428 |
|Star Wars: Episode IV - A New Hope (1977)            |2991 |
|Star Wars: Episode V - The Empire Strikes Back (1980)|2990 |
|Star Wars: Episode VI - Return of the Jedi (1983)    |2883 |
|Jurassic Park (1993)                                 |2672 |
|Saving Private Ryan (1998)                           |2653 |
|Terminator 2: Judgment Day (1991)                    |2649 |
|Matrix, The (1999)                                   |2590 |
|Back to the Future (1985)                            |2583 |
|Silence of the Lambs, The (1991)                     |2578 |
+-----------------------------------------------------+-----+



#### total number of movies viewed by each user

In [106]:
merge_df = rating_df.join(user_df, "user_id")
merge_df.show()

+-------+--------+------+---------+------+---+----------+--------+
|user_id|movie_id|rating|timestamp|gender|age|occupation|zip_code|
+-------+--------+------+---------+------+---+----------+--------+
|      1|    1193|     5|978300760|     F|  1|        10|   48067|
|      1|     661|     3|978302109|     F|  1|        10|   48067|
|      1|     914|     3|978301968|     F|  1|        10|   48067|
|      1|    3408|     4|978300275|     F|  1|        10|   48067|
|      1|    2355|     5|978824291|     F|  1|        10|   48067|
|      1|    1197|     3|978302268|     F|  1|        10|   48067|
|      1|    1287|     5|978302039|     F|  1|        10|   48067|
|      1|    2804|     5|978300719|     F|  1|        10|   48067|
|      1|     594|     4|978302268|     F|  1|        10|   48067|
|      1|     919|     4|978301368|     F|  1|        10|   48067|
|      1|     595|     5|978824268|     F|  1|        10|   48067|
|      1|     938|     4|978301752|     F|  1|        10|   48

In [107]:
view_per_user = merge_df.groupBy("user_id").agg(count("movie_id").alias("total_viewd_movies"))
view_per_user.show(truncate=False)

+-------+------------------+
|user_id|total_viewd_movies|
+-------+------------------+
|148    |624               |
|463    |123               |
|471    |105               |
|496    |119               |
|833    |21                |
|1088   |1176              |
|243    |33                |
|392    |487               |
|540    |39                |
|623    |172               |
|737    |217               |
|858    |190               |
|897    |203               |
|1025   |33                |
|1084   |180               |
|31     |119               |
|516    |294               |
|85     |39                |
|137    |201               |
|251    |73                |
+-------+------------------+
only showing top 20 rows



#### top users with most viewed movies count

In [108]:
view_per_user_des = view_per_user.orderBy(col("total_viewd_movies").desc()).limit(10)
view_per_user_des.show(truncate=False)

+-------+------------------+
|user_id|total_viewd_movies|
+-------+------------------+
|4169   |2314              |
|1680   |1850              |
|4277   |1743              |
|1941   |1595              |
|1181   |1521              |
|889    |1518              |
|3618   |1344              |
|2063   |1323              |
|1150   |1302              |
|1015   |1286              |
+-------+------------------+



#### total rating to each movie

In [109]:
merge_df = rating_df.join(movie_df,"movie_id")
total_rating_df = merge_df.groupBy("title").agg(sum("rating").alias("total_rating"))
total_rating_df.show(truncate=False)

+------------------------------------------------+------------+
|title                                           |total_rating|
+------------------------------------------------+------------+
|Snow White and the Seven Dwarfs (1937)          |2934        |
|Night of the Living Dead (1968)                 |2625        |
|Elizabeth (1998)                                |3780        |
|7th Voyage of Sinbad, The (1958)                |933         |
|Annie Hall (1977)                               |5525        |
|400 Blows, The (Les Quatre cents coups) (1959)  |808         |
|Heavenly Creatures (1994)                       |1844        |
|Psycho (1960)                                   |5328        |
|Problem Child (1990)                            |308         |
|If Lucy Fell (1996)                             |209         |
|Seven Beauties (Pasqualino Settebellezze) (1976)|295         |
|When We Were Kings (1996)                       |1197        |
|In the Heat of the Night (1967)        

#### Average rating of each movie

In [110]:
avg_rating_df = merge_df.groupBy("title").agg((sum("rating")/count("rating")).alias("avg_rating"))
avg_rating_df.show(truncate=False)

+------------------------------------------------+------------------+
|title                                           |avg_rating        |
+------------------------------------------------+------------------+
|Snow White and the Seven Dwarfs (1937)          |3.8453473132372213|
|Night of the Living Dead (1968)                 |3.6713286713286712|
|Elizabeth (1998)                                |4.029850746268656 |
|7th Voyage of Sinbad, The (1958)                |3.616279069767442 |
|Annie Hall (1977)                               |4.14167916041979  |
|400 Blows, The (Les Quatre cents coups) (1959)  |4.320855614973262 |
|Heavenly Creatures (1994)                       |3.8658280922431865|
|Psycho (1960)                                   |4.218527315914489 |
|Problem Child (1990)                            |1.925             |
|If Lucy Fell (1996)                             |3.073529411764706 |
|Seven Beauties (Pasqualino Settebellezze) (1976)|3.831168831168831 |
|When We Were Kings 

#### most viewed genres by age group

In [111]:
merge_df = rating_df.join(movie_df, "movie_id")
merge_df = merge_df.join(user_df, "user_id")
merge_df.show()

+-------+--------+------+---------+--------------------+--------------------+------+---+----------+--------+
|user_id|movie_id|rating|timestamp|               title|              genres|gender|age|occupation|zip_code|
+-------+--------+------+---------+--------------------+--------------------+------+---+----------+--------+
|      1|    1193|     5|978300760|One Flew Over the...|             [Drama]|     F|  1|        10|   48067|
|      1|     661|     3|978302109|James and the Gia...|[Animation, Child...|     F|  1|        10|   48067|
|      1|     914|     3|978301968| My Fair Lady (1964)|  [Musical, Romance]|     F|  1|        10|   48067|
|      1|    3408|     4|978300275|Erin Brockovich (...|             [Drama]|     F|  1|        10|   48067|
|      1|    2355|     5|978824291|Bug's Life, A (1998)|[Animation, Child...|     F|  1|        10|   48067|
|      1|    1197|     3|978302268|Princess Bride, T...|[Action, Adventur...|     F|  1|        10|   48067|
|      1|    1287| 

In [112]:
ex_df = merge_df.select("user_id", explode(col("genres")).alias("genres"),"age")
unique_genres_count = ex_df.groupBy("genres").agg(count("*").alias("count"))
unique_genres_count.show()

+-----------+------+
|     genres| count|
+-----------+------+
|      Crime| 79541|
|    Romance|147523|
|   Thriller|189680|
|  Adventure|133953|
|      Drama|354529|
| Children's| 72186|
|        War| 68527|
|Documentary|  7910|
|    Fantasy| 36301|
|    Mystery| 40178|
|    Musical| 41533|
|  Animation| 43293|
|  Film-Noir| 18261|
|     Horror| 76386|
|    Western| 20683|
|     Comedy|356580|
|     Action|257457|
|     Sci-Fi|157294|
+-----------+------+



In [114]:
age_genres = ex_df.groupBy("age", "genres").agg(count("genres").alias("CountOfGenre"))
age_genres.show()

+---+-----------+------------+
|age|     genres|CountOfGenre|
+---+-----------+------------+
| 56|     Comedy|       11961|
| 56|      Drama|       17269|
|  1|      Drama|        7483|
|  1|     Comedy|       11162|
| 45| Children's|        5400|
| 45|        War|        6642|
| 45|    Western|        2133|
| 50|    Western|        2420|
|  1|     Horror|        2211|
| 35|   Thriller|       36840|
| 45|      Crime|        6048|
|  1|  Adventure|        3998|
| 56|    Western|        1333|
| 25|     Comedy|      143210|
| 25|    Romance|       58003|
| 35|     Sci-Fi|       32333|
| 25|    Mystery|       15160|
| 35|      Crime|       14895|
| 56|        War|        3775|
| 25|Documentary|        3489|
+---+-----------+------------+
only showing top 20 rows



In [115]:
window_spec = Window.partitionBy("age").orderBy(col("CountOfGenre").desc())

top_genre_by_age = age_genres.withColumn("row_num", row_number().over(window_spec))

top_genre_by_age = top_genre_by_age.filter(col("row_num") == 1).drop("row_num")
top_genre_by_age.show()

+---+------+------------+
|age|genres|CountOfGenre|
+---+------+------------+
|  1|Comedy|       11162|
| 18|Comedy|       69980|
| 25|Comedy|      143210|
| 35| Drama|       71590|
| 45| Drama|       32141|
| 50| Drama|       29247|
| 56| Drama|       17269|
+---+------+------------+



#### Most viewed genres by gender

In [116]:
ex_df = merge_df.select("user_id", "gender", explode("genres").alias("genres"))
gender_genres = ex_df.groupBy("gender", "genres").agg(count("*").alias("CountOfGenre"))
gender_genres.show()

+------+-----------+------------+
|gender|     genres|CountOfGenre|
+------+-----------+------------+
|     F|     Comedy|       96271|
|     M|     Horror|       61751|
|     M|    Romance|       97226|
|     M|   Thriller|      149372|
|     F|    Romance|       50297|
|     F|    Fantasy|        8718|
|     M|Documentary|        5970|
|     M|    Fantasy|       27583|
|     F| Children's|       21317|
|     F|    Musical|       13505|
|     F|   Thriller|       40308|
|     M|    Western|       17206|
|     M|     Sci-Fi|      129894|
|     F|    Western|        3477|
|     F|  Film-Noir|        4202|
|     F|     Sci-Fi|       27400|
|     F|  Adventure|       27332|
|     F|        War|       14093|
|     F|    Mystery|        9976|
|     F|     Action|       45650|
+------+-----------+------------+
only showing top 20 rows



In [117]:
x = Window.partitionBy("gender").orderBy(col("CountOfGenre").desc())
top_genre_by_gender = ts.withColumn("row_num", row_number().over(x))
top_genre_by_gender =top_genre_by_gender.filter(col("row_num")==1).drop("row_num")
top_genre_by_gender.show()

+------+------+------------+
|gender|genres|CountOfGenre|
+------+------+------------+
|     F| Drama|       98153|
|     M|Comedy|      260309|
+------+------+------------+



### Movie Recommendation

In [118]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [119]:
user_indexer = StringIndexer(inputCol="user_id", outputCol="user_index")
movie_indexer = StringIndexer(inputCol="movie_id", outputCol="movie_index")
_rating_df = user_indexer.fit(rating_df).transform(rating_df)
_rating_df = movie_indexer.fit(_rating_df).transform(_rating_df)
(training, test) = _rating_df.randomSplit([0.7, 0.3])

als = ALS(maxIter=5, regParam=0.01, userCol="user_index", itemCol="movie_index", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training)

In [120]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")

Root Mean Squared Error (RMSE) = 0.9120549139285303


In [121]:
userRecs = model.recommendForAllUsers(5)
userId_to_recommend = 3

user_recommendations = userRecs.filter(userRecs.user_index == userId_to_recommend).select("recommendations.movie_index","user_index")
user_recommendations.show(truncate=False)

+------------------------------+----------+
|movie_index                   |user_index|
+------------------------------+----------+
|[3160, 3220, 2810, 3392, 3570]|3         |
+------------------------------+----------+



In [122]:
user_recommendations_exploded = user_recommendations.select(
    col("user_index"),
    explode("movie_index").alias("movie_index")
)
    
user_recommendations_with_movie_ids = user_recommendations_exploded.join(
    movie_df,
    user_recommendations_exploded.movie_index == movie_df.movie_id,
).select(
    "user_index",
    "title"
)
user_recommendations_with_movie_ids.show(truncate=False)

+----------+--------------------------+
|user_index|title                     |
+----------+--------------------------+
|3         |Magnolia (1999)           |
|3         |Night Tide (1961)         |
|3         |Perfect Blue (1997)       |
|3         |She-Devil (1989)          |
|3         |Last September, The (1999)|
+----------+--------------------------+

