In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, row_number, count
from pyspark.sql.window import Window

In [23]:
spark = SparkSession \
    .builder \
    .appName("Databases II") \
    .getOrCreate()

In [24]:
rating = (spark.read
      .format("csv")
      .option('header', 'true')
      .option("delimiter", ",")
      .option("inferSchema", "true")
      .load("rating.csv")
     )

In [25]:
# convert the string column 'timestamp' to timestamp data type
rating = rating.select(col('userId'),
                      col('movieId'),
                      col('rating'),
                      col('timestamp').cast('timestamp').alias('timestamp'))

In [26]:
# add to 'rating' dataframe an extra column with the year values from column 'timestamp'
rating = rating.select(col('userId'),
                      col('movieId'),
                      col('rating'),
                      col('timestamp'),
                      year(col('timestamp')).alias('yearNum'))

In [27]:
# create new dataframe 'top_user' with the columns of dataframe 'rating'
# grouped by the columns 'userId' and 'yearNum'
# and count the number of ratings for each user, every year
top_user = rating.select('*').groupby(rating["userId"], rating["yearNum"]).agg(count('rating').alias('total_ratings'))

In [28]:
# create a window partition over the 'yearNum' column
window = Window.partitionBy(top_user["yearNum"]).orderBy(top_user["total_ratings"].desc())

In [29]:
# give the sequential row number from 1 to 10, to the result of each window partition
top_users = top_user.select('*', row_number().over(window).alias('rank')).filter(col('rank') <= 10)

In [30]:
# order the results firstly by ascending yearNum and secondly by ascending rank
# and show the results
query7 = top_users.orderBy(top_users["yearNum"].asc(), top_users["rank"].asc()).show(1000)

+------+-------+-------------+----+
|userId|yearNum|total_ratings|rank|
+------+-------+-------------+----+
|131160|   1995|            3|   1|
| 28507|   1995|            1|   2|
| 25878|   1996|          800|   1|
|  1931|   1996|          722|   2|
| 46663|   1996|          669|   3|
|107732|   1996|          657|   4|
| 24214|   1996|          624|   5|
| 19067|   1996|          605|   6|
| 41389|   1996|          605|   7|
|  4548|   1996|          589|   8|
| 46146|   1996|          570|   9|
| 81218|   1996|          510|  10|
|124052|   1997|         1352|   1|
|128653|   1997|         1141|   2|
|  5814|   1997|          849|   3|
| 64778|   1997|          655|   4|
| 83343|   1997|          583|   5|
|101971|   1997|          559|   6|
| 34962|   1997|          526|   7|
| 19954|   1997|          522|   8|
| 61432|   1997|          512|   9|
|  8962|   1997|          510|  10|
|   903|   1998|         1202|   1|
| 69082|   1998|          892|   2|
|125125|   1998|          88

In [31]:
spark.stop()