In [61]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [62]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [63]:
from pyspark import SparkConf
from pyspark.sql import SparkSession, Window
import pyspark.sql.types as t
import pyspark.sql.functions as f

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.linear_model import LinearRegression, Ridge, RidgeCV, Lasso
from datetime import datetime, timezone, timedelta
from sklearn.model_selection import train_test_split

import seaborn as sns

from mpl_toolkits.mplot3d import Axes3D

import unittest
import pytest

In [64]:
spark_session = (SparkSession.builder
                             .master("local") 
                             .appName("diploma project app")
                             .config(conf=SparkConf()) 
                             .getOrCreate())

In [65]:
# print((filtered_title_basics_df.count(), len(filtered_title_basics_df.columns)))

In [66]:
def read_csv(path, work_schema, spark_session):
  """Read data from CSV"""
  work_df = spark_session.read.csv(path,
                                   sep=r'\t',
                                   header=True,
                                   nullValue=r'\N',
                                   schema=work_schema)
  return work_df

In [67]:
def write_csv(df, path_to_save):
  """Write dafaframe to CSV as 1 file"""
  df.coalesce(1).write.csv(path_to_save, header=True, mode="overwrite")

# Read all df

Read title.akas

In [68]:
path_title_akas = '/content/drive/MyDrive/GD/input_files/title.akas/data.tsv'
schema_title_akas = t.StructType([t.StructField('TitleId', t.StringType(), True),
                                t.StructField('Ordering', t.IntegerType(), True),
                                t.StructField('Title', t.StringType(), True),
                                t.StructField('Region', t.StringType(), True),
                                t.StructField('Language', t.StringType(), True),
                                t.StructField('Types', t.StringType(), True),
                                t.StructField('Attributes', t.StringType(), True),
                                t.StructField('IsOriginalTitle ',t.IntegerType(),True),
])
title_akas_df = read_csv(path_title_akas, schema_title_akas, spark_session)
title_akas_df.show(40, truncate=False)

+---------+--------+-------------------------+------+--------+-----------+--------------------------+----------------+
|TitleId  |Ordering|Title                    |Region|Language|Types      |Attributes                |IsOriginalTitle |
+---------+--------+-------------------------+------+--------+-----------+--------------------------+----------------+
|tt0000001|1       |Карменсіта               |UA    |null    |imdbDisplay|null                      |0               |
|tt0000001|2       |Carmencita               |DE    |null    |null       |literal title             |0               |
|tt0000001|3       |Carmencita - spanyol tánc|HU    |null    |imdbDisplay|null                      |0               |
|tt0000001|4       |Καρμενσίτα               |GR    |null    |imdbDisplay|null                      |0               |
|tt0000001|5       |Карменсита               |RU    |null    |imdbDisplay|null                      |0               |
|tt0000001|6       |Carmencita               |US

Read name.basics

In [69]:
path_name_basics = '/content/drive/MyDrive/GD/input_files/name.basics/data.tsv'
schema_name_basics = t.StructType([t.StructField('TitleId', t.StringType(), True),
                                  t.StructField('PrimaryName', t.StringType(), True),
                                  t.StructField('BirthYear', t.IntegerType(), True),
                                  t.StructField('DeathYear', t.IntegerType(), True),
                                  t.StructField('PrimaryProfession', t.StringType(), True),
                                  t.StructField('KnownForTitles', t.StringType(), True),
])

name_basics_df = read_csv(path_name_basics, schema_name_basics, spark_session)
name_basics_df.show(truncate=False)

+---------+-------------------+---------+---------+-------------------------------------+---------------------------------------+
|TitleId  |PrimaryName        |BirthYear|DeathYear|PrimaryProfession                    |KnownForTitles                         |
+---------+-------------------+---------+---------+-------------------------------------+---------------------------------------+
|nm0000001|Fred Astaire       |1899     |1987     |soundtrack,actor,miscellaneous       |tt0072308,tt0053137,tt0050419,tt0031983|
|nm0000002|Lauren Bacall      |1924     |2014     |actress,soundtrack                   |tt0038355,tt0117057,tt0071877,tt0037382|
|nm0000003|Brigitte Bardot    |1934     |null     |actress,soundtrack,music_department  |tt0057345,tt0049189,tt0054452,tt0056404|
|nm0000004|John Belushi       |1949     |1982     |actor,soundtrack,writer              |tt0078723,tt0077975,tt0072562,tt0080455|
|nm0000005|Ingmar Bergman     |1918     |2007     |writer,director,actor                |t

Read title.basics

In [70]:
path_title_basics = '/content/drive/MyDrive/GD/input_files/title.basics/data.tsv'
  
schema_title_basics = t.StructType([t.StructField('Tconst', t.StringType(), True),
                            t.StructField('TitleType', t.StringType(), True),
                            t.StructField('PrimaryTitle', t.StringType(), True),
                            t.StructField('OriginalTitle', t.StringType(), True),
                            t.StructField('IsAdult', t.IntegerType(), True),
                            t.StructField('StartYear', t.IntegerType(), True),
                            t.StructField('EndYear', t.IntegerType(), True),
                            t.StructField('RuntimeMinutes', t.IntegerType(), True),
                            t.StructField('Genres', t.StringType(), True),
])

title_basics_df = read_csv(path_title_basics, schema_title_basics, spark_session)
title_basics_df.show(truncate=False)

+---------+---------+-------------------------------------------+-------------------------------------------------+-------+---------+-------+--------------+------------------------+
|Tconst   |TitleType|PrimaryTitle                               |OriginalTitle                                    |IsAdult|StartYear|EndYear|RuntimeMinutes|Genres                  |
+---------+---------+-------------------------------------------+-------------------------------------------------+-------+---------+-------+--------------+------------------------+
|tt0000001|short    |Carmencita                                 |Carmencita                                       |0      |1894     |null   |1             |Documentary,Short       |
|tt0000002|short    |Le clown et ses chiens                     |Le clown et ses chiens                           |0      |1892     |null   |5             |Animation,Short         |
|tt0000003|short    |Pauvre Pierrot                             |Pauvre Pierrot           

Read title.principals

In [71]:
path_title_principals = '/content/drive/MyDrive/GD/input_files/title.principals/data.tsv'
  
schema_title_principals = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                        t.StructField('Ordering', t.IntegerType(), True),
                                        t.StructField('Nconst', t.StringType(), True),
                                        t.StructField('Category', t.StringType(), True),
                                        t.StructField('Job', t.StringType(), True),
                                        t.StructField('Characters', t.StringType(), True),
                                      ])

title_principals_df = read_csv(path_title_principals, schema_title_principals, spark_session)
title_principals_df.show(truncate=False)

+---------+--------+---------+---------------+-----------------------+--------------+
|Tconst   |Ordering|Nconst   |Category       |Job                    |Characters    |
+---------+--------+---------+---------------+-----------------------+--------------+
|tt0000001|1       |nm1588970|self           |null                   |["Self"]      |
|tt0000001|2       |nm0005690|director       |null                   |null          |
|tt0000001|3       |nm0374658|cinematographer|director of photography|null          |
|tt0000002|1       |nm0721526|director       |null                   |null          |
|tt0000002|2       |nm1335271|composer       |null                   |null          |
|tt0000003|1       |nm0721526|director       |null                   |null          |
|tt0000003|2       |nm1770680|producer       |producer               |null          |
|tt0000003|3       |nm1335271|composer       |null                   |null          |
|tt0000003|4       |nm5442200|editor         |null    

