# GRADTDA5622 - Big Data Computing Foundations 2
## Homework 5: PySpark Practice
- Semester: Spring 2023
- Instructor: Tom Bihari
- Section: N/A
- Student Name: Able Baker **(fill in)**
- Student Email: baker.12345@osu.edu **(fill in)**
- Student ID: 123456789 **(fill in)**
***

***
# Section: Overview
***

**The Objectives of This Assignment are:**
1. To practice using common Spark operations.
2. To practice using Spark to solve problems and answer questions.

**Overview:**
- I have provided a step by step approach you can follow.  Fill in the ... in each cell.
- I have filled in some cells for you, as examples.
- Refer to the **PySpark_DeepDive1** notebook covered in the **Deep Dive: Spark** module for examples of code that can be used in this assignment.

**Some Good Resources:**
- https://spark.apache.org/docs/latest/api/python/index.html
- https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/frame.html
- https://sparkbyexamples.com/pyspark-tutorial/

**Instructions:**
- **Follow the instructions** in each section.
- **Fill in** the **Conclusions** section.

***
# Section: Setup
- Add any needed imports, helper functions, etc., here.
***

In [1]:
try:
    import pyspark
except:
    print('Installing pyspark')
    !pip install pyspark
    import pyspark

# try:
#     import pyspark_config
# except:
#     print('Installing pyspark_config')
#     !pip install pyspark_config
#     import pyspark_config

In [2]:
# NOTE: If any of these libraries are not already loaded on OSC Jupyter+Spark (e.g., seaborn),
#  go the the Launcher (New Launcher in the JupyterLab Files menu), open a Terminal, and type
#  "pip install seaborn" (or the needed library).
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sb
from time import time
from pyspark import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as SqlF

pd.set_option('display.max_columns', 50) #include to avoid ... in middle of display
pyspark.__version__

'3.5.4'

In [3]:
spark = SparkSession.builder.master("local[*]") \
                    .appName('MyApp') \
                    .getOrCreate()
sc = spark.sparkContext  # Get the context, so we have a short name for it if we need it.
#print(sc.appName)

In [4]:
# Identify the location of the shared data folder
# shared_data_directory = "../shared_Sp23/"
shared_data_directory = "/content/"

***
# Section: 1 - Problem Overview / Business Understanding
***

The three provided datasets contain records of users' recommendations for movies (items).  The datasets are **data.csv**, **item.csv**, and **user.csv**.  See: https://grouplens.org/datasets/movielens/100k/ for descriptions of the datasets.

The **goal** of the exercise is to **estimate a rating** for the movie **"Mission: Impossible (1996)"** by **User15**.

- We will use a trivial approach (barely sensible, but easy):
  - Find all "other users" who have rated the Mission Impossible movie already.
  - If the average age of those "other reviewers" is within +- 20 years of User15's age:
    - Average the ratings those "other users" gave to Mission Impossible to estimate a rating for User15 for Mission Impossible.
    - Otherwise, just use the average rating User15 gave to other movies to estimate a rating for User15 for Mission Impossible.  

***
# Section: 2 - Data Understanding
***

***
## Section: 2.1 - Describe the meaning and type of data for each attribute.
- This can be pulled from the original metadata documentation, if available, fom other sources, or postulated based on values within the data.  Be explicit regarding the source, assumptions, etc., in particular if you are making educated guesses.
***

In [None]:
# Insert code and/or commentary here...  EXAMPLE CODE BELOW...

### Read the three datasets.

In [5]:
data_df = spark.read.csv(shared_data_directory + 'data.csv', header=True, inferSchema=True).orderBy('user_id','item_id')
print(data_df.count())
data_df.printSchema()
data_df.show(2,truncate=False)

100000
root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)

+-------+-------+------+---------+
|user_id|item_id|rating|timestamp|
+-------+-------+------+---------+
|1      |1      |5     |874965758|
|1      |2      |3     |876893171|
+-------+-------+------+---------+
only showing top 2 rows



In [None]:
# Insert code and/or commentary here...
item_df = ...

In [None]:
# Insert code and/or commentary here...
user_df = ...

***
## Section: 2.2 - Provide basic statistics for the attributes.
- For example: counts, percentiles, mean, median, standard deviation. The statistics should be relevant for the type of attribute.
***

In [6]:
data_df.describe().toPandas()

Unnamed: 0,summary,user_id,item_id,rating,timestamp
0,count,100000.0,100000.0,100000.0,100000.0
1,mean,462.48475,425.53013,3.52986,883528851.48862
2,stddev,266.6144201275064,330.79835632558417,1.1256735991443163,5343856.189502888
3,min,1.0,1.0,1.0,874724710.0
4,max,943.0,1682.0,5.0,893286638.0


