In [19]:
# Import Statements

import csv
import os
import platform
import sys

# Spark imports
from pyspark.ml.feature import VectorAssembler, VectorSlicer, RobustScaler, CountVectorizer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, OneVsRest
from pyspark.rdd import RDD
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, LongType, StringType
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, Normalizer, VectorSlicer, OneHotEncoder

import findspark
findspark.init()



In [2]:
#---Phase 1: Data Loading & Formatting

In [3]:

# Initialize a spark session.
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

def load_df_from_csv(filename):
    spark = init_spark()
    df = spark.read.csv(filename, header=True, multiLine=True, quote="\"", escape="\"")
    return df

# UDFs used for data preprocesssing

def json_list_num(all_cast):
    cleaned = all_cast.replace("/", "")
    converted_list = list(eval(cleaned))
    return len(converted_list)


def gender_of_first_cast(all_cast):
    cleaned = all_cast.replace("/", "")
    converted_list = list(eval(cleaned))
    if converted_list and converted_list[0] and converted_list[0]["gender"]:
        return converted_list[0]["gender"]
    else:
        return None


def gender_of_second_cast(all_cast):
    cleaned = all_cast.replace("/", "")
    converted_list = list(eval(cleaned))
    if converted_list and len(converted_list)>1 and converted_list[1] and converted_list[1]["gender"]:
        return converted_list[1]["gender"]
    else:
        return None

def release_year(date):
    if date:
        return int(date[:4])
    return date


def release_month(date):
    if date:
        return int(date[5:7])
    return date


def imdb_title(title):
    return title.split("\xa0")[0]


In [4]:

# IMDB database
imdb_dataset = load_df_from_csv("datasets" + ("\\" if platform.system() == "Windows" else "/") + "imdb_movie_metadata.csv")

# remove useless columns from the dataset
imdb_dataset = imdb_dataset.drop("color", "director_name", "director_facebook_likes", "num_critic_for_reviews",
                                 "actor_3_facebook_likes", "actor_2_name", "actor_1_name",
                                 "actor_3_name", "facenumber_in_poster", "plot_keywords", "movie_imdb_link",
                                 "language", "country", "title_year", "actor_2_facebook_likes",
                                 "gross", "aspect_ratio", "actor_1_facebook_likes","movie_facebook_likes")

imdb_dataset = imdb_dataset.withColumnRenamed("movie_title", "imdb_movie_title")

udf_imdb_title = udf(imdb_title, StringType())
imdb_dataset = imdb_dataset.withColumn("imdb_movie_title", udf_imdb_title("imdb_movie_title"))

# TMDB credits database
crew_dataset = load_df_from_csv("datasets/" + "tmdb_5000_credits.csv")

udf_cast_num = udf(json_list_num, IntegerType())
udf_first_cast_gender = udf(gender_of_first_cast, IntegerType())
udf_cast_num = udf(json_list_num, IntegerType())
udf_first_cast_gender = udf(gender_of_first_cast, IntegerType())
udf_second_cast_gender = udf(gender_of_second_cast, IntegerType())

crew_dataset = crew_dataset.withColumn("cast_number", udf_cast_num("cast")) \
    .withColumn("cast_number", udf_cast_num("crew")) \
    .withColumn("first_cast_gender", udf_first_cast_gender("cast")) \
    .withColumn("second_cast_gender", udf_second_cast_gender("cast"))

crew_dataset = crew_dataset.drop("cast", "crew")
crew_dataset = crew_dataset.withColumnRenamed("title", "tmdb_movie_title")

# TMDB Movie dataset
tmdb_dataset = load_df_from_csv("datasets/" + "tmdb_5000_movies.csv")

tmdb_dataset = tmdb_dataset.select("title", "release_date", "vote_average", "revenue", "id", "popularity")

udf_release_year = udf(release_year, IntegerType())
udf_release_month = udf(release_month, IntegerType())
tmdb_dataset = tmdb_dataset.withColumn("release_year", udf_release_year("release_date"))
tmdb_dataset = tmdb_dataset.withColumn("release_month", udf_release_month("release_date"))

tmdb_dataset = tmdb_dataset.drop("release_date")

# merge the 3 datasets

two_tmdb_joined = tmdb_dataset.join(crew_dataset, crew_dataset.movie_id == tmdb_dataset.id, "inner").drop("id").drop(
    "movie_id")
dataset = two_tmdb_joined.join(imdb_dataset, two_tmdb_joined.title == imdb_dataset.imdb_movie_title, "inner").drop(
    "imdb_movie_title").drop("tmdb_movie_title")

