In [33]:
import sys
import os


from pyspark import SparkConf
from pyspark.sql import SparkSession, Window
import pyspark.sql.types as t
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, rand, split, explode, regexp_replace, corr, desc, row_number, percent_rank, lead, \
    lag

from schemas.dataframes import get_episode_df, get_ratings_df, get_name_df, get_principals_df, get_crew_df, get_akas_df, get_basics_df

In [2]:
spark_session = (SparkSession.builder
                             .master('local')
                             .appName('IMDB')
                             .config(conf=SparkConf())
                             .getOrCreate())

In [3]:
episode_df = get_episode_df(spark_session)
ratings_df = get_ratings_df(spark_session)
name_df = get_name_df(spark_session)
principals_df = get_principals_df(spark_session)
crew_df = get_crew_df(spark_session)
akas_df = get_akas_df(spark_session)
basics_df = get_basics_df(spark_session)

## 1) 20 фільмів українською мовою з найвищим рейтингом

In [18]:
ukraine_akas_df = (akas_df
                   .filter((akas_df.language == 'uk') & (akas_df.types[0] == 'imdbDisplay'))
                   .join(ratings_df, ratings_df.tconst == akas_df.titleId)
                   .filter(ratings_df.numVotes > 100)
                   .sort(col('averageRating').desc())
                   .limit(20)
                   )

ukraine_akas_df.show(100, False)


+---------+--------+-------------------------------------------------------------+------+--------+-------------+----------+---------------+---------+-------------+--------+
|titleId  |ordering|title                                                        |region|language|types        |attributes|isOriginalTitle|tconst   |averageRating|numVotes|
+---------+--------+-------------------------------------------------------------+------+--------+-------------+----------+---------------+---------+-------------+--------+
|tt0069628|6       |Сімнадцять миттєвостей весни                                 |SUHH  |uk      |[imdbDisplay]|[\N]      |false          |tt0069628|8.8          |4833    |
|tt0096697|67      |Сімпсони                                                     |SUHH  |uk      |[imdbDisplay]|[\N]      |false          |tt0096697|8.7          |427311  |
|tt0078655|12      |Місце зустрічі змінити не можна                              |SUHH  |uk      |[imdbDisplay]|[\N]      |false       

## 2) Топ 20 фільмів із найбільшою кількістю локальних назв

In [5]:
top_20_akas_df = (akas_df
                  .dropDuplicates(['title'])
                  .groupBy('titleId').count()
                  .sort(desc(col('count')))
                  #.dropDuplicates(['titleId'])
                  .limit(20)
                  .join(basics_df, basics_df.tconst == col('titleId'))
                  .select('titleId', 'primaryTitle', 'startYear', 'count')
                  )

window = Window.orderBy(desc('count'))
res_df = top_20_akas_df.withColumn('rank', row_number().over(window))

res_df.show(20, False)

+---------+----------------------------------------------+---------+-----+----+
|titleId  |primaryTitle                                  |startYear|count|rank|
+---------+----------------------------------------------+---------+-----+----+
|tt0168366|Pokémon                                       |1997     |144  |1   |
|tt0076759|Star Wars: Episode IV - A New Hope            |1977     |93   |2   |
|tt0104431|Home Alone 2: Lost in New York                |1992     |93   |3   |
|tt0088814|The Black Cauldron                            |1985     |83   |4   |
|tt0086190|Star Wars: Episode VI - Return of the Jedi    |1983     |77   |5   |
|tt2418644|Testing Movie1                                |2015     |72   |6   |
|tt0080684|Star Wars: Episode V - The Empire Strikes Back|1980     |69   |7   |
|tt2872750|Shaun the Sheep Movie                         |2015     |67   |8   |
|tt0060196|The Good, the Bad and the Ugly                |1966     |65   |9   |
|tt0086779|La piovra                    

## 3) Всі унікальні локальні назви для фільму Home Alone 2: Lost in New York (1992)

In [6]:
homealone_df = (basics_df
              .filter((basics_df.primaryTitle == 'Home Alone 2: Lost in New York') & (basics_df.startYear == 1992))
              .join(akas_df, akas_df.titleId == basics_df.tconst)
              .dropDuplicates(['title'])
              .select('titleId', 'title', 'region')
              )
homealone_df.show(100, False)

+---------+-----------------------------------------------------------------+------+
|titleId  |title                                                            |region|
+---------+-----------------------------------------------------------------+------+
|tt0104431|Akela Ghar 2: New York Mein Haar Gaya                            |IN    |
|tt0104431|Aleinn Heima 2: Týndur í New York                                |IS    |
|tt0104431|Alene hjemme 2 - Forlatt i New York                              |NO    |
|tt0104431|Alene hjemme 2: Glemt i New York                                 |DK    |
|tt0104431|Alleen tuis 2                                                    |ZA    |
|tt0104431|Alone Again                                                      |US    |
|tt0104431|Alone Home 2: Lost in New York                                   |IS    |
|tt0104431|Alone at Home 2 - Lost in New York                               |SK    |
|tt0104431|Alone at Home 2: Lost in New York                     

## 4) Топ 5 найпопулярніших фільмів із раяном гослінгом за весь час, сортування по рейтингу

In [21]:
window = Window.orderBy(desc('averageRating'))