In [None]:
# Insert code and/or commentary here...
item_df...

In [None]:
# Insert code and/or commentary here...
user_df...

***
# Section: 3 - Data Pre-Processing
***

In [None]:
# Trim the data_df and item_df datasets down to only the necessary information.
# Consider using "select" to keep only the 'user_id','item_id','rating' columns for data_df,
#   and the 'movie_id' and 'movie_title' columns for item_df.

data_df = ...
item_df = ...
#user_df = no changes needed

In [None]:
# Calculate the user statistics (count, min, average, max ratings).
# Consider using "groupBy", "agg", "orderBy", etc.
# Create a Dataframe containing: |user_id|count_rating|min_rating|avg_rating|max_rating|

user_rating_df = ...

print(user_rating_df.count())
user_rating_df.show(5,truncate=False)

In [None]:
# Calculate the movie statistics (count, min, average, max ratings).
# Consider using "groupBy", "agg", "orderBy", etc.
# Create a Dataframe containing: |item_id|count_rating|min_rating|avg_rating|max_rating|

item_rating_df = ...

print(item_rating_df.count())
item_rating_df.show(5,truncate=False)

***
# Section: 4 - Recommendation System
- For each of the steps below, I have provided an outline of the computation to perform and the expected output structure.
- Please fill in the computations.
- You may choose to deviate from this structure, but if you do so, decribe the steps you chose.
***

In [7]:
# Specify the user and movie of interest.

user_x_id = 15
movie_y_title = "Mission: Impossible (1996)"

In [None]:
# Get the demographics of this user, and save the age and gender.
# Consider using the user_df from above, and the "filter" and "collect" operations.

user_x_demographics = ...
user_x_demographics.show()

user_x_age = user_x_demographics.collect()[0]['age']
print("user_x_age:",user_x_age)

user_x_gender = ...
print("user_x_gender:",user_x_gender)

In [None]:
# Get user X's average rating for all movies they actually have rated.
# Consider using the user_rating_df from above, and the "filter" and "collect" operations.

user_x_avg_rating = ...

print("user_x_avg_rating:",user_x_avg_rating)

In [None]:
# Get the movie id for this movie title.
# Consider using the "filter" and "collect" operations.

movie_y_id = ...

print("movie_y_id:",movie_y_id)

In [None]:
# Get all of the other users who have rated movie Y.
# Consider using "filter", "select", "orderBy", "withColumnRenamed".
# Create a Dataframe containing: |other_user_id|movie_y_rating|

other_reviewers_df = ...

print("other_reviewers_df.count:",other_reviewers_df.count())
other_reviewers_df.show(5,truncate=False)

In [None]:
# For each of the other reviewers of movie_y, get the demographics.
# Consider using the other_reviewers_df and user_df from above, and the "join" operation.
# Create a Dataframe containing: |other_user_id|movie_y_rating|age|gender|occupation|zip_code|

other_reviewer_demo_df = ...

print("other_reviewer_demo_df.count:",other_reviewer_demo_df.count())
other_reviewer_demo_df.show(5,truncate=False)

In [None]:
# For the other reviewers, get the average movie_y_rating and average age.
# Consider using the other_reviewer_demo_df and the "agg", "SqlF.avg" and "collect" operations.
# Create a Dataframe containing: |item_id|other_user_id|other_user_rating|user_x_rating|

avg_other_reviewer_movie_y_rating = ...
print("avg_other_reviewer_movie_y_rating:",avg_other_reviewer_movie_y_rating)

avg_other_reviewer_age = ...
print("avg_other_reviewer_age:",avg_other_reviewer_age)

In [None]:
# This is a trivial way to make a recommendation.  Normally we would do something much
# more sophisticated.  But we will keep it simple here.

# If the average age of the other reviewers is within +- 20 years of user_x age,
#  then assume user_x's rating of movie_y will be the average rating given by the other reviewers.
#  Otherwise, assume user_x's rating for movie_y will be the average rating user_x has given
#  to other movies they have rated.
# Print this rating, with the explanation.

age_diff = ...
print("age_diff:",age_diff)

if age_diff <= 20:
    print('''The average age of reviewers of movie_y is within 20 years
    of the age of user_x.  So we will use their average rating for movie_y:''',avg_other_reviewer_movie_y_rating)
else:
    print('''The average age of reviewers of movie_y is NOT within 20 years
    of the age of user_x.  So we will use the average rating of user_x for other movies :''',user_x_avg_rating)

***
# Section: 6 - Conclusions
- What are your overall conclusions about the assignment?
- What did you learn?
***

In [None]:
# Insert commentary here.