# remove all null values
cols = dataset.columns
for col in cols:
    dataset = dataset.filter(dataset[str(col)].isNotNull())


In [5]:
# Cast all columns from string to integer type
dataset = (dataset.withColumn("vote_average",(dataset["vote_average"].cast(IntegerType()))))
dataset = (dataset.withColumn("cast_total_facebook_likes",(dataset["cast_total_facebook_likes"].cast(IntegerType()))))
dataset = (dataset.withColumn("imdb_score",(dataset["imdb_score"].cast(IntegerType()))))
dataset = (dataset.withColumn("budget",(dataset["budget"].cast(LongType()))))
dataset = (dataset.withColumn("num_user_for_reviews",(dataset["num_user_for_reviews"].cast(IntegerType()))))
dataset = (dataset.withColumn("num_voted_users",(dataset["num_voted_users"].cast(IntegerType()))))
dataset = (dataset.withColumn("popularity",(dataset["popularity"].cast(IntegerType()))))
dataset = dataset.filter(dataset.budget > 1)

In [6]:
#---Phase 2: Data Transformation

In [7]:
# Single-value categorical variable transformations


#  Convert "content rating" variable using One Hot Encoder

# Step 1 String Indexer part
indexer = StringIndexer(inputCol='content_rating', outputCol='ContentIndex')
indexed = indexer.fit(dataset).transform(dataset)

# Step 2:   OneHotEncoding part
encoder = OneHotEncoder(inputCol='ContentIndex', outputCol='OHEContentIndex')
dataset = encoder.fit(indexed).transform(indexed)

dataset = dataset.drop("ContentIndex","content_rating")

In [8]:
# Multi-value categorical variable transformations

# This function takes the dataset df and split the genres column into an array of strings
def split_genres_string(dataset, column_to_split):
    dataset = dataset.select(
        ["*", split(column_to_split, '[|]').alias('{}_arr'.format(column_to_split))]).drop(column_to_split)

    return dataset


def apply_countvectorizer(dataset, column):
    '''
    Input: dataset (dataframe) and column name that contains an array of categories 
    Output: returns the transformed dataset (dataframe) and the list of categories
    - https://stackoverflow.com/questions/58010126/pyspark-string-array-of-dynamic-length-in-dataframe-column-to-onehot-encoded
    - https://spark.apache.org/docs/2.4.0/api/python/pyspark.ml.html#pyspark.ml.feature.CountVectorizer
    '''
    cv = CountVectorizer(inputCol=column, outputCol="{}_to_vector".format(column), binary=True)
    model = cv.fit(dataset)
    model.setInputCol(column)
    set_of_categories = model.vocabulary

    # print(set_of_categories)
    
    dataset = model.transform(dataset)
    return dataset, set_of_categories


# testing genres and production_companies 
dataset = split_genres_string(dataset, "genres")
dataset, genres_set = apply_countvectorizer(dataset, "genres_arr")
dataset = dataset.drop("genres_arr")


In [9]:
print(dataset.count()) # we end up with 3810 movies
print(dataset.printSchema())

3810
root
 |-- title: string (nullable = true)
 |-- vote_average: integer (nullable = true)
 |-- revenue: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- release_month: integer (nullable = true)
 |-- cast_number: integer (nullable = true)
 |-- first_cast_gender: integer (nullable = true)
 |-- second_cast_gender: integer (nullable = true)
 |-- duration: string (nullable = true)
 |-- num_voted_users: integer (nullable = true)
 |-- cast_total_facebook_likes: integer (nullable = true)
 |-- num_user_for_reviews: integer (nullable = true)
 |-- budget: long (nullable = true)
 |-- imdb_score: integer (nullable = true)
 |-- OHEContentIndex: vector (nullable = true)
 |-- genres_arr_to_vector: vector (nullable = true)

None


In [10]:

# Converting profit to be represented on the scale: A-E

# Step 1:   Add a profit column. Remove revenue column.
#           Sort by profit (descending order).
dataset = (
    dataset.withColumn(
        "profit",
        (dataset["revenue"].cast(LongType()) - dataset["budget"].cast(LongType())),
    )
    .drop(*["revenue"])
    .sort("profit", ascending=False)
)
# Step 2:   Add an incremental ID in the "id" column to assign a grade.
#           Convert back to dataframe.
columns = dataset.columns
dataset = (
    dataset.rdd.zipWithIndex()
    .map(lambda x: (x[1],) + tuple(x[0]))
    .toDF(["id"] + columns)
)


