In [10]:
import numpy as np
import pandas as pd
import os, glob
import pyspark
from pyspark.sql import SparkSession, types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

22/03/31 23:19:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [7]:
def read_data(file_name, schema=None):
    url = f'https://datasets.imdbws.com/{file_name}.tsv.gz'
    spark.sparkContext.addFile(url)
    df = spark.read \
            .option('header', 'true') \
            .csv('file://'+pyspark.SparkFiles.get(file_name+'.tsv.gz'), sep='\t', schema=schema)
    df.write.parquet('data/'+file_name, mode='overwrite')

In [7]:
title_schema = types.StructType([
    types.StructField('titleId',types.StringType(),True),
    types.StructField('ordering',types.IntegerType(),True),
    types.StructField('title',types.StringType(),True),
    types.StructField('region',types.StringType(),True),
    types.StructField('language',types.StringType(),True),
    types.StructField('types',types.StringType(),True),
    types.StructField('attributes',types.StringType(),True),
    types.StructField('isOriginalTitle',types.IntegerType(),True)
])

In [8]:
title_df = read_data('title.akas', 'data/title.akas', schema=title_schema)

22/03/24 18:30:48 WARN SparkContext: The path https://datasets.imdbws.com/title.akas.tsv.gz has been added already. Overwriting of added paths is not supported in the current version.
                                                                                

In [64]:
title_df.show()

+---------+--------+--------------------+------+--------+-----------+--------------------+---------------+
|  titleId|ordering|               title|region|language|      types|          attributes|isOriginalTitle|
+---------+--------+--------------------+------+--------+-----------+--------------------+---------------+
|tt0000001|       1|          Карменсіта|    UA|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       2|          Carmencita|    DE|      \N|         \N|       literal title|              0|
|tt0000001|       3|Carmencita - span...|    HU|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       4|          Καρμενσίτα|    GR|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       5|          Карменсита|    RU|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       6|          Carmencita|    US|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       7|          Carmenc

In [65]:
title_df.schema

StructType(List(StructField(titleId,StringType,true),StructField(ordering,IntegerType,true),StructField(title,StringType,true),StructField(region,StringType,true),StructField(language,StringType,true),StructField(types,StringType,true),StructField(attributes,StringType,true),StructField(isOriginalTitle,IntegerType,true)))

In [8]:
title_basics_schema = types.StructType([
    types.StructField('tconst',types.StringType(),True),
    types.StructField('titleType',types.StringType(),True),
    types.StructField('primaryTitle',types.StringType(),True),
    types.StructField('originalTitle',types.StringType(),True),
    types.StructField('isAdult',types.IntegerType(),True),
    types.StructField('startYear',types.IntegerType(),True),
    types.StructField('endYear',types.IntegerType(),True),
    types.StructField('runtimeMinutes',types.IntegerType(),True),
    types.StructField('genres',types.StringType(),True)
])

In [9]:
title_basics_df = read_data('title.basics', schema=title_basics_schema)

                                                                                

In [15]:
glob.glob('data/title.basics/*.parquet')[0]

'data/title.basics/part-00000-863fca20-65f7-40fe-88ff-a30b17f16c38-c000.snappy.parquet'

In [6]:
title_basics_df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|   null|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|   null|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|   null|             4|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|   null|            12|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|   null|             1|        Comedy

                                                                                

In [23]:
title_crew_schema = types.StructType([
    types.StructField('tconst',types.StringType(),True),
    types.StructField('directors',types.StringType(),True),
    types.StructField('writers',types.StringType(),True)
])

In [24]:
title_crew_df = read_data('title.crew', schema=title_crew_schema)

22/03/19 02:23:53 WARN SparkContext: The path https://datasets.imdbws.com/title.crew.tsv.gz has been added already. Overwriting of added paths is not supported in the current version.
[Stage 15:>                                                         (0 + 1) / 1]

(8778064, 3)


                                                                                

In [25]:
title_crew_df.show()

