In [165]:
import pyarrow as pa
import pyarrow.csv
import pyarrow.json
import pyarrow.parquet as pq
import os
import glob
import pyarrow.fs as fs
import json
import pyarrow.compute

# 1. Convert to parquet for faster handling with DuckDb

In [166]:
project_dir = os.getcwd()
data_dir = project_dir + "/data"

In [167]:
train_paths = glob.glob(f'{data_dir}/train-*.csv')

In [168]:
parquet_folder = project_dir + "/data_parquet"

In [169]:
def convert_to_parquet(csv_path, output_destination, overwrite=True):
    print("Reading file ", csv_path)
    table = pa.csv.read_csv(csv_path)
    file_name = os.path.basename(csv_path)
    output_filename = os.path.splitext(file_name)[0] + '.parquet'
    output_path = output_destination + "/" + output_filename
    print(f'Writing {output_filename} to {output_path}...')
    local_fs = fs.LocalFileSystem()

    try:
        # Attempt to write the Arrow table to a Parquet file
        pq.write_table(table, output_path)
    
    except fs.FSFileAlreadyExists:
        if overwrite:
            # If the Parquet file already exists, delete it and then write the new file
            local_fs.delete_file(output_path)
            pq.write_table(table, output_path)
        else:
            print(f"File already exists, skipping {output_filename}")

In [170]:
for path in train_paths:
    convert_to_parquet(path, parquet_folder)

Reading file  /Users/martinarnold/uva/bigdata/big_data/data/train-6.csv
Writing train-6.parquet to /Users/martinarnold/uva/bigdata/big_data/data_parquet/train-6.parquet...
Reading file  /Users/martinarnold/uva/bigdata/big_data/data/train-7.csv
Writing train-7.parquet to /Users/martinarnold/uva/bigdata/big_data/data_parquet/train-7.parquet...
Reading file  /Users/martinarnold/uva/bigdata/big_data/data/train-5.csv
Writing train-5.parquet to /Users/martinarnold/uva/bigdata/big_data/data_parquet/train-5.parquet...
Reading file  /Users/martinarnold/uva/bigdata/big_data/data/train-4.csv
Writing train-4.parquet to /Users/martinarnold/uva/bigdata/big_data/data_parquet/train-4.parquet...
Reading file  /Users/martinarnold/uva/bigdata/big_data/data/train-1.csv
Writing train-1.parquet to /Users/martinarnold/uva/bigdata/big_data/data_parquet/train-1.parquet...
Reading file  /Users/martinarnold/uva/bigdata/big_data/data/train-3.csv
Writing train-3.parquet to /Users/martinarnold/uva/bigdata/big_data/

## Directors

In [171]:
directing_filepath = project_dir + "/data/directing.json"

In [172]:
with open(directing_filepath, 'r') as json_file:
    json_data = json.load(json_file)

In [173]:
movies = json_data['movie'].values()
directors = json_data['director'].values()

In [174]:
len(set(directors))

7189

In [175]:
movies = pa.array(movies)
directors = pa.array(directors)
names = ['movie', 'director']

In [176]:
table = pa.Table.from_arrays([movies, directors], names=names)

In [177]:
pq.write_table(table, project_dir+"/data_parquet/directing.parquet")

## Writers

In [178]:
directing_filepath = project_dir + "/data/writing.json"
with open(directing_filepath, 'r') as json_file:
    json_data = json.load(json_file)

In [179]:
movies = [entry['movie'] for entry in json_data]
writers = [entry['writer'] for entry in json_data]

In [180]:
movies = pa.array(movies)
writers = pa.array(writers)
names = ['movie', 'writer']
table = pa.Table.from_arrays([movies, writers], names=names)
pq.write_table(table, project_dir + "/data_parquet/writing.parquet")

In [267]:
convert_to_parquet(data_dir+"/validation_hidden.csv", parquet_folder)

Reading file  /Users/martinarnold/uva/bigdata/big_data/data/validation_hidden.csv
Writing validation_hidden.parquet to /Users/martinarnold/uva/bigdata/big_data/data_parquet/validation_hidden.parquet...


# 2. Cleaning

In [181]:
from unidecode import unidecode

In [182]:
import duckdb
from duckdb.typing import *

In [183]:
# some DuckDB setup
con = duckdb.connect()
# enable automatic query parallelization
con.execute("PRAGMA threads=2")
# enable caching of parquet metadata
con.execute("PRAGMA enable_object_cache")

<duckdb.duckdb.DuckDBPyConnection at 0x176d24670>

