In [1]:
# Importing all the necessary libaries for movie_data
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as f
from pyspark.sql.functions import col, split
from pyspark.sql.functions import to_timestamp, current_timestamp, unix_timestamp, from_unixtime
from pyspark.sql.types import *

In [2]:
## Setting path for three differernt tables
movies = '/jeet/hadoop/spark/movies.csv'
ratings = '/jeet/hadoop/spark/ratings.csv'
tags = '/jeet/hadoop/spark/tags.csv'

In [3]:
# Reading and setting up schema for all the three files
mvi = spark.read.format('csv').option('header', 'true').option('inferScheme', 'true').load(movies)
ratng = spark.read.format('csv').option('header', 'true').option('inferScheme', 'true').load(ratings)
tgs = spark.read.format('csv').option('header', 'true').option('inferScheme', 'true').load(tags)



                                                                                

In [4]:
# Changing data types of timestamp for RATING and TAGS tables
ratng = ratng.withColumn('timestamp', from_unixtime("timestamp").cast(TimestampType()))
tgs = tgs.withColumn('timestamp', from_unixtime("timestamp").cast(TimestampType()))

In [5]:
# Work with spark SQL
mvi.createOrReplaceTempView("MOVIES")
ratng.createOrReplaceTempView("RATINGS")
tgs.createOrReplaceTempView("TAGS")

In [6]:
mvi.show()
ratng.show()
tgs.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [7]:
# Show the aggregated number of ratings per year
query1 = """ select YEAR(timestamp) as year, count(rating) as rating_count
FROM RATINGS
group by 1
order by 1 desc """

output = spark.sql(query1)
output.show()

output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_1.csv')
print("Write Successfull")

                                                                                

+----+------------+
|year|rating_count|
+----+------------+
|2018|        6418|
|2017|        8198|
|2016|        6703|
|2015|        6616|
|2014|        1439|
|2013|        1664|
|2012|        4656|
|2011|        1690|
|2010|        2301|
|2009|        4158|
|2008|        4351|
|2007|        7114|
|2006|        4059|
|2005|        5813|
|2004|        3279|
|2003|        4014|
|2002|        3478|
|2001|        3922|
|2000|       10061|
|1999|        2439|
+----+------------+
only showing top 20 rows



                                                                                

Write Successfull


In [8]:
# Average Monthly number of ratings
query2 = """ select left(timestamp, 7) as year_month, avg(rating)
from Ratings
group by 1
order by 1 desc """

output_2 = spark.sql(query2)
output_2.show()
output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_2.csv')
print("Write Successfull")

                                                                                

+----------+------------------+
|year_month|       avg(rating)|
+----------+------------------+
|   2018-09| 3.568708609271523|
|   2018-08|3.5577617328519855|
|   2018-07| 4.010238907849829|
|   2018-06| 3.979713603818616|
|   2018-05|2.9516298633017874|
|   2018-04|              3.75|
|   2018-03| 3.786817713697219|
|   2018-02|2.7386655260906756|
|   2018-01|3.4194736842105264|
|   2017-12|3.2611940298507465|
|   2017-11| 3.652173913043478|
|   2017-10|3.5244444444444443|
|   2017-09|3.6827830188679247|
|   2017-08| 4.076923076923077|
|   2017-07| 4.052941176470588|
|   2017-06|2.9594240837696337|
|   2017-05| 3.480183562786817|
|   2017-04| 3.626218851570964|
|   2017-03| 3.051001821493625|
|   2017-02|2.7547619047619047|
+----------+------------------+
only showing top 20 rows

Write Successfull


In [9]:
query3 = """ with t1 AS (
select rating, 
CASE WHEN rating BETWEEN 0 and 2 THEN '0.0 - 2.0'
WHEN rating BETWEEN 2.1 AND 4 THEN '2.1 - 4.0'
ELSE '>4' 
END as rating_bucket
from RATINGS),

t2 AS (select rating_bucket, count(rating) AS counts
from t1
group by 1
order by 1)

select rating_bucket, counts, counts*100/SUM(counts)OVER() AS percentage
from t2 """

output = spark.sql(query3)

output.show()
output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_3.csv')
print("Write Successfull")

25/01/22 00:20:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:20:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:20:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:20:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:20:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:20:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 0

+-------------+------+------------------+
|rating_bucket|counts|        percentage|
+-------------+------+------------------+
|    0.0 - 2.0| 19073|18.914871672815263|
|    2.1 - 4.0| 68552| 67.98365663056845|
|           >4| 13211|13.101471696616288|
+-------------+------+------------------+



25/01/22 00:20:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:20:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:20:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:20:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:20:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:20:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 0

Write Successfull


                                                                                

In [10]:
# Movies Tagged but not Rated