Read title.episode

In [72]:
path_title_episode = '/content/drive/MyDrive/GD/input_files/title.episode/data.tsv'

schema_title_episode = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                    t.StructField('ParentTconst', t.StringType(), True),
                                    t.StructField('SeasonNumber', t.IntegerType(), True),
                                    t.StructField('EpisodeNumber ', t.IntegerType(), True),
                                  ])


title_episode_df = read_csv(path_title_episode, schema_title_episode, spark_session)
title_episode_df.show(truncate=False)

+---------+------------+------------+--------------+
|Tconst   |ParentTconst|SeasonNumber|EpisodeNumber |
+---------+------------+------------+--------------+
|tt0020666|tt15180956  |1           |2             |
|tt0020829|tt15180956  |1           |1             |
|tt0021166|tt15180956  |1           |3             |
|tt0021612|tt15180956  |2           |2             |
|tt0021655|tt15180956  |2           |5             |
|tt0021663|tt15180956  |2           |6             |
|tt0021664|tt15180956  |2           |4             |
|tt0021701|tt15180956  |2           |1             |
|tt0021802|tt15180956  |2           |11            |
|tt0022009|tt15180956  |2           |10            |
|tt0022031|tt15180956  |2           |8             |
|tt0022127|tt15180956  |2           |9             |
|tt0022152|tt15180956  |2           |7             |
|tt0022385|tt15180956  |2           |3             |
|tt0022604|tt15180956  |3           |8             |
|tt0022610|tt15180956  |3           |10       

Read title.crew

In [73]:
path_title_crew = '/content/drive/MyDrive/GD/input_files/title.crew/data.tsv'

schema_title_crew = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                    t.StructField('Directors', t.StringType(), True),
                                    t.StructField('Writers ', t.StringType(), True),
                                  ])


title_crew_df = read_csv(path_title_crew, schema_title_crew, spark_session)
title_crew_df.show(truncate=False)

+---------+-------------------+---------+
|Tconst   |Directors          |Writers  |
+---------+-------------------+---------+
|tt0000001|nm0005690          |null     |
|tt0000002|nm0721526          |null     |
|tt0000003|nm0721526          |null     |
|tt0000004|nm0721526          |null     |
|tt0000005|nm0005690          |null     |
|tt0000006|nm0005690          |null     |
|tt0000007|nm0005690,nm0374658|null     |
|tt0000008|nm0005690          |null     |
|tt0000009|nm0085156          |nm0085156|
|tt0000010|nm0525910          |null     |
|tt0000011|nm0804434          |null     |
|tt0000012|nm0525910,nm0525908|null     |
|tt0000013|nm0525910          |null     |
|tt0000014|nm0525910          |null     |
|tt0000015|nm0721526          |null     |
|tt0000016|nm0525910          |null     |
|tt0000017|nm1587194,nm0804434|null     |
|tt0000018|nm0804434          |null     |
|tt0000019|nm0932055          |null     |
|tt0000020|nm0010291          |null     |
+---------+-------------------+---

Read title.ratings

In [74]:
path_title_ratings = '/content/drive/MyDrive/GD/input_files/title.ratings/data.tsv'

schema_title_ratings = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                     t.StructField('AverageRating', t.FloatType(), True),
                                     t.StructField('NumVotes', t.IntegerType(), True),
                                   ])


title_ratings_df = read_csv(path_title_ratings, schema_title_ratings, spark_session)
title_ratings_df.show(truncate=False)

+---------+-------------+--------+
|Tconst   |AverageRating|NumVotes|
+---------+-------------+--------+
|tt0000001|5.7          |1899    |
|tt0000002|5.9          |254     |
|tt0000003|6.5          |1692    |
|tt0000004|5.7          |166     |
|tt0000005|6.2          |2509    |
|tt0000006|5.2          |172     |
|tt0000007|5.4          |784     |
|tt0000008|5.4          |2037    |
|tt0000009|5.3          |197     |
|tt0000010|6.9          |6863    |
|tt0000011|5.3          |352     |
|tt0000012|7.4          |11768   |
|tt0000013|5.7          |1817    |
|tt0000014|7.1          |5276    |
|tt0000015|6.2          |1015    |
|tt0000016|5.9          |1421    |
|tt0000017|4.6          |310     |
|tt0000018|5.3          |569     |
|tt0000019|5.2          |31      |
|tt0000020|4.8          |339     |
+---------+-------------+--------+
only showing top 20 rows



# 1. Get all titles of series/movies etc. that are available in Ukrainian.

In [75]:
def get_filter_df1(df, column, value,target_column):
  """Get all titles of series/movies etc. that are available in Ukrainian."""
  
  df = df.filter(f.col(column) == value)
  return df.na.drop(subset=[target_column])

col = "Region"
value = "UA"
target_column = "Title"

filtered_title_akas_df = get_filter_df1(title_akas_df, col, value,target_column)
filtered_title_akas_df.show()

+---------+--------+--------------------+------+--------+-----------+----------+----------------+
|  TitleId|Ordering|               Title|Region|Language|      Types|Attributes|IsOriginalTitle |
+---------+--------+--------------------+------+--------+-----------+----------+----------------+
|tt0000001|       1|          Карменсіта|    UA|    null|imdbDisplay|      null|               0|
|tt0000003|       4|        Бідний П'єро|    UA|    null|imdbDisplay|      null|               0|
|tt0000005|       2|    Ковальська сцена|    UA|    null|imdbDisplay|      null|               0|
|tt0000008|       9|   Чхання Фреда Отта|    UA|    null|imdbDisplay|      null|               0|
|tt0000010|      10|Вихід робітників ...|    UA|    null|imdbDisplay|      null|               0|
|tt0000012|      26|Прибуття потяга н...|    UA|    null|imdbDisplay|      null|               0|
|tt0000013|      13|Прибуття делегаті...|    UA|    null|imdbDisplay|      null|               0|
|tt0000014|      19|

# 2. Get the list of people’s names, who were born in the 19th century.

In [76]:
def get_filter_df2(df, column, values, target_column):
  """Get the list of people’s names, who were born in the 19th century."""
  
  df = df.filter((f.col(column) >= values[0]) & (f.col(column) <= values[1]))
  return df.na.drop(subset=[target_column])

col = "BirthYear"
values = [1801, 1900] # 19th century.
target_column = 'PrimaryName'

filtered_name_basics_df = get_filter_df2(name_basics_df, col, values, target_column)
filtered_name_basics_df.show()

+---------+------------------+---------+---------+--------------------+--------------------+
|  TitleId|       PrimaryName|BirthYear|DeathYear|   PrimaryProfession|      KnownForTitles|
+---------+------------------+---------+---------+--------------------+--------------------+
|nm0000001|      Fred Astaire|     1899|     1987|soundtrack,actor,...|tt0072308,tt00531...|
|nm0000007|   Humphrey Bogart|     1899|     1957|actor,soundtrack,...|tt0043265,tt00373...|
|nm0000010|      James Cagney|     1899|     1986|actor,soundtrack,...|tt0042041,tt00355...|
|nm0000033|  Alfred Hitchcock|     1899|     1980|director,producer...|tt0053125,tt00542...|
|nm0000036|     Buster Keaton|     1895|     1966|actor,writer,dire...|tt0016332,tt00153...|
|nm0000050|      Groucho Marx|     1890|     1977|soundtrack,actor,...|tt0028772,tt00197...|
|nm0000055|     Alfred Newman|     1900|     1970|music_department,...|tt0065377,tt00560...|
|nm0000064|Edward G. Robinson|     1893|     1973|actor,soundtrack,...

