In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Movie-analytics').getOrCreate()

movies_rdd = spark.sparkContext.textFile('./movie_lens/movies.dat')
ratings_rdd = spark.sparkContext.textFile('./movie_lens/ratings.dat')


In [3]:
!pip install pyspark



# **All RDD operations**

## **Distinct genre in dataset of movies**

In [4]:
genre = movies_rdd.map(lambda line: line.split("::")[2])
testing = genre.flatMap(lambda line: line.split('|'))
genres_distinct_sorted = testing.distinct().sortBy(lambda x: x[0])
genres_distinct_sorted.coalesce(1).saveAsTextFile('./results/spark_rdd_results/distinct_genre')

## **Latest released movies**

In [5]:
movie_name = movies_rdd.map(lambda line: line.split("::")[1])
year = movie_name.map(lambda line: line[line.rfind("(") + 1:line.rfind(")")])
latest = year.max()
latest_movies = movie_name.filter(lambda line: "(" + latest + ")" in line)
latest_movies.coalesce(1).saveAsTextFile('./results/spark_rdd_results/latest_movies')

## **Number of movies in each genre**

In [6]:
flat_genre = genre.flatMap(lambda lines: lines.split("\\|"))
genre_kv = flat_genre.map(lambda k: (k, 1))
genre_count = genre_kv.reduceByKey(lambda k, v: (k + v))
genre_sort = genre_count.sortByKey()
genre_sort.coalesce(1).saveAsTextFile('./results/spark_rdd_results/no_of_movies_in_each_genre')

## **Top 10 most watched movies**

In [7]:
movies = ratings_rdd.map(lambda line: int(line.split("::")[1]))
movies_pair = movies.map(lambda mv: (mv, 1))
movies_count = movies_pair.reduceByKey(lambda x, y: x + y)
movies_sorted = movies_count.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
mv_top10_list = movies_sorted.take(10)
mv_top10_rdd = spark.sparkContext.parallelize(mv_top10_list)
movie_names = spark.sparkContext.textFile("./movie_lens/movies.dat") \
    .map(lambda line: (int(line.split("::")[0]), line.split("::")[1]))
join_output = movie_names.join(mv_top10_rdd)
result = join_output.sortBy(lambda x: x[1][1], ascending=False) \
    .map(lambda x: f"{x[0]},{x[1][0]},{x[1][1]}") \
    .coalesce(1).saveAsTextFile("./results/spark_rdd_results/Top-10-CSV")

## **Movies starting with each letter**

In [8]:
movies = movies_rdd.map(lambda lines: lines.split("::")[1])
string_flat = movies.map(lambda lines: lines.split(" ")[0])
movies_letter = string_flat.filter(lambda word: word[0].isalpha()).map(lambda word: (word[0].upper(), 1))
movies_letter_count = movies_letter.reduceByKey(lambda k, v: (k + v)).sortByKey()
movies_digit = string_flat.filter(lambda word: word[0].isdigit()).map(lambda word: (word[0], 1))
movies_digit_count = movies_digit.reduceByKey(lambda k, v: k + v).sortByKey()
result = movies_digit_count.union(movies_letter_count)
result.coalesce(1).saveAsTextFile("./results/spark_rdd_results/Each-letter")

# **All dataframe operations**

## **Movies dataFrame**

In [9]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ShortType
from pyspark.sql.functions import row_number,lit, col
from pyspark.sql.window import Window

w = Window().orderBy(lit('A'))

id_schema = StructType([StructField("MovieID", StringType(), True)])
title_schema = StructType([StructField("Title", StringType(), True)])
year_schema = StructType([StructField("Year", StringType(), True)])
genre_schema = StructType([StructField("Genre", StringType(), True)])

movie_details_rdd = movies_rdd.map(lambda line: line.split("::"))

movie_id_rdd = movie_details_rdd.map(lambda n: (n[0],))
movie_id_df = movie_id_rdd.toDF(schema=id_schema)

movie_title_rdd = movie_details_rdd.map(lambda n: (n[1][:n[1].rfind(' (')],))
movie_title_df = movie_title_rdd.toDF(schema=title_schema)

