In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math

In [2]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

## Part1: Data ETL and Data Exploration

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [5]:
movies = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [6]:
movies.show(5)

In [7]:
ratings.show(5)

In [8]:
links.show(5)

In [9]:
tags.show(5)

In [10]:
tmp1 = ratings.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [11]:
tmp1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

## Part 1: Spark SQL and OLAP

### Q1: The number of Users

In [14]:
tmp1 = ratings.groupBy("userID").count().toPandas().count()
print('The number of users is {}'.format(tmp1[1]))

### Q2: The number of Movies

In [16]:
tmp2 = ratings.groupBy("movieId").count().toPandas().count()
print('The number of movies is {}'.format(tmp2[1]))

### Q3:  How many movies are rated by users? List movies not rated before

In [18]:
tmp3 = sum(ratings.groupBy("movieId").count().toPandas()['count'] >= 1)
print('The number of rated movies is {}'.format(tmp3))
tmp3_1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 0)
print('The number of rated movies is {}'.format(tmp3_1))

### Q4: List Movie Genres

In [20]:
tmp4 = movies.groupBy("genres").count().toPandas()
print(format(tmp4))

### Q5: Movie for Each Category

In [22]:
#what category here? title?
tmp5 = movies.groupBy("title").count().toPandas()
print(format(tmp5))

## Part2: Spark ALS based approach for training model
We will use an RDD-based API from [pyspark.mllib](https://spark.apache.org/docs/2.1.1/mllib-collaborative-filtering.html) to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [24]:
from pyspark.mllib.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row

In [25]:
movie_rating = sc.textFile("/FileStore/tables/ratings.csv")

In [26]:
header = movie_rating.take(1)[0]
rating_data = movie_rating.filter(lambda line: line!=header).map(lambda line: line.split(",")).map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]), rating=float(p[2]))).cache() 
ratingdata = spark.createDataFrame(rating_data)

In [27]:
# check three rows
# rating_data.take(3)
ratingdata.show(20,False)

Now we split the data into training/validation/testing sets using a 6/2/2 ratio.

In [29]:
# train, validation, test = rating_data.randomSplit([6,2,2],seed = 7856)
(train, validation, test) = ratingdata.randomSplit([0.6,0.2,0.2])
train.show()

In [30]:
train.cache()

In [31]:
validation.cache()

In [32]:
test.cache()