<a href="https://colab.research.google.com/github/marholm/BigData_Project/blob/main/BigData_IMDB_Team19.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 0. Set up environment

In [None]:
# Required installations
!pip install pyspark
!pip install findspark
# !pip install duckdb
# !pip install numpy
!pip install pandas
# !pip install fitter
!pip install chart-studio

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 13 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 42.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=e3aa8c111b763b502eb2cbf13f605c3b13d5876cc6ed8f91e264cb5f7c5529fb
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Collecting chart-studio
  Downloading chart_studio-1.1.0-

In [None]:
# import os       #importing os to set environment variable
# def install_java():
#   !apt-get install -y openjdk-8-jdk-headless -qq > /dev/null      #install openjdk
#   os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"     #set environment variable
#   !java -version       #check java version
# install_java()

In [None]:
# when errors occur in above method: !apt-get update

In [None]:
%env SPARK_HOME=/usr/local/lib/python3.7/dist-packages/pyspark

env: SPARK_HOME=/usr/local/lib/python3.7/dist-packages/pyspark


In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

spark = SparkSession.builder \
    .master("local") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.executor.memory", "70g") \
    .config("spark.driver.memory", "50g") \
    .config("spark.sql.analyzer.failAmbiguousSelfJoin", False) \
    .config("spark.memory.offHeap.enabled","true") \
    .config("spark.memory.offHeap.size","10g") \
    .getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# 1. Data Engineering

##  1.1 Data Ingestion

Collection of original data from Github, persisted without any transformation. 
* Training data (CSV)
* Testing and validation data (CSV)
* Writers and Directors (JSON)
- External data from: https://www.kaggle.com/rounakbanik/the-movies-dataset (CSV)

In [None]:
!git clone https://github.com/schelterlabs/big-data-course-2022-projects.git

Cloning into 'big-data-course-2022-projects'...
remote: Enumerating objects: 127, done.[K
remote: Counting objects: 100% (127/127), done.[K
remote: Compressing objects: 100% (108/108), done.[K
remote: Total 127 (delta 37), reused 49 (delta 5), pack-reused 0[K
Receiving objects: 100% (127/127), 5.88 MiB | 17.54 MiB/s, done.
Resolving deltas: 100% (37/37), done.


In [None]:
df_train = spark.read.options(header='True', inferSchema='True', delimiter=',')\
    .csv("big-data-course-2022-projects/imdb/train-*.csv")

In [None]:
df_train.count()

7959

In [None]:
df_test = spark.read.options(header='True', inferSchema='True', delimiter=',')\
    .csv("big-data-course-2022-projects/imdb/test_hidden.csv")

In [None]:
df_validation = spark.read.options(header='True', inferSchema='True', delimiter=',')\
    .csv("big-data-course-2022-projects/imdb/validation_hidden.csv")

In [None]:
df_writers = spark.read.json("big-data-course-2022-projects/imdb/writing.json")

In [None]:
df_writers.count()

22428

In [None]:
import pandas as pd

df_directors = pd.read_json("big-data-course-2022-projects/imdb/directing.json")
df_directors = spark.createDataFrame(df_directors)

In [None]:
df_directors.count()

11162

### 1.1.1 Exploratory analysis

Exploring original data 

In [None]:
# from chart_studio import plotly
# import plotly.offline as py
# import plotly.graph_objs as go

# # Info on https://plotly.com/python/v3/apache-spark/
# data = [go.Histogram(x=df_train.toPandas()['numVotes'])]
# py.iplot(data)

# Find outliers startYear
# data = [go.Histogram(x=df_train.toPandas()['startYear'])]
# py.iplot(data)

In [None]:
# import pyspark.sql.functions as F

# plot_labels = df_train.groupBy('label')\
#               .agg(
#                   F.count('label').alias("label count"), \
#                   F.avg("numVotes").alias("avg vote per label")\
#               )

# plot_labels.toPandas()

## 1.2 Data Preperation
* Data Cleaning
* Join writers and directors on training df

### 1.2.1 Data Cleaning
* Replace empty fields in `numVotes` with the integer 0.
* Replace special characters (e.g., accented characters such as  à and é) with ASCII characters.
* Replace empty `originalTitle` fields with a copy of `primaryTitle` and the other way around. 
* Replace `endYear` cells with `\N` with the value from `startYear`.
* Check outliers. Look for movies before 1900 for example.
* Capitalization rules. Chose a specific format.**bold text**

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
import unicodedata
import numpy as np

def replace_empty_with_nan(df, col=None, t=None):
    df = df.withColumn(col, \
                      F.when(df[col].isNull(), np.nan)\
                      .otherwise(df[col])\
                      .cast(t))
    return df


def replace_empty_with_zero(df, col=None, t=None):
    df = df.withColumn(col, \
                       F.when(df[col].isNull(), 0)\
                       .otherwise(df[col])\
                       .cast(t))
    return df

def replace_special_characters(df, col=None):
    my_udf_2 = F.udf(lambda x: unicodedata.normalize('NFD', x)\
                     .encode('ascii', 'ignore').decode("utf-8"))
    df = df.withColumn(col, my_udf_2(col))

    return df

def replace_empty_with_col(df, col=None, col1=None, t=None):
    df = df.withColumn(col, \
                      F.when(df[col].isNull(), df[col1])\
                      .otherwise(df[col])\
                      .cast(t))
    return df

def replace_empty_years(df):
    df = df.withColumn("endYear", \
                      F.when(df["endYear"] == r"\N", df["startYear"])\
                      .otherwise(df["endYear"])\
                      .cast(T.IntegerType()))
  
    df = df.withColumn("startYear", \
                      F.when(df["startYear"] == r"\N", df["endYear"])\
                      .otherwise(df["startYear"])\
                      .cast(T.IntegerType()))
    
    return df

In [None]:
def clean_df(df):
    """ Functions calls the different cleaning functions. Here you can experiment"""

    df = replace_empty_with_nan(df, "runtimeMinutes", T.IntegerType())
    df = replace_empty_with_nan(df, "numVotes", T.IntegerType())

    df = replace_empty_with_col(df, "originalTitle", "primaryTitle", T.StringType())
    df = replace_empty_with_col(df, "primaryTitle", "originalTitle", T.StringType())
    df = replace_empty_with_col(df, "numVotes", "numVotes", T.IntegerType())

    df = replace_special_characters(df, "primaryTitle")
    df = replace_special_characters(df, 'originalTitle')

    df = replace_empty_years(df)

    return df

In [None]:
# Clean TRAIN data
df_train = clean_df(df_train)
# Check for invalid values
df_train.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_train.columns]).show()


+---+------+------------+-------------+---------+-------+--------------+--------+-----+
|_c0|tconst|primaryTitle|originalTitle|startYear|endYear|runtimeMinutes|numVotes|label|
+---+------+------------+-------------+---------+-------+--------------+--------+-----+
|  0|     0|           0|            0|        0|      0|            13|       0|    0|
+---+------+------------+-------------+---------+-------+--------------+--------+-----+



In [None]:
# Clean TEST
df_test = clean_df(df_test)
# Check if empty values
df_test.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_test.columns]).show() 
df_test.count() # 1086 rows

