In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from sparkmeasure import StageMetrics
from pyspark.sql.functions import *
import pyspark.sql.functions as f
import numpy as np
import pandas as pd
import random


spark = SparkSession \
    .builder \
    .master('spark:http://192.168.1.254:7077')\
    .appName('Database 2 project') \
    .config('spark.jars', '/Users/kouts/OneDrive/Documents/spark-measure_2.12-0.17.jar') \
    .config('spark.executor.cores', '2')\
    .config('spark.executor.memory', '1G')\
    .config('spark.drivers.cores', '2')\
    .config('spark.drivers.memory', '1G')\
    .getOrCreate()


#create spark metrics object
stagemetrics = StageMetrics(spark)

In [None]:
movie_df = spark.read.csv('movie.csv',
                    sep=',',
                    header=True,
                    inferSchema=True,
                    )
movie_df.createOrReplaceTempView('movie_df_table')

In [None]:
tag_df = spark.read.csv('tag.csv',
                    sep=',',
                    header=True,
                    inferSchema=True,
                    )
tag_df.createOrReplaceTempView('tag_df_table')

In [None]:
genome_df = spark.read.csv('genome_tags.csv',
                    sep=',',
                    header=True,
                    inferSchema=True,
                    )
genome_df.createOrReplaceTempView('genome_df_table')

In [None]:
rating_df = spark.read.csv('rating.csv',
                    sep=',',
                    header=True,
                    inferSchema=True,
                    )
rating_df.createOrReplaceTempView('rating_df_table')

In [None]:
# Question 1

In [None]:
# start measuring performance
stagemetrics.begin()


# join the two required tables and get a new table that contains the movies that cointain the keyword 'Jumanji'
joined_dataframe = rating_df.join(movie_df, rating_df['movieId'] == movie_df['movieId'], 'inner')\
                            .filter(movie_df.title.contains('Jumanji'))

# joined_dataframe.count()

joined_dataframe.select(count(joined_dataframe.title).alias('Jumanji reviews')).show()

# stop measuring performance
stagemetrics.end()

# print performance metrics
stagemetrics.print_report()

In [None]:
# Question 2

In [None]:
# start measuring performance
stagemetrics.begin()

# join the two required tables and get only the movies whom the tags contain the word boring
joined_dataframe = movie_df.join(tag_df, movie_df['movieId'] == tag_df['movieId'], 'inner')\
                           .withColumn('tag', lower(col('tag')))\
                           .filter(tag_df.tag.contains('boring')) 


# drop title duplicates                                                           
joined_dataframe = joined_dataframe.dropDuplicates(['title']) 

# sort in alphabetical order by title
joined_dataframe = joined_dataframe.orderBy('title')

# show only the first 5
joined_dataframe.select('title').show(5, truncate=False)

# stop measuring performance
stagemetrics.end()

# print performance metrics
stagemetrics.print_report()

In [None]:
# Question 3

In [None]:
# start measuring performance
stagemetrics.begin()

# the keyword 'bollywood' can be everywhere in the tag description
spark.sql("SELECT DISTINCT rating_df_table.userId \
           FROM rating_df_table \
           INNER JOIN tag_df_table ON tag_df_table.movieId = rating_df_table.movieId\
           WHERE LOWER(tag_df_table.tag) LIKE '%bollywood%'\
           AND rating_df_table.rating > 3\
           AND tag_df_table.userId = rating_df_table.userId\
           ORDER BY rating_df_table.userId\
           LIMIT 5").show()

# stop measuring performance
stagemetrics.end()

# print performance metrics
stagemetrics.print_report()

In [None]:
# Question 4

In [None]:
# list containing the years
years_list = list(range(1995, 2016))

# start measuring performance
stagemetrics.begin()

for year in years_list:
    # remove this if-statement if you want to test all years
    if year == 2005:
        spark.sql("SELECT movie_df_table.movieId\
               FROM movie_df_table\
               INNER JOIN rating_df_table ON rating_df_table.movieId = movie_df_table.movieId\
               WHERE EXTRACT(year FROM rating_df_table.timestamp) = {}\
               GROUP BY movie_df_table.movieId\
               ORDER BY AVG(rating_df_table.rating) desc\
               LIMIT 10".format(year)).show()

        
# stop measuring performance
stagemetrics.end()

# print performance metrics
stagemetrics.print_report()

In [None]:
# Question 5

In [None]:
# start measuring performance
stagemetrics.begin()

# sorting alphabetically by the movies titles, not the tags
joined_dataframe = movie_df.join(tag_df, movie_df['movieId'] == tag_df['movieId'], 'inner')\
                           .filter(tag_df.timestamp.contains('2015'))\
                           .sort(asc('title'))
# drop title duplicates
joined_dataframe = joined_dataframe.dropDuplicates(['title']) 

# joined_dataframe.show()

# convert dataframe to sql table
joined_dataframe.createOrReplaceTempView('joined_dataframe_table')