In [184]:
def convert_to_ascii(input_string):
    return unidecode(input_string)

In [185]:
con.create_function('to_ascii', convert_to_ascii, [VARCHAR], VARCHAR)

<duckdb.duckdb.DuckDBPyConnection at 0x176d24670>

In [226]:
train = "'data_parquet/train-*.parquet'"
directing = "'data_parquet/directing.parquet'"
writing = "'data_parquet/writing.parquet'"

In [187]:
query = f"""
     with train as (
        select *
        from {train}
        ),
    directing as (
        select 
            movie
            , director
        from {directing}
        group by movie, director),
    writing as (
        select 
            movie
            , writer
        from {writing}
        group by movie, writer),
    clean_data as (
        select 
            try_cast(tconst as varchar) as tconst
            , try_cast(primaryTitle as varchar) as primaryTitle
            , try_cast(originalTitle as varchar) as originalTitle
            , try_cast(runtimeMinutes as integer) as runtimeMinutes
            , try_cast(numVotes as integer) as numVotes
            , to_ascii(primaryTitle) as title
            , try_cast(startYear as integer) as yearStart, 
            try_cast(endYear as integer) as yearEnd,
            case 
                when yearStart is not null and yearEnd is null then yearStart
                when yearStart is NULL and yearEnd is not NULL then yearEnd
                else yearStart
                end as year,
                label 
                , case
                    when director = '\\N' then null
                    else try_cast(director as varchar)
                    end as director
                , case
                    when writer = '\\N' then null
                    else try_cast(writer as varchar)
                    end as writer
            ,case
                when title != originalTitle and originalTitle != '' then True
                else False
            end as foreign_movie
            , case 
                when label = True then 1
                else 0
                end as label_int
        from train left outer join directing
        on train.tconst = directing.movie
        left outer join writing
        on train.tconst = writing.movie
    ),
    director_rating as (
        select 
            director
            , count(distinct tconst) as n_successes_dir
        from clean_data
        where label=True
        group by director
    ), 
    writer_rating as (
        select 
                writer
                , count(distinct tconst) as n_successes_wri
            from clean_data
            where label=True
            group by writer
    )
    select 
        tconst
        ,title 
        ,year
        , foreign_movie
        , runtimeMinutes
        , numVotes
        ,listagg(distinct clean_data.director, ', ') as directors
        ,listagg(distinct clean_data.writer, ', ') as writers
        ,len(directors) as n_directors
        ,len(writers) as n_writers
        , len(title) as title_length
        ,label
        , sum(n_successes_dir) as n_successes_dir
        , sum(n_successes_wri) as n_successes_wri
    from clean_data
    left outer join director_rating on clean_data.director = director_rating.director
    left outer join writer_rating on clean_data.writer = writer_rating.writer
    group by tconst, title, year, foreign_movie, label, runtimeMinutes, numVotes
        
    
"""

In [188]:
aggregated = con.execute(query).df()

In [189]:
aggregated

Unnamed: 0,tconst,title,year,foreign_movie,runtimeMinutes,numVotes,directors,writers,n_directors,n_writers,title_length,label,n_successes_dir,n_successes_wri
0,tt0010600,The Doll,1919,True,66.0,1898.0,nm0523932,"nm0473134, nm0932559, nm0523932, nm0006782",9.0,42.0,8,True,32.0,7.0
1,tt0028333,Swing Time,1936,False,103.0,,nm0828419,"nm0949130, nm0416861, nm0391750, nm0892044, nm...",9.0,75.0,10,True,35.0,16.0
2,tt0034862,Holiday Inn,1942,False,100.0,14436.0,"nm0020980, nm0762263","nm0616893, nm0000927, nm0723418, nm0083125, nm...",20.0,75.0,11,True,21.0,20.0
3,tt0036260,Phantom Lady,1944,False,87.0,4833.0,nm0802563,"nm0941280, nm0774441",9.0,20.0,12,True,6.0,6.0
4,tt0037055,The Mask of Dimitrios,1944,False,95.0,3530.0,nm0624535,"nm0001907, nm0344246",9.0,20.0,21,True,6.0,6.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7954,tt5913184,Spitfire,2018,False,99.0,1054.0,"nm1406508, nm1116645",,20.0,,8,True,3.0,
7955,tt0056930,A Child Is Waiting,1963,False,102.0,2842.0,nm0001023,nm0542631,9.0,9.0,18,True,4.0,3.0
7956,tt4126304,Cartel Land,2015,False,100.0,18265.0,nm1292648,,9.0,,11,True,2.0,
7957,tt0349467,Freedomland,2006,False,113.0,16091.0,nm0005387,nm0697115,9.0,9.0,11,False,,1.0