+---+------+------------+-------------+---------+-------+--------------+--------+
|_c0|tconst|primaryTitle|originalTitle|startYear|endYear|runtimeMinutes|numVotes|
+---+------+------------+-------------+---------+-------+--------------+--------+
|  0|     0|           0|            0|        0|      0|             1|       0|
+---+------+------------+-------------+---------+-------+--------------+--------+



1086

In [None]:
# Clean VALID
df_validation = clean_df(df_validation)
# Check for invalid values
df_validation.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_validation.columns]).show()
df_validation.count() #955 rows

+---+------+------------+-------------+---------+-------+--------------+--------+
|_c0|tconst|primaryTitle|originalTitle|startYear|endYear|runtimeMinutes|numVotes|
+---+------+------------+-------------+---------+-------+--------------+--------+
|  0|     0|           0|            0|        0|      0|             2|       0|
+---+------+------------+-------------+---------+-------+--------------+--------+



955

### 1.2.2 Outliers

@ Marianne, maybe we can look together what we keep and what we can remove from this part:)

In [None]:
# Calculate important metrics
from pyspark.sql.functions import mean, stddev, col
from statsmodels import robust
import matplotlib.pyplot as plt

# Task: Outliers
numVotes_show = df_train.select("numVotes")

# Mean
numVotes_mean = df_train.select(mean("numVotes"))

# Standard Deviation (average deviation from the mean)
numVotes_std = df_train.select(stddev("numVotes"))

# Median Absolute Deviation (MAD)
# 1. Put values of numVotes in a list 
numVotesList = df_train.select("numVotes").rdd.flatMap(lambda x: x).collect()

# 2. Using numpy MAD function 
numVotes_mad = robust.mad(numVotesList)  # add c=1 to eliminate scaling factor
print('Median Absolute Deviation(numVotes): ', numVotes_mad)

  import pandas.util.testing as tm


Median Absolute Deviation(numVotes):  2865.8700883713286


