In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from sparkmeasure import StageMetrics
spark=SparkSession \
        .builder \
        .master("spark://ubuntu:7077")\
        .appName("Top movies for every category")\
        .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [2]:
stagemetrics = StageMetrics(spark)

In [3]:
stagemetrics.begin()
Movies_Dataframe=(spark.read.format("csv")
            .option("header","True")
            .option("inferSchema","True")
            .load("/home/meli/Documents/Kaggle/movie.csv")
      )
Ratings_Dataframe=(spark.read.format("csv")
            .option("header","True")
            .option("inferSchema","True")
            .load("/home/meli/Documents/Kaggle/rating.csv")
      )
#Format the Rating Dataframe
Rdf=(Ratings_Dataframe.drop('timestamp')
                      .drop('userId')
                      .groupBy("movieId").count()
                      .withColumnRenamed('count', 'NumberOfRatings')
    )
#Format the Movie Dataframe
Mdf=(Movies_Dataframe.select("*", split(col("genres"), "\|"))
                     .drop('genres')
                     .withColumnRenamed('split(genres, \|, -1)','genresar')
    )
Moviedf=Mdf.select("*",explode(Mdf.genresar).alias("Genre")).drop("genresar").sort("Genre",ascending=False)

#Inner-Join on movieId
mrdf1=Moviedf.filter("Genre !='(no genres listed)'").join(Rdf,["movieId"]) 

#Partition by Genre and descending order by NumberOfRatings, 
windowDept = Window.partitionBy("Genre").orderBy(col("NumberOfRatings").desc())
FullQuery=(mrdf1.withColumn("row",row_number().over(windowDept))
           .filter(col("row") <= 1)
           .drop("row")
          )
FullQuery.orderBy("Genre",ascending=True).show(truncate=0)
stagemetrics.end()

+-------+--------------------------------+-----------+---------------+
|movieId|title                           |Genre      |NumberOfRatings|
+-------+--------------------------------+-----------+---------------+
|480    |Jurassic Park (1993)            |Action     |59715          |
|480    |Jurassic Park (1993)            |Adventure  |59715          |
|1      |Toy Story (1995)                |Animation  |49695          |
|1      |Toy Story (1995)                |Children   |49695          |
|296    |Pulp Fiction (1994)             |Comedy     |67310          |
|296    |Pulp Fiction (1994)             |Crime      |67310          |
|5669   |Bowling for Columbine (2002)    |Documentary|12280          |
|296    |Pulp Fiction (1994)             |Drama      |67310          |
|1      |Toy Story (1995)                |Fantasy    |49695          |
|1617   |L.A. Confidential (1997)        |Film-Noir  |26836          |
|593    |Silence of the Lambs, The (1991)|Horror     |63299          |
|150  

In [4]:
stagemetrics.print_report()


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 8
numTasks => 416
elapsedTime => 59336 (59 s)
stageDuration => 56376 (56 s)
executorRunTime => 156342 (2.6 min)
executorCpuTime => 63217 (1.1 min)
executorDeserializeTime => 15062 (15 s)
executorDeserializeCpuTime => 4152 (4 s)
resultSerializationTime => 280 (0.3 s)
jvmGCTime => 2327 (2 s)
shuffleFetchWaitTime => 2 (2 ms)
shuffleWriteTime => 6405 (6 s)
resultSize => 1600503 (1562.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 2810035200
recordsRead => 40055086
bytesRead => 1384480482 (1320.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 166945
shuffleTotalBlocksFetched => 4875
shuffleLocalBlocksFetched => 4875
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 4339302 (4.0 MB)
shuffleLocalBytesRead => 4339302 (4.0 MB)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (