In [None]:
import duckdb
import numpy as np
import pandas as pd
import pyspark
from pyspark.shell import spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import ArrayType, FloatType, DecimalType, StringType, IntegerType
from pyspark.sql.functions import size
from pyspark.storagelevel import StorageLevel
from pyspark import SparkContext
from pyspark.sql.functions import col,when
from pyspark.sql import SparkSession
from pyspark.pandas.spark import functions as SF

## Connect to duckDB and import data

In [None]:
con = duckdb.connect(database=':memory:')

In [None]:
# Create the tables structure:

for table in ['train', 'test', 'validation']:
    # Drop the tables if they already exist
    try:
        con.execute('''DROP TABLE ''' + table)
    except:
        pass
    
    # Create the table structures (with labels column for the train set):
    if table == 'train':
        # With label
        con.execute('''
        CREATE TABLE ''' + table + '''(num INT, tconst VARCHAR, primaryTitle VARCHAR, originalTitle VARCHAR, startYear varchar,
        endYear varchar, runtimeMinutes VARCHAR, numVotes FLOAT, label BOOL);
        ''')
    else:
        con.execute('''
        CREATE TABLE ''' + table + '''(num INT, tconst VARCHAR, primaryTitle VARCHAR, originalTitle VARCHAR, startYear varchar,
        endYear varchar, runtimeMinutes VARCHAR, numVotes FLOAT);
        ''')

In [None]:
#With copy the CSVs are appended to one table
from os import listdir
from os.path import isfile, join

path = os.getcwd() + "/data/"
files = [f for f in listdir(path) if isfile(join(path, f))]

for f in files:
    file = 'data/'+f
    if 'train-' in f:
        print('Reading train file...', f)
        con.execute("COPY train FROM '"+file+"' (AUTO_DETECT TRUE)")
            
    if 'test_' in f:
        print('Reading test file...', f)
        con.execute("COPY test FROM '"+file+"' (AUTO_DETECT TRUE)")
    
    if 'validation_' in f:
        print('Reading validation file...', f)
        con.execute("COPY validation FROM '"+file+"' (AUTO_DETECT TRUE)")

In [None]:
con.execute(''' SELECT * FROM train''').fetch_df()

Get the JSONs into different tables

In [None]:
jsonStr = 'writing.json'
# Convert JSON to DataFrame Using read_json()
try:
    df = pd.read_json(jsonStr)
except:
    df = pd.read_json('data/'+jsonStr)
con.execute("CREATE TABLE writing AS SELECT * FROM 'df'");

In [None]:

jsonStr2 = 'directing.json'
# Convert JSON to DataFrame Using read_json()
# Convert JSON to DataFrame Using read_json()
try:
    df2 = pd.read_json(jsonStr2)
except:
    df2 = pd.read_json('data/'+jsonStr2)

con.execute("CREATE TABLE directing AS SELECT * FROM 'df2'");

## From DB to Spark

In [None]:
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("IMDB") \
    .getOrCreate()

In [None]:
# Get the duckDB tables in spark

train_df=spark.createDataFrame(con.execute("SELECT * FROM train").fetchdf().where(pd.notnull(con.execute("SELECT * FROM train").fetchdf()),
                                                                                  None))
test_df=spark.createDataFrame(con.execute("SELECT * FROM test").fetchdf().where(pd.notnull(con.execute("SELECT * FROM test").fetchdf()),
                                                                                  None))
validation_df=spark.createDataFrame(con.execute("SELECT * FROM validation").fetchdf().where(pd.notnull(con.execute("SELECT * FROM validation").fetchdf()),
                                                                                  None))


## Preprocessing

In [None]:
# Drop null values from the desired columns
def drop_nulls(df, cols):
    df = df.dropna(subset=cols)
    return df

In [None]:
# Swap start and endyear where necesseary and add YearSinceRealease feature
from pyspark.sql.functions import when
def set_years(df):
    df = df.withColumn("endyear", when(df.endyear == "\\N","2022")
                                 .otherwise(df.endyear))
    df = df.withColumn("startyear", when(df.startyear == "\\N", df.endyear)
                              .otherwise(df.startyear))
    df = df.withColumn("endyear", when(df.endyear == df.startyear, "2022")
                              .otherwise(df.endyear))
    df = df.withColumn('YearSinceRealease', ( df['endyear'] - df['startyear'] ))
    return df

In [None]:
# Check runtime minutes to the mean when not available
def runtime_nulls(df):
    df = df.where(df.runtimeminutes != '\\N')