In [190]:
query = f"""
    with train as (
        select *
        from {train}
        ),
    directing as (
        select 
            movie
            , director
        from {directing}
        group by movie, director),
    writing as (
        select 
            movie
            , writer
        from {writing}
        group by movie, writer),
   clean_data as (
        select 
            try_cast(tconst as varchar) as tconst
            , try_cast(primaryTitle as varchar) as primaryTitle
            , try_cast(originalTitle as varchar) as originalTitle
            , try_cast(runtimeMinutes as integer) as runtimeMinutes
            , try_cast(numVotes as integer) as numVotes
            , to_ascii(primaryTitle) as title
            , try_cast(startYear as integer) as yearStart, 
            try_cast(endYear as integer) as yearEnd,
            case 
                when yearStart is not null and yearEnd is null then yearStart
                when yearStart is NULL and yearEnd is not NULL then yearEnd
                else yearStart
                end as year,
                label 
                , case
                    when director = '\\N' then null
                    else try_cast(director as varchar)
                    end as director
                , case
                    when writer = '\\N' then null
                    else try_cast(writer as varchar)
                    end as writer
            ,case
                when title != originalTitle and originalTitle != '' then True
                else False
            end as foreign_movie
        from train left outer join directing
        on train.tconst = directing.movie
        left outer join writing
        on train.tconst = writing.movie
    )
    select 
        *
    from clean_data
        
    
"""

In [191]:
non_agg = con.execute(query).df()

# 3. Get train set

In [192]:
query = f"""
     with train as (
        select *
        from {train}
        ),
    directing as (
        select 
            movie
            , director
        from {directing}
        group by movie, director),
    writing as (
        select 
            movie
            , writer
        from {writing}
        group by movie, writer),
    clean_data as (
        select 
            try_cast(tconst as varchar) as tconst
            , try_cast(primaryTitle as varchar) as primaryTitle
            , try_cast(originalTitle as varchar) as originalTitle
            , try_cast(runtimeMinutes as integer) as runtimeMinutes
            , try_cast(numVotes as integer) as numVotes
            , to_ascii(primaryTitle) as title
            , try_cast(startYear as integer) as yearStart, 
            try_cast(endYear as integer) as yearEnd,
            case 
                when yearStart is not null and yearEnd is null then yearStart
                when yearStart is NULL and yearEnd is not NULL then yearEnd
                else yearStart
                end as year,
                label 
                , case
                    when director = '\\N' then null
                    else try_cast(director as varchar)
                    end as director
                , case
                    when writer = '\\N' then null
                    else try_cast(writer as varchar)
                    end as writer
            ,case
                when title != originalTitle and originalTitle != '' then True
                else False
            end as foreign_movie
            , case 
                when label = True then 1
                else 0
                end as label_int
        from train left outer join directing
        on train.tconst = directing.movie
        left outer join writing
        on train.tconst = writing.movie
    ),
    director_rating as (
        select 
            director
            , count(distinct tconst) as n_successes_dir
        from clean_data
        where label=True
        group by director
    ), 
    writer_rating as (
        select 
                writer
                , count(distinct tconst) as n_successes_wri
            from clean_data
            where label=True
            group by writer
    ),
    aggregated as (
        select 
            tconst
            ,title 
            ,year
            , foreign_movie
            , runtimeMinutes
            , numVotes
            ,listagg(distinct clean_data.director, ', ') as directors
            ,listagg(distinct clean_data.writer, ', ') as writers
            ,len(directors) as n_directors
            ,len(writers) as n_writers
            , len(title) as title_length
            ,label
            , sum(n_successes_dir) as n_successes_dir
            , sum(n_successes_wri) as n_successes_wri
        from clean_data
        left outer join director_rating on clean_data.director = director_rating.director
        left outer join writer_rating on clean_data.writer = writer_rating.writer
        group by tconst, title, year, foreign_movie, label, runtimeMinutes, numVotes
    )
    select 
        tconst
        , year
        , foreign_movie
        , runtimeMinutes
        , numVotes
        , title_length
        , label
        , case 
            when n_successes_dir is null then 0
            else n_successes_dir
            end as n_successes_dir
        , case 
            when n_successes_wri is null then 0
            else n_successes_wri
            end as n_successes_wri
    from aggregated
"""