movie_year_rdd = movie_details_rdd.map(lambda n: (n[1][n[1].rfind('(')+1:n[1].rfind(')')],))
movie_year_df = movie_year_rdd.toDF(schema=year_schema)

movie_genre_rdd = movie_details_rdd.map(lambda n: (n[2],))
movie_genre_df = movie_genre_rdd.toDF(schema=genre_schema)

movie_id_df = movie_id_df.withColumn('id', row_number().over(w))
movie_title_df = movie_title_df.withColumn('id', row_number().over(w))
movie_year_df = movie_year_df.withColumn('id', row_number().over(w))
movie_genre_df = movie_genre_df.withColumn('id', row_number().over(w))

result1 = movie_id_df.join(movie_title_df, on=['id'])
result2 = result1.join(movie_year_df, on=['id'])
movies_df = result2.join(movie_genre_df, on=['id']).drop('id')

movies_df = movies_df.withColumn("Year", col("Year").cast("int"))
movies_df = movies_df.withColumn("MovieId", col("MovieID").cast("int"))
movies_df.show()


+-------+--------------------+----+--------------------+
|MovieId|               Title|Year|               Genre|
+-------+--------------------+----+--------------------+
|      1|           Toy Story|1995|Animation|Childre...|
|      2|             Jumanji|1995|Adventure|Childre...|
|      3|    Grumpier Old Men|1995|      Comedy|Romance|
|      4|   Waiting to Exhale|1995|        Comedy|Drama|
|      5|Father of the Bri...|1995|              Comedy|
|      6|                Heat|1995|Action|Crime|Thri...|
|      7|             Sabrina|1995|      Comedy|Romance|
|      8|        Tom and Huck|1995|Adventure|Children's|
|      9|        Sudden Death|1995|              Action|
|     10|           GoldenEye|1995|Action|Adventure|...|
|     11|American Presiden...|1995|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|1995|       Comedy|Horror|
|     13|               Balto|1995|Animation|Children's|
|     14|               Nixon|1995|               Drama|
|     15|    Cutthroat Island|1

## **Ratings DataFrame**

In [10]:
from pyspark.sql import Row

schema = StructType([
    StructField("UserId", StringType(), True),
    StructField("MovieId", StringType(), True),
    StructField("Ratings", StringType(), True),
    StructField("Timestamp", StringType(), True)
])

rowRDD = ratings_rdd.map(lambda line: line.split("::")).map(lambda x: Row(x[0], x[1], x[2], x[3]))
ratings_df = spark.createDataFrame(rowRDD, schema)
ratings_df = ratings_df.withColumn("Ratings", col("Ratings").cast("int"))

ratings_df.show()

+------+-------+-------+---------+
|UserId|MovieId|Ratings|Timestamp|
+------+-------+-------+---------+
|     1|   1193|      5|978300760|
|     1|    661|      3|978302109|
|     1|    914|      3|978301968|
|     1|   3408|      4|978300275|
|     1|   2355|      5|978824291|
|     1|   1197|      3|978302268|
|     1|   1287|      5|978302039|
|     1|   2804|      5|978300719|
|     1|    594|      4|978302268|
|     1|    919|      4|978301368|
|     1|    595|      5|978824268|
|     1|    938|      4|978301752|
|     1|   2398|      4|978302281|
|     1|   2918|      4|978302124|
|     1|   1035|      5|978301753|
|     1|   2791|      4|978302188|
|     1|   2687|      3|978824268|
|     1|   2018|      4|978301777|
|     1|   3105|      5|978301713|
|     1|   2797|      4|978302039|
+------+-------+-------+---------+
only showing top 20 rows



## **Users DataFrame**

In [11]:
users_rdd = spark.sparkContext.textFile('./movie_lens/users.dat')

