In [1]:
import pyspark.sql.types as t
import pyspark.sql.functions as f
from pyspark import SparkConf
from pyspark.sql import SparkSession, Window

# **Table of Contents**

<div class="alert alert-block alert-info" style="margin-top: 20px">

1. [Dataset description](#1)
2. [Configuration of Spark](#2)
3. [Data loading](#3)
4. [Data preprocessing](#4)
5. [Transformation Stage](#5)
    - [Get the general information about dataset. (Such as Schema, Columns,
number of columns and rows etc.) Describe the dataset using this
information](#5.1)
    - [Get statistics about numerical columns. Describe the dataset using this
information. How could this be useful for further analysis?](#5.2)
    - [Come up with your own 6 or more business questions to the data](#5.3)
        - [How many ukrainian titles in akas dataframe?](#5.3.1)
        - [Get only those titles which contain 'Short' as genre](#5.3.2)
        - [Which Documentary title has the most numVotes?](#5.3.3)
        - [Which region is the most common?](#5.3.4)
        - [Get title with the highest average rating](#5.3.5)
        - [Get title with the highest number of episodes](#5.3.6)
        - [Get title of the top 1 TV Series based on the average user ratings and the number of votes they received?](#5.3.7)
        - [Get title of the top 1 TV Series with the most extended runtime per episode?](#5.3.8)
    
</div>

# **1. Dataset description** <a id='1'></a>

## **Dataset Description (IMDb dataset)**

Each dataset is contained in a gzipped, tab-separated-values (TSV) formatted file in the UTF-8 character set. The first line in each file contains headers that describe what is in each column. A ‘\N’ is used to denote that a particular field is missing or null for that title/name

### **Files**
* **name.basics.tsv** - 
* **title.akas.tsv** - 
* **title.basics.tsv** - 
* **title.crew.tsv** - 
* **title.episode.tsv** - 
* **title.principals.tsv** - 
* **title.ratings.tsv** - 

### **name.basics.tsv**

|      Name         |    Type          |   Description                                     |
| -------------     | ---------------- | ------------------------------------------------- |
|     nconst        |   string         | alphanumeric unique identifier of the name/person |
|   primaryName     | string           | name by which the person is most often credited   |
|     birthYear     | in YYYY format   | birth year                                        |
|     deathYear     | in YYYY format   | if applicable, else '\N'                          |
| primaryProfession | array of strings | the top-3 professions of the person               |
|   knownForTitles  | array of tconsts | titles the person is known for                    |

### **title.akas.tsv**

|      Name       |    Type   |   Description    |
| -------------   | --------- | ---------------- |
| titleId         | string    | a tconst, an alphanumeric unique identifier of the title |
| ordering        | integer   | a number to uniquely identify rows for a given titleId |
| title           | string    | the localized title |
| region          | string    | the region for this version of the title |
| language        | string    | the language of the title |
| types           | array     | Enumerated set of attributes for this alternative title. One or more of the following: "alternative", "dvd", "festival", "tv", "video", "working", "original", "imdbDisplay". New values may be added in the future without warning |
| attributes      | array     | Additional terms to describe this alternative title, not enumerated |
| isOriginalTitle | boolean   | 0: not original title; 1: original title |

### **title.basics.tsv**

|      Name      |    Type      |   Description   |
| -------------  | ------------ | ----------------|
| tconst         | string       | alphanumeric unique identifier of the title |
| titleType      | string       | the type/format of the title (e.g. movie, short, tvseries, tvepisode, video, etc) |
| primaryTitle   | string       | the more popular title / the title used by the filmmakers on promotional materials at the point of release |
| originalTitle  | string       | original title, in the original language |
| isAdult        | boolean      | 0: non-adult title; 1: adult title |
| startYear      | YYYY         | represents the release year of a title. In the case of TV Series, it is the series start year |
| endYear        | YYYY         | TV Series end year. ‘\N’ for all other title types |
| runtimeMinutes | integer      | primary runtime of the title, in minutes |
| genres         | string array | includes up to three genres associated with the title |

### **title.crew.tsv**

|      Name     |    Type          |   Description                               |
| ------------- | ---------------  | ------------------------------------------- |
| tconst        | string           | alphanumeric unique identifier of the title |
| directors     | array of nconsts | director(s) of the given title              |
| writers       | array of nconsts | writer(s) of the given title                |

### **title.episode.tsv**

|      Name     |    Type   |   Description                                   |
| ------------- | --------- | ----------------------------------------------- |
| tconst        | string    | alphanumeric identifier of episode              |
| parentTconst  | string    | alphanumeric identifier of the parent TV Series |
| seasonNumber  | integer   | season number the episode belongs to            |
| episodeNumber | integer   | episode number of the tconst in the TV series   |

### **title.principals.tsv**

|      Name     |    Type   |   Description                                             |
| ------------- | --------- | --------------------------------------------------------- |
| tconst        | string    | alphanumeric unique identifier of the title               |
| ordering      | integer   | a number to uniquely identify rows for a given titleId    |
| nconst        | string    | alphanumeric unique identifier of the name/person         |
| category      | string    | the category of job that person was in                    |
| job           | string    | the specific job title if applicable, else '\N'           |
| characters    | string    | the name of the character played if applicable, else '\N' |

### **title.ratings.tsv**

|      Name     |    Type   |   Description                                        |
| ------------- | --------- | ---------------------------------------------------- |
| tconst        | string    | alphanumeric unique identifier of the title          |
| averageRating | float     | weighted average of all the individual user ratings  |
| numVotes      |  integer  | number of votes the title has received               |


# **2. Configuration of Spark** <a id='2'></a>

In [2]:
path = 'C:/files/grid-dynamics'

In [3]:
spark_session = (
    SparkSession.builder
        .master('local')
        .appName('project')
        .config(conf=SparkConf())
        .getOrCreate()
)

spark_session.conf.set('spark.sql.repl.eagerEval.enabled', True)

Defining the schemas

In [4]:
name_basics_schema = t.StructType([
    t.StructField('nconst', t.StringType(), True),
    t.StructField('primaryName', t.StringType(), True),
    t.StructField('birthYear', t.DateType(), True),
    t.StructField('deathYear', t.DateType(), True),
    t.StructField('primaryProfession', t.StringType(), True),
    t.StructField('knownForTitles', t.StringType(), True)
])

akas_schema = t.StructType([
    t.StructField('titleId', t.StringType(), True),
    t.StructField('ordering', t.IntegerType(), True),
    t.StructField('title', t.StringType(), True),
    t.StructField('region', t.StringType(), True),
    t.StructField('language', t.StringType(), True),
    t.StructField('types', t.StringType(), True),
    t.StructField('attributes', t.StringType(), True),
    t.StructField('isOriginalTitle', t.BooleanType(), True)
])

title_basics_schema = t.StructType([
    t.StructField('tconst', t.StringType(), True),
    t.StructField('titleType', t.StringType(), True),
    t.StructField('primaryTitle', t.StringType(), True),
    t.StructField('originalTitle', t.StringType(), True),
    t.StructField('isAdult', t.BooleanType(), True),
    t.StructField('startYear', t.DateType(), True),
    t.StructField('endYear', t.DateType(), True),
    t.StructField('runtimeMinutes', t.IntegerType(), True),
    t.StructField('genres', t.StringType(), True),
])

crew_schema = t.StructType([
    t.StructField('tconst', t.StringType(), True),
    t.StructField('directors', t.StringType(), True),
    t.StructField('writers', t.StringType(), True),
])

episode_schema = t.StructType([
    t.StructField('tconst', t.StringType(), True),
    t.StructField('parentTconst', t.StringType(), True),
    t.StructField('seasonNumber', t.IntegerType(), True),
    t.StructField('episodeNumber', t.IntegerType(), True),
])

principals_schema = t.StructType([
    t.StructField('tconst', t.StringType(), True),
    t.StructField('ordering', t.IntegerType(), True),
    t.StructField('nconst', t.StringType(), True),
    t.StructField('category', t.StringType(), True),
    t.StructField('job', t.StringType(), True),
    t.StructField('characters', t.StringType(), True)
])

ratings_schema = t.StructType([
    t.StructField('tconst', t.StringType(), True),
    t.StructField('averageRating', t.FloatType(), True),
    t.StructField('numVotes', t.IntegerType(), True)
])

# **3. Data loading** <a id='3'></a>

Reading the dataset

In [5]:
name_basics = spark_session.read.csv(f'{path}/name.basics.tsv', 
                                    sep=r'\t', 
                                    header=True,
                                    nullValue='null',
                                    schema=name_basics_schema,
                                    dateFormat='yyyy')

akas = spark_session.read.csv(f'{path}/title.akas.tsv', 
                                    sep=r'\t', 
                                    header=True,
                                    nullValue='null',
                                    schema=akas_schema)

title_basics = spark_session.read.csv(f'{path}/title.basics.tsv', 
                                    sep=r'\t', 
                                    header=True,
                                    nullValue='null',
                                    schema=title_basics_schema,
                                    dateFormat='yyyy')

crew = spark_session.read.csv(f'{path}/title.crew.tsv', 
                                    sep=r'\t', 
                                    header=True,
                                    nullValue='null',
                                    schema=crew_schema)

episode = spark_session.read.csv(f'{path}/title.episode.tsv', 
                                    sep=r'\t', 
                                    header=True,
                                    nullValue='null',
                                    schema=episode_schema)

principals = spark_session.read.csv(f'{path}/title.principals.tsv', 
                                    sep=r'\t', 
                                    header=True,
                                    nullValue='null',
                                    schema=principals_schema)

ratings = spark_session.read.csv(f'{path}/title.ratings.tsv', 
                                    sep=r'\t', 
                                    header=True,
                                    nullValue='null',
                                    schema=ratings_schema)

# **4. Data preprocessing** <a id='4'></a>

Replacing '\N' with null value

In [6]:
name_basics = name_basics.withColumn('primaryProfession', f.when(f.col('primaryProfession') == r'\N', None).otherwise(f.col('primaryProfession')))
name_basics = name_basics.withColumn('knownForTitles', f.when(f.col('knownForTitles') == r'\N', None).otherwise(f.col('knownForTitles')))

title_basics = title_basics.withColumn('genres', f.when(f.col('genres') == r'\N', None).otherwise(f.col('genres')))

crew = crew.withColumn('writers', f.when(f.col('writers') == r'\N', None).otherwise(f.col('writers')))
crew = crew.withColumn('directors', f.when(f.col('directors') == r'\N', None).otherwise(f.col('directors')))

principals = principals.withColumn('job', f.when(f.col('job') == r'\N', None).otherwise(f.col('job')))
principals = principals.withColumn('characters', f.when(f.col('characters') == r'\N', None).otherwise(f.col('characters')))

akas = akas.withColumn('region', f.when(f.col('region') == r'\N', None).otherwise(f.col('region')))
akas = akas.withColumn('language', f.when(f.col('language') == r'\N', None).otherwise(f.col('language')))
akas = akas.withColumn('types', f.when(f.col('types') == r'\N', None).otherwise(f.col('types')))
akas = akas.withColumn('attributes', f.when(f.col('attributes') == r'\N', None).otherwise(f.col('attributes')))



Converting strings to array of strings

In [7]:
name_basics = name_basics.withColumn('primaryProfession', f.split('primaryProfession', ','))
name_basics = name_basics.withColumn('knownForTitles', f.split('knownForTitles', ','))

title_basics = title_basics.withColumn('genres', f.split('genres', ','))

crew = crew.withColumn('directors', f.split('directors', ','))
crew = crew.withColumn('writers', f.split('writers', ','))

Removing unnecessary symbols

In [8]:
principals = principals.withColumn('characters', f.regexp_replace('characters', r'[\[\]"]', ''))

# **5. Transformation Stage** <a id='5'></a>

## **1. Get the general information about dataset. (Such as Schema, Columns, number of columns and rows etc.) Describe the dataset using this information** <a id='5.1'></a>

In [9]:
datasets = [(name_basics, 'name_basics'), (akas, 'akas'), (title_basics, 'title_basics'), (crew, 'crew'), 
            (episode, 'episode'), (principals, 'principals'), (ratings, 'ratings')]

In [10]:
print('Schemas\n')

for dataset, name in datasets:
    print(f'Dataset: {name}')
    print(dataset.printSchema())

Schemas

Dataset: name_basics
root
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: date (nullable = true)
 |-- deathYear: date (nullable = true)
 |-- primaryProfession: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- knownForTitles: array (nullable = true)
 |    |-- element: string (containsNull = false)

None
Dataset: akas
root
 |-- titleId: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- region: string (nullable = true)
 |-- language: string (nullable = true)
 |-- types: string (nullable = true)
 |-- attributes: string (nullable = true)
 |-- isOriginalTitle: boolean (nullable = true)

None
Dataset: title_basics
root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: boolean (nullable = true)
 |-- startYear: date (nulla

In [11]:
print('Columns\n')

for dataset, name in datasets:
    print(f'Dataset: {name}: {dataset.columns}, number of columns: {len(dataset.columns)}, number of rows: {dataset.count()}\n')

Columns

Dataset: name_basics: ['nconst', 'primaryName', 'birthYear', 'deathYear', 'primaryProfession', 'knownForTitles'], number of columns: 6, number of rows: 12698644

Dataset: akas: ['titleId', 'ordering', 'title', 'region', 'language', 'types', 'attributes', 'isOriginalTitle'], number of columns: 8, number of rows: 36642385

Dataset: title_basics: ['tconst', 'titleType', 'primaryTitle', 'originalTitle', 'isAdult', 'startYear', 'endYear', 'runtimeMinutes', 'genres'], number of columns: 9, number of rows: 10017011

Dataset: crew: ['tconst', 'directors', 'writers'], number of columns: 3, number of rows: 10019208

Dataset: episode: ['tconst', 'parentTconst', 'seasonNumber', 'episodeNumber'], number of columns: 4, number of rows: 7620834

Dataset: principals: ['tconst', 'ordering', 'nconst', 'category', 'job', 'characters'], number of columns: 6, number of rows: 57243863

Dataset: ratings: ['tconst', 'averageRating', 'numVotes'], number of columns: 3, number of rows: 1331252



## **2. Get statistics about numerical columns. Describe the dataset using this information. How could this be useful for further analysis?** <a id='5.2'></a>

In [12]:
akas.describe(['ordering'])

summary,ordering
count,36642385.0
mean,4.1506784015287215
stddev,3.939129056143955
min,1.0
max,249.0


In [13]:
title_basics.describe(['runtimeMinutes'])

summary,runtimeMinutes
count,2970701.0
mean,43.50585333226064
stddev,81.48975978468617
min,0.0
max,59460.0


In [14]:
episode.describe(['seasonNumber', 'episodeNumber'])

summary,seasonNumber,episodeNumber
count,6057523.0,6057523.0
mean,3.739018902610853,402.8005742611295
stddev,24.44167563757548,1306.4123181308778
min,1.0,0.0
max,2021.0,97251.0


In [15]:
principals.describe(['ordering'])

summary,ordering
count,57243863.0
mean,4.610154489399152
stddev,2.7856930662973065
min,1.0
max,10.0


In [16]:
ratings.describe(['averageRating', 'numVotes'])

summary,averageRating,numVotes
count,1331252.0,1331252.0
mean,6.955515265382489,1038.735545937208
stddev,1.3822064108967602,17489.990386451816
min,1.0,5.0
max,10.0,2767112.0


## **3. Come up with your own 6 or more business questions to the data** <a id='5.3'></a>

In [17]:
for dataset, name in datasets:
    print(f'Dataset: {name}')
    print(dataset.show())

Dataset: name_basics
+---------+-------------------+----------+----------+--------------------+--------------------+
|   nconst|        primaryName| birthYear| deathYear|   primaryProfession|      knownForTitles|
+---------+-------------------+----------+----------+--------------------+--------------------+
|nm0000001|       Fred Astaire|1899-01-01|1987-01-01|[soundtrack, acto...|[tt0053137, tt007...|
|nm0000002|      Lauren Bacall|1924-01-01|2014-01-01|[actress, soundtr...|[tt0117057, tt007...|
|nm0000003|    Brigitte Bardot|1934-01-01|      null|[actress, soundtr...|[tt0054452, tt004...|
|nm0000004|       John Belushi|1949-01-01|1982-01-01|[actor, soundtrac...|[tt0078723, tt008...|
|nm0000005|     Ingmar Bergman|1918-01-01|2007-01-01|[writer, director...|[tt0069467, tt005...|
|nm0000006|     Ingrid Bergman|1915-01-01|1982-01-01|[actress, soundtr...|[tt0034583, tt003...|
|nm0000007|    Humphrey Bogart|1899-01-01|1957-01-01|[actor, soundtrac...|[tt0037382, tt004...|
|nm0000008|      Ma

How many ukrainian titles in akas dataframe? <a id='5.3.1'></a>

In [18]:
akas.filter(akas.region == 'UA').count()

27365

Get only first 5 those titles which contain 'Short' as genre <a id='5.3.2'></a>

In [19]:
title_basics.filter(f.array_contains(title_basics.genres, 'Short')).limit(5).show()

+---------+---------+--------------------+--------------------+-------+----------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult| startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+----------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|   null|1894-01-01|   null|             1|[Documentary, Short]|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|   null|1892-01-01|   null|             5|  [Animation, Short]|
|tt0000004|    short|         Un bon bock|         Un bon bock|   null|1892-01-01|   null|            12|  [Animation, Short]|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|   null|1893-01-01|   null|             1|     [Comedy, Short]|
|tt0000006|    short|   Chinese Opium Den|   Chinese Opium Den|   null|1894-01-01|   null|             1|      

Which Documentary title has the most numVotes? <a id='5.3.3'></a>

In [29]:
title_basics.filter(f.array_contains(title_basics.genres, 'Documentary')) \
    .select('tconst', 'primaryTitle') \
    .join(ratings, 'tconst', 'inner') \
    .sort(f.desc('numVotes')) \
    .limit(1) \
    .select('primaryTitle') \
    .collect()[0]['primaryTitle']

'Planet Earth'

Which region is the most common? <a id='5.3.4'></a>

In [9]:
akas.groupBy('region') \
    .count() \
    .sort(f.desc('count')) \
    .collect()[0]['region']

'DE'

Get title with the highest average rating <a id='5.3.5'></a>

In [14]:
ratings.join(title_basics, 'tconst', 'left') \
    .select('tconst', 'originalTitle', 'averageRating') \
    .sort(f.desc('averageRating')) \
    .collect()[0]['originalTitle']

'Die Fee'

Get title with the highest number of episodes <a id='5.3.6'></a>

In [9]:
episode.groupBy('parentTconst') \
    .count() \
    .join(title_basics, episode.parentTconst == title_basics.tconst, 'left') \
    .sort(f.desc('count')) \
    .collect()[0]['originalTitle']

'NRK Nyheter'

Get title of the top 1 TV Series based on the average user ratings and the number of votes they received? <a id='5.3.7'></a>

In [21]:
# Filter only TV Series from title.basics dataset
tv_series_df = title_basics.filter(f.col("titleType") == "tvSeries")

# Join with title.ratings dataset to get ratings and numVotes
tv_series_ratings_df = tv_series_df.join(ratings, "tconst")

# Create window specification to rank TV Series based on averageRating and numVotes
windowSpec = Window.orderBy(f.desc("averageRating"), f.desc("numVotes"))

# Add rank column based on windowSpec
tv_series_ranked_df = tv_series_ratings_df.withColumn("rank", f.rank().over(windowSpec))

# Select the top 10 TV Series
top_10_tv_series = tv_series_ranked_df.filter(f.col("rank") <= 10).select("tconst", "primaryTitle", "averageRating", "numVotes")

# Show the result
top_10_tv_series.collect()[0]['primaryTitle']

'Confessionário Online'

Get title of the top 1 TV Series with the most extended runtime per episode? <a id='5.3.8'></a>

In [22]:
# Filter only TV Series from title.basics dataset
tv_series_df = title_basics.filter(f.col("titleType") == "tvSeries")

# Create a window specification to partition by TV Series and calculate the maximum runtime per episode
windowSpec = Window.partitionBy("tconst")

# Calculate the maximum runtime per episode for each TV Series
tv_series_runtime_df = tv_series_df.withColumn("maxRuntimePerEpisode", f.max("runtimeMinutes").over(windowSpec))

# Create a window specification to rank TV Series based on maxRuntimePerEpisode
windowSpecRank = Window.orderBy(f.desc("maxRuntimePerEpisode"))

# Add rank column based on windowSpecRank
tv_series_ranked_df = tv_series_runtime_df.withColumn("rank", f.rank().over(windowSpecRank))

# Select the top 10 TV Series based on maxRuntimePerEpisode
top_10_tv_series_runtime = tv_series_ranked_df.filter(f.col("rank") <= 10).select("tconst", "primaryTitle", "maxRuntimePerEpisode")

# Show the result
top_10_tv_series_runtime.collect()[0]['primaryTitle']

'The Sharing Circle'