In [None]:
from pyspark.sql.functions import lit, avg
from pyspark.sql.window import Window
# x = df_train.groupBy('startYear')\
#               .agg(F.avg("numVotes").alias("avg vote per year")\
#               )


# # # doesnt work with the function
# # # how to add a new column 
# # .alias('Mean_column'
# # Create variable -> Calculate average -> Use .lit and put variable in it
# z = F.avg('numVotes')
# value = z
# print(value)
# # i = F.lit(F.mean('numVotes'))
# # g = F.avg(df_train['numVotes'])
# # h = lit(F.mean(df_train['numVotes']))
# # y = df_train.withColumn(('numVotes'), lit(120))
# #df2 = df_train.select(col("numVotes"),lit(numVotes_mean).alias("New_Column"))

# #mean_ = df_train.groupBy().avg("numVotes").take(1)[0][0]
# #df_train.withColumn("test", lit(mean)).show()

# #df_train.withColumn("mean", lit(df_train.select(avg("numVotes") AS ("temp")).first().getAs("temp"))).show()

# #df2.show()
#partitionBy('numVotes')
windowSpec = Window.orderBy("directors")
# .alias('newshit') -> using alias had no effect
df_train.withColumn("tryAvgCol", lit(F.avg("numVotes").over(windowSpec)))  

# ok super basic: 1. ny kolonne -> 2. lit(mean) -> 3. when 0-> lit
# df_train = df_train.withColumn("newAvgCol", lit(1000))

# df_train = df_train.withColumn('numVotes', \
#                     F.when(df_train['numVotes'].isNull(), df_train["newAvgCol"])\
#                     .otherwise(df_train['numVotes'])\
#                     .cast(T.IntegerType())).show()




AnalysisException: ignored

In [None]:
# Filter Outliers
# Filter rows by movie-date (1888 < movie_date < 2022)
df_train.filter(df_train['endYear'] < 1888)
df_train.filter(df_train['endYear'] > 2022)

# Replace numVotes==0 with the mean THE ERROR IS REPLACE PART!
# myudf = F.udf(lambda x: F.avg if x is None else x)

# df_train = df_train.withColumn('numVotes',\
#                                 F.when(df_train['numVotes'] == np.isnan(),\
#                                        df_train.agg(F.avg(c) for c in df_train.columns))\
#                                 .otherwise(df_train['numVotes'])).show()


# df_train = df_train.withColumn('numVotes',\
#                                 F.when(df_train['numVotes'].isnan(), myudf)\
#                                 .otherwise(df_train['numVotes'])).show()

#df_train.withColumn('numVotes',\
#                    F.when('numVotes').isNull(), mean('numVotes')\
#                    .otherwise(col('numVotes')))
# Filter outliers based on MAD
# df_train.filter((df_train['numVotes'] > (mad*10)) | (df_train['numVotes'] < (mad/10))).show()

# Filter outliers deviating more than 2 times from the mad (or something)
# df_train = df_train.withColumn('numVotes',\
#                    F.where(numVotes_mad/2 < df_train['numVotes'] < numVotes_mad*2))\
#                    .show()


# # MAD outlier filter
# MADdf = df.groupby('genre')
# .agg(F.expr('percentile(duration, array(0.5))')[0]
#      .alias('duration_median')).join(df, "genre", "left")
#      .withColumn("duration_difference_median", F.abs(F.col('duration')-F.col('duration_median')))
#      .groupby('genre', 'duration_median').agg(F.expr('percentile(duration_difference_median, array(0.5))')[0]
#                                               .alias('median_absolute_difference'))

# # source: https://toritompkins.co.uk/identifying-data-outliers-in-apache-spark-3-0/
# mad_df = df_train.groupBy('numVotes').agg(F.expr('percentile(duration, array(0.5))')[0]\
#      .alias('duration_median').join(df_train, 'numVotes', 'left')\
#      .withColumn('duration_difference_median', F.abs(F.col('duration')-F.col('duration_median')))\
#      .groupBy('numVotes', 'duration_median').agg(F.expr('percentile(duration_difference_median, array(0.5))')[0]\
#                                                  .alias('median_absolute_difference')))

# outliersremoved = df_train.join(mad_df, "numVotes", "left")\
# .filter(F.abs(F.col("duration")-F.col("duration_median")) <= (F.col("mean_absolute_difference")*3))




###1.2.3 Writers and Directors

In [None]:
import numpy as np
import csv
import re
from itertools import chain

movie_writers = {}
movie_directors = {}

