<a href="https://colab.research.google.com/github/staaason/imbd-spark-project/blob/main/Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 54 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 70.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=fd4644a51f8bd7a3feaca3f457515665a82e633c3e88a3b2f44425fb31acf378
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [2]:
from pyspark import SparkConf
from pyspark.sql import  SparkSession, Window
import pyspark.sql.types as t
import pyspark.sql.functions as f
from pyspark.sql.functions import explode, sequence, to_date
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
title_akas_schema = t.StructType([t.StructField('titleId', t.StringType(), True),
                                  t.StructField('ordering', t.IntegerType(), True),
                                  t.StructField('title', t.StringType(), True),
                                  t.StructField('region', t.StringType(), True),
                                  t.StructField('language', t.StringType(), True),
                                  t.StructField('types', t.StringType(), True),
                                  t.StructField('attributes',  t.StringType(), True),
                                  t.StructField('isOriginalTitle', t.StringType(), True)])

title_akas_df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/data/title.akas.tsv.gz", sep=r'\t', header=True, nullValue='\\N', schema=title_akas_schema)
title_akas_df.show(truncate=False)

+---------+--------+-------------------------+------+--------+-----------+---------------------+---------------+
|titleId  |ordering|title                    |region|language|types      |attributes           |isOriginalTitle|
+---------+--------+-------------------------+------+--------+-----------+---------------------+---------------+
|tt0000001|1       |Карменсіта               |UA    |null    |imdbDisplay|null                 |0              |
|tt0000001|2       |Carmencita               |DE    |null    |null       |literal title        |0              |
|tt0000001|3       |Carmencita - spanyol tánc|HU    |null    |imdbDisplay|null                 |0              |
|tt0000001|4       |Καρμενσίτα               |GR    |null    |imdbDisplay|null                 |0              |
|tt0000001|5       |Карменсита               |RU    |null    |imdbDisplay|null                 |0              |
|tt0000001|6       |Carmencita               |US    |null    |imdbDisplay|null                 |

In [5]:
title_basics_schema =  t.StructType([t.StructField('tconst', t.StringType(), True),
                                  t.StructField('titleType', t.StringType(), True),
                                  t.StructField('primaryTitle', t.StringType(), True),
                                  t.StructField('originalTitle', t.StringType(), True),
                                  t.StructField('isAdult', t.IntegerType(), True),
                                  t.StructField('startYear', t.DateType(), True),
                                  t.StructField('endYear',  t.DateType(), True),
                                  t.StructField('runtimeMinutes', t.IntegerType(), True),
                                  t.StructField('genres', t.StringType(), True)])

title_basics_df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/data/title.basics.tsv.gz", sep=r'\t', header=True, nullValue='\\N', dateFormat="MM/dd/yyyy", schema=title_basics_schema)
title_basics_df.show()

+---------+---------+--------------------+--------------------+-------+----------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult| startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+----------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|1894-01-01|   null|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|1892-01-01|   null|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|1892-01-01|   null|             4|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|1892-01-01|   null|            12|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|1893-01-01|   null|             1|      

In [6]:
name_basics_schema =  t.StructType([t.StructField('nconst', t.StringType(), True),
                                  t.StructField('primaryName', t.StringType(), True),
                                  t.StructField('birthYear', t.DateType(), True),
                                  t.StructField('deathYear', t.DateType(), True),
                                  t.StructField('primaryProfession', t.StringType(), True),
                                  t.StructField('knownForTitles',  t.StringType(), True)])


name_basics_df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/data/name.basics.tsv.gz", sep=r'\t', header=True, nullValue='\\N', dateFormat="MM/dd/yyyy", schema=name_basics_schema)
name_basics_df.show()

+---------+-------------------+----------+----------+--------------------+--------------------+
|   nconst|        primaryName| birthYear| deathYear|   primaryProfession|      knownForTitles|
+---------+-------------------+----------+----------+--------------------+--------------------+
|nm0000001|       Fred Astaire|1899-01-01|1987-01-01|soundtrack,actor,...|tt0072308,tt00504...|
|nm0000002|      Lauren Bacall|1924-01-01|2014-01-01|  actress,soundtrack|tt0037382,tt00383...|
|nm0000003|    Brigitte Bardot|1934-01-01|      null|actress,soundtrac...|tt0054452,tt00491...|
|nm0000004|       John Belushi|1949-01-01|1982-01-01|actor,soundtrack,...|tt0072562,tt00787...|
|nm0000005|     Ingmar Bergman|1918-01-01|2007-01-01|writer,director,a...|tt0050986,tt00509...|
|nm0000006|     Ingrid Bergman|1915-01-01|1982-01-01|actress,soundtrac...|tt0038109,tt00387...|
|nm0000007|    Humphrey Bogart|1899-01-01|1957-01-01|actor,soundtrack,...|tt0034583,tt00432...|
|nm0000008|      Marlon Brando|1924-01-0

