In [None]:
# 1. Install all the dependencies in Colab environment i.e. Apache Spark 2.4.4 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q http://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark


In [None]:
# 2. Setup Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


In [None]:
# 3. Start Spark Session
import findspark
findspark.init()
# findspark.find()
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession

from pyspark import SparkContext
sc = SparkContext(appName='QUERYING THE DATASET USING “SPARK”')
sc

In [None]:
!ls sample_data/movies.csv
!head sample_data/movies.csv

sample_data/movies.csv
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action


In [None]:
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

movies = sqlContext.read.csv('sample_data/movies.csv',header=True).rdd
links = sqlContext.read.csv('sample_data/links.csv',header=True).rdd
ratings = sqlContext.read.csv('sample_data/ratings.csv',header=True).rdd
tags = sqlContext.read.csv('sample_data/tags.csv',header=True).rdd

print(movies.count())

9742


**Question 1: How many “Drama” movies (movies with the "Drama" genre) are there?**

In [None]:
def hasElement(str):
  genres = str.split("|")
  return 'Drama' in genres

movies_genres = movies.map(lambda row: row['genres'])
movies_genres_drama = movies_genres.filter(hasElement)

print(movies_genres_drama.count())

4361


**Question 2: How many unique movies are rated, how many are not rated?**

In [None]:
# Fetch unique movieIds
distinct_movieIds = movies.map(lambda row: row['movieId']).distinct()

# Fetch unique movieIds with ratings
distinct_movieIds_with_ratings = distinct_movieIds.intersection(ratings.map(lambda row: row['movieId']).distinct())
print(distinct_movieIds_with_ratings.count())

# Fetch unique movieIds with no ratings
distinct_movieIds_with_no_ratings = distinct_movieIds.subtract(distinct_movieIds_with_ratings)
print(distinct_movieIds_with_no_ratings.count())


9724
18


**Question 3: Who gave the most ratings, how many rates did he make?**

In [None]:
ratings_users = ratings.map(lambda row: (row['userId'], 1))
ratings_users = ratings_users.reduceByKey(lambda a, b: (a+b))
ratings_users = ratings_users.sortBy(lambda pair: pair[1],ascending=False)
print("UserId: ", ratings_users.first()[0])
print("Total ratings: ", ratings_users.first()[1])

UserId:  414
Total ratings:  2698


**Question 4: Compute min, average, max rating per movie.**

In [None]:
grouped_movie = ratings.map(lambda row: (row['movieId'], row['rating'])).groupByKey()
movie_ratings = ratings.map(lambda row: (row['movieId'], row['rating']))

min_per_movie = grouped_movie.mapValues(min).map(lambda x: (x[0],x[1]))
max_per_movie = grouped_movie.mapValues(max).map(lambda x: (x[0],x[1]))

per_movie_counts = ratings.map(lambda row: (row['movieId'],1)).reduceByKey(lambda a,b : a+b)
per_movie_sums = ratings.map(lambda row: (row['movieId'],float(row['rating']))).reduceByKey(lambda a,b : a+b)
avg_per_movie = per_movie_counts.join(per_movie_sums).map(lambda tupl: (tupl[0],tupl[1][1]/tupl[1][0]))

ratings_stats = min_per_movie.join(max_per_movie)
ratings_stats = ratings_stats.join(avg_per_movie).mapValues(lambda tupl: (tupl[0][0],tupl[1], tupl[0][1]))
ratings_stats = ratings_stats.map(lambda tupl: (tupl[0], tupl[1][0],tupl[1][1], tupl[1][2]))

ratings_stats = ratings_stats.sortBy(lambda tupl: int(tupl[0]))
print(ratings_stats.take(5))

ratings_stats.toDF(['MovieID','min','average','max']).show()


[('1', '0.5', 3.9209302325581397, '5.0'), ('2', '0.5', 3.4318181818181817, '5.0'), ('3', '0.5', 3.2596153846153846, '5.0'), ('4', '1.0', 2.357142857142857, '3.0'), ('5', '0.5', 3.0714285714285716, '5.0')]
+-------+---+------------------+---+
|MovieID|min|           average|max|
+-------+---+------------------+---+
|      1|0.5|3.9209302325581397|5.0|
|      2|0.5|3.4318181818181817|5.0|
|      3|0.5|3.2596153846153846|5.0|
|      4|1.0| 2.357142857142857|3.0|
|      5|0.5|3.0714285714285716|5.0|
|      6|1.0| 3.946078431372549|5.0|
|      7|1.0| 3.185185185185185|5.0|
|      8|1.0|             2.875|5.0|
|      9|1.5|             3.125|5.0|
|     10|0.5| 3.496212121212121|5.0|
|     11|1.0|3.6714285714285713|5.0|
|     12|1.0|2.4210526315789473|5.0|
|     13|2.0|             3.125|4.0|
|     14|3.0|3.8333333333333335|5.0|
|     15|1.0|               3.0|5.0|
|     16|1.0| 3.926829268292683|5.0|
|     17|0.5|3.7761194029850746|5.0|
|     18|2.0|               3.7|5.0|
|     19|1.0| 2.72

