In [88]:
import pandas as pd 
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, BooleanType, ArrayType, FloatType
import pyspark.sql.functions as func

In [2]:
#YEEE it work!
#!wget https://datasets.imdbws.com/name.basics.tsv.gz

In [3]:
title = '/home/yu_savchuk/petProject/create_datawarehouse/title_basics.tsv'
rating = '/home/yu_savchuk/petProject/create_datawarehouse/title_ratings.tsv'

In [79]:
### Check values in columns
df_title = pd.read_csv(title, sep='\t', low_memory=False, nrows=100)


In [78]:
df_rating = pd.read_csv(rating, sep='\t', low_memory=False, nrows=100)

In [6]:
df_title.head()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [9]:
df_rating

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1876
1,tt0000002,5.9,248
2,tt0000003,6.5,1648
3,tt0000004,5.8,160
4,tt0000005,6.2,2475
...,...,...,...
1239447,tt9916690,6.5,6
1239448,tt9916720,5.1,209
1239449,tt9916730,8.7,6
1239450,tt9916766,6.7,19


In [145]:
spark = SparkSession.builder.appName('Transformation').getOrCreate()

In [146]:
#Define schema for title file
title_schema = StructType([\
                           StructField('tconst', StringType(), True),\
                           StructField('titleType', StringType(), True),\
                           StructField('primaryTitle', StringType(), True),\
                           StructField('originalTitle', StringType(), True),\
                           StructField('isAdult', BooleanType(), True),\
                           StructField('startYear', IntegerType(), True),\
                           StructField('endYear', IntegerType(), True),\
                           StructField('runtimeMinutes', IntegerType(), True),\
                           StructField('genres', StringType(), True)
                           
])

In [147]:
### read title file
df_title = spark.read.csv(title, schema=title_schema, sep='\t', header=True)
df_title.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: boolean (nullable = true)
 |-- startYear: integer (nullable = true)
 |-- endYear: integer (nullable = true)
 |-- runtimeMinutes: integer (nullable = true)
 |-- genres: string (nullable = true)



In [148]:
### Drop unnecassary columns
df_title = df_title.drop('originalTitle', 'endYear')
df_title.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- isAdult: boolean (nullable = true)
 |-- startYear: integer (nullable = true)
 |-- runtimeMinutes: integer (nullable = true)
 |-- genres: string (nullable = true)



In [149]:
### Convert genres column to array type
df_title = df_title.withColumn('genres', func.split(df_title['genres'], ','))

In [150]:
df_title.take(40)

[Row(tconst='tt0000001', titleType='short', primaryTitle='Carmencita', isAdult=None, startYear=1894, runtimeMinutes=1, genres=['Documentary', 'Short']),
 Row(tconst='tt0000002', titleType='short', primaryTitle='Le clown et ses chiens', isAdult=None, startYear=1892, runtimeMinutes=5, genres=['Animation', 'Short']),
 Row(tconst='tt0000003', titleType='short', primaryTitle='Pauvre Pierrot', isAdult=None, startYear=1892, runtimeMinutes=4, genres=['Animation', 'Comedy', 'Romance']),
 Row(tconst='tt0000004', titleType='short', primaryTitle='Un bon bock', isAdult=None, startYear=1892, runtimeMinutes=12, genres=['Animation', 'Short']),
 Row(tconst='tt0000005', titleType='short', primaryTitle='Blacksmith Scene', isAdult=None, startYear=1893, runtimeMinutes=1, genres=['Comedy', 'Short']),
 Row(tconst='tt0000006', titleType='short', primaryTitle='Chinese Opium Den', isAdult=None, startYear=1894, runtimeMinutes=1, genres=['Short']),
 Row(tconst='tt0000007', titleType='short', primaryTitle='Corbett

In [151]:
### Using OneHotEncoding
my_genres = ['Comedy', 'Horror', 'Drama', 'Romance']

for genre in my_genres:
    df_title = df_title.withColumn(f'{genre}_OneHot',
                                   func.when(func.array_contains(func.col('genres'), genre) == True, 1).otherwise(0))

In [152]:
### Check created columns
df_title.select('Comedy_OneHot', 'Horror_OneHot').show()
### Everything OK

+-------------+-------------+
|Comedy_OneHot|Horror_OneHot|
+-------------+-------------+
|            0|            0|
|            0|            0|
|            1|            0|
|            0|            0|
|            1|            0|
|            0|            0|
|            0|            0|
|            0|            0|
|            0|            0|
|            0|            0|
|            0|            0|
|            0|            0|
|            0|            0|
|            1|            0|
|            0|            0|
|            0|            0|
|            0|            0|
|            0|            0|
|            1|            0|
|            0|            0|
+-------------+-------------+
only showing top 20 rows



In [153]:
### Define schema for ratings data
rating_schema = StructType([\
                           StructField('tconst', StringType(), True),\
                           StructField('averageRating', FloatType(), True),\
                           StructField('numVotes', IntegerType(), True)\
])

In [154]:
df_rating = spark.read.csv(rating, sep='\t', header=True, schema=rating_schema)

In [155]:
### Check schema
df_rating.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- averageRating: float (nullable = true)
 |-- numVotes: integer (nullable = true)



In [156]:
df_rating.show()

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1876|
|tt0000002|          5.9|     248|
|tt0000003|          6.5|    1648|
|tt0000004|          5.8|     160|
|tt0000005|          6.2|    2475|
|tt0000006|          5.2|     165|
|tt0000007|          5.4|     771|
|tt0000008|          5.4|    2016|
|tt0000009|          5.3|     193|
|tt0000010|          6.9|    6775|
|tt0000011|          5.3|     344|
|tt0000012|          7.4|   11635|
|tt0000013|          5.7|    1795|
|tt0000014|          7.1|    5214|
|tt0000015|          6.2|     991|
|tt0000016|          5.9|    1403|
|tt0000017|          4.6|     305|
|tt0000018|          5.3|     561|
|tt0000019|          5.2|      30|
|tt0000020|          4.8|     332|
+---------+-------------+--------+
only showing top 20 rows



In [157]:
### Create final dataset

df_main = df_title.join(func.broadcast(df_rating), ['tconst'], "inner")

In [158]:
df_main.take(6)

                                                                                

[Row(tconst='tt0000001', titleType='short', primaryTitle='Carmencita', isAdult=None, startYear=1894, runtimeMinutes=1, genres=['Documentary', 'Short'], Comedy_OneHot=0, Horror_OneHot=0, Drama_OneHot=0, Romance_OneHot=0, averageRating=5.699999809265137, numVotes=1876),
 Row(tconst='tt0000002', titleType='short', primaryTitle='Le clown et ses chiens', isAdult=None, startYear=1892, runtimeMinutes=5, genres=['Animation', 'Short'], Comedy_OneHot=0, Horror_OneHot=0, Drama_OneHot=0, Romance_OneHot=0, averageRating=5.900000095367432, numVotes=248),
 Row(tconst='tt0000003', titleType='short', primaryTitle='Pauvre Pierrot', isAdult=None, startYear=1892, runtimeMinutes=4, genres=['Animation', 'Comedy', 'Romance'], Comedy_OneHot=1, Horror_OneHot=0, Drama_OneHot=0, Romance_OneHot=1, averageRating=6.5, numVotes=1648),
 Row(tconst='tt0000004', titleType='short', primaryTitle='Un bon bock', isAdult=None, startYear=1892, runtimeMinutes=12, genres=['Animation', 'Short'], Comedy_OneHot=0, Horror_OneHot=0

In [159]:
### Write to parquet file
df_main.write.parquet('title_rating.parquet')

                                                                                