In [7]:
title_crew_schema =  t.StructType([t.StructField('tconst', t.StringType(), True),
                                  t.StructField('directors', t.StringType(), True),
                                  t.StructField('writers ', t.StringType(), True)])

title_crew_df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/data/title.crew.tsv.gz", sep=r'\t', header=True, nullValue='\\N', schema=title_crew_schema)
title_crew_df.show()

+---------+-------------------+---------+
|   tconst|          directors| writers |
+---------+-------------------+---------+
|tt0000001|          nm0005690|     null|
|tt0000002|          nm0721526|     null|
|tt0000003|          nm0721526|     null|
|tt0000004|          nm0721526|     null|
|tt0000005|          nm0005690|     null|
|tt0000006|          nm0005690|     null|
|tt0000007|nm0005690,nm0374658|     null|
|tt0000008|          nm0005690|     null|
|tt0000009|          nm0085156|nm0085156|
|tt0000010|          nm0525910|     null|
|tt0000011|          nm0804434|     null|
|tt0000012|nm0525908,nm0525910|     null|
|tt0000013|          nm0525910|     null|
|tt0000014|          nm0525910|     null|
|tt0000015|          nm0721526|     null|
|tt0000016|          nm0525910|     null|
|tt0000017|nm1587194,nm0804434|     null|
|tt0000018|          nm0804434|     null|
|tt0000019|          nm0932055|     null|
|tt0000020|          nm0010291|     null|
+---------+-------------------+---

In [8]:
title_episode_schema =  t.StructType([t.StructField('tconst', t.StringType(), True),
                                  t.StructField('parentTconst', t.StringType(), True),
                                  t.StructField('seasonNumber', t.IntegerType(), True),
                                  t.StructField('episodeNumber', t.IntegerType(), True)])

title_episode_df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/data/title.episode.tsv.gz", sep=r'\t', header=True, nullValue='\\N', schema=title_episode_schema)
title_episode_df.show()

+---------+------------+------------+-------------+
|   tconst|parentTconst|seasonNumber|episodeNumber|
+---------+------------+------------+-------------+
|tt0041951|   tt0041038|           1|            9|
|tt0042816|   tt0989125|           1|           17|
|tt0042889|   tt0989125|        null|         null|
|tt0043426|   tt0040051|           3|           42|
|tt0043631|   tt0989125|           2|           16|
|tt0043693|   tt0989125|           2|            8|
|tt0043710|   tt0989125|           3|            3|
|tt0044093|   tt0959862|           1|            6|
|tt0044668|   tt0044243|           2|           16|
|tt0044901|   tt0989125|           3|           46|
|tt0045519|   tt0989125|           4|           11|
|tt0045960|   tt0044284|           2|            3|
|tt0046135|   tt0989125|           4|            5|
|tt0046150|   tt0341798|        null|         null|
|tt0046855|   tt0046643|           1|            4|
|tt0046864|   tt0989125|           5|           20|
|tt0047810| 

In [9]:
title_principals_schema =  t.StructType([t.StructField('tconst', t.StringType(), True),
                                  t.StructField('ordering', t.IntegerType(), True),
                                  t.StructField('nconst', t.StringType(), True),
                                  t.StructField('category', t.StringType(), True),
                                  t.StructField('job', t.StringType(), True),
                                  t.StructField('characters', t.StringType(), True)])

title_principals_df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/data/title.principals.tsv.gz", sep=r'\t', header=True, nullValue='\\N', schema=title_principals_schema)
title_principals_df.show()