**Question 5: Output dataset containing users that have rated a movie but not tagged it.**

In [None]:
users_movies_rated = ratings.map(lambda row: (row['userId'], row['movieId']))
users_movies_tagged = tags.map(lambda row: (row['userId'], row['movieId'])).distinct()

users_movies_rated_and_tagged = users_movies_rated.intersection(users_movies_tagged)
users_only_movies_rated_and_tagged = users_movies_rated_and_tagged.map(lambda tupl: tupl[0]).distinct()
user_only_movies_rated = users_movies_rated.map(lambda tupl: tupl[0]).distinct()

users_only_movies_rated_and_not_tagged = user_only_movies_rated.subtract(users_only_movies_rated_and_tagged).sortBy(lambda x: int(x))

print(users_only_movies_rated_and_not_tagged.collect())
print(users_only_movies_rated_and_not_tagged.count())

['1', '3', '4', '5', '6', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '19', '20', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '77', '78', '79', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '90', '91', '92', '93', '94', '95', '96', '97', '98', '99', '100', '101', '102', '104', '105', '107', '108', '109', '110', '111', '113', '114', '115', '116', '117', '118', '120', '121', '122', '123', '124', '126', '127', '128', '129', '130', '131', '133', '134', '135', '136', '137', '139', '140', '141', '142', '143', '144', '145', '146', '147', '148', '149', '150', '151', '152', '153', '154', '155', '156', '157', '158', '159', '160', '162', '163', '164', '165', '168', '169', '170', '171', '172', '173', '174', '175'

**Question 6: Output dataset containing users that have rated AND tagged a movie.**

In [None]:
users_only_movies_rated_and_tagged = users_only_movies_rated_and_tagged.sortBy(lambda x: int(x))
print(users_only_movies_rated_and_tagged.collect())
print(users_only_movies_rated_and_tagged.count())

['2', '7', '18', '21', '49', '62', '63', '76', '103', '106', '112', '119', '125', '132', '138', '161', '166', '167', '177', '184', '193', '205', '226', '256', '274', '289', '291', '300', '305', '318', '319', '327', '336', '356', '357', '419', '424', '435', '439', '462', '474', '477', '487', '506', '509', '513', '520', '533', '537', '567', '573', '599', '606', '610']
54


**Question 7: Output dataset showing the number of movies per Genre per Year**

In [None]:
movies_genres = movies.map(lambda row: (row['movieId'], row['genres']))
movies_genres = movies_genres.flatMapValues(lambda v : v.split('|'))
movies_years = movies.map(lambda row: (row['movieId'],row['title'][-5:-1]))
movies_genres_years = movies_genres.join(movies_years).sortBy(lambda tupl:int(tupl[0]))
movies_genres_years_count = movies_genres_years.map(lambda tuple: (tuple[1],1)).reduceByKey(lambda a,b : a+b)
movies_genres_years_count = movies_genres_years_count.map(lambda tupl: (tupl[0][0], tupl[0][1],tupl[1]))

movies_genres_years_count.toDF(['Genre', 'Year','Number of Movies']).show()

+-----------+----+----------------+
|      Genre|Year|Number of Movies|
+-----------+----+----------------+
|   Children|1995|              29|
|    Fantasy|1995|              16|
|    Romance|1995|              49|
|      Drama|1995|             123|
|     Action|1995|              44|
|   Thriller|1995|              42|
|     Horror|1995|              19|
|     Sci-Fi|1995|              19|
|    Musical|1995|               4|
|      Drama|1994|             114|
|    Romance|1994|              41|
|     Comedy|1996|             108|
|      Crime|1996|              32|
|Documentary|1995|               6|
|  Adventure|1996|              35|
|    Mystery|1996|              10|
|      Crime|1976|               8|
|     Comedy|1992|              69|
|   Children|1994|              24|
|    Fantasy|1994|              16|
+-----------+----+----------------+
only showing top 20 rows



In [None]:
sc.stop()

NameError: ignored