+---------+-------------------+---------+
|   tconst|          directors|  writers|
+---------+-------------------+---------+
|tt0000001|          nm0005690|       \N|
|tt0000002|          nm0721526|       \N|
|tt0000003|          nm0721526|       \N|
|tt0000004|          nm0721526|       \N|
|tt0000005|          nm0005690|       \N|
|tt0000006|          nm0005690|       \N|
|tt0000007|nm0005690,nm0374658|       \N|
|tt0000008|          nm0005690|       \N|
|tt0000009|          nm0085156|nm0085156|
|tt0000010|          nm0525910|       \N|
|tt0000011|          nm0804434|       \N|
|tt0000012|nm0525910,nm0525908|       \N|
|tt0000013|          nm0525910|       \N|
|tt0000014|          nm0525910|       \N|
|tt0000015|          nm0721526|       \N|
|tt0000016|          nm0525910|       \N|
|tt0000017|nm1587194,nm0804434|       \N|
|tt0000018|          nm0804434|       \N|
|tt0000019|          nm0932055|       \N|
|tt0000020|          nm0010291|       \N|
+---------+-------------------+---

In [26]:
title_crew_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- directors: string (nullable = true)
 |-- writers: string (nullable = true)



In [11]:
title_episode_schema = types.StructType([
    types.StructField('tconst',types.StringType(),True),
    types.StructField('parentTconst',types.StringType(),True),
    types.StructField('seasonNumber',types.IntegerType(),True),
    types.StructField('episodeNumber',types.IntegerType(),True)
])

In [12]:
pth = 'title.episode/'

In [15]:
read_data('title.episode', dest_file=pth, schema=title_episode_schema)

22/03/19 13:49:40 WARN SparkContext: The path https://datasets.imdbws.com/title.episode.tsv.gz has been added already. Overwriting of added paths is not supported in the current version.
                                                                                

In [32]:
title_episode_df.show(4)

+---------+------------+------------+-------------+
|   tconst|parentTconst|seasonNumber|episodeNumber|
+---------+------------+------------+-------------+
|tt0020666|  tt15180956|           1|            2|
|tt0020829|  tt15180956|           1|            1|
|tt0021166|  tt15180956|           1|            3|
|tt0021612|  tt15180956|           2|            2|
+---------+------------+------------+-------------+
only showing top 4 rows



In [33]:
title_episode_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- parentTconst: string (nullable = true)
 |-- seasonNumber: integer (nullable = true)
 |-- episodeNumber: integer (nullable = true)



In [43]:
title_principals_schema = types.StructType([
    types.StructField('tconst',types.StringType(),True),
    types.StructField('ordering',types.IntegerType(),True),
    types.StructField('nconst',types.StringType(),True),
    types.StructField('category',types.StringType(),True),
    types.StructField('job',types.StringType(),True),
    types.StructField('characters',types.StringType(),True)
])

In [44]:
title_principals_df = read_data('title.principals', schema=title_principals_schema)

22/03/19 02:39:11 WARN SparkContext: The path https://datasets.imdbws.com/title.principals.tsv.gz has been added already. Overwriting of added paths is not supported in the current version.
[Stage 29:>                                                         (0 + 1) / 1]

(49414516, 6)


                                                                                

In [45]:
title_principals_df.show(4)

+---------+--------+---------+---------------+--------------------+----------+
|   tconst|ordering|   nconst|       category|                 job|characters|
+---------+--------+---------+---------------+--------------------+----------+
|tt0000001|       1|nm1588970|           self|                  \N|  ["Self"]|
|tt0000001|       2|nm0005690|       director|                  \N|        \N|
|tt0000001|       3|nm0374658|cinematographer|director of photo...|        \N|
|tt0000002|       1|nm0721526|       director|                  \N|        \N|
+---------+--------+---------+---------------+--------------------+----------+
only showing top 4 rows



In [46]:
title_principals_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- nconst: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)



In [4]:
title_rating_schema = types.StructType([
    types.StructField('tconst',types.StringType(),True),
    types.StructField('averageRating',types.DoubleType(),True),
    types.StructField('numVotes',types.IntegerType(),True)
])

In [5]:
title_ratings_df = read_data('title.ratings', schema=title_rating_schema)

[Stage 0:>                                                          (0 + 1) / 1]

(1225773, 3)


                                                                                