# 3. Get titles of all movies that last more than 2 hours.

In [77]:
def get_filter_df3(df, column1, value1, column2, value2, target_column):
  """Get titles of all movies that last more than 2 hours."""

  df = df.filter((f.col(column1) > value1) & (f.col(column2).isin(value2)))
  return df.na.drop(subset=target_column)

col1 = "RuntimeMinutes"
value1 = 120 # 2 hours in minutes
col2 = "TitleType"
value2 = ['movie', 'tvMovie']
target_column = ['PrimaryTitle', 'OriginalTitle']

filtered_title_basics_df = get_filter_df3(title_basics_df, col1, value1, col2, value2, target_column)
filtered_title_basics_df.show(truncate=False)

+---------+---------+---------------------------------------+---------------------------------------+-------+---------+-------+--------------+-----------------------+
|Tconst   |TitleType|PrimaryTitle                           |OriginalTitle                          |IsAdult|StartYear|EndYear|RuntimeMinutes|Genres                 |
+---------+---------+---------------------------------------+---------------------------------------+-------+---------+-------+--------------+-----------------------+
|tt0002574|movie    |What Happened to Mary                  |What Happened to Mary                  |0      |1912     |null   |150           |Action,Drama,Thriller  |
|tt0002605|movie    |The Adventures of Kathlyn              |The Adventures of Kathlyn              |0      |1913     |null   |300           |Adventure              |
|tt0002646|movie    |Atlantis                               |Atlantis                               |0      |1913     |null   |121           |Drama                  

# 4. Get names of people, corresponding movies/series and characters they played in those films. 

In [78]:
def get_filter_df4(name_basics_df, title_basics_df, title_principals_df, 
                   col_id, col_id_name, col_name, col_film, col_char):
  """Get names of people, corresponding movies/series and characters they played in those films."""

  new_df1 = name_basics_df.select([col_id_name, col_name])
  new_df2 = title_basics_df.select([col_id, col_film[0], col_film[1]])
  new_df3 = title_principals_df.select([col_id, col_char])
  
  new_df1 = new_df1.select([f.split(f.col(col_id_name),",").alias(col_id_name), col_name])
  new_df1 = new_df1.select([f.explode(new_df1[col_id_name]).alias(col_id_name), new_df1[col_name]])
  new_df3 = new_df3.withColumn( col_char, f.translate(f.col(col_char), '["]', ""))
  # return new_df3

  df4 = new_df1.join(new_df2, new_df1[col_id_name] ==  new_df2[col_id], "inner")
  df4 = df4.join(new_df3, df4[col_id] ==  new_df3[col_id], "inner").drop(col_id)
  df4 = df4.withColumnRenamed(col_id_name, col_id)
  
  return df4

col_id = "Tconst"
col_id_name = 'KnownForTitles'
col_name = "PrimaryName"
col_film = ['PrimaryTitle', 'OriginalTitle']
col_char = 'Characters'


result_df4  = get_filter_df4(name_basics_df, title_basics_df, title_principals_df, 
                        col_id, col_id_name, col_name, col_film, col_char)
result_df4.show(truncate=False)

+---------+----------------+-------------------------------+-------------------------------+--------------+
|Tconst   |PrimaryName     |PrimaryTitle                   |OriginalTitle                  |Characters    |
+---------+----------------+-------------------------------+-------------------------------+--------------+
|tt0000174|Jan Krízenecký  |Výstavní párkar a lepic plakátù|Výstavní párkar a lepic plakátù|Sausage Vendor|
|tt0000174|Jan Krízenecký  |Výstavní párkar a lepic plakátù|Výstavní párkar a lepic plakátù|Sticker       |
|tt0000174|Jan Krízenecký  |Výstavní párkar a lepic plakátù|Výstavní párkar a lepic plakátù|null          |
|tt0000174|Ferdinand Gýra  |Výstavní párkar a lepic plakátù|Výstavní párkar a lepic plakátù|Sausage Vendor|
|tt0000174|Ferdinand Gýra  |Výstavní párkar a lepic plakátù|Výstavní párkar a lepic plakátù|Sticker       |
|tt0000174|Ferdinand Gýra  |Výstavní párkar a lepic plakátù|Výstavní párkar a lepic plakátù|null          |
|tt0000305|Valentine Brouat|

# 5. Get information about how many adult movies/series etc. there are per region. Get the top 100 of them from the region with the biggest count to the region with the smallest one.

In [79]:
def get_filter_df5(title_basics_df, title_akas_df, col1, col2, col_id1, col_id2):
  """
  Get information about how many adult movies/series etc. there are per region. 
  Get the top 100 of them from the region with the biggest count to the region with the smallest one.
  """

  joined_df = title_basics_df.join(title_akas_df, title_basics_df[col_id1] ==  title_akas_df[col_id2], "inner")
  joined_df = joined_df.filter(f.col(col1) == 1)
  joined_df = joined_df.na.drop(subset=[col1, col2])
  return joined_df.groupBy(col2).count().orderBy('count', ascending=False).limit(100)

col1 = 'IsAdult'
col2 = 'Region'
col_id1 = 'Tconst'
col_id2 = 'TitleId'

result_df5 = get_filter_df5(title_basics_df, title_akas_df, col1, col2, col_id1, col_id2)

result_df5.show()

+------+-----+
|Region|count|
+------+-----+
|    US|93168|
|    JP|21073|
|    DE|12446|
|    FR| 8035|
|    ES| 6244|
|    IT| 5944|
|    CA| 5381|
|    GB| 4455|
|    VE| 3685|
|    PT| 3508|
|    IN| 3170|
|   XWW| 2693|
|    NL| 2025|
|    BR| 1939|
|    CZ| 1561|
|    SE| 1381|
|   XWG| 1170|
|    HU|  866|
|    GR|  860|
|    DK|  808|
+------+-----+
only showing top 20 rows



# 6. Get information about how many episodes in each TV Series. Get the top 50 of them starting from the TV Series with the biggest quantity of episodes.

In [80]:
def get_filter_df6(title_episode_df, col_id, col_count):
  """
  Get information about how many episodes in each TV Series. 
  Get the top 50 of them starting from the TV Series with the biggest quantity of episodes.
  """

  new_df = title_episode_df.select([col_id, col_count])
  return new_df.groupBy(col_id).count().orderBy('count', ascending=False).limit(50)

col_id = "ParentTconst"
col_count = "Tconst"

result_df6  = get_filter_df6(title_episode_df, col_id, col_count)
result_df6.show(truncate=False)

+------------+-----+
|ParentTconst|count|
+------------+-----+
|tt12164062  |18045|
|tt0058796   |14559|
|tt0069658   |12532|
|tt0988827   |10674|
|tt0053494   |10505|
|tt0344642   |10015|
|tt0270116   |9885 |
|tt0055708   |9807 |
|tt0363402   |9580 |
|tt1985601   |9502 |
|tt0068120   |9320 |
|tt0380100   |9220 |
|tt0088580   |9199 |
|tt0283794   |9010 |
|tt0434733   |9004 |
|tt0092325   |8775 |
|tt0159881   |8632 |
|tt0283767   |8456 |
|tt0068069   |8368 |
|tt0439979   |8286 |
+------------+-----+
only showing top 20 rows



# 7.	Get 10 titles of the most popular movies/series etc. by each decade. 