# concatenate the tags
spark.sql('SELECT title, concat_ws(",",collect_list(tag)) AS Concatenated_Tags\
           FROM joined_dataframe_table\
           GROUP BY title').show(5, truncate=False)

# stop measuring performance
stagemetrics.end()

# print performance metrics
stagemetrics.print_report()

In [None]:
# Question 6

In [None]:
# start measuring performance
stagemetrics.begin()

spark.sql("SELECT movie_df_table.title, COUNT(rating_df_table.rating) \
           FROM movie_df_table \
           INNER JOIN rating_df_table ON rating_df_table.movieId = movie_df_table.movieId\
           GROUP BY(movie_df_table.title)\
           ORDER BY COUNT(rating_df_table.rating) DESC\
           LIMIT 5").show(truncate=False)

# stop measuring performance
stagemetrics.end()

# print performance metrics
stagemetrics.print_report()

In [None]:
# Question 7

In [None]:
years_list = list(range(1995, 2016))

# start measuring performance
stagemetrics.begin()

for year in years_list:
    # remove this if-statement if you want to test all years
    if year == 1995:
        spark.sql("SELECT userId, COUNT(rating)\
                       FROM rating_df_table\
                       WHERE EXTRACT(year FROM rating_df_table.timestamp) = {}\
                       GROUP BY(userId)\
                       ORDER BY COUNT(rating) DESC\
                       LIMIT 10".format(year)).show()
        
# stop measuring performance
stagemetrics.end()

# print performance metrics
stagemetrics.print_report()

In [None]:
# Question 8

In [None]:
# start measuring performance
stagemetrics.begin()

new_dataframe = movie_df.select('title','movieId', f.split('genres', '[|]').alias('Genre'),\
                        f.posexplode(f.split('genres', '[|]')).alias('pos', 'val'))\
                        .drop('val').select('title','movieId',f.expr('Genre[pos]').alias('Genre')).show()
            
# dataframe containing the ratings number
ratings_count_df = rating_df.select(rating_df['movieId']).groupby(rating_df['movieId']).agg(count('movieId').alias('numOfRatings'))

# dataframe containing the ratings count df and joined to categorize by genre
joined_df = new_dataframe.join(ratings_count_df, (new_dataframe.movieId==ratings_count_df.movieId), 'inner').select(new_dataframe['genre'], new_dataframe['title'], ratings_count_df['views'])

# get the max view by the genre
final_df = joined_df.groupby(joined_df['Genre']).agg(max('numOfRatings'))

final_df = final_df.join(joined_df, (final_df.Views==joined_df.views), 'inner').select(joined_df['Genre'], joined_df['title'], final_df['numOfRatings']).orderBy(joined_df['Genre'].asc()).dropDuplicates()

# first row equals to no genres listed, therefore dont include it 
final_df.filter(final_df['Genre']!='(no genres listed)').show(5,False)

# stop measuring performance
stagemetrics.end()

# print performance metrics
stagemetrics.print_report()

In [None]:
# Question 9

In [None]:
# start measuring performance
stagemetrics.begin()

# we want same movieId+timestamp(group by) and at least two different userId(count of distinct userId>1)
spark.sql("SELECT SUM(*) AS COUNT FROM\
          (SELECT COUNT(DISTINCT userId) NumberOfViewers FROM rating_df_table\
           GROUP BY movieId,timestamp\
           HAVING COUNT(DISTINCT userId) > 1)").show(truncate=False)

# stop measuring performance
stagemetrics.end()

# print performance metrics
stagemetrics.print_report()

In [None]:
# Question 10

In [None]:
# start measuring performance
stagemetrics.begin()

#separate genres of the movies, unique genres
new_dataframe = movie_df.select('movieId',f.split('genres', '[|]').alias('genre'),
                f.posexplode(f.split('genres', '[|]')).alias('pos', 'val'))\
                .drop('val')\
                .select('movieId',f.expr('genre[pos]').alias('genre'))

# get the dataframe where the tags are funny
funny_tags_df = tag_df.filter(tag_df['tag']=='funny').select(tag_df['movieId'],tag_df['tag'])

# join funny_tags_df with ratings_df
joined_dataframe = rating_df.join(funny_tags_df, (rating_df.movieId==funny_tags_df.movieId), 'inner').filter(rating_df['rating'] > 3.5).select(rating_df['movieId']).groupby(rating_df['movieId']).count()

# calculating the number of movies by genre
final_df = new_dataframe.join(joined_dataframe, (new_dataframe.movieId==joined_dataframe.movieId), 'inner').select(new_dataframe['genre'],joined_dataframe['movieId']).orderBy(new_dataframe['genre'].asc())

# groubpy by genre
final_df = final_df.groupby(final_df['genre']).count()

# sort alphabetically by genre column
final_df = final_df.sort(final_df['genre'])

final_df.show(5)

# stop measuring performance
stagemetrics.end()

# print performance metrics
stagemetrics.print_report()