In [6]:
title_ratings_df.show(5)

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1868|
|tt0000002|          5.9|     247|
|tt0000003|          6.5|    1639|
|tt0000004|          5.8|     159|
|tt0000005|          6.2|    2463|
+---------+-------------+--------+
only showing top 5 rows



In [7]:
title_ratings_df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)



In [10]:
print(title_basics_df.count(), len(title_basics_df.columns()))

                                                                                

8778064

In [3]:
spark.sparkContext.addFile('https://datasets.imdbws.com/name.basics.tsv.gz')

In [10]:
name_schema = types.StructType([
    types.StructField('nconst',types.StringType(), True),
    types.StructField('primaryName',types.StringType(),True),
    types.StructField('birthYear',types.IntegerType(),True),
    types.StructField('deathYear',types.IntegerType(),True),
    types.StructField('primaryProfession',types.StringType(),True),
    types.StructField('knownForTitles',types.StringType(),True)
])

In [42]:
df_1 = spark.read\
        .option("header", "true")\
        .csv('file://'+pyspark.SparkFiles.get('title.akas.tsv.gz'), sep='\t')

In [43]:
df_1.schema

StructType(List(StructField(titleId,StringType,true),StructField(ordering,StringType,true),StructField(title,StringType,true),StructField(region,StringType,true),StructField(language,StringType,true),StructField(types,StringType,true),StructField(attributes,StringType,true),StructField(isOriginalTitle,StringType,true)))

In [44]:
df_1.show()

+---------+--------+--------------------+------+--------+-----------+--------------------+---------------+
|  titleId|ordering|               title|region|language|      types|          attributes|isOriginalTitle|
+---------+--------+--------------------+------+--------+-----------+--------------------+---------------+
|tt0000001|       1|          Карменсіта|    UA|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       2|          Carmencita|    DE|      \N|         \N|       literal title|              0|
|tt0000001|       3|Carmencita - span...|    HU|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       4|          Καρμενσίτα|    GR|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       5|          Карменсита|    RU|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       6|          Carmencita|    US|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       7|          Carmenc

In [5]:
df_1.schema

StructType(List(StructField(nconst,StringType,true),StructField(primaryName,StringType,true),StructField(birthYear,StringType,true),StructField(deathYear,StringType,true),StructField(primaryProfession,StringType,true),StructField(knownForTitles,StringType,true)))

In [16]:
name_pth = f'data/name/'

In [26]:
df_1.repartition(7)\
    .write.parquet(name_pth, mode='overwrite')

                                                                                

In [5]:
name_basics_schema = types.StructType([
    types.StructField('nconst',types.StringType(),True),
    types.StructField('primaryName',types.StringType(),True),
    types.StructField('birthYear',types.IntegerType(),True),
    types.StructField('deathYear',types.IntegerType(),True),
    types.StructField('primaryProfession',types.StringType(),True),
    types.StructField('knownForTitles',types.StringType(),True)
])

In [27]:
!ls -lh 'data/name'

total 379M
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53   0 Mar 19 01:19 _SUCCESS
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 55M Mar 19 01:19 part-00000-84fbbced-7ae2-42d0-a241-25b9a0bdeb8b-c000.snappy.parquet
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 55M Mar 19 01:19 part-00001-84fbbced-7ae2-42d0-a241-25b9a0bdeb8b-c000.snappy.parquet
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 55M Mar 19 01:19 part-00002-84fbbced-7ae2-42d0-a241-25b9a0bdeb8b-c000.snappy.parquet
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 55M Mar 19 01:19 part-00003-84fbbced-7ae2-42d0-a241-25b9a0bdeb8b-c000.snappy.parquet
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 55M Mar 19 01:19 part-00004-84fbbced-7ae2-42d0-a241-25b9a0bdeb8b-c000.snappy.parquet
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 55M Mar 19 01:19 part-00005-84fbbced-7ae2-42d0-a241-25b9a0bdeb8b-c000.snappy.parquet
-rw-r--r-- 1 oseghalepatrick53 oseghalepatrick53 55M Mar 19 01:19 part-00006-84fbbced-7ae2-42d0-a241-25b9a0bde