+---------+--------+---------+---------------+--------------------+--------------+
|   tconst|ordering|   nconst|       category|                 job|    characters|
+---------+--------+---------+---------------+--------------------+--------------+
|tt0000001|       1|nm1588970|           self|                null|      ["Self"]|
|tt0000001|       2|nm0005690|       director|                null|          null|
|tt0000001|       3|nm0374658|cinematographer|director of photo...|          null|
|tt0000002|       1|nm0721526|       director|                null|          null|
|tt0000002|       2|nm1335271|       composer|                null|          null|
|tt0000003|       1|nm0721526|       director|                null|          null|
|tt0000003|       2|nm1770680|       producer|            producer|          null|
|tt0000003|       3|nm1335271|       composer|                null|          null|
|tt0000003|       4|nm5442200|         editor|                null|          null|
|tt0

In [10]:
title_ratings_schema =  t.StructType([t.StructField('tconst', t.StringType(), True),
                                  t.StructField('averageRating', t.FloatType(), True),
                                  t.StructField('numVotes', t.IntegerType(), True)])

title_ratings_df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/data/title.ratings.tsv.gz", sep=r'\t', header=True, nullValue='\\N', schema=title_ratings_schema)
title_ratings_df.show()

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1925|
|tt0000002|          5.8|     261|
|tt0000003|          6.5|    1741|
|tt0000004|          5.6|     176|
|tt0000005|          6.2|    2554|
|tt0000006|          5.1|     175|
|tt0000007|          5.4|     797|
|tt0000008|          5.4|    2069|
|tt0000009|          5.3|     200|
|tt0000010|          6.9|    6992|
|tt0000011|          5.3|     357|
|tt0000012|          7.4|   12000|
|tt0000013|          5.7|    1850|
|tt0000014|          7.1|    5381|
|tt0000015|          6.2|    1035|
|tt0000016|          5.9|    1460|
|tt0000017|          4.6|     318|
|tt0000018|          5.3|     583|
|tt0000019|          5.1|      31|
|tt0000020|          4.8|     351|
+---------+-------------+--------+
only showing top 20 rows



In [None]:
def showAvailableInUA():
  titles_in_ua_df = title_akas_df.select(f.column('title')).where(f.col('region') == 'UA')
  titles_in_ua_df.show()
  path_to_save = '/content/drive/MyDrive/Colab Notebooks/output/titles_available_in_ukraine'
  titles_in_ua_df.write.options(header='True', delimiter=',').mode("overwrite").csv(path_to_save)
showAvailableInUA()
  

+--------------------+
|               title|
+--------------------+
|          Карменсіта|
|        Бідний П'єро|
|    Ковальська сцена|
|   Чхання Фреда Отта|
|Вихід робітників ...|
|Прибуття потяга н...|
|Прибуття делегаті...|
| Политий поливальник|
|     Навколо кабінки|
|              Ковалі|
|     Морське купання|
|      Партія в карти|
|Площа Кордельє в ...|
|Виловлювання черв...|
|  Сніданок немовляти|
|Стрибок через бре...|
|       Вольтижування|
|        Гра в сніжки|
|    Зруйнування муру|
|       Замок диявола|
+--------------------+
only showing top 20 rows



In [13]:
def getPeopleBornIn19Century():
  people_born_df = name_basics_df.select(f.column('primaryName').alias('name_surname')).where(f.year(name_basics_df.birthYear) >= 1800).where(f.year(name_basics_df.birthYear) < 1900)
  people_born_df.show()
  path_to_save = '/content/drive/MyDrive/Colab Notebooks/output/people_born_in_19_century'
  people_born_df.write.options(header='True', delimiter=',').mode("overwrite").csv(path_to_save)

getPeopleBornIn19Century()

+------------------+
|      name_surname|
+------------------+
|      Fred Astaire|
|   Humphrey Bogart|
|      James Cagney|
|  Alfred Hitchcock|
|     Buster Keaton|
|      Groucho Marx|
|Edward G. Robinson|
|    Randolph Scott|
|       Max Steiner|
|   Charles Chaplin|
|      Robert Ellis|
|      Robert Ellis|
|       Annie Rosar|
|         John Ford|
|     D.W. Griffith|
|     Boris Karloff|
|        Fritz Lang|
|       Bela Lugosi|
|   Edgar Allan Poe|
|   Judith Anderson|
+------------------+
only showing top 20 rows



In [None]:
def getMoviesLastTwoHours():
  movies_df = title_basics_df.select(f.column('originalTitle').alias('title')).where(f.col('runtimeMinutes') > 120).where(f.col('titleType') == 'movie')
  movies_df.show()
  path_to_save = '/content/drive/MyDrive/Colab Notebooks/output/get_movies_last_two_hours'
  movies_df.write.options(header='True', delimiter=',').mode("overwrite").csv(path_to_save)