In [193]:
train_data = con.execute(query).df()

In [194]:
train_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7959 entries, 0 to 7958
Data columns (total 9 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   tconst           7959 non-null   object 
 1   year             7959 non-null   int32  
 2   foreign_movie    7959 non-null   bool   
 3   runtimeMinutes   7946 non-null   float64
 4   numVotes         7169 non-null   float64
 5   title_length     7959 non-null   int64  
 6   label            7959 non-null   bool   
 7   n_successes_dir  7959 non-null   float64
 8   n_successes_wri  7959 non-null   float64
dtypes: bool(2), float64(4), int32(1), int64(1), object(1)
memory usage: 419.8+ KB


In [195]:
df_train_data = train_data.dropna()

In [196]:
X = df_train_data.drop(['tconst', 'label'], axis=1).values
y = df_train_data.loc[:, 'label'].values

In [197]:
from sklearn.model_selection import train_test_split

In [198]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [199]:
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier

In [200]:
names = [
    "Nearest Neighbors",
    "Linear SVM",
    "Decision Tree",
    "Random Forest",
    "AdaBoost",
    "Logistic Regression",
]

In [201]:
classifiers = [
    KNeighborsClassifier(3),
    SVC(kernel="linear", C=0.025, random_state=42),
    DecisionTreeClassifier(max_depth=5, random_state=42),
    RandomForestClassifier(
        max_depth=5, n_estimators=10, max_features=1, random_state=42
    ),
    AdaBoostClassifier(algorithm="SAMME", random_state=42),
    LogisticRegression(random_state=0),
]

In [202]:
datasets = [(X, y)]

In [203]:
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.inspection import DecisionBoundaryDisplay

In [204]:
# preprocess dataset, split into training and test part
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=42)

# iterate over classifiers
for name, clf in zip(names, classifiers):
    print("Training ", name)

    clf = make_pipeline(StandardScaler(), clf)
    clf.fit(X_train, y_train)
    score = clf.score(X_test, y_test)
    print(score)


Training  Nearest Neighbors
0.8152287809989521
Training  Linear SVM
0.8190709046454768
Training  Decision Tree
0.9381767376877401
Training  Random Forest
0.9280475026196298
Training  AdaBoost
0.9430667132378624
Training  Logistic Regression
0.8232623122598672


In [236]:
clf = AdaBoostClassifier(algorithm="SAMME", random_state=42)
clf = make_pipeline(StandardScaler(), clf)
clf.fit(X_train, y_train)

# Predictions

In [283]:
validation = "'data_parquet/validation_hidden.parquet'"
test = "'data_parquet/test_hidden.parquet'"

