## Data Preprocessing

### Project Description:

Using pyspark, due to the large size of the data sets, the following code seeks to join 3 movielens data sets consisting of movies, reviews, ratings, and trailer data.

January 09, 1995 and March 31, 2015.<br>
The data can be found at https://grouplens.org/datasets/movielens/ .

In [1]:
# Import packages
import findspark
findspark.init("/Users/joseppbenvenuto/spark-3.0.1-bin-hadoop2.7")
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark").getOrCreate()
import pandas as pd
import numpy as np

In [2]:
# View schema of data set
reviews = spark.read.csv('Data/ratings.csv', header = True)
reviews_schema = reviews.printSchema()
reviews_schema

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



The above schema describes the data types of each feature found in the data set.

In [3]:
# View data set
reviews.createOrReplaceTempView('reviews')
reviews_sql = spark.sql("""SELECT *
                           FROM reviews""")

reviews_sql.show(10)
print('\n' + 'record count: ' + str(reviews_sql.count()))

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
|     1|    112|   3.5|1094785740|
|     1|    151|   4.0|1094785734|
|     1|    223|   4.0|1112485573|
|     1|    253|   4.0|1112484940|
|     1|    260|   4.0|1112484826|
+------+-------+------+----------+
only showing top 10 rows


record count: 20000263


The above table represents all the movies under review, reviews, star ratings, and users.

In [4]:
# View schema of data set 
movies = spark.read.csv('Data/movies.csv', header = True)
movies_schema = movies.printSchema()
movies_schema

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



The above schema describes the data types of each feature found in the data set.

In [5]:
# View data set
movies.createOrReplaceTempView('movies')
movies_sql = spark.sql("""SELECT *
                          FROM movies""")

movies_sql.show(10)
print('\n' + 'record count: ' + str(movies_sql.count()))

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
+-------+--------------------+--------------------+
only showing top 10 rows


record count: 27278


The above table represents all the businesses and business names under review.

In [6]:
# Use SQL to join the two data sets on business id
full_movies_sql = spark.sql("""SELECT mov.movieID, mov.title, rev.userID, rev.rating, rev.timestamp
                               FROM reviews AS rev JOIN movies AS mov
                               ON rev.movieId = mov.movieId""")

full_movies_sql.show(10)
print('\n' + 'record count: ' + str(full_movies_sql.count()))