getMoviesLastTwoHours()

+--------------------+
|               title|
+--------------------+
|What Happened to ...|
|The Adventures of...|
|            Atlantis|
|            Germinal|
|Les misérables - ...|
|The Active Life o...|
|The Beloved Adven...|
|             Cabiria|
|   L'enfant de Paris|
|The Exploits of E...|
|The Hazards of Helen|
|Lucille Love: The...|
|      The Master Key|
|The Perils of Pau...|
|The Port of Missi...|
|El signo de la tribu|
|  The Trey o' Hearts|
|The Birth of a Na...|
|       The Black Box|
|     The Broken Coin|
+--------------------+
only showing top 20 rows



In [None]:
def getNamesTitlesCharacters():
  names_df = name_basics_df.join(title_principals_df, ['nconst']).where(name_basics_df.nconst ==  title_principals_df.nconst).join(title_basics_df).where(title_basics_df.tconst == title_principals_df.tconst)
  names_df = names_df.where((names_df.category == 'actor') | (names_df.category == 'actress')).where(f.column('characters').isNotNull()).select(f.column('primaryName'), f.column('originalTitle'), f.column('characters'))
  names_df.show()
  path_to_save = '/content/drive/MyDrive/Colab Notebooks/output/get_names_titles_characters'
  names_df.write.options(header='True', delimiter=',').mode("overwrite").csv(path_to_save)

getNamesTitlesCharacters()

+--------------------+--------------------+--------------------+
|         primaryName|       originalTitle|          characters|
+--------------------+--------------------+--------------------+
|    Joseph Jefferson|Rip Leaving Sleep...|  ["Rip Van Winkle"]|
|Josef Sváb-Malost...|Výstavní párkar a...|  ["Sausage Vendor"]|
|      Ferdinand Gýra|Výstavní párkar a...|         ["Sticker"]|
|      Georges Méliès|   Évocation spirite|  ["L'illusioniste"]|
|       Emilia Márkus|             A táncz|          ["Salome"]|
|Ilona Hegedüsné B...|             A táncz|          ["Táncos"]|
|        Laura Bayley|  Mary Jane's Mishap|       ["Mary Jane"]|
|Gilbert M. 'Bronc...|The Messenger Boy...|   ["Messenger Boy"]|
|Robert Storm Pete...|En foræring til m...|        ["Petersen"]|
|       Gertie Potter|   That Fatal Sneeze|          ["Nephew"]|
|     Thurston Harris|   That Fatal Sneeze|           ["Uncle"]|
|  Anthony O'Sullivan|     A Famous Escape|        ["Prisoner"]|
|       D.W. Griffith|   

In [None]:
def getAdultTitlesPerRegion():
  titles_df = title_basics_df.join(title_akas_df, title_basics_df.tconst ==  title_akas_df.titleId, 'left').where(title_basics_df.isAdult == 1).groupBy('region').sum('isAdult').na.drop().sort(f.col('sum(isAdult)').desc())
  titles_df.show(100, truncate=False)
  path_to_save = '/content/drive/MyDrive/Colab Notebooks/output/get_adult_titles_per_region'
  titles_df.limit(100).write.options(header='True', delimiter=',').mode("overwrite").csv(path_to_save)

getAdultTitlesPerRegion()

+------+------------+
|region|sum(isAdult)|
+------+------------+
|US    |97092       |
|JP    |21498       |
|DE    |13351       |
|FR    |8169        |
|ES    |6403        |
|IT    |6047        |
|CA    |5506        |
|GB    |4836        |
|VE    |3686        |
|PT    |3571        |
|IN    |3246        |
|XWW   |2802        |
|NL    |2062        |
|BR    |1955        |
|CZ    |1600        |
|SE    |1445        |
|XWG   |1213        |
|HU    |878         |
|GR    |875         |
|DK    |821         |
|AU    |691         |
|FI    |428         |
|RU    |404         |
|TR    |376         |
|CH    |193         |
|AR    |177         |
|PL    |141         |
|MX    |141         |
|XEU   |139         |
|HK    |125         |
|SUHH  |110         |
|SK    |107         |
|BE    |97          |
|AT    |73          |
|GI    |72          |
|CY    |44          |
|KR    |39          |
|TW    |37          |
|PE    |36          |
|CO    |34          |
|PH    |30          |
|NO    |28          |
|CN    |25