In [284]:
test_query = f"""
     with train as (
        select *
        from {train}
        ),
    test as (
        select * 
        from {test}
    )
    , directing as (
        select 
            movie
            , director
        from {directing}
        group by movie, director),
    writing as (
        select 
            movie
            , writer
        from {writing}
        group by movie, writer),
    clean_train_data as (
        select 
            try_cast(tconst as varchar) as tconst
            , try_cast(primaryTitle as varchar) as primaryTitle
            , try_cast(originalTitle as varchar) as originalTitle
            , try_cast(runtimeMinutes as integer) as runtimeMinutes
            , try_cast(numVotes as integer) as numVotes
            , to_ascii(primaryTitle) as title
            , try_cast(startYear as integer) as yearStart, 
            try_cast(endYear as integer) as yearEnd,
            case 
                when yearStart is not null and yearEnd is null then yearStart
                when yearStart is NULL and yearEnd is not NULL then yearEnd
                else yearStart
                end as year,
                label 
                , case
                    when director = '\\N' then null
                    else try_cast(director as varchar)
                    end as director
                , case
                    when writer = '\\N' then null
                    else try_cast(writer as varchar)
                    end as writer
            ,case
                when title != originalTitle and originalTitle != '' then True
                else False
            end as foreign_movie
            , case 
                when label = True then 1
                else 0
                end as label_int
        from train left outer join directing
        on train.tconst = directing.movie
        left outer join writing
        on train.tconst = writing.movie
    ),
    clean_test_data as (
        select 
            try_cast(tconst as varchar) as tconst
            , try_cast(primaryTitle as varchar) as primaryTitle
            , try_cast(originalTitle as varchar) as originalTitle
            , try_cast(runtimeMinutes as integer) as runtimeMinutes
            , try_cast(numVotes as integer) as numVotes
            , to_ascii(primaryTitle) as title
            , try_cast(startYear as integer) as yearStart, 
            try_cast(endYear as integer) as yearEnd,
            case 
                when yearStart is not null and yearEnd is null then yearStart
                when yearStart is NULL and yearEnd is not NULL then yearEnd
                else yearStart
                end as year
                --, label 
                , case
                    when director = '\\N' then null
                    else try_cast(director as varchar)
                    end as director
                , case
                    when writer = '\\N' then null
                    else try_cast(writer as varchar)
                    end as writer
            ,case
                when title != originalTitle and originalTitle != '' then True
                else False
            end as foreign_movie
        from test left outer join directing
        on test.tconst = directing.movie
        left outer join writing
        on test.tconst = writing.movie
    ),
    director_rating as (
        select 
            director
            , count(distinct tconst) as n_successes_dir
        from clean_train_data
        where label=True
        group by director
    ), 
    writer_rating as (
        select 
                writer
                , count(distinct tconst) as n_successes_wri
            from clean_train_data
            where label=True
            group by writer
    ),
    aggregated as (
        select 
            tconst
            ,title 
            ,year
            , foreign_movie
            , runtimeMinutes
            , numVotes
            ,listagg(distinct clean_test_data.director, ', ') as directors
            ,listagg(distinct clean_test_data.writer, ', ') as writers
            ,len(directors) as n_directors
            ,len(writers) as n_writers
            , len(title) as title_length
            --,label
            , sum(n_successes_dir) as n_successes_dir
            , sum(n_successes_wri) as n_successes_wri
        from clean_test_data
        left outer join director_rating on clean_test_data.director = director_rating.director
        left outer join writer_rating on clean_test_data.writer = writer_rating.writer
        group by tconst, title, year, foreign_movie, runtimeMinutes, numVotes
    )
    select 
        tconst
        , year
        , foreign_movie
        , runtimeMinutes
        , numVotes
        , title_length
        --, label
        , case 
            when n_successes_dir is null then 0
            else n_successes_dir
            end as n_successes_dir
        , case 
            when n_successes_wri is null then 0
            else n_successes_wri
            end as n_successes_wri
    from aggregated
"""

In [285]:
test_data = con.execute(test_query).df()

In [286]:
test_data

Unnamed: 0,tconst,year,foreign_movie,runtimeMinutes,numVotes,title_length,n_successes_dir,n_successes_wri
0,tt0069994,1973,True,89.0,3066.0,26,0.0,0.0
1,tt0072930,1975,True,96.0,1395.0,15,0.0,0.0
2,tt0075404,1977,True,111.0,7862.0,10,0.0,0.0
3,tt0079688,1979,False,95.0,6795.0,13,0.0,0.0
4,tt0082406,1981,False,83.0,89382.0,21,18.0,35.0
...,...,...,...,...,...,...,...,...
1081,tt0114746,1995,True,129.0,599944.0,10,6.0,1.0
1082,tt1124052,2009,True,132.0,10817.0,22,1.0,1.0
1083,tt7897478,2018,False,113.0,2233.0,19,1.0,1.0
1084,tt2193364,2013,False,130.0,4561.0,18,0.0,1.0


**imput missing values**

In [287]:
test_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1086 entries, 0 to 1085
Data columns (total 8 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   tconst           1086 non-null   object 
 1   year             1086 non-null   int32  
 2   foreign_movie    1086 non-null   bool   
 3   runtimeMinutes   1085 non-null   float64
 4   numVotes         967 non-null    float64
 5   title_length     1086 non-null   int64  
 6   n_successes_dir  1086 non-null   float64
 7   n_successes_wri  1086 non-null   float64
dtypes: bool(1), float64(4), int32(1), int64(1), object(1)
memory usage: 56.3+ KB


In [288]:
from sklearn.impute import SimpleImputer
import numpy as np

In [289]:
imp_mean = SimpleImputer(missing_values=np.nan, strategy='mean')

In [290]:
X_test = test_data.drop('tconst', axis=1).values

In [291]:
X_test = imp_mean.fit_transform(X_test)

In [292]:
y_pred = clf.predict(X_test)

In [293]:
y_pred

array([False, False, False, ...,  True, False,  True])

In [294]:
Y = pd.DataFrame(y_pred)
Y.to_csv("../predictions/test_hidden_martin.txt", sep="\t", index=False)

In [282]:
Y = pd.DataFrame(y_pred)
Y.to_csv("../predictions/validation_hidden_martin.txt", sep="\t", index=False)