In [34]:
count = dataset.count() // 5
profits = dataset.filter((dataset.id == 0) | (dataset.id == (count - 1)) | (dataset.id == count + 1) | (dataset.id == (count * 2 + 1))| (dataset.id == (count + 1) * 2) | (dataset.id == (count * 3 + 2))| (dataset.id == (count + 1) * 3) | (dataset.id == (count * 4 + 3))| (dataset.id == (count * 4 + 3)) | (dataset.id == (count*5)-1))
profits.select("profit").show()

+------------+
|      profit|
+------------+
|  2550965087|
|   103862963|
|   103412065|
|    28850000|
|    28848069|
|      321508|
|      303838|
|    -9800895|
|-12152172799|
+------------+



In [None]:
# Range A: [103862963, +inf)
# Range B: [28850000, 103412063)
# Range C: [321508, 28850000)
# Range D: [-98008, 321508)
# Range E: (-inf, -98008)

In [None]:
# Step 3:   Assign a grade in the "profit_grade" column, corresponding to the value.
#           Remove id and profit columns.
dataset = (
    dataset.withColumn(
        "profit_grade",
        when((dataset.id >= 0) & (dataset.id < count), "A")
        .when((dataset.id >= (count + 1)) & (dataset.id < count * 2 + 1), "B")
        .when((dataset.id >= (count + 1) * 2) & (dataset.id < count * 3 + 2), "C")
        .when((dataset.id >= (count + 1) * 3) & (dataset.id < count * 4 + 3), "D")
        .otherwise("E"),
    ).drop(*["id", "profit"])
    # Randomize to "unsort" dataset.
    .orderBy(rand())
)

In [47]:
dataset = dataset.drop("title","duration","content_rating")

In [12]:
# Get the final schema
print(dataset.printSchema())

root
 |-- vote_average: long (nullable = true)
 |-- popularity: long (nullable = true)
 |-- release_year: long (nullable = true)
 |-- release_month: long (nullable = true)
 |-- cast_number: long (nullable = true)
 |-- first_cast_gender: long (nullable = true)
 |-- second_cast_gender: long (nullable = true)
 |-- num_voted_users: long (nullable = true)
 |-- cast_total_facebook_likes: long (nullable = true)
 |-- num_user_for_reviews: long (nullable = true)
 |-- budget: long (nullable = true)
 |-- imdb_score: long (nullable = true)
 |-- OHEContentIndex: vector (nullable = true)
 |-- genres_arr_to_vector: vector (nullable = true)
 |-- profit_grade: string (nullable = false)

None


In [50]:

# Data prepration for the models

# Put all features in one vector
all_feature_cols = [item for item in dataset.columns if item != "profit_grade"]
assembler = VectorAssembler(inputCols=all_feature_cols, outputCol="userFeatures")
dataset = assembler.transform(dataset)


In [51]:
# Use RobusScaler to reduce outliers
scaler = RobustScaler(inputCol="userFeatures", outputCol="scaledFeatures",
                      withScaling=True, withCentering=False,
                      lower=0.25, upper=0.75)

scalerModel = scaler.fit(dataset)

# Transform each feature to have unit quantile range.
dataset = scalerModel.transform(dataset)

In [None]:
# Convert the label column to numeric format using StringIndexer
dataset = StringIndexer(inputCol="profit_grade", outputCol="indexedLabel").fit(dataset).transform(dataset)

In [52]:
normalizer = Normalizer(inputCol="scaledFeatures", outputCol="normalized_features", p=1.0)
dataset = normalizer.transform(dataset)

In [16]:
#---Phase 3.a: Random Forest Model & Evaluation

In [17]:
# Random Forest Model

# Split data to training and test set
trainingData, testData = dataset.randomSplit([0.8, 0.2])

# Create a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol = "normalized_features", numTrees=100,maxDepth=10)

# Train model using the training data
model = rf.fit(trainingData)


# Evaluation

predictions = model.transform(testData)

predictions.select("prediction", "indexedLabel", "normalized_features").show(5)

# Find the accuracy of the model
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Random Forest Model accuracy = %g" % (accuracy))


