#### Задание
Имеются таблицы с данными фильмов.

1 таблица - basics. \
Структура полей и описание: \
tconst (string) - alphanumeric unique identifier of the title \
titleType (string) – the type/format of the title (e.g. movie, short, tvseries, tvepisode, video, etc) \
primaryTitle (string) – the more popular title / the title used by the filmmakers on promotional materials at the point of release \
originalTitle (string) - original title, in the original language \
isAdult (boolean) - 0: non-adult title; 1: adult title \
startYear (YYYY) – represents the release year of a title. In the case of TV Series, it is the series start year \
endYear (YYYY) – TV Series end year. '\N' for all other title types \
runtimeMinutes – primary runtime of the title, in minutes \
genres (string array) – includes up to three genres associated with the title

2 таблица - ratings. \
Структура полей и описание: \
tconst (string) - alphanumeric unique identifier of the title \
averageRating – weighted average of all the individual user ratings \
numVotes - number of votes the title has received

Нужно показать топ 5 самых высокооцененных жанров фильмов за последние 10, 20, 30 лет.

In [2]:
# импорт
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [3]:
# инициализация spark

spark = SparkSession \
    .builder \
    .appName("pyspark_imdb_top5") \
    .master("local[1]") \
    .config("spark.driver.memory", "10G") \
    .config("spark.jars", "postgresql-42.7.4.jar") \
    .getOrCreate()

In [4]:
# загрузка данных в датафреймы из таблиц БД

df_basics = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/imdb") \
    .option("dbtable", "basics") \
    .option("user", "postgres") \
    .option("password", "123") \
    .option("driver", "org.postgresql.Driver") \
    .load()

df_ratings = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/imdb") \
    .option("dbtable", "ratings") \
    .option("user", "postgres") \
    .option("password", "123") \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [5]:
# фильтруем тип 'movie'
df_basics = df_basics.where(df_basics.titleType == 'movie')

# join датафреймов и выбор нужных столбцов
df_join = df_basics \
    .join(df_ratings, df_basics.tconst == df_ratings.tconst). \
    select(df_basics.startYear, df_basics.genres, df_ratings.averageRating, df_ratings.numVotes)

# находим максимальный год выпуска фильмов в датафрейме
max_year = df_join.agg(F.max(F.col("startYear"))).collect()[0][0]

# сплит и разбиение на строки колонки genres
df_join = df_join.withColumn("genres", F.explode(F.split("genres", ",")))

In [6]:
# выбор нужных колонок, группировка по жанрам и агрегация количества оценок и средней оценки для каждого диапазона (10, 20, 30 лет)
df_join = df_join.select("startYear", "genres", "averageRating", "numVotes")
df_top5_10years = df_join. \
    where((max_year - df_join.startYear) <= 10). \
    groupby("genres"). \
    agg({"numVotes": "sum", "averageRating": "mean"}). \
    drop("startYear")

df_top5_20years = df_join. \
    where((max_year - df_join.startYear) <= 20). \
    groupby("genres"). \
    agg({"numVotes": "sum", "averageRating": "mean"}). \
    drop("startYear")

df_top5_30years = df_join. \
    where((max_year - df_join.startYear) <= 30). \
    groupby("genres"). \
    agg({"numVotes": "sum", "averageRating": "mean"}). \
    drop("startYear")

# нормализация и получение конечной метрики
min = df_top5_10years.agg({"sum(numVotes)": "min"}).collect()[0][0]
max = df_top5_10years.agg({"sum(numVotes)": "max"}).collect()[0][0]
df_top5_10years = df_top5_10years.withColumn('norm_numVotes', (F.col("sum(numVotes)") - min)/(max - min))
df_top5_10years = df_top5_10years.withColumn("final_metric", F.round((F.col("norm_numVotes") * F.col("avg(averageRating)")), 2))
df_top5_10years = df_top5_10years.select("genres").sort(F.desc("final_metric")).limit(5)

min = df_top5_20years.agg({"sum(numVotes)": "min"}).collect()[0][0]
max = df_top5_20years.agg({"sum(numVotes)": "max"}).collect()[0][0]
df_top5_20years = df_top5_20years.withColumn('norm_numVotes', (F.col("sum(numVotes)") - min)/(max - min))
df_top5_20years = df_top5_20years.withColumn("final_metric", F.round((F.col("norm_numVotes") * F.col("avg(averageRating)")), 2))
df_top5_20years = df_top5_20years.select("genres").sort(F.desc("final_metric")).limit(5)

min = df_top5_30years.agg({"sum(numVotes)": "min"}).collect()[0][0]
max = df_top5_30years.agg({"sum(numVotes)": "max"}).collect()[0][0]
df_top5_30years = df_top5_30years.withColumn('norm_numVotes', (F.col("sum(numVotes)") - min)/(max - min))
df_top5_30years = df_top5_30years.withColumn("final_metric", F.round((F.col("norm_numVotes") * F.col("avg(averageRating)")), 2))
df_top5_30years = df_top5_30years.select("genres").sort(F.desc("final_metric")).limit(5)

# сохранение в файл
df_top5_10years.write.parquet("top5_10years.parquet")
df_top5_20years.write.parquet("top5_20years.parquet")
df_top5_30years.write.parquet("top5_30years.parquet")

In [9]:
df_top5_10years.show()

+---------+
|   genres|
+---------+
|    Drama|
|   Action|
|Adventure|
|   Comedy|
|    Crime|
+---------+



In [10]:
df_top5_20years.show()

+---------+
|   genres|
+---------+
|    Drama|
|   Action|
|   Comedy|
|Adventure|
| Thriller|
+---------+



In [11]:
df_top5_30years.show()

+---------+
|   genres|
+---------+
|    Drama|
|   Action|
|   Comedy|
|Adventure|
|    Crime|
+---------+