# Read external data
with open("drive/MyDrive/Bigdata-Grp19/ext data/credits.csv", 'r') as f:
    reader = csv.DictReader(f)
    for row in reader: # every row contains crew of 1 movie 
      crew = re.findall(r"\{([^}]+)\}", row['crew']) # find { crew }

      writers = []
      directors = []

      for people in crew:
        name = re.search(r"(?<='name': )(..*?),", people)
        
        if 'Director' in people:
            director = name.group(1).replace("'", "")
            directors.append(director)
        if 'Writer' in people:
            writer = name.group(1).replace("'", "")
            writers.append(writer)

        if len(writers) > 0:
          movie_writers[row['id']] = writers
        else:
          movie_writers[row['id']] = ''

        if len(directors) > 0:
          movie_directors[row['id']] = directors
        else:
          movie_directors[row['id']] = ''

writers = list(chain(movie_writers.values()))
directors = list(chain(movie_directors.values()))
movie_ids = list(movie_writers.keys())

ext_df = pd.DataFrame(data={'id': movie_ids, 'writers': writers, 'directors': directors})

ext_df["writers"] = [','.join(map(str, x)) for x in ext_df['writers']]
ext_df["directors"] = [','.join(map(str, x)) for x in ext_df['directors']]
ext_df["id"] = ext_df["id"].astype(int)

ext_df_spark = spark.createDataFrame(ext_df, "movie_id: int, writers: string, directors: string")

In [None]:
# ext_df_spark.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in ext_df_spark.columns]).show()

In [None]:
ext_meta = pd.read_csv("drive/MyDrive/Bigdata-Grp19/ext data/movies_metadata.csv", low_memory=False)
ext_meta = ext_meta.loc[:, ['id', 'imdb_id', 'vote_average', 'vote_count']]

ext_meta['id'] = ext_meta['id'].apply(lambda x: x.replace('-', ''))
ext_meta["id"] = ext_meta["id"].astype(int)

ext_meta["imdb_id"] = ext_meta["imdb_id"].astype(str)
ext_meta["vote_average"] = ext_meta["vote_average"].astype(float)

ext_meta["vote_count"] = ext_meta["vote_count"].astype(float)

ext_meta_spark = spark.createDataFrame(ext_meta, "m_id: int,  imdb_id: string, vote_average: float, vote_count: float")

In [None]:
# Writers, directors plus imdb_id from metadata
ext_data_result = ext_meta_spark.join(ext_df_spark,\
                                    ext_df_spark.movie_id == ext_meta_spark.m_id, "inner") \
                                    .drop(ext_df_spark.movie_id)\
                                    .drop(ext_meta_spark.m_id)

In [None]:
# ext_data_result.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in ext_data_result.columns]).show()

In [None]:
# Try: joining using pandas
# Convert df to pandas-df
df_pandas = df_train.toPandas()
#print(df_pandas)
df_train = df_train.join(ext_data_result, \
                          df_train["tconst"] == ext_data_result["imdb_id"],"left") \
                         .drop(ext_data_result.imdb_id)

In [None]:
#Writers directors on the df_train (join left to use original df as base)
df_train = df_train.join(ext_data_result, \
                        df_train["tconst"] == ext_data_result["imdb_id"],"left") \
                       .drop(ext_data_result.imdb_id)



In [None]:
def replace_empty_votes_with_average_director(df):
    # Add new column with average rate for director(s)
    average_per_director = df.groupBy("directors")\
                          .agg(F.avg("numVotes")\
                          .alias("avg num votes per director"))

    # Join: directors names and averages on df
    df = df.join(average_per_director,\
                            df.directors == average_per_director.directors,\
                            "inner")\
                            .drop(average_per_director.directors)

    # Replce empty fields with that column
    df = df.withColumn("numVotes", \
                        F.when(df["numVotes"] == 0, average_per_director["avg num votes per director"])\
                        .otherwise(df["numVotes"])) \
                        .drop(average_per_director["avg num votes per director"])
    
    return df

In [None]:
df_train = replace_special_characters(df_train, 'directors')
df_train = replace_special_characters(df_train, 'writers')
df_train = replace_empty_votes_with_average_director(df_train)

In [None]:
df_train.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_train.columns]).show() # Check if empty values


## 1.3 Data Segregation 
 		
Split subsets of data to train the model and further validate how it performs against new data (splitting it into training and evaluation subsets)

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
from pyspark.ml.feature import FeatureHasher,Tokenizer,CountVectorizer,QuantileDiscretizer
from pyspark.ml.feature import StringIndexer,VectorIndexer,MaxAbsScaler,StopWordsRemover
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
# scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
tokenizerTitle = Tokenizer(inputCol="primaryTitle", outputCol="wordsPrimaryTitle")
stopRemover = StopWordsRemover(inputCol="wordsPrimaryTitle", outputCol="wordsPrimaryTitleRemovedStop")
titleCV = CountVectorizer(inputCol="wordsPrimaryTitleRemovedStop", outputCol="primaryTitleFeatures")