ryan_gosling_df = (principals_df
                   .filter(principals_df.nconst == 'nm0331516')
                   .join(name_df, name_df.nconst == principals_df.nconst)
                   .join(basics_df, basics_df.tconst == principals_df.tconst)
                   .join(ratings_df, ratings_df.tconst == basics_df.tconst)
                   .sort(desc(col('averageRating')))
                   .select('primaryName', 'primaryTitle', 'startYear', 'runtimeMinutes', 'genres', 'averageRating', 'numVotes')
                   .limit(20)
                   )

ryan_gosling_df = ryan_gosling_df.withColumn('rank', row_number().over(window))
ryan_gosling_df.show()

+------------+--------------------+---------+--------------+--------------------+-------------+--------+----+
| primaryName|        primaryTitle|startYear|runtimeMinutes|              genres|averageRating|numVotes|rank|
+------------+--------------------+---------+--------------+--------------------+-------------+--------+----+
|Ryan Gosling|Margot Robbie Tak...|     2023|             7|             [Short]|          8.6|      13|   1|
|Ryan Gosling|   Heartbreaker High|     1998|          NULL|            [Comedy]|          8.4|      10|   2|
|Ryan Gosling|Ryan Gosling/Russ...|     2016|          NULL|[Comedy, Music, T...|          8.3|      62|   3|
|Ryan Gosling|Harrison Ford/Rya...|     2017|            47|[Comedy, Music, T...|          8.2|      88|   4|
|Ryan Gosling|The Tale of Stati...|     1995|            22|[Drama, Fantasy, ...|          8.2|     899|   5|
|Ryan Gosling|What's Your First...|     2015|          NULL|         [Talk-Show]|          8.1|      27|   6|
|Ryan Gosl

## 5) Процентне співвідношення кількості фільмів по роках починаючи із 2000 року

In [35]:
window1 = Window.orderBy('startYear')

total_number_of_films = (basics_df
                         .filter((basics_df.startYear >= 2000) & (basics_df.startYear <= 2024) 
                                 & ((basics_df.titleType == 'movie') | (basics_df.titleType == 'tvMovie')))
                         .groupBy('startYear').count()
                         .agg({'count': 'sum'})
                         .collect()[0][0]
                        )       

films_by_year_df = (basics_df
                    .filter((basics_df.startYear >= 2000) & (basics_df.startYear <= 2024) 
                            & ((basics_df.titleType == 'movie') | (basics_df.titleType == 'tvMovie')))
                    .groupBy('startYear').count()
                    )
                
films_by_year_df = films_by_year_df.withColumn('next_year', lead('count').over(window1))
films_by_year_df = films_by_year_df.withColumn('prev_year', lag('count').over(window1))
films_by_year_df = films_by_year_df.withColumn('% of all', col('count') / total_number_of_films * 100)
res = films_by_year_df.withColumn('diff in %', (col('next_year') - col('prev_year')) / col('count') * 100)
res.show(200, False)

+---------+-----+---------+---------+------------------+-------------------+
|startYear|count|next_year|prev_year|% of all          |diff in %          |
+---------+-----+---------+---------+------------------+-------------------+
|2000     |7662 |8231     |NULL     |1.9723886187358899|NULL               |
|2001     |8231 |8537     |7662     |2.1188633151677254|10.630543068885919 |
|2002     |8537 |9045     |8231     |2.1976352960256187|9.534965444535551  |
|2003     |9045 |9747     |8537     |2.3284070812406843|13.377556661138751 |
|2004     |9747 |11009    |9045     |2.5091192726205582|20.14978967887555  |
|2005     |11009|11603    |9747     |2.833989337465859 |16.858933599782    |
|2006     |11603|12585    |11009    |2.9868996532488294|13.582694130828235 |
|2007     |12585|13630    |11603    |3.2396907813614164|16.10647596344855  |
|2008     |13630|15147    |12585    |3.5086996702388644|18.79677182685253  |
|2009     |15147|15754    |13630    |3.8992130524657433|14.022578728461083 |

## 6) Всі серії футурами по сезонах

In [9]:
futurama_df = (episode_df
                .filter(episode_df.parentTconst == 
                        basics_df
                           .filter((basics_df.primaryTitle == 'Futurama') & (basics_df.titleType == 'tvSeries') & (basics_df.startYear == 1999))
                           .select('tconst').collect()[0][0]
                        )
                .join(ratings_df.withColumnRenamed('tconst', 'rtconst'), 
                      col('rtconst') == episode_df.tconst)
                .join(basics_df.withColumnRenamed('tconst', 'btconst'),
                      col('btconst') == episode_df.tconst)
                .orderBy('seasonNumber', 'episodeNumber')
                .select('tconst', 'seasonNumber', 'episodeNumber', 'primaryTitle', 'startYear', 'averageRating', 'numVotes')
                )

futurama_df.show(1000, False)

+----------+------------+-------------+----------------------------------------+---------+-------------+--------+
|tconst    |seasonNumber|episodeNumber|primaryTitle                            |startYear|averageRating|numVotes|
+----------+------------+-------------+----------------------------------------+---------+-------------+--------+
|tt0584449 |1           |1            |Space Pilot 3000                        |1999     |8.6          |4816    |
|tt0756891 |1           |2            |The Series Has Landed                   |1999     |8.0          |3871    |
|tt0756882 |1           |3            |I, Roommate                             |1999     |8.2          |3765    |
|tt0756885 |1           |4            |Love's Labours Lost in Space            |1999     |8.1          |3593    |
|tt0584438 |1           |5            |Fear of a Bot Planet                    |1999     |7.8          |3385    |
|tt0584425 |1           |6            |A Fishful of Dollars                    |1999    