# Globals

In [0]:
# https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

from pyspark.sql.functions import *

import os
ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
SECRET_KEY = os.getenv('AWS_SECRET_KEY')

# Import IMDB data

In [0]:
# Mount AIDA-bucket with IMDB data

AWS_BUCKET_SOURCE = 'aida-project'
MOUNT_SOURCE = '/mnt/data_source'

dbutils.fs.mount(f's3a://{ACCESS_KEY}:{SECRET_KEY.replace("/", "%2F")}@{AWS_BUCKET_SOURCE}', MOUNT_SOURCE)
display(dbutils.fs.ls(f'dbfs:{MOUNT_SOURCE}/TSV'))

path,name,size
dbfs:/mnt/data_source/TSV/name.basics.tsv,name.basics.tsv,579976550
dbfs:/mnt/data_source/TSV/title.akas.tsv,title.akas.tsv,969441812
dbfs:/mnt/data_source/TSV/title.basics.tsv,title.basics.tsv,537519832
dbfs:/mnt/data_source/TSV/title.principals.tsv,title.principals.tsv,1622240736
dbfs:/mnt/data_source/TSV/title.ratings.tsv,title.ratings.tsv,16907124


In [0]:
for file in dbutils.fs.ls(f'{MOUNT_SOURCE}/TSV'):
  print('###', file.name, f'(size: {file.size//(1024*1024):,} MB)', '#'*50)
  print(dbutils.fs.head(file.path, 1000) + '...\n')

In [0]:
tables = {}
for file in dbutils.fs.ls(f'{MOUNT_SOURCE}/TSV'):
  table = {'name.basics.tsv':'names', 'title.basics.tsv':'titles', 'title.akas.tsv':'akas',
    'title.principals.tsv':'principals', 'title.ratings.tsv':'ratings'}[file.name]
  print('Loading', file.path, "-> tables['" + table + "']")
  tables[table] = spark.read.csv(file.path, header=True, inferSchema=True, sep='\t', nullValue=r'\N')
  print(table, f'({tables[table].count():,} rows)', end=': '); tables[table].printSchema()

In [0]:
for table in tables: print('###', table, '#'*50); tables[table].summary().show()

In [0]:
tables['ratings'].groupby('numVotes').count().sort('numVotes').display()

numVotes,count
5,77086
6,68773
7,56060
8,46448
9,39107
10,33571
11,28910
12,25362
13,22613
14,20385


# Data cleaning

## titles.isAdult

In [0]:
# Looking at table-summaries above, we saw some possible data-corruption in the titles table. Look deeper into this

bad_rows = [row.tconst for row in tables['titles'].where('isAdult NOT IN (0, 1)').collect()]
print(bad_rows)

In [0]:
tables['titles'].filter(col('tconst').isin(bad_rows)).display()

tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
tt10233364,tvEpisode,"""Rolling in the Deep Dish	""Rolling in the Deep Dish",0,2019,,,Reality-TV,
tt10925142,tvEpisode,"""The IMDb Show on Location: Star Wars Galaxy's Edge	""The IMDb Show on Location: Star Wars Galaxy's Edge",0,2019,,,Talk-Show,
tt10970874,tvEpisode,"""Die Bauhaus-Stadt Tel Aviv - Vorbild für die Metropolen der Moderne?	""Die Bauhaus-Stadt Tel Aviv - Vorbild für die Metropolen der Moderne?",0,2019,,,,
tt2347742,tvEpisode,"""No sufras por la alergia esta primavera	""No sufras por la alergia esta primavera",0,2004,,,,
tt3984412,tvEpisode,"""I'm Not Going to Come Last, I'm Just Going to Die on The Amazing Race	""I'm Not Going to Come Last, I'm Just Going to Die on The Amazing Race",0,2014,,,Reality-TV,
tt7841930,tvEpisode,"""Stop and Hear the Cicadas/Cold-Blooded	""Stop and Hear the Cicadas/Cold-Blooded",0,2018,,24.0,"Animation,Family",
tt8740950,tvEpisode,"""Weight Loss Resolution Restart - Ins & Outs of Menopause, PeriMenopause & Hormones	""Weight Loss Resolution Restart - Ins & Outs of Menopause, PeriMenopause & Hormones",0,2015,,,Reality-TV,
tt9822816,tvEpisode,"""Zwischen Vertuschung und Aufklärung - Missbrauchsgipfel im Vatikan	""Zwischen Vertuschung und Aufklärung - Missbrauchsgipfel im Vatikan",0,2019,,,,
tt9900062,tvEpisode,"""The Direction of Yuu's Love: Hings Aren't Going As Planned	""The Direction of Yuu's Love: Hings Aren't Going As Planned",0,1994,,,"Animation,Comedy,Drama",
tt9909210,tvEpisode,"""Politik und/oder Moral - Wie weit geht das Vertrauen der Bürger?	""Politik und/oder Moral - Wie weit geht das Vertrauen der Bürger?",0,2005,,,,


In [0]:
tables['titles'] = tables['titles'].filter(col('tconst').isin(bad_rows) == False).unionAll(
  tables['titles'].filter(col('tconst').isin(bad_rows)) \
    .withColumn('genres', col('runtimeMinutes')).withColumn('runtimeMinutes', col('endYear')) \
    .withColumn('endYear', col('startYear')).withColumn('startYear', col('isAdult')) \
    .withColumn('isAdult', col('originalTitle')) \
    .withColumn('originalTitle', array_join(slice(split('primaryTitle', '\t'), 2, 1), '')) \
    .withColumn('primaryTitle', array_join(slice(split('primaryTitle', '\t'), 1, 1), '')) \
 )