In [None]:
title_episode_df = title_episode_df.withColumnRenamed("tconst", "ttconst")
def getSeriesEpisodes():
  series_df = title_basics_df.join(title_episode_df, title_basics_df.tconst == title_episode_df.parentTconst, 'inner').na.drop().groupBy( \
    'primaryTitle').count().sort(f.col('count').desc())
  series_df.show(50)
  path_to_save = '/content/drive/MyDrive/Colab Notebooks/output/get_count_series_episodes'
  series_df.limit(50).write.options(header='True', delimiter=',').mode("overwrite").csv(path_to_save)

getSeriesEpisodes()

+--------------------+-----+
|        primaryTitle|count|
+--------------------+-----+
|          Neighbours| 9505|
|    Ohayou Tokushima| 9502|
|        Love of Life| 7288|
|   The Edge of Night| 6588|
|          Judge Judy| 6302|
|        Charlie Rose| 6183|
|     Larry King Live| 6116|
|        Barátok közt| 6102|
|             Donahue| 5474|
|    The Secret Storm| 5173|
| Search for Tomorrow| 5067|
|          Crossroads| 5032|
|The Oprah Winfrey...| 4926|
|The Tonight Show ...| 4782|
|     Verbotene Liebe| 4664|
|         The Doctors| 4657|
|The Tonight Show ...| 4641|
|       Another World| 4468|
|               Maury| 4219|
|    One Life to Live| 4209|
|   Plus belle la vie| 4162|
|The Merv Griffin ...| 4150|
|Late Show with Da...| 4065|
|         Mr. Dressup| 3998|
|           Marienhof| 3983|
|     All My Children| 3674|
|           Jackanory| 3580|
|The Huntley-Brink...| 3529|
|         Family Feud| 3467|
|            I lampsi| 3457|
|The Ellen DeGener...| 3370|
|The Jeremy Ky