+----------+------------+--------------------+
|prediction|indexedLabel| normalized_features|
+----------+------------+--------------------+
|       3.0|         3.0|(48,[2,3,4,5,6,7,...|
|       3.0|         3.0|(48,[2,3,4,5,6,7,...|
|       3.0|         3.0|(48,[2,3,4,5,6,7,...|
|       3.0|         3.0|(48,[0,2,3,4,5,6,...|
|       3.0|         3.0|(48,[0,1,2,3,4,5,...|
+----------+------------+--------------------+
only showing top 5 rows

Random Forest Model accuracy = 0.592497


In [18]:
#---Phase 3.b: Logistic Regression Model & Evaluation

In [19]:
# Logistic Regression Model using One Vs Rest

# Create the base Logistic Regression classifier.
lr = LogisticRegression(maxIter=20, tol=1E-6, fitIntercept=True,featuresCol='normalized_features', labelCol='indexedLabel')

# Create the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr,featuresCol='normalized_features', labelCol='indexedLabel')

# train the multiclass model.
ovrModel = ovr.fit(trainingData)

# Evaluation

predictions = ovrModel.transform(testData)

# Find the accuracy of the model
evaluator = MulticlassClassificationEvaluator(metricName="accuracy",predictionCol="prediction", labelCol='indexedLabel')
accuracy = evaluator.evaluate(predictions)
print("Logistic Regression Model accuracy = %g" % (accuracy))

Logistic Regression Model accuracy = 0.517464


In [20]:
#---Phase 4: Feature Importance

In [21]:
# Use random forest's feature importance function to find the most discriminating features
feature_importances = model.featureImportances
feature_imp_array = feature_importances.toArray()

features_list = ['vote_average',
 'popularity',
 'release_year',
 'release_month',
 'cast_number',
 'first_cast_gender',
 'second_cast_gender',
 'num_voted_users',
 'cast_total_facebook_likes',
 'num_user_for_reviews',
 'budget',
 'imdb_score',
 'OHEContentIndex']

# map feature names to scores
feat_imp_list = []
for feature, importance in zip(features_list, feature_imp_array):
    feat_imp_list.append((feature, importance))

feat_imp_list = sorted(feat_imp_list, key=(lambda x: x[1]), reverse=True)

print(feat_imp_list)

[('budget', 0.17173164217678363), ('popularity', 0.1407144362884061), ('num_voted_users', 0.12717478260515194), ('num_user_for_reviews', 0.07319305216602837), ('release_year', 0.07164706324748484), ('OHEContentIndex', 0.04385593355082366), ('vote_average', 0.042959964843468305), ('imdb_score', 0.04032291337754476), ('first_cast_gender', 0.03962076068923135), ('cast_total_facebook_likes', 0.03915978502137939), ('cast_number', 0.03908297160829202), ('release_month', 0.03410615425972621), ('second_cast_gender', 0.032538910994601)]


In [None]:
#-- Extra
# Binary classification - Label movies based on whether they made profit or not
dataset = (
    dataset.withColumn(
        "profit_grade",
        (dataset["revenue"].cast(LongType()) > dataset["budget"].cast(LongType())),
    )
    .drop(*["revenue"])
)

dataset = (dataset.withColumn("profit_grade",(dataset["profit_grade"].cast(IntegerType()))))

print(dataset.filter(dataset.profit_grade == 1).count)

In [56]:
# Binary Random Forest Model

# Split data to training and test set
trainingData, testData = dataset.randomSplit([0.8, 0.2])

# Create a RandomForest model.
rf = RandomForestClassifier(labelCol="profit_grade", featuresCol = "normalized_features", numTrees=20,maxDepth=10)

# Train model using the training data
model = rf.fit(trainingData)


# Evaluation

predictions = model.transform(testData)

# Find the accuracy of the model
evaluator = BinaryClassificationEvaluator(labelCol="profit_grade", rawPredictionCol="prediction")
arroc = evaluator.evaluate(predictions)

print("Binary Random Forest Model ARROC = %g" % (arroc))

Binary Random Forest Model ARROC = 0.805384


In [59]:
# Binary Logistic Regression Model

# Create the base Logistic Regression classifier.
lr = LogisticRegression(maxIter=20, tol=1E-6, fitIntercept=True,featuresCol='normalized_features', labelCol='profit_grade')

# train the multiclass model.
ovrModel = ovr.fit(trainingData)

# Evaluation

predictions = ovrModel.transform(testData)

# Find the accuracy of the model
evaluator = BinaryClassificationEvaluator(labelCol="profit_grade", rawPredictionCol="prediction")
arroc = evaluator.evaluate(predictions)

print("Binary Logistic Regression Model ARROC = %g" % (arroc))

Binary Logistic Regression Model ARROC = 0.821577