In [81]:
def get_filter_df7(title_basics_df, title_ratings_df, col_id1, col_id2, spark_session):
  """Get 10 titles of the most popular movies/series etc. by each decade."""

  schema_result_df7 = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                    t.StructField('TitleType', t.StringType(), True),
                                    t.StructField('PrimaryTitle', t.StringType(), True),
                                    t.StructField('OriginalTitle', t.StringType(), True),
                                    t.StructField('StartYear', t.IntegerType(), True),
                                    t.StructField('Decade', t.IntegerType(), True),
                                    t.StructField('AverageRating', t.FloatType(), True),
                                  ])
  data_df7 = []
  df7 = spark_session.createDataFrame(data = data_df7, schema = schema_result_df7)

  result_df7 = title_basics_df.join(title_ratings_df, title_basics_df[col_id1] ==  title_ratings_df[col_id2], "inner").drop(col_id2)
  result_df7 = result_df7.na.drop(subset=['PrimaryTitle', 'OriginalTitle', 'StartYear', 'AverageRating'])
  result_df7 = result_df7.withColumn('Decade', ((f.col('StartYear')-1) / 10).cast('int'))
  list_decades = result_df7.toPandas()['Decade'].unique().tolist()

  for dec in list_decades:
    df7 = (df7.union(result_df7.select([col_id1, 'TitleType', 'PrimaryTitle', 'OriginalTitle', 'StartYear', 'Decade', 'AverageRating'])
                               .filter(f.col('Decade') == dec)
                               .orderBy('AverageRating', ascending=False).limit(10)))
  return df7

col_id1 = 'Tconst'
col_id2 = 'Tconst2'
title_ratings_df = title_ratings_df.withColumnRenamed(col_id1, col_id2)

result_df7 = get_filter_df7(title_basics_df, title_ratings_df, col_id1, col_id2, spark_session)
result_df7.show(truncate=False)

+---------+---------+----------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+---------+------+-------------+
|Tconst   |TitleType|PrimaryTitle                                                                                                                                  |OriginalTitle                                                                                                                                 |StartYear|Decade|AverageRating|
+---------+---------+----------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+---------+-----

# 8.	Get 10 titles of the most popular movies/series etc. by each genre.

In [82]:
def get_filter_df8(title_basics_df, title_ratings_df, col_id1, col_id2, spark_session):
  """Get 10 titles of the most popular movies/series etc. by each genre."""

  schema_result_df8 = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                    t.StructField('TitleType', t.StringType(), True),
                                    t.StructField('PrimaryTitle', t.StringType(), True),
                                    t.StructField('OriginalTitle', t.StringType(), True),
                                    t.StructField('Genres', t.StringType(), True),
                                    t.StructField('AverageRating', t.FloatType(), True),
                                  ])
  data_df8 = []
  df8 = spark_session.createDataFrame(data = data_df8, schema = schema_result_df8)

  result_df8 = title_basics_df.select([col_id1, 'TitleType', 'PrimaryTitle', 'OriginalTitle', f.split(f.col('Genres'),",").alias('Genres')])
  result_df8 = result_df8.select([col_id1, 'TitleType', 'PrimaryTitle', 'OriginalTitle', f.explode(f.col('Genres')).alias('Genres')])
  
  result_df8 = result_df8.join(title_ratings_df, result_df8[col_id1] ==  title_ratings_df[col_id2], "inner").drop(col_id2)
  result_df8 = result_df8.na.drop(subset=['PrimaryTitle', 'OriginalTitle', 'Genres', 'AverageRating'])
  list_genres = result_df8.toPandas()['Genres'].unique().tolist()

  for gen in list_genres:
    df8 = (df8.union(result_df8.select([col_id1, 'TitleType', 'PrimaryTitle', 'OriginalTitle', 'Genres', 'AverageRating'])
                               .filter(f.col('Genres') == gen)
                               .orderBy('AverageRating', ascending=False).limit(10)))

  return df8


col_id1 = 'Tconst'
col_id2 = 'Tconst2'
title_ratings_df = title_ratings_df.withColumnRenamed(col_id1, col_id2)

result_df8 = get_filter_df8(title_basics_df, title_ratings_df, col_id1, col_id2, spark_session)
result_df8.show(truncate=False)

+----------+---------+-----------------------------------------+-----------------------------------------+---------+-------------+
|Tconst    |TitleType|PrimaryTitle                             |OriginalTitle                            |Genres   |AverageRating|
+----------+---------+-----------------------------------------+-----------------------------------------+---------+-------------+
|tt1132924 |tvEpisode|Goth to Have Better Friends/Wax Attacks  |Goth to Have Better Friends/Wax Attacks  |Animation|10.0         |
|tt1306434 |tvEpisode|Creepie Friday/The Final Curtain         |Creepie Friday/The Final Curtain         |Animation|10.0         |
|tt1145947 |tvEpisode|Calling All Domos                        |Calling All Domos                        |Animation|10.0         |
|tt1147818 |tvEpisode|One Week Later                           |One Week Later                           |Animation|10.0         |
|tt0955930 |tvEpisode|Daishugeki! Team X vs. Imperial DG       |Daishugeki! Team X 

# Save data


In [83]:
path_to_save1 = f'/content/drive/MyDrive/GD/output_files/result_df1.csv'
write_csv(filtered_title_akas_df, path_to_save1)

In [84]:
path_to_save2 = f'/content/drive/MyDrive/GD/output_files/result_df2.csv'
write_csv(filtered_name_basics_df, path_to_save2)

In [85]:
path_to_save3 = f'/content/drive/MyDrive/GD/output_files/result_df3.csv'
write_csv(filtered_title_basics_df, path_to_save3)

In [86]:
path_to_save4 = f'/content/drive/MyDrive/GD/output_files/result_df4.csv'
write_csv(result_df4, path_to_save4)

In [87]:
path_to_save5 = f'/content/drive/MyDrive/GD/output_files/result_df5.csv'
write_csv(result_df5, path_to_save5)

In [88]:
path_to_save6 = f'/content/drive/MyDrive/GD/output_files/result_df6.csv'
write_csv(result_df6, path_to_save6)

In [89]:
path_to_save7 = f'/content/drive/MyDrive/GD/output_files/result_df7.csv'
write_csv(result_df7, path_to_save7)

In [90]:
path_to_save8 = f'/content/drive/MyDrive/GD/output_files/result_df8.csv'
write_csv(result_df8, path_to_save8)

# Unittests


