In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession, Row
from datetime import datetime

In [2]:
path_prefix = '/home/joeyresuento/Projects/data_training/data_sets/ml-25m'
# movies = pd.read_csv(f'{path_prefix}/movies.csv')
# ratings = pd.read_csv(f'{path_prefix}/ratings.csv')

In [18]:
spark = SparkSession \
    .builder \
    # .master("spark://aljresuento1.arcanys.com:7077") \ # doesn't seem to be working
    .appName("MyApp") \
    .getOrCreate()

spark.sparkContext.setLogLevel('ERROR')

In [19]:
movies_df = spark.read.option("header", True).csv(f'{path_prefix}/movies.csv')
ratings_df = spark.read.option("header", True).csv(f'{path_prefix}/ratings.csv')

In [20]:
ratings_df.createOrReplaceTempView('ratings')
movies_df.createOrReplaceTempView('movies')
# spark.sql("SELECT COUNT(1) FROM movies").show()
# spark.sql("SELECT COUNT(1) FROM ratings").show()

In [21]:
# top100_highest_rated_movies = spark.sql("SELECT \
#    movieId, rating, COUNT(1) as count \
#    FROM ratings \
#    GROUP BY movieId, rating \
#    ORDER BY rating, count DESC LIMIT 100")

# top100_highest_rated_movie_ids = top100_highest_rated_movies \
#    .rdd.map(lambda r: int(r.movieId)) \
#    .collect()

In [22]:
movies_df.printSchema()
ratings_df.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [23]:
movie_ratings_df = spark.sql("SELECT m.movieId, m.title, m.genres, r.userId, r.rating, r.timestamp \
FROM movies m INNER JOIN ratings r ON m.movieId = r.movieId")

In [24]:
# movie_ratings_df.take(10)[0]

In [25]:
def enrich_row(row):
    year = datetime.fromtimestamp(int(row.timestamp)).year
    return Row(
        movieId=int(row.movieId),
        title=row.title,
        genres=row.genres,
        userId=int(row.userId),
        rating=float(row.rating),
        timestamp=int(row.timestamp),
        year=year
    )

enriched_movie_ratings = movie_ratings_df.rdd.map(enrich_row)

In [26]:
enriched_movie_ratings_df = spark.createDataFrame(enriched_movie_ratings)
enriched_movie_ratings_df.createOrReplaceTempView('movies_ratings')
enriched_movie_ratings_df.printSchema()

root
 |-- movieId: long (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- userId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- year: long (nullable = true)



In [1]:
# ratings_count_by_year = enriched_movie_ratings_df.rdd \
#    .map(lambda r: (r.year, 1)) \
#    .reduceByKey(lambda a, b: a + b)

In [2]:
# ratings_count_by_year_list = ratings_count_by_year.collect()

In [3]:
# year_list = []
# year_rating_list = []
# for i in sorted(ratings_count_by_year_list, key=lambda x: x[0]):
#    year_list.append(i[0])
#    year_rating_list.append(i[1])

# year_rating_pd = pd.DataFrame({
#    'year': year_list,
#    'rating': year_rating_list
# })

In [4]:
# plt.figure(figsize=(20, 6))
# year_rating_pd[year_rating_pd['year'] > 2014]
# sns.barplot(year_rating_pd, x='year', y='rating')

In [5]:
movies_ratings_count = spark.sql("SELECT \
                                        movieId, title, avg(rating) as avg_rating, count(rating) as count \
                                        FROM movies_ratings \
                                        GROUP BY movieId, title \
                                        ORDER BY count DESC, avg_rating DESC")

NameError: name 'spark' is not defined

In [16]:
top100_highest_rated_movies = movies_ratings_count.take(100)

                                                                                

In [17]:
top100_highest_rated_movies

[Row(movieId=356, title='Forrest Gump (1994)', avg_rating=4.048011436845787, count=81491),
 Row(movieId=318, title='Shawshank Redemption, The (1994)', avg_rating=4.413576004516335, count=81482),
 Row(movieId=296, title='Pulp Fiction (1994)', avg_rating=4.188912039361382, count=79672),
 Row(movieId=593, title='Silence of the Lambs, The (1991)', avg_rating=4.151341616415071, count=74127),
 Row(movieId=2571, title='Matrix, The (1999)', avg_rating=4.154099127610975, count=72674),
 Row(movieId=260, title='Star Wars: Episode IV - A New Hope (1977)', avg_rating=4.120188599618726, count=68717),
 Row(movieId=480, title='Jurassic Park (1993)', avg_rating=3.6791749812920926, count=64144),
 Row(movieId=527, title="Schindler's List (1993)", avg_rating=4.247579083279535, count=60411),
 Row(movieId=110, title='Braveheart (1995)', avg_rating=4.002272573668559, count=59184),
 Row(movieId=2959, title='Fight Club (1999)', avg_rating=4.228310618821568, count=58773),
 Row(movieId=589, title='Terminator 2: 