In [0]:
from pyspark.sql.functions import *

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, to_date, month, year, desc, asc

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data
movies_data = [(1, "Avengers"), (2, "Frozen 2"), (3, "Joker")]
users_data = [(1, "Daniel"), (2, "Monica"), (3, "Maria"), (4, "James")]
ratings_data = [
    (1, 1, 3, "2020-01-12"),
    (1, 2, 4, "2020-02-11"),
    (1, 3, 2, "2020-02-12"),
    (1, 4, 1, "2020-01-01"),
    (2, 1, 5, "2020-02-17"),
    (2, 2, 2, "2020-02-01"),
    (2, 3, 2, "2020-03-01"),
    (3, 1, 3, "2020-02-22"),
    (3, 2, 4, "2020-02-25")
]

# Create DataFrames
movies_df = spark.createDataFrame(movies_data, ["movie_id", "title"])
users_df = spark.createDataFrame(users_data, ["user_id", "name"])
ratings_df = spark.createDataFrame(ratings_data, ["movie_id", "user_id", "rating", "created_at"])

In [0]:
/* Write your T-SQL query statement below */

with cte1 as 
(
select u.name, 
       u.user_id, 
	   count(mr.user_id) as count_user
from users u
inner join movierating mr
on u.user_id = mr.user_id
group by u.name,u.user_id
--order by u.name
),
cte2 as
(
SELECT
    mr.movie_id,
    m.title,
    AVG(CAST(mr.rating AS FLOAT)) AS 'average_rating'
from movies m
inner join movierating mr
on m.movie_id = mr.movie_id
where mr.created_at like '2020-02%'
group by mr.movie_id,m.title
)

select name as results
from (
select top 1 name from cte1
where count_user = (select max(count_user) from cte1)
) a
union all
select title as results
from (
select top 1 title from cte2
where average_rating = (select max(average_rating) from cte2)
) b

In [0]:
# Convert created_at to proper date
ratings_df = ratings_df.withColumn("created_at", to_date(col("created_at"), "yyyy-MM-dd"))

# ----------------------------
# Part 1: Most Active Rater
# ----------------------------
most_active_user = ratings_df.groupBy("user_id") \
    .agg(count("*").alias("num_ratings")) \
    .join(users_df, "user_id") \
    .orderBy(desc("num_ratings"), asc("name")) \
    .limit(1) \
    .select(col("name").alias("results"))

# ----------------------------
# Part 2: Top Rated Movie in Feb 2020
# ----------------------------
ratings_feb2020 = ratings_df.filter((month("created_at") == 2) & (year("created_at") == 2020))

top_rated_movie = ratings_feb2020.groupBy("movie_id") \
    .agg(avg("rating").alias("avg_rating")) \
    .join(movies_df, "movie_id") \
    .orderBy(desc("avg_rating"), asc("title")) \
    .limit(1) \
    .select(col("title").alias("results"))

# ----------------------------
# Combine both results
# ----------------------------
final_result = most_active_user.union(top_rated_movie)

# Show the output
final_result.show()