In [None]:
# Fill with mean in empty rows:
from pyspark.sql.functions import avg
def fill_with_mean(df, cols): 
    # First convert the non numeric values to None:
    for col in cols:
        df = df.withColumn(col, when(df[col] == "\\N", None)
                                 .otherwise(df[col]))
    # Then fill with the mean:
    fill_values = {column: df.agg({column:"mean"}).first()[0] for column in cols}
    df = df.na.fill(fill_values)
    return df

In [None]:
# EXECUTE THE PREPROCESSING:

def preprocessing(df):
    # Drop rows that have null values in runtimeminutes/numvotes
    # df = drop_nulls(df, ["runtimeminutes","numvotes"])
    # Swap start and endyear where necesseary, set end year and add YearSinceRealease feature
    df = set_years(df)
    # Check Runtime minutes
    df = fill_with_mean(df, ["runtimeminutes"])
    return df
    
train_proc = preprocessing(train_df)
test_proc = preprocessing(test_df)
val_proc = preprocessing(validation_df)

## Additional data

#### Save in DuckDB

In [None]:
try:
    con.execute("CREATE TABLE additional_train AS SELECT * FROM 'movies_info_train.csv';")
    con.execute("CREATE TABLE additional_test AS SELECT * FROM 'movies_info_test.csv';")
    con.execute("CREATE TABLE additional_val AS SELECT * FROM 'movies_info_val.csv';")
except:
    con.execute("CREATE TABLE additional_train AS SELECT * FROM 'data/movies_info_train.csv';")
    con.execute("CREATE TABLE additional_test AS SELECT * FROM 'data/movies_info_test.csv';")
    con.execute("CREATE TABLE additional_val AS SELECT * FROM 'data/movies_info_val.csv';")

#### From DB to Spark

In [None]:
# Get the duckDB tables in spark

train_extra_df=spark.createDataFrame(con.execute("SELECT * FROM additional_train").fetchdf().where(pd.notnull(con.execute("SELECT * FROM additional_train").fetchdf()),
                                                                                  None))
test_extra_df=spark.createDataFrame(con.execute("SELECT * FROM additional_test").fetchdf().where(pd.notnull(con.execute("SELECT * FROM additional_test").fetchdf()),
                                                                                  None))
validation_extra_df=spark.createDataFrame(con.execute("SELECT * FROM additional_val").fetchdf().where(pd.notnull(con.execute("SELECT * FROM additional_val").fetchdf()),
                                                                                  None))


In [None]:
print('Train dataset size:',train_df.count())
print('Extra features size:',train_extra_df.count())

In [None]:
writers_df=spark.createDataFrame(con.execute("SELECT * FROM writing").fetchdf().where(pd.notnull(con.execute("SELECT * FROM writing").fetchdf()),
                                                                                  None))
directors_df=spark.createDataFrame(con.execute("SELECT * FROM directing").fetchdf().where(pd.notnull(con.execute("SELECT * FROM directing").fetchdf()),
                                                                                  None))

In [None]:
from pyspark.sql.functions import collect_list
grouped_writers = writers_df.groupby('movie').agg(collect_list('writer').alias("writers"))

#### Merge all dataframe togerther (inito_numpy writers, directors, extra data)

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

def merge_dfs(df, df_extra):
    horiztnlcombined_data = df.join(df_extra, df.tconst == df_extra.imdb_id, 'inner')
    print(df.count(), ' + ', df_extra.count(), ' --> ', horiztnlcombined_data.count())
    return horiztnlcombined_data

train_merge_df = merge_dfs(train_df, train_extra_df)
test_merge_df = merge_dfs(test_df, test_extra_df)
val_merge_df = merge_dfs(validation_df, validation_extra_df)

In [None]:
directors_df = directors_df.selectExpr("movie as movie_d","director as director")
writers_directors = grouped_writers.join(directors_df, grouped_writers.movie == directors_df.movie_d, 'inner')

In [None]:

def final_merge(df, df_writ_dir):
    final_df = df.join(df_writ_dir, df.tconst == df_writ_dir.movie, 'inner')
    return final_df

train_final_df = final_merge(train_merge_df, writers_directors)
test_final_df = final_merge(test_merge_df, writers_directors)
val_final_df = final_merge(val_merge_df, writers_directors)

## Prepare for ML algorithm

#### Keep only useful columns

