# Домашнее задание № 1 (Apache Spark SQL)

**Цель**: Решить 5 задач на SQL с использованием базовых таблиц.

**Инструкции**:
0. Прочитать условия ДЗ
1. Выполнить задание на SQL;
2. Ознакомиться с документацией по SQL: [Ссылка](https://spark.apache.org/docs/latest/sql-ref.html);
3. Создать ноутбук с решениями задач (создайте копию на основе этого шаблона);
4. Каждый запрос должен содержать обязательные комментарии с объяснением логического смысла решения;
5. Скачать ноутбук в формате .ipynb и отправить на адрес электронной почты: ilya+hse@aniskovets.com.

**Оценка и дедлайн**:
- Максимальная оценка - 5 баллов;
- Дедлайн: 24.05.2023 23:59;
- Оценки будут опубликованы после сдачи **всех двух** домашних заданий.

**Таблицы**:
Для выполнения задания предполагается использование следующих таблиц, загруженных в систему в формате CSV с типами полей STRING:

- title_basics_csv
- title_principals_csv
- title_crew_csv
- title_episode_csv
- title_ratings_csv
- title_akas_csv
- name_basics_csv

**Преобразование типов**:
Для преобразования STRING в другой тип используйте конструкцию: 
```sql
CAST (column_name AS TYPE) AS column_name
```

**Пример**
```sql
CREATE TABLE test_table USING PARQUET AS 
  SELECT 
    CAST(averageRating AS decimal(2,1))  AS averageRating 
  FROM test_table_csv 
```

**Все подготовительные этапы по преобразованию типов нужно делать в разделе Initialization. После чего, рекомендуется использовать в запросах таблицы в формате PARQUET**

Перед этим не мешает проверить, что значения влезают в размерности этого типа
Список типов SQL: https://spark.apache.org/docs/latest/sql-ref-datatypes.html 

Домашнее задание необходимо сделать в виде ноутбука. Ноутбук должен запускаться без ошибок.

Во всех задачах ответом должно быть только ОДНО значение (число или строка).

Неправильный ответ - 0 баллов за задачу.
Правильный ответ на все тесты 1 балл за задачу, иначе пропорционально количеству пройденных тестов (количество тестов и какие именно заранее неизвестны).

**ВНИМАНИЕ! К каждому запросу необходимо писать комментарии. Комментарии должны обьяснять логический смысл решения. Отсутствие комментария к решению задачи штраф - оценка за задачу = оценка / 2!**

Всего 5 задач и 5 баллов

**В режиме проверки SQL запросы будут запускаться с любыми параметрами, отличными от тех, что вы выбрали.**

Скрипты не должны зависеть от выбранного вами параметра, а также от регистра строки. Все параметры должны быть изменяемыми (смотрите, пример в Вопросе № 0)

**Тестирование будет проводится с любым значением параметров, в том числе с отсутвующим в датасете, запрос должен выводить правильный результат!**

SQL запрос в ответе всегда должен быть один и параметризирован. Если необходимо пользуйтесь конструкцией WITH name AS () -  пример ниже

После завершения работы нажимайте в меню File/Download/Download ipnb, скачивайте файл и присылайте почтой на ilya+hse@aniskovets.com

Если заметили опечатки или появились вопросы пишите в телеграм: @aigmx



In [1]:
!pip install pyspark===3.4.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
!curl -O https://mars.ru77.ru/data/title.basics.tsv.gz
!curl -O https://mars.ru77.ru/data/title.crew.tsv.gz
!curl -O https://mars.ru77.ru/data/title.episode.tsv.gz
!curl -O https://mars.ru77.ru/data/title.principals.tsv.gz
!curl -O https://mars.ru77.ru/data/title.ratings.tsv.gz
!curl -O https://mars.ru77.ru/data/title.akas.tsv.gz
!curl -O https://mars.ru77.ru/data/name.basics.tsv.gz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  132M  100  132M    0     0  5740k      0  0:00:23  0:00:23 --:--:-- 6982k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 51.7M  100 51.7M    0     0  3988k      0  0:00:13  0:00:13 --:--:-- 7240k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 29.6M  100 29.6M    0     0  2958k      0  0:00:10  0:00:10 --:--:-- 5881k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  342M  100  342M    0     0  6529k      0  0:00:53  0:00:53 --:--:-- 7250k
  % Total    % Received % Xferd  Average Speed   Tim

In [3]:
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext


spark = SparkSession.builder.master("local[2]").config("spark.driver.memory", "8g").appName("vse").enableHiveSupport().getOrCreate()
sql = spark.sql

In [4]:
title_basics_csv = spark.read.csv("title.basics.tsv.gz", sep='\\t', nullValue='\\N', header=True, quote="", escape="")
title_basics_csv.createOrReplaceTempView("title_basics_csv")


title_principals_csv = spark.read.csv("title.principals.tsv.gz", sep='\\t', nullValue='\\N', header=True, quote="", escape="")
title_principals_csv.createOrReplaceTempView("title_principals_csv")


title_crew_csv = spark.read.csv("title.crew.tsv.gz", sep='\\t', nullValue='\\N', header=True, quote="", escape="")
title_crew_csv.createOrReplaceTempView("title_crew_csv")


title_episode_csv = spark.read.csv("title.episode.tsv.gz", sep='\\t', nullValue='\\N', header=True, quote="", escape="")
title_episode_csv.createOrReplaceTempView("title_episode_csv")


title_ratings_csv = spark.read.csv("title.ratings.tsv.gz", sep='\\t', nullValue='\\N', header=True, quote="", escape="")
title_ratings_csv.createOrReplaceTempView("title_ratings_csv")


title_akas_csv = spark.read.csv("title.akas.tsv.gz", sep='\\t', nullValue='\\N', header=True, quote="", escape="")
title_akas_csv.createOrReplaceTempView("title_akas_csv")


name_basics_csv = spark.read.csv("name.basics.tsv.gz", sep='\\t', nullValue='\\N', header=True, quote="", escape="")
name_basics_csv.createOrReplaceTempView("name_basics_csv")

In [5]:
# Initialization
_ = sql("""DROP TABLE IF EXISTS title_basics""").collect()
q = """
    CREATE TABLE title_basics STORED AS PARQUET 
    SELECT tconst,
           titleType,
           primaryTitle,
           originalTitle,
           CAST (isAdult AS boolean) as isAdult,
           CAST (startYear AS integer) as startYear,
           CAST (endYear AS integer) as endYear,
           CAST (runtimeMinutes AS integer) as runtimeMinutes,
           SPLIT(genres, ',') as genres
    FROM title_basics_csv
    """
_ = sql(q).collect()

_ = sql("""DROP TABLE IF EXISTS title_principals""").collect()
q = """
    CREATE TABLE title_principals STORED AS PARQUET 
    SELECT tconst,
           CAST (ordering AS integer) as ordering,
           nconst,
           category,
           job,
           characters
    FROM title_principals_csv
    """
_ = sql(q).collect()

_ = sql("""DROP TABLE IF EXISTS title_crew""").collect()
q = """
    CREATE TABLE title_crew STORED AS PARQUET 
    SELECT tconst,
           SPLIT(directors, ',') as directors,
           SPLIT(writers, ',') as writers
    FROM title_crew_csv
    """
_ = sql(q).collect()

_ = sql("""DROP TABLE IF EXISTS title_episode""").collect()
q = """
    CREATE TABLE title_episode STORED AS PARQUET 
    SELECT tconst,
           parentTconst,
           CAST (seasonNumber AS integer) as seasonNumber,
           CAST (episodeNumber AS integer) as episodeNumber
    FROM title_episode_csv
    """
_ = sql(q).collect()

_ = sql("""DROP TABLE IF EXISTS title_ratings""").collect()
q = """
    CREATE TABLE title_ratings STORED AS PARQUET 
    SELECT tconst,
           CAST (averageRating AS decimal(2,1)) as averageRating,
           CAST (numVotes AS integer) as numVotes
    FROM title_ratings_csv
    """
_ = sql(q).collect()

_ = sql("""DROP TABLE IF EXISTS title_akas""").collect()
q = """
    CREATE TABLE title_akas STORED AS PARQUET 
    SELECT titleId,
           CAST (ordering AS integer) as ordering,
           title,
           region,
           language,
           types,
           attributes,
           CAST (isOriginalTitle AS boolean) as isOriginalTitle
    FROM title_akas_csv
    """
_ = sql(q).collect()

_ = sql("""DROP TABLE IF EXISTS name_basics""").collect()
q = """
    CREATE TABLE name_basics STORED AS PARQUET 
    SELECT nconst,
           primaryName,
           CAST (birthYear AS integer) as birthYear,
           CAST (deathYear AS integer) as deathYear,
           SPLIT(primaryProfession, ',') as primaryProfession,
           SPLIT(knownForTitles, ',') as knownForTitles
    FROM name_basics_csv
    """
_ = sql(q).collect()

_ = sql("""DROP TABLE IF EXISTS title_writers""").collect()
q = """
    /*
    Вспомогательная табличка. Разбивает содержимое массива writers
    на отдельные строки, чтобы по-человечески работать с данными а не как плебеи.
    Получается одна строка - это пара фильм-сценарист.
    */

    CREATE TABLE title_writers STORED AS PARQUET 
    SELECT tconst,
           posexplode(writers) AS (ordering, nconst)
    FROM title_crew
    WHERE writers is not null
    """
_ = sql(q).collect()

_ = sql("""DROP TABLE IF EXISTS title_genres""").collect()
q = """
    /*
    Вспомогательная табличка. Разбивает содержимое массива genres
    на отдельные строки. Думаю, понятно зачем ))))
    */

    CREATE TABLE title_genres STORED AS PARQUET 
    SELECT tconst, ordering, lower(genre) as genre
    FROM (
      SELECT tconst,
            posexplode(genres) AS (ordering, genre)
      FROM title_basics
      WHERE genres is not null
    ) _
    """
_ = sql(q).collect()

In [6]:
sql("DESCRIBE FORMATTED title_basics").show(50, truncate=False)
sql("DESCRIBE FORMATTED title_principals").show(50, truncate=False)
sql("DESCRIBE FORMATTED title_crew").show(50, truncate=False)
sql("DESCRIBE FORMATTED title_episode").show(50, truncate=False)
sql("DESCRIBE FORMATTED title_ratings").show(50, truncate=False)
sql("DESCRIBE FORMATTED title_akas").show(50, truncate=False)
sql("DESCRIBE FORMATTED name_basics").show(50, truncate=False)

+----------------------------+--------------------------------------------------------------+-------+
|col_name                    |data_type                                                     |comment|
+----------------------------+--------------------------------------------------------------+-------+
|tconst                      |string                                                        |null   |
|titleType                   |string                                                        |null   |
|primaryTitle                |string                                                        |null   |
|originalTitle               |string                                                        |null   |
|isAdult                     |boolean                                                       |null   |
|startYear                   |int                                                           |null   |
|endYear                     |int                                                 

Вопрос №0 (не оценивается, показан, как пример). Сколько произведений имеют средний рейтинг выше заданного {threshold}?

In [7]:
query_0 = """
/*
  Чтобы посчитать количество произведений находим все рейтинги больше, чем заданный порог.
  Так как надо найти все произведения, а не только фильмы, нам не нужно делать фильтр по типу произведения titleType
*/
SELECT 
  COUNT(tconst) as cnt_above_threshold
FROM  title_ratings
WHERE averageRating > {threshold};
"""
sql(query_0, threshold=5.0).show(truncate=False)

+-------------------+
|cnt_above_threshold|
+-------------------+
|1025071            |
+-------------------+



Вопрос №1. Найдите среднюю продолжительность (title_basics:runtimeMinutes) фильмов (title_basics:titleType=movie) определенного жанра {genre}. 

Среднюю продолжительность надо рассчитать только для тех фильмов, где title_basics:runtimeMinutes не равен NULL. 

In [20]:
query_1 = """
/*
  С помощью вспомогательной таблички title_genres (создавалась на этапе initialization)
  фильтруем все фильмы (titleType = 'movie') из title_basics по genre = LOWER({genre}).
  В конце выводим среднее runtimeMinutes.
*/

SELECT AVG(runtimeMinutes) as answer
FROM title_basics b
JOIN title_genres g using(tconst)
WHERE TRUE
  and titleType = 'movie'
  and genre = LOWER({genre})
  and runtimeMinutes is not null
"""
sql(query_1, genre='Comedy').show(truncate=False)

+----------------+
|answer          |
+----------------+
|92.5809266708541|
+----------------+



Вопрос №2. Сколько фильмов (title_basics:titleType=movie) определенного жанра (title_basics:genres) {genre} было выпущено в определенный год (title_basics:startYear) {year}?


In [21]:
query_2 = """
/*
  Из title_basics отбираем фильмы (titleType = 'movie') и год выпуска (startYear = INT({year})).
  Жанр фильтруем привычным способом, с помощью title_genres.
  Поскольку на каждый фильм в запросе приходится только одна строка (т. к. в title_genres одна строка на фильм-жанр, а в title_basics одна строка на фильм),
  ответ получаем простым count'ом.
*/

SELECT count(1) as answer
FROM title_basics b
JOIN title_genres g using(tconst)
WHERE TRUE
  and titleType = 'movie'
  and genre = LOWER({genre})
  and startYear = INT({year})
"""
sql(query_2, genre='Comedy', year=2022).show(truncate=False)

+------+
|answer|
+------+
|203   |
+------+



Вопрос №3. Сколько фильмов определенного жанра (title_basics:genres) {genre}, в которых снялся конкретный актер (name_basics:primaryName) {actor}, имеют средний рейтинг (title_ratings:averageRating) выше заданного порога {threshold}?







In [22]:
query_3 = """
/*
  В подзапросе title_filter отбираем все фильмы, в которых искомый человек участвовал как актер.
  Затем джойним title_filter на title_basics (=> оставляем все фильмы с этим актером, т. к. JOIN = INNER JOIN),
  фильтруем жанр через title_genres, ну и достаем рейтинг из title_ratings.

  Тут снова на один фильм приходится одна строка, так что используем count(1).
*/

with title_filter as (
  SELECT tconst
  from title_principals
  join name_basics using(nconst)
  WHERE LOWER(primaryName) = LOWER({actor})
    and category = 'actor'
)
SELECT count(1) as answer
FROM title_basics b
JOIN title_filter f on f.tconst = b.tconst
JOIN title_genres g on g.tconst = b.tconst
JOIN title_ratings r on r.tconst = b.tconst
WHERE TRUE
  and titleType = 'movie'
  and genre = LOWER({genre})
  and averageRating > {threshold}
"""
sql(query_3, genre='Comedy', actor='James Cagney', threshold=5.3).show(truncate=False)

+------+
|answer|
+------+
|20    |
+------+



Вопрос №4. Сколько телесериалов (title_basics:titleType=tvSeries) в определенном жанре (title_basics:genres) {genre} имеют более определенного количества (title_episode:seasonNumber > {seasons})?


In [23]:
query_4 = """
/*
  Важно отфильтровать titleType = 'tvSeries', поскольку не только у этого типа бывают эпизоды (есть также tvMiniSeries).
  А в остальном все просто - главное джойнить title_basics через parentTconst, т. е. к каждому эпизоду получить данные о сериале,
  к которому этот эпизод относится.
*/

SELECT count(distinct e.parentTconst) as answer
from title_episode e
join title_basics  b on b.tconst = e.parentTconst
join title_genres g on g.tconst = b.tconst 
                   and g.genre = LOWER({genre})
where TRUE
  and e.seasonNumber > INT({seasons})
  and b.titleType = 'tvSeries'
"""
sql(query_4, genre='Comedy', seasons=7).show(truncate=False)

+------+
|answer|
+------+
|995   |
+------+



Вопрос № 5. Сколько фильмов (title_basics:titleType=movie)  создал конкретный сценарист (title_crew:writers, name_basics:primaryName) {writer} в определенном жанре (title_basics:genres) {genre}?

In [24]:
query_5  = """
/*
  В первом подзапросе отбираем айдишники людей, соответствующие параметру.
  Во втором - ищем все произведения, в которых отобранный человек участвовал в качестве сценариста 
  (для этого используем вспомогательную табличку title_writers, которая создавалась в initialization).
  Наконец, фильтруем записи в title_basics при помощи двух джойнов (на title_filter и title_genres + g.genre = LOWER({genre})),
  а также не забываем выбрать только фильмы (b.titleType = 'movie')
*/

with writer_filter as (
  SELECT nconst
  FROM name_basics
  WHERE LOWER(primaryName) = LOWER({writer})
),
title_filter as (
  SELECT tconst
  from title_writers
  join writer_filter using(nconst)
)
SELECT count(b.tconst) as answer
FROM title_basics b
join title_filter f on f.tconst = b.tconst
join title_genres g on g.tconst = b.tconst 
WHERE TRUE
  and b.titleType = 'movie'
  and g.genre = LOWER({genre})
"""
sql(query_5, genre='Drama', writer='Federico Fellini').show(truncate=False)

+------+
|answer|
+------+
|38    |
+------+



In [26]:
# PLEASE DO NOT REMOVE THIS CELL

# Generate answers
for n in range(1, 6):
  print(f"-- answer {n}\n " + globals()[f"query_{n}"] + ";\n")

-- answer 1
 
/*
  С помощью вспомогательной таблички title_genres (создавалась на этапе initialization)
  фильтруем все фильмы (titleType = 'movie') из title_basics по genre = LOWER({genre}).
  В конце выводим среднее runtimeMinutes.
*/

SELECT AVG(runtimeMinutes) as answer
FROM title_basics b
JOIN title_genres g using(tconst)
WHERE TRUE
  and titleType = 'movie'
  and genre = LOWER({genre})
  and runtimeMinutes is not null
;

-- answer 2
 
/*
  Из title_basics отбираем фильмы (titleType = 'movie') и год выпуска (startYear = INT({year})).
  Жанр фильтруем привычным способом, с помощью title_genres.
  Поскольку на каждый фильм в запросе приходится только одна строка (т. к. в title_genres одна строка на фильм-жанр, а в title_basics одна строка на фильм),
  ответ получаем простым count'ом.
*/

SELECT count(1) as answer
FROM title_basics b
JOIN title_genres g using(tconst)
WHERE TRUE
  and titleType = 'movie'
  and genre = LOWER({genre})
  and startYear = INT({year})
;

-- answer 3
 
/*
 