In [None]:
pip install pyspark

In [None]:
pip install findspark

In [None]:
# Import all dependencies

import findspark
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, max, min, col, row_number
from pyspark.sql.window import Window

In [None]:
findspark.init(os.environ["SPARK_HOME"])

In [None]:
# Set up Spark Session
spark = SparkSession.builder.getOrCreate()

In [None]:
# Read movies.dat into a Spark Dataframe

file_path = "/Users/wadirmalik/Downloads/ml-1m/movies.dat"

file_options = {
    "header": False,
    "inferSchema": True,  
    "delimiter": "::" 
}

movies_df = spark.read.options(**file_options).csv(file_path)

movies_df = movies_df.withColumnRenamed("_c0", "MovieID") \
            .withColumnRenamed("_c1", "Title") \
            .withColumnRenamed("_c2", "Genres")

movies_df.show()

In [None]:
# Read ratings.dat into a Spark Dataframe

file_path = "/Users/wadirmalik/Downloads/ml-1m/ratings.dat"

file_options = {
    "header": False,
    "inferSchema": True,  
    "delimiter": "::" 
}

ratings_df = spark.read.options(**file_options).csv(file_path)

ratings_df = ratings_df.withColumnRenamed("_c0", "UserID") \
            .withColumnRenamed("_c1", "MovieID") \
            .withColumnRenamed("_c2", "Rating") \
            .withColumnRenamed("_c3", "Timestamp")

ratings_df.show()

In [None]:
# create a new dataframe containig max, min and avg rating of the movies

movie_ratings_df = ratings_df.groupBy("MovieID").agg(max("Rating").alias("Max_Rating"), \
                                                     min("Rating").alias("Min_Rating"), \
                                                     avg("Rating").alias("Avg_Rating"))
movie_ratings_df.show()

In [None]:
# join the max, min and avg dataframe to the movies dataframe

movies_with_ratings_df = movies_df.join(movie_ratings_df, "movieId", "inner")

movies_with_ratings_df.show()

In [None]:
# create a new dataframe that shows the users top 3 rated movies

window_spec = Window.partitionBy("UserId").orderBy(col("Rating"))
top_movies_df = ratings_df.withColumn("Rank", row_number().over(window_spec)).filter(col("Rank") <= 3)
top_movies_df.show()

In [86]:
# Write out original and new datafarmes in an efficient format of your choice (I chose CSV)

movies_df.write.format("csv").mode("overwrite").save("movies.csv")
ratings_df.write.format("csv").mode("overwrite").save("ratings.csv")
movies_with_ratings_df.write.format("csv").mode("overwrite").save("movies_with_ratings.csv")
top_movies_df.write.format("csv").mode("overwrite").save("top_movies.csv")

                                                                                