In [None]:
def drop_cols(df, cols):
    drop_df = df.drop(*cols)
    return drop_df

cols2drop = ('num', 'tconst', 'primarytitle', 'originaltitle', 'endyear', 'imdb_id', 'belongs_to_collection', 
        'budget', 'id', 'original_title', 'overview', 'production_companies',
         'release_date', 'revenue', 'runtime', 'tagline', 'title', 'video', 'vote_count', 'spoken_language_list',  'movie', 'movie_d')

train_df_clean = drop_cols(train_final_df, cols2drop)
test_df_clean = drop_cols(test_final_df, cols2drop)
val_df_clean = drop_cols(val_final_df, cols2drop)


#### Encoding

Trying to encode with One Hot Encoder for spark dataframe

In [None]:
#   ##  import the required libraries
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder


def hot_encode(df, col):
    ##  numeric indexing for the strings (indexing starts from 0)
    indexer = StringIndexer(inputCol=col, outputCol=col+'_ind')
    df = indexer.fit(df).transform(df)
    ohe = OneHotEncoder(inputCol=col+'_ind', outputCol=col+'OHEVector')
    df = ohe.fit(df).transform(df)
    return df

    
train_df_encode = hot_encode(train_df_clean, 'genre_list')
test_df_encode = hot_encode(test_df_clean, 'genre_list')
val_df_encode = hot_encode(val_df_clean, 'genre_list')


#### Convert to pandas

In [None]:
final_pandas = test_df_encode.toPandas()

In [None]:
final_pandas

#### ML Model 

In [None]:
final_pandas.isnull().sum()

In [None]:
pip install lightgbm

In [None]:
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.preprocessing import LabelEncoder

##### Create df to feed to the model (should be done in PySpark)

In [None]:
train_pd = train_df.toPandas()
writers_pd = writers_df.toPandas()
directors_pd = directors_df.toPandas()
more_pd = moredata_df.toPandas()

In [None]:
test_pd = test_hidden_df.toPandas()
val_pd = validation_hidden_df.toPandas()

In [None]:
merged1 = train_pd.merge(more_pd, left_on = 'tconst', right_on = 'imdb_id', how = 'inner')

In [None]:
merged1

In [None]:
merged_test = test_pd.merge(more_pd, left_on = 'tconst', right_on = 'imdb_id', how = 'inner')
merged_val = val_pd.merge(more_pd, left_on = 'tconst', right_on = 'imdb_id', how = 'inner')

In [None]:
merged_val

##### One hot encoding for features

In [None]:
one_hot_g = pd.DataFrame()
one_hot_g['genres'] = final_pandas['genre_list'].str.strip('[]').str.replace(' ','').str.replace("'",'').str.split(',')
#one_hot_g['genres'] = final_pandas['genre_list'].str.split(',')

In [None]:
one_hot_pr = pd.DataFrame()
one_hot_g['prods'] = final_pandas['production_list'].str.strip('[]').str.replace(' ','').str.replace("'",'').str.split(',')

In [None]:
one_hot_g['genres'] = one_hot_g['genres'].apply(lambda x: set(x))

In [None]:
one_hot_w = pd.DataFrame()
one_hot_w['writers'] = final_pandas['writers'].apply(lambda x: set(x))

In [None]:
from sklearn.preprocessing import MultiLabelBinarizer

mlb = MultiLabelBinarizer()
one_hot_df = pd.DataFrame(mlb.fit_transform(one_hot_g['genres']),columns=mlb.classes_)

In [None]:
one_hot_df1 = pd.DataFrame(mlb.fit_transform(one_hot_w['writers']),columns=mlb.classes_)

TODO: Include the directors

In [None]:
one_hot_d = pd.DataFrame()
# Get one hot encoding of columns B
one_hot_d = pd.get_dummies(final_pandas['director'])
# Drop column B as it is now encoded
#df = df.drop('B',axis = 1)
# Join the encoded df
#final_pandas = final_pandas.join(one_hot_d)


In [None]:
final_pandas = final_pandas.join(one_hot_df)

In [None]:
final_pandas = final_pandas.join(one_hot_df1)

In [None]:
#list(final_pandas.columns)

In [None]:
# creating instance of labelencoder
# labelencoder = LabelEncoder()
# # Assigning numerical values and storing in another column
# merged1['startyear_Cat'] = labelencoder.fit_transform(merged1['startyear'])
# merged1['production_countr_list_Cat'] = labelencoder.fit_transform(merged1['production_countr_list'])