tables['titles'].filter(col('tconst').isin(bad_rows)).display()

tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
tt10233364,tvEpisode,"""Rolling in the Deep Dish","""Rolling in the Deep Dish",0,2019,,,Reality-TV
tt10925142,tvEpisode,"""The IMDb Show on Location: Star Wars Galaxy's Edge","""The IMDb Show on Location: Star Wars Galaxy's Edge",0,2019,,,Talk-Show
tt10970874,tvEpisode,"""Die Bauhaus-Stadt Tel Aviv - Vorbild für die Metropolen der Moderne?","""Die Bauhaus-Stadt Tel Aviv - Vorbild für die Metropolen der Moderne?",0,2019,,,
tt2347742,tvEpisode,"""No sufras por la alergia esta primavera","""No sufras por la alergia esta primavera",0,2004,,,
tt3984412,tvEpisode,"""I'm Not Going to Come Last, I'm Just Going to Die on The Amazing Race","""I'm Not Going to Come Last, I'm Just Going to Die on The Amazing Race",0,2014,,,Reality-TV
tt7841930,tvEpisode,"""Stop and Hear the Cicadas/Cold-Blooded","""Stop and Hear the Cicadas/Cold-Blooded",0,2018,,24.0,"Animation,Family"
tt8740950,tvEpisode,"""Weight Loss Resolution Restart - Ins & Outs of Menopause, PeriMenopause & Hormones","""Weight Loss Resolution Restart - Ins & Outs of Menopause, PeriMenopause & Hormones",0,2015,,,Reality-TV
tt9822816,tvEpisode,"""Zwischen Vertuschung und Aufklärung - Missbrauchsgipfel im Vatikan","""Zwischen Vertuschung und Aufklärung - Missbrauchsgipfel im Vatikan",0,2019,,,
tt9900062,tvEpisode,"""The Direction of Yuu's Love: Hings Aren't Going As Planned","""The Direction of Yuu's Love: Hings Aren't Going As Planned",0,1994,,,"Animation,Comedy,Drama"
tt9909210,tvEpisode,"""Politik und/oder Moral - Wie weit geht das Vertrauen der Bürger?","""Politik und/oder Moral - Wie weit geht das Vertrauen der Bürger?",0,2005,,,


## titles.runtimeMinutes

In [0]:
# Check we now have valid (numeric) runtimes

tables['titles'].where('runtimeMinutes NOT BETWEEN 0 AND 50000').display()

tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
tt10844584,movie,100,100,0,2019,,59460,Drama
tt8273150,movie,Logistics,Logistics,0,2012,,51420,Documentary


In [0]:
# Yeah, well, uh, good enough. Data quality of IMDB leaves room for improvement.
# But the data type is OK. So we can now cast that column to INTEGER.

tables['titles'] = tables['titles'].withColumn('runtimeMinutes', col('runtimeMinutes').cast('integer'))
print(end='titles: '); tables['titles'].printSchema()

# Export data for ML-steps

In [0]:
# Mount our project bucket for exporting data

AWS_BUCKET_TARGET = 'sjf-project'
MOUNT_TARGET = '/mnt/data_target'

dbutils.fs.mount(f's3a://{ACCESS_KEY}:{SECRET_KEY.replace("/", "%2F")}@{AWS_BUCKET_TARGET}', MOUNT_TARGET)
display(dbutils.fs.ls(f'dbfs:{MOUNT_TARGET}'))

path,name,size
dbfs:/mnt/data_target/julian_genres.tsv,julian_genres.tsv,245132145


In [0]:
def export_tsv(table, filepath):
  print(f'Exporting data to {filepath}...')
  table.coalesce(1).write.csv(f'{filepath}.tmp', header=True, sep='\t', mode='overwrite')
  print('Done exporting, moving to correct path...')
  for file in dbutils.fs.ls(f'{filepath}.tmp'):
    if (file.name.endswith('.csv')): dbutils.fs.mv(file.path, filepath)
  dbutils.fs.rm(f'{filepath}.tmp', recurse=True)
  print('Done. Here\'s the file:')
  display(dbutils.fs.ls(filepath))

# Work in progress
## (Please put your safety helmet on)

In [0]:
# Next steps
# Simon - "Verrutscher" in titles analysieren und beheben? -> DONE
# Julian - Analyse genres
#   titles[tconst, titleType, startYear, endYear, genre] -> Simon exportiert für Julian -> DONE
# Falk - Verifikation vorgegebenes Datenschema <-> inferredSchema (z.B.: titles.isAdult kann nicht stimmen)

In [0]:
julian = tables['titles'].select('tconst, titleType, startYear, endYear, genres'.split(', '))
julian.printSchema()

# Export Julians data to TSV.
export_tsv(julian, f'dbfs:{MOUNT_TARGET}/julian_genres.tsv')

path,name,size
dbfs:/mnt/data_target/julian_genres.tsv,julian_genres.tsv,245132145


In [0]:
# Julian's work goes here:
# Just in case it is needed later - the file is located here: s3://sjf-project/julian_genres.tsv
# I will use the 'julian' object defined before...

#julian.show(500)

nogenre = julian.filter(julian.genres.isNull())#.sort(julian.tconst)
nogenre.show()

In [0]:
#######################
#     Falks Space     #
#######################

In [0]:
print([table for table in tables])
print(tables['ratings'].count())

In [0]:
tables['titles'].groupBy("isAdult").count().show()

In [0]:
tables['titles'].filter(title_basics["isAdult"]>1).show(truncate=False)

In [0]:
title_basics_filtered.show(truncate=False)