In [91]:
class TestFilters(unittest.TestCase):
  
  @classmethod
  def setUpClass(cls):
    cls.spark = (SparkSession
                     .builder
                     .master("local[*]")
                     .appName("Unit-tests")
                     .getOrCreate())

  @classmethod
  def tearDownClass(cls):
    cls.spark.stop()

  def test_get_filter_df1(self):
    '''Test case function for 1 task'''
    input_schema_task1 = t.StructType([t.StructField('TitleId', t.StringType(), True),
                                 t.StructField('Ordering', t.IntegerType(), True),
                                 t.StructField('Title', t.StringType(), True),
                                 t.StructField('Region', t.StringType(), True),
                                 t.StructField('Language', t.StringType(), True),
                                 t.StructField('Types', t.StringType(), True),
                                 t.StructField('Attributes', t.StringType(), True),
                                 t.StructField('IsOriginalTitle',t.IntegerType(),True),
                               ])
    input_data_task1 = [
          ('tt0000001',1, 'Карменсіта', 'UA', None, 'imdbDisplay', None, 0),
          ('tt0000002',2, 'Καρμενσίτα', 'GR', None, 'imdbDisplay', 'literal title', 0),
          ('tt0000003',3, 'Carmencita', 'DE', None, 'imdbDisplay', None, 0),
          ('tt0000004',4, 'Carmencita', 'HU', None, 'imdbDisplay', None, 0),
          ('tt0000005',5, 'Carmencita', 'US', None, 'imdbDisplay', None, 0),
    ]
    input_df1 = self.spark.createDataFrame(data = input_data_task1, schema = input_schema_task1)
    
    expected_data_task1 = [('tt0000001',1, 'Карменсіта', 'UA', None, 'imdbDisplay', None, 0),]
    
    col = "Region"
    value = "UA"
    target_column = "Title"

    transformed_df = get_filter_df1(input_df1, col, value,target_column)
    expected_df = self.spark.createDataFrame(data = expected_data_task1, schema = input_schema_task1)

    # Assert the output of the transformation to the expected data frame.
    field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
    fields1 = [*map(field_list, transformed_df.schema.fields)]
    fields2 = [*map(field_list, expected_df.schema.fields)]
    # Compare schema of transformed_df and expected_df
    res = set(fields1) == set(fields2)

    # assert
    self.assertTrue(res)
    # Compare data in transformed_df and expected_df
    self.assertEqual(sorted(expected_df.collect()), sorted(transformed_df.collect()))
    

  def test_get_filter_df2(self):
    '''Test case function for 2 task'''
    input_schema_task2 = t.StructType([t.StructField('TitleId', t.StringType(), True),
                                       t.StructField('PrimaryName', t.StringType(), True),
                                       t.StructField('BirthYear', t.IntegerType(), True),
                                       t.StructField('DeathYear', t.IntegerType(), True),
                                       t.StructField('PrimaryProfession', t.StringType(), True),
                                       t.StructField('KnownForTitles', t.StringType(), True),
    ])

    input_data_task2 = [
          ('nm0000001','Fred Astaire', 1899, 1987, 'soundtrack,actor,miscellaneous', 'tt0072308,tt0053137,tt0050419,tt0031983'),
          ('nm0000002','Lauren Bacall', 1924, 2014, 'actress,soundtrack', 'tt0038355,tt0117057,tt0071877,tt0037382'),
          ('nm0000003','Brigitte Bardot', 1934, None, 'actress,soundtrack,music_department', 'tt0057345,tt0049189,tt0054452,tt0056404'),
          ('nm0000004','John Belushi', 1949, 1982, 'actor,soundtrack,writer', 'tt0078723,tt0077975,tt0072562,tt0080455'),
          ('nm0000005','Ingmar Bergman', 1918, 2007, 'writer,director,actor', 'tt0083922,tt0050986,tt0060827,tt0050976'),
    ]

    input_df2 = self.spark.createDataFrame(data = input_data_task2, schema = input_schema_task2)

    expected_data_task2 = [('nm0000001','Fred Astaire', 1899, 1987, 'soundtrack,actor,miscellaneous', 'tt0072308,tt0053137,tt0050419,tt0031983')]
    
    col = "BirthYear"
    values = [1801, 1900] # 19th century.
    target_column = 'PrimaryName'

    transformed_df = get_filter_df2(input_df2, col, values,target_column)
    expected_df = self.spark.createDataFrame(data = expected_data_task2, schema = input_schema_task2)

    # Assert the output of the transformation to the expected data frame.
    field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
    fields1 = [*map(field_list, transformed_df.schema.fields)]
    fields2 = [*map(field_list, expected_df.schema.fields)]
    # Compare schema of transformed_df and expected_df
    res = set(fields1) == set(fields2)

    # assert
    self.assertTrue(res)
    # Compare data in transformed_df and expected_df
    self.assertEqual(sorted(expected_df.collect()), sorted(transformed_df.collect()))

  def test_get_filter_df3(self):
    '''Test case function for 3 task'''
    input_schema_task3 = t.StructType([t.StructField('Tconst', t.StringType(), True),
                            t.StructField('TitleType', t.StringType(), True),
                            t.StructField('PrimaryTitle', t.StringType(), True),
                            t.StructField('OriginalTitle', t.StringType(), True),
                            t.StructField('IsAdult', t.IntegerType(), True),
                            t.StructField('StartYear', t.IntegerType(), True),
                            t.StructField('EndYear', t.IntegerType(), True),
                            t.StructField('RuntimeMinutes', t.IntegerType(), True),
                            t.StructField('Genres', t.StringType(), True),
    ])

    input_data_task3 = [
          ('tt0002574','movie', 'What Happened to Mary', 'What Happened to Mary', 0, 1912, None, 110, 'Action,Drama,Thriller'),
          ('tt0002605','movie', 'The Adventures of Kathlyn', 'The Adventures of Kathlyn', 0, 1915, None, 300, 'Adventure'),
          ('tt0002646','short', 'Atlantis', 'Atlantis', 0, 1944, None, 140, 'Drama'),

    ]

    input_df3 = self.spark.createDataFrame(data = input_data_task3, schema = input_schema_task3)

    expected_data_task3 = [('tt0002605','movie', 'The Adventures of Kathlyn', 'The Adventures of Kathlyn', 0, 1915, None, 300, 'Adventure'),]
    
    col1 = "RuntimeMinutes"
    value1 = 120 # 2 hours in minutes
    col2 = "TitleType"
    value2 = ['movie', 'tvMovie']
    target_column = ['PrimaryTitle', 'OriginalTitle']

    transformed_df = get_filter_df3(input_df3, col1, value1, col2, value2, target_column)
    expected_df = self.spark.createDataFrame(data = expected_data_task3, schema = input_schema_task3)

    # Assert the output of the transformation to the expected data frame.
    field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
    fields1 = [*map(field_list, transformed_df.schema.fields)]
    fields2 = [*map(field_list, expected_df.schema.fields)]
    # Compare schema of transformed_df and expected_df
    res = set(fields1) == set(fields2)

    # assert
    self.assertTrue(res)
    # Compare data in transformed_df and expected_df
    self.assertEqual(sorted(expected_df.collect()), sorted(transformed_df.collect()))

  def test_get_filter_df4(self):
    '''Test case function for 4 task'''
    schema_name_basics = t.StructType([t.StructField('TitleId', t.StringType(), True),
                                      t.StructField('PrimaryName', t.StringType(), True),
                                      t.StructField('BirthYear', t.IntegerType(), True),
                                      t.StructField('DeathYear', t.IntegerType(), True),
                                      t.StructField('PrimaryProfession', t.StringType(), True),
                                      t.StructField('KnownForTitles', t.StringType(), True),
                                      ])
    schema_title_basics = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                        t.StructField('TitleType', t.StringType(), True),
                                        t.StructField('PrimaryTitle', t.StringType(), True),
                                        t.StructField('OriginalTitle', t.StringType(), True),
                                        t.StructField('IsAdult', t.IntegerType(), True),
                                        t.StructField('StartYear', t.IntegerType(), True),
                                        t.StructField('EndYear', t.IntegerType(), True),
                                        t.StructField('RuntimeMinutes', t.IntegerType(), True),
                                        t.StructField('Genres', t.StringType(), True),
                                      ])
    schema_title_principals = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                        t.StructField('Ordering', t.IntegerType(), True),
                                        t.StructField('Nconst', t.StringType(), True),
                                        t.StructField('Category', t.StringType(), True),
                                        t.StructField('Job', t.StringType(), True),
                                        t.StructField('Characters', t.StringType(), True),
                                      ])
    schema_expected4 = t.StructType([t.StructField('Tconst', t.StringType(), False),
                                    t.StructField('PrimaryName', t.StringType(), True),
                                    t.StructField('PrimaryTitle', t.StringType(), True),
                                    t.StructField('OriginalTitle', t.StringType(), True),
                                    t.StructField('Characters', t.StringType(), True),
                                  ])
    
    input_name_basics = [
          ('nm0000001','Fred Astaire', 1899, 1987, 'soundtrack,actor,miscellaneous', 'tt0072308,tt0053137,tt0050419,tt0031983'),
          ('nm0000002','Lauren Bacall', 1924, 2014, 'actress,soundtrack', 'tt0038355,tt0117057,tt0071877,tt0037382'),
          ('nm0000003','Brigitte Bardot', 1934, None, 'actress,soundtrack,music_department', 'tt0057345,tt0049189,tt0054452,tt0056404'),
          ('nm0000004','John Belushi', 1949, 1982, 'actor,soundtrack,writer', 'tt0078723,tt0077975,tt0072562,tt0080455'),
          ('nm0000005','Ingmar Bergman', 1918, 2007, 'writer,director,actor', 'tt0083922,tt0050986,tt0060827,tt0050976'),
    ]
    input_title_basics = [
          ('tt0072308','movie', 'What Happened to Mary', 'What Happened to Mary', 0, 1912, None, 110, 'Action,Drama,Thriller'),
          ('tt0071877','movie', 'The Adventures of Kathlyn', 'The Adventures of Kathlyn', 0, 1915, None, 300, 'Adventure'),
          ('tt0002646','short', 'Atlantis', 'Atlantis', 0, 1944, None, 140, 'Drama'),

    ]
    input_title_principals = [
        
          ('tt0072308',1, 'nm1588970', 'self', None, '["Self"]'),
          ('tt0071877',2, 'nm0005690', 'director', None, '["Blacksmith"]'),
          ('tt0000003',3, 'nm0374658', 'cinematographer', 'director of photography', None),

    ]
    expected_data_task4 = [
        
          ('tt0072308', 'Fred Astaire', 'What Happened to Mary', 'What Happened to Mary', 'Self'),
          ('tt0071877', 'Lauren Bacall', 'The Adventures of Kathlyn', 'The Adventures of Kathlyn', 'Blacksmith'),

    ]
    name_basics_df = self.spark.createDataFrame(data = input_name_basics, schema = schema_name_basics)
    title_basics_df = self.spark.createDataFrame(data = input_title_basics, schema = schema_title_basics)
    title_principals_df = self.spark.createDataFrame(data = input_title_principals, schema = schema_title_principals)

    col_id = "Tconst"
    col_id_name = 'KnownForTitles'
    col_name = "PrimaryName"
    col_film = ['PrimaryTitle', 'OriginalTitle']
    col_char = 'Characters'

    transformed_df = get_filter_df4(name_basics_df, title_basics_df, title_principals_df, col_id, col_id_name, col_name, col_film, col_char)
    expected_df = self.spark.createDataFrame(data = expected_data_task4, schema = schema_expected4)

    # Assert the output of the transformation to the expected data frame.
    field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
    fields1 = [*map(field_list, transformed_df.schema.fields)]
    fields2 = [*map(field_list, expected_df.schema.fields)]
    # Compare schema of transformed_df and expected_df
    res = set(fields1) == set(fields2)

    # assert
    self.assertTrue(res)
    # Compare data in transformed_df and expected_df
    self.assertEqual(sorted(expected_df.collect()), sorted(transformed_df.collect()))

  def test_get_filter_df5(self):
    '''Test case function for 5 task'''

    schema_title_basics = t.StructType([t.StructField('Tconst', t.StringType(), True),
                            t.StructField('TitleType', t.StringType(), True),
                            t.StructField('PrimaryTitle', t.StringType(), True),
                            t.StructField('OriginalTitle', t.StringType(), True),
                            t.StructField('IsAdult', t.IntegerType(), True),
                            t.StructField('StartYear', t.IntegerType(), True),
                            t.StructField('EndYear', t.IntegerType(), True),
                            t.StructField('RuntimeMinutes', t.IntegerType(), True),
                            t.StructField('Genres', t.StringType(), True),
    ])
    schema_title_akas = t.StructType([t.StructField('TitleId', t.StringType(), True),
                                t.StructField('Ordering', t.IntegerType(), True),
                                t.StructField('Title', t.StringType(), True),
                                t.StructField('Region', t.StringType(), True),
                                t.StructField('Language', t.StringType(), True),
                                t.StructField('Types', t.StringType(), True),
                                t.StructField('Attributes', t.StringType(), True),
                                t.StructField('IsOriginalTitle ',t.IntegerType(),True),
    ])
    
    schema_expected5 = t.StructType([t.StructField('Region', t.StringType(), True),
                                    t.StructField('count', t.LongType(), False),
                                  ])
    

    input_title_basics = [
          ('tt0000001','short', 'Carmencita', 'Carmencita', 1, 1912, None, 110, 'Action,Drama,Thriller'),
          ('tt0000002','short', 'Der Clown und seine Hunde', 'Der Clown und seine Hunde', 1, 1915, None, 300, 'Adventure'),
          ('tt0000003','short', 'Poor Pierrot', 'Poor Pierrot', 0, 1944, None, 140, 'Drama'),

    ]
    input_title_akas = [
          ('tt0000001',1, 'Карменсіта', 'UA', None, 'imdbDisplay', None, 0),
          ('tt0000001',2, 'Καρμενσίτα', 'GR', None, 'imdbDisplay', 'literal title', 0),
          ('tt0000001',3, 'Carmencita', 'DE', None, 'imdbDisplay', None, 0),
          ('tt0000002',1, "Клоун та його собаки", 'UA', None, 'imdbDisplay', None, 0),
          ('tt0000002',2, 'Der Clown und seine Hunde', 'DE', None, 'imdbDisplay', None, 0),
          ('tt0000003',2, "Бідний П'єро", 'UA', None, 'imdbDisplay', None, 0),

    ]

    expected_data_task5 = [
        
          ('UA', 2),
          ('DE', 2),
          ('GR', 1),

    ]
    title_akas_df = self.spark.createDataFrame(data = input_title_akas, schema = schema_title_akas)
    title_basics_df = self.spark.createDataFrame(data = input_title_basics, schema = schema_title_basics)


    col1 = 'IsAdult'
    col2 = 'Region'
    col_id1 = 'Tconst'
    col_id2 = 'TitleId'

    transformed_df = get_filter_df5(title_basics_df, title_akas_df, col1, col2, col_id1, col_id2)
    expected_df = self.spark.createDataFrame(data = expected_data_task5, schema = schema_expected5)

    # Assert the output of the transformation to the expected data frame.
    field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
    fields1 = [*map(field_list, transformed_df.schema.fields)]
    fields2 = [*map(field_list, expected_df.schema.fields)]
    # Compare schema of transformed_df and expected_df
    res = set(fields1) == set(fields2)

    # assert
    self.assertTrue(res)
    # Compare data in transformed_df and expected_df
    self.assertEqual(sorted(expected_df.collect()), sorted(transformed_df.collect()))

  def test_get_filter_df6(self):
    '''Test case function for 6 task'''

    schema_title_episode = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                    t.StructField('ParentTconst', t.StringType(), True),
                                    t.StructField('SeasonNumber', t.IntegerType(), True),
                                    t.StructField('EpisodeNumber ', t.IntegerType(), True),
                                  ])
    
    schema_expected6 = t.StructType([t.StructField('ParentTconst', t.StringType(), True),
                                    t.StructField('count', t.LongType(), False),
                                  ])
    
    input_title_episode = [
          ('tt0020666','tt15180956', 1, 1),
          ('tt0020829','tt15180956', 1, 2),
          ('tt0020823','tt15180956', 1, 3),
          ('tt0020634','tt15180957', 1, 1),
          ('tt0020833','tt15180957', 1, 2),
          ('tt0020432','tt15180957', 1, 3),
          ('tt0020334','tt15180957', 1, 4),
          ('tt0020233','tt15180957', 1, 5),
          ('tt0020132','tt15180957', 1, 6),
          ('tt0120634','tt15180958', 1, 1),
          ('tt0120833','tt15180958', 1, 2),
          ('tt0120432','tt15180958', 1, 3),
          ('tt0120334','tt15180958', 1, 4),
          ('tt0120233','tt15180958', 1, 5),

    ]

    expected_data_task6 = [
          ('tt15180956', 3),
          ('tt15180957', 6),
          ('tt15180958', 5),

    ]
    title_episode_df = self.spark.createDataFrame(data = input_title_episode, schema = schema_title_episode)

    col_id = "ParentTconst"
    col_count = "Tconst"

    transformed_df = get_filter_df6(title_episode_df, col_id, col_count)
    expected_df = self.spark.createDataFrame(data = expected_data_task6, schema = schema_expected6)

    # Assert the output of the transformation to the expected data frame.
    field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
    fields1 = [*map(field_list, transformed_df.schema.fields)]
    fields2 = [*map(field_list, expected_df.schema.fields)]
    # Compare schema of transformed_df and expected_df
    res = set(fields1) == set(fields2)
    # assert
    self.assertTrue(res)
    # Compare data in transformed_df and expected_df
    self.assertEqual(sorted(expected_df.collect()), sorted(transformed_df.collect()))

  def test_get_filter_df7(self):
    '''Test case function for 7 task'''

    schema_title_ratings = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                         t.StructField('AverageRating', t.FloatType(), True),
                                         t.StructField('NumVotes', t.IntegerType(), True),
                                        ])
    
    schema_title_basics = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                        t.StructField('TitleType', t.StringType(), True),
                                        t.StructField('PrimaryTitle', t.StringType(), True),
                                        t.StructField('OriginalTitle', t.StringType(), True),
                                        t.StructField('IsAdult', t.IntegerType(), True),
                                        t.StructField('StartYear', t.IntegerType(), True),
                                        t.StructField('EndYear', t.IntegerType(), True),
                                        t.StructField('RuntimeMinutes', t.IntegerType(), True),
                                        t.StructField('Genres', t.StringType(), True),
                                      ])
    
    schema_expected7 = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                    t.StructField('TitleType', t.StringType(), True),
                                    t.StructField('PrimaryTitle', t.StringType(), True),
                                    t.StructField('OriginalTitle', t.StringType(), True),
                                    t.StructField('StartYear', t.IntegerType(), True),
                                    t.StructField('Decade', t.IntegerType(), True),
                                    t.StructField('AverageRating', t.FloatType(), True),
                                     
                                  ])

    input_title_ratings = [
        ('tt0000001', 5.9, 1899), 
        ('tt0000002', 5.8, 254),  
        ('tt0000003', 5.7, 1692), 
        ('tt0000004', 5.6, 166),  
        ('tt0000005', 5.5, 2509), 
        ('tt0000006', 5.4, 172),  
        ('tt0000007', 5.3, 784),  
        ('tt0000008', 5.2, 2037), 
        ('tt0000009', 5.1, 197),  
        ('tt0000010', 6.9, 6863), 
        ('tt0000011', 7.0, 352), 
        ('tt0000012', 7.4, 11768),
        ('tt0000013', 5.7, 1817),
        ('tt0000014', 5.1, 5276),
        ('tt0000015', 5.1, 5276),
        ('tt0000016', 8.1, 5476),
        ('tt0000017', 8.1, 5256),
    ]

    input_title_basics = [
        ('tt0000001','short', 'name1', 'name1', 1, 1815, None, 300, 'Adventure'),
        ('tt0000002','short', 'name2', 'name2', 0, 1816, None, 140, 'Drama'),
        ('tt0000003','movie', 'name3', 'name3', 1, 1812, None, 110, 'Action,Drama,Thriller'),
        ('tt0000004','short', 'name4', 'name4', 1, 1815, None, 300, 'Adventure'),
        ('tt0000005','short', 'name5', 'name5', 0, 1811, None, 140, 'Drama'),
        ('tt0000006','short', 'name6', 'name6', 1, 1812, None, 110, 'Action,Drama,Thriller'),
        ('tt0000007','movie', 'name7', 'name7', 1, 1815, None, 300, 'Adventure'),
        ('tt0000008','short', 'name8', 'name8', 0, 1811, None, 140, 'Drama'),
        ('tt0000009','movie', 'name9', 'name9', 1, 1812, None, 110, 'Action,Drama,Thriller'),
        ('tt0000010','short', 'name10', 'name10', 1, 1815, None, 300, 'Adventure'),
        ('tt0000011','short', 'name11', 'name11', 0, 1811, None, 140, 'Drama'),
        ('tt0000012','short', 'Carmencita', 'Carmencita', 1, 1912, None, 110, 'Action,Drama,Thriller'),
        ('tt0000013','movie', 'Der Clown und seine Hunde', 'Der Clown und seine Hunde', 1, 1915, None, 300, 'Adventure'),
        ('tt0000014','short', 'Poor Pierrot', 'Poor Pierrot', 0, 1916, None, 140, 'Drama'),
        ('tt0000015','movie', 'Carmencita', 'Carmencita', 1, 1912, None, 110, 'Action,Drama,Thriller'),
    ]
    expected_data_task7 = [
        ('tt0000011','short', 'name11', 'name11', 1811, 181, 7.0,),
        ('tt0000010','short', 'name10', 'name10', 1815, 181, 6.9,),
        ('tt0000001','short', 'name1', 'name1', 1815, 181, 5.9,),
        ('tt0000002','short', 'name2', 'name2', 1816, 181, 5.8,),
        ('tt0000003','movie', 'name3', 'name3', 1812, 181, 5.7,),
        ('tt0000004','short', 'name4', 'name4', 1815, 181, 5.6,),
        ('tt0000005','short', 'name5', 'name5', 1811, 181, 5.5,),
        ('tt0000006','short', 'name6', 'name6', 1812, 181, 5.4,),
        ('tt0000007','movie', 'name7', 'name7', 1815, 181, 5.3,),
        ('tt0000008','short', 'name8', 'name8', 1811, 181, 5.2,),    
        ('tt0000012','short', 'Carmencita', 'Carmencita', 1912, 191, 7.4),
        ('tt0000013','movie', 'Der Clown und seine Hunde', 'Der Clown und seine Hunde', 1915, 191, 5.7),
        ('tt0000014','short', 'Poor Pierrot', 'Poor Pierrot', 1916, 191, 5.1),
        ('tt0000015','movie', 'Carmencita', 'Carmencita', 1912, 191, 5.1),
    ]

    title_ratings_df = self.spark.createDataFrame(data = input_title_ratings, schema = schema_title_ratings)
    title_basics_df = self.spark.createDataFrame(data = input_title_basics, schema = schema_title_basics)

    col_id1 = 'Tconst'
    col_id2 = 'Tconst2'
    title_ratings_df = title_ratings_df.withColumnRenamed(col_id1, col_id2)

    transformed_df = get_filter_df7(title_basics_df, title_ratings_df, col_id1, col_id2, self.spark)
    expected_df = self.spark.createDataFrame(data = expected_data_task7, schema = schema_expected7)


    # Assert the output of the transformation to the expected data frame.
    field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
    fields1 = [*map(field_list, transformed_df.schema.fields)]
    fields2 = [*map(field_list, expected_df.schema.fields)]
    # Compare schema of transformed_df and expected_df
    res = set(fields1) == set(fields2)
    # print('\n', set(fields1), '\n', set(fields2))
    # assert
    self.assertTrue(res)
    # Compare data in transformed_df and expected_df
    self.assertEqual(sorted(expected_df.collect()), sorted(transformed_df.collect()))
  
  
  def test_get_filter_df8(self):
    '''Test case function for 8 task'''

    schema_title_ratings = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                         t.StructField('AverageRating', t.FloatType(), True),
                                         t.StructField('NumVotes', t.IntegerType(), True),
                                        ])
    
    schema_title_basics = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                        t.StructField('TitleType', t.StringType(), True),
                                        t.StructField('PrimaryTitle', t.StringType(), True),
                                        t.StructField('OriginalTitle', t.StringType(), True),
                                        t.StructField('IsAdult', t.IntegerType(), True),
                                        t.StructField('StartYear', t.IntegerType(), True),
                                        t.StructField('EndYear', t.IntegerType(), True),
                                        t.StructField('RuntimeMinutes', t.IntegerType(), True),
                                        t.StructField('Genres', t.StringType(), True),
                                      ])
    
    schema_expected8 = t.StructType([t.StructField('Tconst', t.StringType(), True),
                                     t.StructField('TitleType', t.StringType(), True),
                                     t.StructField('PrimaryTitle', t.StringType(), True),
                                     t.StructField('OriginalTitle', t.StringType(), True),
                                     t.StructField('Genres', t.StringType(), True),
                                     t.StructField('AverageRating', t.FloatType(), True),
                                   ])

    input_title_ratings = [
        ('tt0000001', 5.9, 1899), 
        ('tt0000002', 6.8, 254),  
        ('tt0000003', 5.7, 1692), 
        ('tt0000004', 5.6, 166),  
        ('tt0000005', 5.5, 2509), 
        ('tt0000006', 5.4, 172),  
        ('tt0000007', 5.3, 784),  
        ('tt0000008', 5.2, 2037), 
        ('tt0000009', 5.1, 197),  
        ('tt0000010', 6.9, 6863), 
        ('tt0000011', 7.0, 352), 
        ('tt0000012', 7.4, 11768),
    ]

    input_title_basics = [
        ('tt0000001','short', 'name1', 'name1', 1, 1815, None, 300, 'Adventure'),
        ('tt0000002','short', 'name2', 'name2', 0, 1816, None, 140, 'Drama'),
        ('tt0000003','movie', 'name3', 'name3', 1, 1812, None, 110, 'Action,Drama,Thriller'),
        ('tt0000004','short', 'name4', 'name4', 1, 1815, None, 300, 'Adventure'),
        ('tt0000005','short', 'name5', 'name5', 0, 1811, None, 140, 'Drama'),
        ('tt0000006','short', 'name6', 'name6', 1, 1812, None, 110, 'Action,Drama,Thriller'),
        ('tt0000007','movie', 'name7', 'name7', 1, 1815, None, 300, 'Adventure'),
        ('tt0000008','short', 'name8', 'name8', 0, 1811, None, 140, 'Drama'),
        ('tt0000009','movie', 'name9', 'name9', 1, 1812, None, 110, 'Action,Drama,Thriller'),
        ('tt0000010','short', 'name10', 'name10', 1, 1815, None, 300, 'Adventure'),
    ]

    expected_data_task8 = [
        ('tt0000010', 'short', 'name10', 'name10', 'Adventure', 6.9),
        ('tt0000001', 'short', 'name1', 'name1', 'Adventure', 5.9),
        ('tt0000004', 'short', 'name4', 'name4', 'Adventure', 5.6),
        ('tt0000007', 'movie', 'name7', 'name7', 'Adventure', 5.3),
        ('tt0000002', 'short', 'name2', 'name2', 'Drama', 6.8),
        ('tt0000003', 'movie', 'name3', 'name3', 'Drama', 5.7),
        ('tt0000005', 'short', 'name5', 'name5', 'Drama', 5.5),
        ('tt0000006', 'short', 'name6', 'name6', 'Drama', 5.4),
        ('tt0000008', 'short', 'name8', 'name8', 'Drama', 5.2),
        ('tt0000009', 'movie', 'name9', 'name9', 'Drama', 5.1),
        ('tt0000003', 'movie', 'name3', 'name3', 'Action', 5.7),
        ('tt0000006', 'short', 'name6', 'name6', 'Action', 5.4),
        ('tt0000009', 'movie', 'name9', 'name9', 'Action', 5.1),
        ('tt0000003', 'movie', 'name3', 'name3', 'Thriller', 5.7),
        ('tt0000006', 'short', 'name6', 'name6', 'Thriller', 5.4),
        ('tt0000009', 'movie', 'name9', 'name9', 'Thriller', 5.1),
    ]

    title_ratings_df = self.spark.createDataFrame(data = input_title_ratings, schema = schema_title_ratings)
    title_basics_df = self.spark.createDataFrame(data = input_title_basics, schema = schema_title_basics)

    col_id1 = 'Tconst'
    col_id2 = 'Tconst2'
    title_ratings_df = title_ratings_df.withColumnRenamed(col_id1, col_id2)

    transformed_df = get_filter_df8(title_basics_df, title_ratings_df, col_id1, col_id2, self.spark)
    expected_df = self.spark.createDataFrame(data = expected_data_task8, schema = schema_expected8)
    # transformed_df.show()

    # Assert the output of the transformation to the expected data frame.
    field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
    fields1 = [*map(field_list, transformed_df.schema.fields)]
    fields2 = [*map(field_list, expected_df.schema.fields)]
    # Compare schema of transformed_df and expected_df
    res = set(fields1) == set(fields2)
    # assert
    self.assertTrue(res)
    # Compare data in transformed_df and expected_df
    self.assertEqual(sorted(expected_df.collect()), sorted(transformed_df.collect()))

unittest.main(argv=[''], defaultTest = "TestFilters", verbosity=2, exit=False)


test_get_filter_df1 (__main__.TestFilters)
Test case function for 1 task ... ok
test_get_filter_df2 (__main__.TestFilters)
Test case function for 2 task ... ok
test_get_filter_df3 (__main__.TestFilters)
Test case function for 3 task ... ok
test_get_filter_df4 (__main__.TestFilters)
Test case function for 4 task ... ok
test_get_filter_df5 (__main__.TestFilters)
Test case function for 5 task ... ok
test_get_filter_df6 (__main__.TestFilters)
Test case function for 6 task ... ok
test_get_filter_df7 (__main__.TestFilters)
Test case function for 7 task ... ok
test_get_filter_df8 (__main__.TestFilters)
Test case function for 8 task ... ok

----------------------------------------------------------------------
Ran 8 tests in 5.960s

OK


<unittest.main.TestProgram at 0x7fb15dbca990>