In [None]:
        
# To define the input and output feature
# 'num','tconst','primarytitle','originaltitle', 'endyear',
# x = merged1.drop(["num","tconst","primarytitle","originaltitle", "endyear","imdb_id", "belongs_to_collection", "budget",
#         "id", "original_title", "overview", "tagline", "title", "video", "production_companies", "release_date",
#         "revenue", "runtime",'startyear',"adult", "original_language", "genre_list", "production_list", "production_countr_list",
#                        "spoken_language_list","genres_bin","production_bin", "label"],axis=1)

x = final_pandas.drop(["genre_list", "production_list","original_language",
                       'production_countr_list', 'writers', 'director','','\\N', "label"], axis=1)

x['runtimeminutes'] = x['runtimeminutes'].astype(float)
x['startyear'] = x['startyear'].astype(float)
x['adult'] = x['adult'].astype(int)
y = final_pandas['label']
# train and test split
x_train,x_test,y_train,y_test = train_test_split(x,y,test_size=0.33,random_state=42)

##### Make predictions for hidden data

In [None]:
x = merged_test.drop(["num","tconst","primarytitle","originaltitle", "endyear","imdb_id", "belongs_to_collection", "budget",
        "id", "original_title", "overview", "tagline", "title", "video", "production_companies", "release_date",
        "revenue", "runtime",'startyear',"adult", "original_language", "genre_list", "production_list", "production_countr_list",
                       "spoken_language_list","genres_bin","production_bin", "label"],axis=1)
x['runtimeminutes'] = x['runtimeminutes'].astype(float)
#prediction on the test set
y_pred=clf.predict(x)
#rounding the values
y_pred=y_pred.round(0)
#converting from float to integer
y_pred=y_pred.astype(int)

In [None]:
lgb_params = {

    'boosting_type':'gbdt',
    'objective': 'binary',
    'metric': 'binary_logloss',
    'n_estimators':10000,
    'learning_rate':0.3,
    'num_leaves':2840,
    'max_depth':10,
    'min_data_in_leaf': 300,
'lambda_l1': 35,
'lambda_l2': 65,
'min_gain_to_split': 7.394615335964813,
'bagging_fraction': 0.6,
'bagging_freq': 1,
'feature_fraction': 0.3
                } 
d_train=lgb.Dataset(x_train, label=y_train)

#train the model 
clf=lgb.train(lgb_params,d_train) #train the model on 100 epocs


In [None]:
clf.save_model('model_lgbm.txt')
clf.save_model("model_lgbm.json")

In [None]:
#model_lgb = lgb.Booster(model_file='model_lgbm.txt')

In [None]:
#prediction on the test set
y_pred=clf.predict(x_test)
#rounding the values
y_pred=y_pred.round(0)
#converting from float to integer
y_pred=y_pred.astype(int)

In [None]:
from sklearn.metrics import mean_squared_error,roc_auc_score,precision_score, accuracy_score
#roc_auc_score metric
accuracy_score(y_pred,y_test.values.astype(int))

In [None]:
validation_pd = pd.read_csv('validation_hidden.csv')
test_pd = pd.read_csv('test_hidden.csv')

In [None]:
validation_pd

In [None]:
# creating instance of labelencoder
labelencoder = LabelEncoder()
# Assigning numerical values and storing in another column
final_pandas['startyear_Cat'] = labelencoder.fit_transform(final_pandas['startyear'])
# final_pandas['writers_Cat'] = labelencoder.fit_transform(final_pandas['writer'])
# final_pandas['directors_Cat'] = labelencoder.fit_transform(final_pandas['director'])
final_pandas

In [None]:
# To define the input and output feature
# 'num','tconst','primarytitle','originaltitle', 'endyear',
x = final_pandas.drop(['startyear',"adult", "original_language", 
                       "popularity","vote_average", "vote_count", "genre_list", "production_list", "production_countr_list",
                       "spoken_language_list", 'writer','director'],axis=1)
x = x.dropna()
x['runtimeminutes'] = x['runtimeminutes'].astype(float)
y = x['label']
x = x.drop(['label'],axis=1)
# train and test split
x_train,x_test,y_train,y_test = train_test_split(x,y,test_size=0.33,random_state=42)

In [None]:
print('Training accuracy {:.4f}'.format(clf.score(x_train,y_train)))
print('Testing accuracy {:.4f}'.format(clf.score(x_test,y_test)))

In [None]:
lgb.plot_importance(clf)