# Initializing pySpark
We use the package [findspark](https://pypi.org/project/findspark/) in order to init the `SparkContext`. Since we're working with *[DataFrames](https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame)* we need to init a `SparkSession` too. The pySpark version is **3.2.0**.<br>
The MovieLens dataset can be found at the following [link](https://files.grouplens.org/datasets/movielens/ml-1m.zip).

In [None]:
import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("MovieLens Dataset").setMaster("local")
sc = SparkContext(conf=conf)
ss = SparkSession(sc)

# Loading the Dataset
## Importing .dat files
The first step is to load the dataset, we can do this using the Pandas `read_table()` [function](https://pandas.pydata.org/docs/reference/api/pandas.read_table.html).<br>
Doing so we obtain 2 DataFrames:
* **Movies**: 3883 x 3
* **Ratings**: 1000209 x 4

In [None]:
import pandas as pd

movies_pd_dataset = pd.read_table("data/movies.dat", 
                                  delimiter="::", 
                                  names=["MovieID", "Title", "Genres"], 
                                  engine="python")

ratings_pd_dataset = pd.read_table("data/ratings.dat", 
                                   delimiter="::", 
                                   names=["UserID", "MovieID", "Rating", "Timestamp"], 
                                   engine="python")

## Creating a Schema
In order to simplify the access to our DataFrames we provide a custom [schema](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html) to the `createDataFrame()` [function](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.createDataFrame.html#pyspark.sql.SparkSession.createDataFrame).<br>
As a result of this operation is possible to specify which data [type](https://spark.apache.org/docs/latest/api/python/search.html?q=PySpark.sql.types#) we want to use for each column, reducing the space used by our DataFrames.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, ShortType, IntegerType

movies_schema = StructType([
    StructField("MovieID", ShortType(), False),
    StructField("Title", StringType(), False),
    StructField("Genres", StringType(), False)
])

ratings_schema = StructType([
    StructField("UserID", ShortType(), False),
    StructField("MovieID", ShortType(), False),
    StructField("Rating", ShortType(), False),
    StructField("Timestamp", IntegerType(), False)
])

movies_dataset = ss.createDataFrame(movies_pd_dataset, schema=movies_schema)
ratings_dataset = ss.createDataFrame(ratings_pd_dataset, schema=ratings_schema)

# Exploratory Analysis

## Query 1
Find the *number of ratings* and *distribution* for each movie

In [None]:
import seaborn as sns

# Number of ratings
ratings_number_for_movie = ratings_dataset.groupBy("MovieID")\
                                          .count().orderBy("MovieID")

# Distribution
sns.displot(data=ratings_number_for_movie.toPandas()["count"])

## Query 2
Find the *number of ratings* and *distribution* for each user

In [None]:
# Number of ratings
ratings_number_for_user = ratings_dataset.groupBy("UserID")\
                                         .count().orderBy("UserID")

# Distribution
sns.displot(data=ratings_number_for_user.toPandas()["count"])

## Query 3
Find the *average score* recieved by each movie

In [None]:
from pyspark.sql.functions import format_number

ratings_dataset.groupBy("MovieID").agg({"Rating": "mean"})\
               .orderBy("MovieID")\
               .select(
                   "MovieID", 
                   format_number("avg(Rating)", 4).alias("Average")
               )\
               .show()

## Query 4
Find the *average score* given by each user 

In [None]:
ratings_dataset.groupBy("UserID").agg({"Rating": "mean"})\
               .orderBy("UserID")\
               .select(
                   "UserID", 
                   format_number("avg(Rating)", 4).alias("Average")
               )\
               .show()

## Query 5
Top **K** movies with at least **R** ratings

Corresponding SQL query:
```sql
SELECT M.MovieID, M.Title, COUNT(R.Ratings) AS RatingsNumber
FROM Movies M JOIN Ratings R ON M.MovieID=R.MovieID
GROUP BY M.MovieID
HAVING RatingsNumber >= R
LIMIT K;
```

In [None]:
from pyspark.sql.functions import col

K = 10
R = 20

movies_dataset.join(ratings_dataset, "MovieID").groupBy("MovieID").count()\
              .orderBy("MovieID").withColumnRenamed("count", "Number of Ratings")\
              .filter(col("Number of Ratings") >= R).limit(K).show()