In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *

In [None]:
import os
import json
import sys
sys.path.append('/opt/workspace/facial_database/python_scripts/')
import database_operations as db

In [None]:
master = "spark://spark-master:7077"
app_name = "Getting list of actors"
spark = (
    SparkSession.builder
    .appName(app_name)
    .master(master)
    .config("spark.driver.memory", "512m")
    .config("spark.driver.cores", "1")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("WARN")

print("Spark version: " + str(sc.version))

In [None]:
datasets_path = '/opt/workspace/facial_database/datasets/imdb_datasets/'
names_file = 'name.basics.tsv.gz'
basics_file = 'title.basics.tsv.gz'
crew_file = 'title.crew.tsv.gz'
principals_file = 'title.principals.tsv.gz'
director_name = 'Christopher Nolan'
birth_year = 1970

In [None]:
raw_names_df = spark.read.csv(datasets_path + names_file,header= True, sep =r'\t')
raw_basics_df = spark.read.csv(datasets_path + basics_file,header= True, sep =r'\t')
raw_crew_df = spark.read.csv(datasets_path + crew_file,header= True, sep =r'\t')
raw_principals_df = spark.read.csv(datasets_path + principals_file,header= True, sep =r'\t').alias('raw_principals_df')

In [None]:
director_id = raw_names_df.filter((raw_names_df.primaryName == director_name) & (raw_names_df.birthYear == birth_year) ).select('nconst').collect()[0][0]
movies_ids_df = raw_crew_df.filter(F.col('directors').like('%' + director_id + '%')).select('tconst')
movies_df = raw_basics_df.join(movies_ids_df, ['tconst'], 'inner')

In [None]:
principals_df = raw_principals_df.join(movies_df, ['tconst'], 'inner').select('raw_principals_df.*')

In [None]:
people_id_df = principals_df.select('nconst').dropDuplicates()
people_df = raw_names_df.join(people_id_df, ['nconst'], 'inner')

In [None]:
#MySQL metadata (defined in docker-compose file)
sql_user = 'WIOS_User'
sql_pwd = 'Whoisonscreen!'
db_name = director_name.replace(' ','_').lower() 
uri = 'mysql+pymysql://' + sql_user + ':' + sql_pwd + '@' + 'mysql' + ':' + '3306' + '/' + db_name
imdb_metadata_folder = '/opt/workspace/facial_database/python_scripts/'
imdb_metadata_file = 'imdb_metadata.json'

In [None]:
db.create_database(sql_user, sql_pwd, db_name)

In [None]:
def create_tables():

    imdb_datasets = json.load(open(imdb_metadata_folder + imdb_metadata_file))

     #Creating tables in SQL and loading the data from the .tsv files
    for datasets in imdb_datasets['datasets']:
        table_name = datasets['name']
        columns = datasets['columns']
        db.create_table(sql_user,sql_pwd,db_name,table_name,columns)

In [None]:
create_tables()

In [None]:
def load_data(spark_df,table_name):
    conn = db.return_conn(uri)    
    db.insert_pd_df_in_table(conn,spark_df.toPandas(),table_name)

In [None]:
load_data(movies_df,'imdb_titles')

In [None]:
load_data(principals_df,'imdb_actors_by_title')

In [None]:
load_data(people_df,'imdb_peoples')

In [None]:
spark.stop()