In [11]:
def roundUpDecade(year):
  return ((int(year) + 9) // 10 * 10) - 1


def getTop10perDecade():
  titles_df = title_basics_df.join(title_ratings_df, title_ratings_df.tconst == title_basics_df.tconst).where(\
    f.col('averageRating').isNotNull()).agg(f.min("startYear"), f.max("startYear"))
  titles_df = titles_df.select(f.date_format(f.col('min(startYear)'),"yyyy").alias("min(year)"), \
    f.date_format(f.col('max(startYear)'),"yyyy").alias("max(year)"))
  dat = titles_df.collect()[0]
  min_date = dat.__getitem__('min(year)')
  max_date = dat.__getitem__('max(year)')
  max_decade = str(roundUpDecade(max_date))
  min_decade_start = str(round(int(min_date), -1))
  min_decade_end = str(roundUpDecade(min_date))
  sql_start_decade_df = spark.sql(f"select explode(sequence(to_date('{min_decade_start}'), to_date('{max_decade}'), interval 10 year)) as startDecade")
  window_start_decade = Window.orderBy(f.col('startDecade'))
  df_start_decade_with_id = sql_start_decade_df.withColumn('id', f.row_number().over(window_start_decade))
  sql_end_decade_df =  spark.sql(f"select explode(sequence(to_date('{min_decade_end}'), to_date('{max_decade}'), interval 10 year)) as endDecade")
  window_end_decade = Window.orderBy(f.col('endDecade'))
  df_end_decade_with_id = sql_end_decade_df.withColumn('id', f.row_number().over(window_end_decade))
  sql_decade_df = df_start_decade_with_id.join(df_end_decade_with_id, df_end_decade_with_id.id == df_start_decade_with_id.id).drop(df_start_decade_with_id.id)
  decade_title_df = title_basics_df.join(sql_decade_df, title_basics_df.startYear >= sql_decade_df.startDecade).where(\
    title_basics_df.startYear <= sql_decade_df.endDecade).join(title_ratings_df, title_ratings_df.tconst == title_basics_df.tconst \
    ).where(f.col('averageRating').isNotNull())
  windowstartDecade = Window.partitionBy("startDecade").orderBy(f.col("averageRating").desc())
  decade_title_df = decade_title_df.withColumn("row", f.row_number().over(windowstartDecade)).filter( \
    f.col("row") <= 10).drop("row").sort(f.col('startDecade').desc() \
    ).select(['startDecade', 'endDecade', 'originalTitle', 'startYear', 'averageRating'])
  decade_title_df.show(100)
  path_to_save = '/content/drive/MyDrive/Colab Notebooks/output/get_top_10_per_decade'
  decade_title_df.write.options(header='True', delimiter=',').mode("overwrite").csv(path_to_save)

getTop10perDecade()


+-----------+----------+--------------------+----------+-------------+
|startDecade| endDecade|       originalTitle| startYear|averageRating|
+-----------+----------+--------------------+----------+-------------+
| 2020-01-01|2029-01-01|Une journée ordin...|2021-01-01|         10.0|
| 2020-01-01|2029-01-01|            The Dump|2021-01-01|         10.0|
| 2020-01-01|2029-01-01|        Loose Thread|2020-01-01|         10.0|
| 2020-01-01|2029-01-01|  Entree Des Artists|2020-01-01|         10.0|
| 2020-01-01|2029-01-01|  A Postcard to Nina|2020-01-01|         10.0|
| 2020-01-01|2029-01-01|    Greg Steltenpohl|2020-01-01|         10.0|
| 2020-01-01|2029-01-01|           Petscop 2|2021-01-01|         10.0|
| 2020-01-01|2029-01-01|             Goliath|2020-01-01|         10.0|
| 2020-01-01|2029-01-01|    The Good Balloon|2021-01-01|         10.0|
| 2020-01-01|2029-01-01|          Casteletes|2020-01-01|         10.0|
| 2010-01-01|2019-01-01|  Hammas hamba vastu|2019-01-01|         10.0|
| 2010

In [16]:
def getTop10perGenre():
  data_genres_count  = title_basics_df.filter(f.col('genres').isNotNull()).groupBy("genres").count()
  data_genres_count_np = data_genres_count.toPandas().to_numpy()
  genres_list = []
  for k in range(len(data_genres_count_np)):
    if data_genres_count_np[k][0] != None:
      for i in range(len(data_genres_count_np[k][0].split(','))):
        genres_list.append(data_genres_count_np[k][0].split(',')[i])
  genres_list = list(set(genres_list))
  temp_list = []
  for i in genres_list:
    temp_list.append((i, 0))
  df = spark.createDataFrame(temp_list, ['genre','column']).drop('column')
  title_df = title_basics_df.join(df, title_basics_df.genres.contains(df.genre), 'left').join( \
    title_ratings_df, title_ratings_df.tconst == title_basics_df.tconst \
    ).where(f.col('averageRating').isNotNull())
  windowTitle = Window.partitionBy("genre").orderBy(f.col("averageRating").desc())
  title_df = title_df.withColumn("row", f.row_number().over(windowTitle)).filter( \
    f.col("row") <= 10).drop("row").select(['genre','originalTitle', 'averageRating']).na.drop()
  title_df.show(150)
  path_to_save = '/content/drive/MyDrive/Colab Notebooks/output/get_top_10_per_genre'
  title_df.write.options(header='True', delimiter=',').mode("overwrite").csv(path_to_save)


getTop10perGenre()




+-----------+--------------------+-------------+
|      genre|       originalTitle|averageRating|
+-----------+--------------------+-------------+
|      Adult|         Renegades 2|         10.0|
|      Adult|  Girls Loving Girls|         10.0|
|      Adult|Innocent Bi-Standers|         10.0|
|      Adult|Super Video Vixen...|         10.0|
|      Adult|Execu-Comp #169 -...|         10.0|
|      Adult|Las putas de New ...|         10.0|
|      Adult|Lana & Theresa Ha...|         10.0|
|      Adult|So You Wanna Be a...|         10.0|
|      Adult|  Strap-a-Dick-to-Me|         10.0|
|      Adult|        Goo Girls 16|         10.0|
|  Adventure|Orpheus in the Un...|         10.0|
|  Adventure|Four Fabulous Cha...|         10.0|
|  Adventure|            Birdboys|         10.0|
|  Adventure|         Off the Map|         10.0|
|  Adventure|   Pilgrim's Vantage|         10.0|
|  Adventure|          Tunnellers|         10.0|
|  Adventure|             Shadows|         10.0|
|  Adventure|Hono no