schema = StructType([
    StructField("UserId", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", StringType(), True),
    StructField("Occupation", StringType(), True),
    StructField("ZipCode", StringType(), True)
])

rowRDD2 = users_rdd.map(lambda line: line.split("::")).map(lambda x: Row(x[0], x[1], x[2], x[3], x[4]))

users_df = spark.createDataFrame(rowRDD2, schema)

users_df.show()

+------+------+---+----------+-------+
|UserId|Gender|Age|Occupation|ZipCode|
+------+------+---+----------+-------+
|     1|     F|  1|        10|  48067|
|     2|     M| 56|        16|  70072|
|     3|     M| 25|        15|  55117|
|     4|     M| 45|         7|  02460|
|     5|     M| 25|        20|  55455|
|     6|     F| 50|         9|  55117|
|     7|     M| 35|         1|  06810|
|     8|     M| 25|        12|  11413|
|     9|     M| 25|        17|  61614|
|    10|     F| 35|         1|  95370|
|    11|     F| 25|         1|  04093|
|    12|     M| 25|        12|  32793|
|    13|     M| 45|         1|  93304|
|    14|     M| 35|         0|  60126|
|    15|     M| 25|         7|  22903|
|    16|     F| 35|         0|  20670|
|    17|     M| 50|         1|  95350|
|    18|     F| 18|         3|  95825|
|    19|     M|  1|        10|  48073|
|    20|     M| 25|        14|  55113|
+------+------+---+----------+-------+
only showing top 20 rows



## **Users detailed DataFrame**

In [12]:
occupations = {
  "0":  "other",
	"1":  "academic/educator",
	"2":  "artist",
	"3":  "clerical/admin",
	"4":  "college/grad student",
	"5":  "customer service",
	"6":  "doctor/health care",
	"7":  "executive/managerial",
	"8":  "farmer",
	"9":  "homemaker",
	"10":  "K-12 student",
	"11":  "lawyer",
	"12":  "programmer",
	"13":  "retired",
	"14":  "sales/marketing",
	"15":  "scientist",
	"16":  "self-employed",
	"17":  "technician/engineer",
	"18":  "tradesman/craftsman",
	"19":  "unemployed",
	"20":  "writer"
}

detailed_users_df = users_df.rdd.map(lambda x: (x.UserId, x.Gender, x.Age, occupations[x.Occupation], x.ZipCode)).toDF(['UserId', 'Gender', 'Age', 'Occupation', 'ZipCode'])
detailed_users_df.show()
detailed_users_df.write.format('csv').option('header', 'true').save('./results/dataframe_results/detailed_users')

+------+------+---+--------------------+-------+
|UserId|Gender|Age|          Occupation|ZipCode|
+------+------+---+--------------------+-------+
|     1|     F|  1|        K-12 student|  48067|
|     2|     M| 56|       self-employed|  70072|
|     3|     M| 25|           scientist|  55117|
|     4|     M| 45|executive/managerial|  02460|
|     5|     M| 25|              writer|  55455|
|     6|     F| 50|           homemaker|  55117|
|     7|     M| 35|   academic/educator|  06810|
|     8|     M| 25|          programmer|  11413|
|     9|     M| 25| technician/engineer|  61614|
|    10|     F| 35|   academic/educator|  95370|
|    11|     F| 25|   academic/educator|  04093|
|    12|     M| 25|          programmer|  32793|
|    13|     M| 45|   academic/educator|  93304|
|    14|     M| 35|               other|  60126|
|    15|     M| 25|executive/managerial|  22903|
|    16|     F| 35|               other|  20670|
|    17|     M| 50|   academic/educator|  95350|
|    18|     F| 18| 

# **Spark SQL**

## **Oldest Movies**

In [13]:
movies_df.createOrReplaceTempView("movies_table")

oldest_movies = spark.sql("Select title, year from movies_table where year = (select min(year) from movies_table)")

oldest_movies.write.format("csv").option("header","true").save("./results/spark_sql_results/oldest_movies")

## **No. of movies each year**

In [14]:
movies_each_year = spark.sql("Select distinct(year), count(year) as no_of_movies from movies_table group by year order by year")
movies_each_year.write.format("csv").option("header","true").save("./results/spark_sql_results/movies_each_year")