query4 = """ with t1 AS (
Select DISTINCT TAGS.movieId from TAGS
left join RATINGS on TAGS.movieId = RATINGS.movieId
where RATINGS.movieId is NULL)

select t1.movieId, MOVIES.title from MOVIES
inner join t1 on MOVIES.movieId = t1.movieId
order by MOVIES.title"""

output = spark.sql(query4)
output.show()

output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_4.csv')
print("Write Successfull")



+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|  34482|Browning Version,...|
|  32371|Call Northside 77...|
|  85565|  Chalet Girl (2011)|
|   5721|  Chosen, The (1981)|
|   3456|Color of Paradise...|
|   3338|For All Mankind (...|
|   4194|I Know Where I'm ...|
|  30892|In the Realms of ...|
|   1076|Innocents, The (1...|
|  26085|Mutiny on the Bou...|
|   2939|      Niagara (1953)|
|   7792|Parallax View, Th...|
|   7020|        Proof (1991)|
|   6668|Road Home, The (W...|
|  25855|Roaring Twenties,...|
|   6849|      Scrooge (1970)|
|   8765|This Gun for Hire...|
|  32160|Twentieth Century...|
+-------+--------------------+

Write Successfull


In [11]:
query5 = """ with t1 AS (
select distinct RATINGS.movieId from RATINGS
left join TAGS on TAGS.movieId = RATINGS.movieId
where TAGS.movieId is NULL)

select MOVIES.title, t1.movieId
from MOVIES
inner join t1 on MOVIES.movieId = t1.movieId
order by 1
"""

output = spark.sql(query5)
output.show()

output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_5.csv')
print("Write Successfull")

+--------------------+-------+
|               title|movieId|
+--------------------+-------+
|          '71 (2014)| 117867|
|'Hellboy': The Se...|  97757|
|'Round Midnight (...|  26564|
| 'Salem's Lot (2004)|  27751|
|'Til There Was Yo...|    779|
|'Tis the Season f...| 149380|
|  'burbs, The (1989)|   2072|
|'night Mother (1986)|   3112|
|*batteries not in...|   8169|
|...All the Marble...|   5706|
|00 Schneider - Ja...| 157110|
|   1-900 (06) (1994)|    889|
|           10 (1979)|   6658|
|10 Cent Pistol (2...| 139717|
|10 Items or Less ...|  49284|
|     10 Years (2011)|  98230|
|    10,000 BC (2008)|  58293|
|    100 Girls (2000)|   7541|
|  100 Streets (2016)| 168350|
|101 Dalmatians II...| 121099|
+--------------------+-------+
only showing top 20 rows

Write Successfull


In [12]:
ratng.show()
tgs.show()
mvi.show()

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      1|   4.0|2000-07-30 18:45:03|
|     1|      3|   4.0|2000-07-30 18:20:47|
|     1|      6|   4.0|2000-07-30 18:37:04|
|     1|     47|   5.0|2000-07-30 19:03:35|
|     1|     50|   5.0|2000-07-30 18:48:51|
|     1|     70|   3.0|2000-07-30 18:40:00|
|     1|    101|   5.0|2000-07-30 18:14:28|
|     1|    110|   4.0|2000-07-30 18:36:16|
|     1|    151|   5.0|2000-07-30 19:07:21|
|     1|    157|   5.0|2000-07-30 19:08:20|
|     1|    163|   5.0|2000-07-30 19:00:50|
|     1|    216|   5.0|2000-07-30 18:20:08|
|     1|    223|   3.0|2000-07-30 18:16:25|
|     1|    231|   5.0|2000-07-30 18:19:39|
|     1|    235|   4.0|2000-07-30 18:15:08|
|     1|    260|   5.0|2000-07-30 18:28:00|
|     1|    296|   3.0|2000-07-30 18:49:27|
|     1|    316|   3.0|2000-07-30 18:38:30|
|     1|    333|   5.0|2000-07-30 18:19:39|
|     1|    349|   4.0|2000-07-3

In [13]:
query6 = """ 
with t1 as 
            (Select movieid
                from ratings
                 group by 1
              having count(distinct userid)>30),

        t2 as (Select 
           t1.movieID from t1
           left join TAGS as t
           on t1.movieID=t.movieID
           where t.movieID IS NULL),
          
           t3 as (Select m.title,m.movieID 
           from MOVIES as m
          inner join t2
           on m.movieID=t2.movieID
           order by 1),

 t4 as (Select t3.title,avg(r.rating) as avg_rating,
           dense_rank()over(order by avg(r.rating) desc) as avg_rank
           from t3 left join RATINGS as r
           on t3.movieID=r.movieID
           group by 1),
           
 t5 as (Select t3.title,count(rating) as counts,
           dense_rank()over(order by count(rating) desc) as count_rank
           from t3 left join RATINGS as r
           on t3.movieID=r.movieID
           group by 1)
           
Select t4.title as Movie_title1,t4.avg_rank,Round(t4.avg_rating,4) as avg_rating,t5.title as Movie_title2,t5.count_rank,t5.counts
           from t4 inner join t5
           on t4.avg_rank=t5.count_rank
           where t4.avg_rank<=10 and t5.count_rank<=10
        
"""

output = spark.sql(query6)
output.show()

output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_6.csv')
print("Write Successfull")

25/01/22 00:21:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 0

+--------------------+--------+----------+--------------------+----------+------+
|        Movie_title1|avg_rank|avg_rating|        Movie_title2|count_rank|counts|
+--------------------+--------+----------+--------------------+----------+------+
|Boondock Saints, ...|       1|    4.2209|American Beauty (...|         1|   204|
|       Brazil (1985)|       2|     4.178|Ace Ventura: Pet ...|         2|   161|
|Cinema Paradiso (...|       3|    4.1618|    Mask, The (1994)|         3|   157|
|       Snatch (2000)|       4|    4.1559|     Die Hard (1988)|         4|   145|
|For a Few Dollars...|       5|    4.1515|Die Hard: With a ...|         5|   144|
|Lives of Others, ...|       6|    4.1176|Groundhog Day (1993)|         6|   143|
|  Toy Story 3 (2010)|       7|    4.1091|Dumb & Dumber (Du...|         7|   133|
|Boogie Nights (1997)|       8|    4.0769|    GoldenEye (1995)|         8|   132|
|Boogie Nights (1997)|       8|    4.0769|Monsters, Inc. (2...|         8|   132|
|American Beauty

25/01/22 00:21:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 0

Write Successfull


In [14]:
# Tags per movie vs Tags per User

query7= """with t1 as(
          Select '1' as key, round((sum(CASE when tag IS NOT NULL THEN 1 ELSE 0 END)/count(distinct movieid)),2) as tags_per_movie
          from TAGS),
          
          t2 as ( Select '1' as key, (sum(CASE WHEN tag IS NOT NULL THEN 1 ELSE 0 END)/count(distinct userid)) as tags_per_user
          from TAGS)
          
          Select t1.tags_per_movie,t2.tags_per_user,
          CASE WHEN tags_per_user>tags_per_movie THEN 'tags_per_user is higher'
          ELSE 'tags_per_movie is higher' END as Comparison
          from t1 inner join t2 on t1.key=t2.key"""
          

output = spark.sql(query7)
output.show()


output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_7.csv')
print("Write Successfull")


+--------------+-------------+--------------------+
|tags_per_movie|tags_per_user|          Comparison|
+--------------+-------------+--------------------+
|          2.34|         63.5|tags_per_user is ...|
+--------------+-------------+--------------------+

Write Successfull


In [15]:
# Users that tagged but did not Rate movies

query8= """
         
         Select distinct t.userid
         from TAGS as t
         left join RATINGS as r
         on t.movieID=r.movieID
         where r.userID is NULL"""
          

output = spark.sql(query8)
output.show()

# Write data in HDFS into single file

output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_8.csv')
print("Write Successfull")



+------+
|userid|
+------+
|   288|
|   318|
|   474|
|   543|
+------+

Write Successfull


In [16]:
# Ratings per user versus Ratings per Movie

query9 = """with t1 as(
          Select '1' as key, round((SUM(CASE when rating IS NOT NULL THEN 1 ELSE 0 END)/count(distinct userid)),2) as ratings_per_user
          from RATINGS),
          
          t2 as ( Select '1' as key, round((sum(CASE WHEN rating IS NOT NULL THEN 1 ELSE 0 END)/count(distinct movieid)),2) as ratings_per_movie
          from RATINGS)
          
          Select t1.ratings_per_user,t2.ratings_per_movie
          from t1 inner join t2 on t1.key=t2.key"""


output = spark.sql(query9)
output.show()

output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_9.csv')
print("Write Successfull")

+----------------+-----------------+
|ratings_per_user|ratings_per_movie|
+----------------+-----------------+
|           165.3|            10.37|
+----------------+-----------------+

Write Successfull


In [17]:
# Predominant Genre per rating level

query10= """with t1 as(
          Select r.rating,m.genres,count(*) as counts,
          dense_rank()over(partition by r.rating order by count(*) desc) as ranker
          from RATINGS AS r
          left join MOVIES as m
          on r.movieID=m.movieID
          group by 1,2)
          
          Select rating,genres as most_frequent_genre from t1 
          where ranker=1
          order by rating desc"""


          

output = spark.sql(query10)
output.show()

output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_10.csv')
print("Write Successfull")

+------+-------------------+
|rating|most_frequent_genre|
+------+-------------------+
|   5.0|              Drama|
|   4.5|              Drama|
|   4.0|              Drama|
|   3.5|             Comedy|
|   3.0|             Comedy|
|   2.5|             Comedy|
|   2.0|             Comedy|
|   1.5|             Comedy|
|   1.0|             Comedy|
|   0.5|             Comedy|
+------+-------------------+

Write Successfull


In [18]:
# Predominant tag per genre

query11= """with t1 as(
          Select m.genres,t.tag,count(*) as counts,
          dense_rank()over(partition by m.genres order by count(*) desc) as ranker
          from MOVIES AS m
          left join TAGS as t
          on t.movieID=m.movieID
          group by 1,2)
          
          Select genres,tag as most_frequent_tag from t1 
          where ranker=1
          order by genres desc"""


          

output = spark.sql(query11)
output.show()

output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_11.csv')
print("Write Successfull")

+--------------------+-----------------+
|              genres|most_frequent_tag|
+--------------------+-----------------+
|             Western|             NULL|
|                 War|             NULL|
|            Thriller|             NULL|
|Sci-Fi|Thriller|IMAX|             NULL|
|     Sci-Fi|Thriller|             NULL|
|         Sci-Fi|IMAX|           sci-fi|
|         Sci-Fi|IMAX|      time-travel|
|              Sci-Fi|             NULL|
|     Romance|Western|             NULL|
|         Romance|War|        Hemingway|
|    Romance|Thriller|             NULL|
|Romance|Sci-Fi|Th...|             NULL|
|Romance|Sci-Fi|Th...|            artsy|
|Romance|Sci-Fi|Th...|        Beautiful|
|Romance|Sci-Fi|Th...|      atmospheric|
|Romance|Sci-Fi|Th...|   existentialism|
|Romance|Sci-Fi|Th...|        dreamlike|
|Romance|Sci-Fi|Th...|         artistic|
|      Romance|Sci-Fi|             NULL|
|             Romance|             NULL|
+--------------------+-----------------+
only showing top

In [19]:
query12 = """with t1 as(
          Select r.movieID,m.title,count(distinct r.userID) as counts,
          dense_rank()over(order by count(distinct r.userid) desc) as ranker
          from RATINGS as r
          left join MOVIES as m
          on r.movieID=m.movieID
          group by 1,2)
          
          Select title,counts from t1 
          where ranker<=10
          """          

output = spark.sql(query12)
output.show()


output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_12.csv')
print("Write Successfull")

25/01/22 00:21:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 0

+--------------------+------+
|               title|counts|
+--------------------+------+
| Forrest Gump (1994)|   329|
|Shawshank Redempt...|   317|
| Pulp Fiction (1994)|   307|
|Silence of the La...|   279|
|  Matrix, The (1999)|   278|
|Star Wars: Episod...|   251|
|Jurassic Park (1993)|   238|
|   Braveheart (1995)|   237|
|Terminator 2: Jud...|   224|
|Schindler's List ...|   220|
+--------------------+------+



25/01/22 00:21:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 0

Write Successfull


In [20]:
# Top 10 movies in terms of avg rating (>30 users reviewed)

query13= """with t1 as(
          Select movieid,avg(rating) as avg_rating,
          dense_rank()over (order by avg(rating) desc) as ranker
          from RATINGS
          group by 1
          having count(distinct userID)>30)
          
          Select m.title,round(t1.avg_rating,9) as avg_rating,t1.ranker from t1
          left join MOVIES as m
          on t1.movieID=m.movieID
          where ranker<=10
          """
output = spark.sql(query13)
output.show()

output.coalesce(1).write.mode("overwrite").format('csv').option('header', 'true') .option('delimiter', ',').save('/tmp/spark_movie_analysis/output_13.csv')
print("Write Successfull")


25/01/22 00:21:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 0

+--------------------+-----------+------+
|               title| avg_rating|ranker|
+--------------------+-----------+------+
|Shawshank Redempt...|4.429022082|     1|
|Lawrence of Arabi...|        4.3|     2|
|Godfather, The (1...|  4.2890625|     3|
|   Fight Club (1999)| 4.27293578|     4|
|Cool Hand Luke (1...|4.271929825|     5|
|Dr. Strangelove o...|4.268041237|     6|
|  Rear Window (1954)|4.261904762|     7|
|Godfather: Part I...|4.259689922|     8|
|Departed, The (2006)|4.252336449|     9|
|   Goodfellas (1990)|       4.25|    10|
+--------------------+-----------+------+



25/01/22 00:21:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 00:21:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/22 0

Write Successfull