tokenizerDirectors = Tokenizer(inputCol="directors", outputCol="wordsDirectors")
directorsCV = CountVectorizer(inputCol="wordsDirectors", outputCol="directorsFeatures")

tokenizerWriters= Tokenizer(inputCol="writers", outputCol="wordsWriters")
writersCV = CountVectorizer(inputCol="wordsWriters", outputCol="writersFeatures")

discretizerStartYear = QuantileDiscretizer(numBuckets=20, inputCol="startYear", outputCol="discStartYear")

discretizerRunTimeMinutes = QuantileDiscretizer(numBuckets=3, inputCol="runtimeMinutes", outputCol="discRuntimeMinutes")

discretizerNumVotes = QuantileDiscretizer(numBuckets=4, inputCol="numVotes", outputCol="discNumVotes")

assembler = VectorAssembler( inputCols=["primaryTitleFeatures","discRuntimeMinutes","discNumVotes","discStartYear","runtimeMinutes","numVotes","startYear","endYear"],outputCol="features")

# Train a GBT model.
df_train = df_train.withColumn("label", df_train.label.cast(T.IntegerType()))
gbt = GBTClassifier(labelCol="label", featuresCol="features")

#pipeline = Pipeline(stages=[assembler,featureIndexer, gbt])
#pipeline = Pipeline(stages=[  tokenizerDirectors, directorsCV, tokenizerWriters, writersCV, assembler, gbt])
pipeline = Pipeline(stages=[ tokenizerTitle, stopRemover, titleCV, discretizerStartYear,discretizerRunTimeMinutes ,discretizerNumVotes, assembler, gbt])

(trainingData, testData) = df_train.randomSplit([0.8, 0.2])

# Fit the pipeline
model = pipeline.fit(trainingData)

## 1.4 Data Validation

In [None]:
 # Select (prediction, true label) and compute accuracy , test error
prediction = model.transform(testData)
#evaluator = BinaryClassificationEvaluator(labelCol="label")
evaluator = MulticlassClassificationEvaluator(labelCol="label")

accuracy = evaluator.evaluate(prediction)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g" % (1.0 - accuracy))

#gbtModel = model.stages[1]
#print(gbtModel)  # summary only

In [None]:
evaluator2 = BinaryClassificationEvaluator(labelCol="label")
accuracy2 = evaluator2.evaluate(prediction)
print("Accuracy = %g" % (accuracy2))
print("Test Error = %g" % (1.0 - accuracy2))

# 2. Analytics/Machine Learning
*From Pramods notebook*

In [None]:
model_all = pipeline.fit(df_train)

In [None]:
df_test.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_test.columns]).show()
df_test = remove_empty(df_test, "runTimeMinutes", T.IntegerType())
df_test = df_test.withColumn("runtimeMinutes", \
                    F.when(df_test["runTimeMinutes"].isNull(), 0)\
                    .otherwise(df_test["runTimeMinutes"])) # REMOVE LAST RUNTIME MINUTE
df_test.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_test.columns]).show()


In [None]:
df_test.dtypes

In [None]:
# Make predictions on test set and create output file.
test_prediction = model_all.transform(df_test)
out_test_df = test_prediction.withColumn("prediction", test_prediction.prediction.cast(T.BooleanType()))
out_test_df = out_test_df.select('prediction')
out_test_df_pd = out_test_df.toPandas()


In [None]:
df_validation.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_validation.columns]).show()
df_validation = df_validation.withColumn("runtimeMinutes", \
                    F.when(df_validation["runTimeMinutes"].isNull(), 0)\
                    .otherwise(df_validation["runTimeMinutes"])) # REMOVE LAST RUNTIME MINUTE
df_validation.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_validation.columns]).show()


In [None]:
out_test_df_pd.to_csv("drive/MyDrive/Big Data - Grp19 IMDB/output/team-19-test-set-output.txt", index=False,header=False)

In [None]:
# Make predictions on validation set and create output file.
valid_prediction = model_all.transform(df_validation)
out_valid_df = valid_prediction.withColumn("prediction",valid_prediction.prediction.cast(T.BooleanType()))
out_valid_df = out_valid_df.select('prediction')
out_valid_df_pd = out_valid_df.toPandas()

out_valid_df_pd.to_csv("drive/MyDrive/Big Data - Grp19 IMDB/output/team-19-valid-set-output.txt", index=False,header=False)