+-------+--------------------+------+------+----------+
|movieID|               title|userID|rating| timestamp|
+-------+--------------------+------+------+----------+
|      2|      Jumanji (1995)|     1|   3.5|1112486027|
|     29|City of Lost Chil...|     1|   3.5|1112484676|
|     32|Twelve Monkeys (a...|     1|   3.5|1112484819|
|     47|Seven (a.k.a. Se7...|     1|   3.5|1112484727|
|     50|Usual Suspects, T...|     1|   3.5|1112484580|
|    112|Rumble in the Bro...|     1|   3.5|1094785740|
|    151|      Rob Roy (1995)|     1|   4.0|1094785734|
|    223|       Clerks (1994)|     1|   4.0|1112485573|
|    253|Interview with th...|     1|   4.0|1112484940|
|    260|Star Wars: Episod...|     1|   4.0|1112484826|
+-------+--------------------+------+------+----------+
only showing top 10 rows


record count: 20000263


The above data set is what will be used in the analysis.

The data set consists of the business names, users, reviews, and star ratings of the business under review.

In [7]:
full_movies_sql.createOrReplaceTempView('full_movies_sql')
movieId_count = spark.sql("""SELECT movieID, COUNT(movieID) AS movieIDCount
                             FROM full_movies_sql
                             GROUP BY movieID
                             HAVING movieIDCount >= 10000
                             ORDER BY movieIDCount ASC""")

movieId_count.show(10)
print('\n' + 'record count: ' + str(movieId_count.count()))

+-------+------------+
|movieID|movieIDCount|
+-------+------------+
|   1339|       10022|
|   3255|       10026|
|   1293|       10028|
|   4720|       10055|
|   2080|       10062|
|    252|       10074|
|   2105|       10076|
|   2023|       10141|
|   2294|       10163|
|   2161|       10169|
+-------+------------+
only showing top 10 rows


record count: 462


In [8]:
userID_count = spark.sql("""SELECT userID, COUNT(userID) AS userIDCount
                            FROM full_movies_sql
                            GROUP BY userID
                            HAVING userIDCount >= 1000
                            ORDER BY userIDCount ASC""")

userID_count.show(10)
print('\n' + 'record count: ' + str(userID_count.count()))

+------+-----------+
|userID|userIDCount|
+------+-----------+
| 26640|       1000|
| 28239|       1000|
| 93555|       1000|
|101963|       1000|
| 97521|       1000|
|136301|       1000|
| 96523|       1000|
| 81212|       1000|
| 13244|       1000|
|102262|       1000|
+------+-----------+
only showing top 10 rows


record count: 1894


In [9]:
movieId_count.createOrReplaceTempView('movieId_count')
userID_count.createOrReplaceTempView('userID_count')

# Use SQL to join the two data sets on business id
full_movies_sql = spark.sql("""SELECT ful.movieID, ful.title, ful.userID, ful.rating, ful.timestamp,
                               mc.movieIDCount, uc.userIDCount
                               FROM full_movies_sql AS ful JOIN movieID_count AS mc
                               ON ful.movieID = mc.movieID
                               JOIN userID_count AS uc
                               ON ful.userID = uc.userID""")

full_movies_sql.show(10)
print('\n' + 'record count: ' + str(full_movies_sql.count()))

+-------+--------------------+------+------+----------+------------+-----------+
|movieID|               title|userID|rating| timestamp|movieIDCount|userIDCount|
+-------+--------------------+------+------+----------+------------+-----------+
|   1090|      Platoon (1986)|111982|   4.0|1390131211|       15808|       1225|
|   2294|         Antz (1998)|111982|   1.5|1215994731|       10163|       1225|
|    296| Pulp Fiction (1994)|111982|   5.0|1202678932|       67310|       1225|
|   1394|Raising Arizona (...|111982|   3.0|1338657149|       15483|       1225|
|  58559|Dark Knight, The ...|111982|   4.5|1217474947|       20438|       1225|
|   1265|Groundhog Day (1993)|111982|   4.0|1221511687|       32026|       1225|
|   2700|South Park: Bigge...|111982|   5.0|1202679295|       17371|       1225|
|   3949|Requiem for a Dre...|111982|   3.0|1241386796|       14515|       1225|
|    919|Wizard of Oz, The...|111982|   3.0|1215994478|       23445|       1225|
|   1500|Grosse Pointe Bla..

In [10]:
# View schema of data set
trailers = spark.read.csv('Data/ml-youtube.csv', header = True)
trailers_schema = trailers.printSchema()
trailers_schema

root
 |-- youtubeId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)



In [11]:
# View data set
trailers.createOrReplaceTempView('trailers')
trailers_sql = spark.sql("""SELECT *
                            FROM trailers""")

trailers.show(10)
print('\n' + 'record count: ' + str(trailers.count()))

+-----------+-------+--------------------+
|  youtubeId|movieId|               title|
+-----------+-------+--------------------+
|K26_sDKnvMU|      1|    Toy Story (1995)|
|3LPANjHlPxo|      2|      Jumanji (1995)|
|rEnOoWs3FuA|      3|Grumpier Old Men ...|
|j9xml1CxgXI|      4|Waiting to Exhale...|
|ltwvKLnj1B4|      5|Father of the Bri...|
|2GfZl4kuVNI|      6|         Heat (1995)|
|twTksx_lWB4|      7|      Sabrina (1995)|
|-C-xXZyX2zU|      8| Tom and Huck (1995)|
|SCOxEKkuWG4|      9| Sudden Death (1995)|
|lcOqUE0u1LM|     10|    GoldenEye (1995)|
+-----------+-------+--------------------+
only showing top 10 rows


record count: 25623


In [12]:
full_movies_sql.createOrReplaceTempView('full_movies_sql')

full_movies_sql = spark.sql("""SELECT ful.*, t.youtubeId
                               FROM full_movies_sql AS ful JOIN trailers AS t
                               ON ful.movieId = t.movieId""")

full_movies_sql.show(10)
print('\n' + 'record count: ' + str(full_movies_sql.count()))

+-------+--------------------+------+------+----------+------------+-----------+-----------+
|movieID|               title|userID|rating| timestamp|movieIDCount|userIDCount|  youtubeId|
+-------+--------------------+------+------+----------+------------+-----------+-----------+
|   1090|      Platoon (1986)|111982|   4.0|1390131211|       15808|       1225|pPi8EQzJ2Bg|
|   2294|         Antz (1998)|111982|   1.5|1215994731|       10163|       1225|6kqGO1c70ak|
|    296| Pulp Fiction (1994)|111982|   5.0|1202678932|       67310|       1225|s7EdQ4FqbhY|
|   1394|Raising Arizona (...|111982|   3.0|1338657149|       15483|       1225|2AIfVoGUs6c|
|  58559|Dark Knight, The ...|111982|   4.5|1217474947|       20438|       1225|GVx5K8WfFJY|
|   1265|Groundhog Day (1993)|111982|   4.0|1221511687|       32026|       1225|tSVeDx9fk60|
|   2700|South Park: Bigge...|111982|   5.0|1202679295|       17371|       1225|PbMl6DjhJ1I|
|   3949|Requiem for a Dre...|111982|   3.0|1241386796|       14515|  

In [14]:
# Export data set to project directory
full_movies = full_movies_sql.toPandas()
full_movies.columns = ['movieId','movie','userId','rating','timestamp','movieIDCount','userIDCount','youtubeId']
full_movies.to_csv('Data/Movie_Ratings.csv')