# SparkML Models Notebook

## Imports

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## Spark Session Initialization

In [3]:
spark = SparkSession.builder \
    .appName("ProjectSuccessPrediction") \
    .config("spark.executor.memory", "24g") \
    .config("spark.driver.memory", "24g") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/20 20:41:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Loading the Data Using PySpark and General Preparation

In [4]:
# Load data
data = spark.read.csv("/kick_starter_cleaned.csv", header=True, inferSchema=True)

# Drop useless columns
columns_to_drop = ["category", "currency", "goal", "deadline", "launched", "pledged", "backers"]
data = data.drop(*columns_to_drop)

# Examine and transform the 'state' column to a binary label column
data = data.withColumn("label", when(col("state") == "successful", 1).otherwise(0))

# Drop rows where the state is 'live' as per the hint in the PDF
data = data.filter(col("state") != "live")

# Drop the 'state' column as it's now redundant
data = data.drop("state")

                                                                                

## Grouping Categorical Columns and Encoding 

In [5]:
# Columns to index and encode
categorical_columns = ["name", "main_category", "country"]

# Indexing and encoding pipelines
indexers = [StringIndexer(inputCol=col, outputCol=col+"_indexed") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col+"_indexed", outputCol=col+"_encoded") for col in categorical_columns]

# Apply the indexers and encoders
pipeline = Pipeline(stages=indexers + encoders)
data = pipeline.fit(data).transform(data)

# Drop the original and indexed columns, keeping only encoded columns
data = data.drop(*categorical_columns)
for col in categorical_columns:
    data = data.drop(col+"_indexed")

                                                                                

## Fixing Numerical Columns (Conversion to Double)

In [6]:
data = data.withColumn("usd_pledged_real", data["usd_pledged_real"].cast(DoubleType()))
data = data.withColumn("usd_goal_real", data["usd_goal_real"].cast(DoubleType()))

## Dropping Null Values

In [7]:
data = data.dropna()

## Defining Feature Columns and Assembling Vector from Label + Features

In [8]:
feature_columns = [col for col in data.columns if col != "label"]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

## Selecting the Relevant Columns for Modeling

In [None]:
data = data.select("features", "label")

## Splitting Data into Training and Testing Sets

In [9]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

## Model 1: Logistic Regression

In [10]:
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)
lr_predictions = lr_model.transform(test_data)

24/05/20 20:41:54 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:42:08 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/20 20:42:10 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:42:20 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:42:25 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:42:30 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:42:36 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:42:42 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:42:47 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:42:53 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:42:57 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:43:02 WARN 

## Model 2: Decision Tree Classifier

In [11]:
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)

24/05/20 20:51:15 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:51:29 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:51:34 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:51:43 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 20:51:50 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
24/05/20 20:51:53 WARN DAGScheduler: Broadcasting large task binary with size 66.5 MiB
24/05/20 20:52:20 WARN MemoryStore: Not enough space to cache rdd_200_1 in memory! (computed 3.0 GiB so far)
24/05/20 20:52:20 WARN BlockManager: Persisting block rdd_200_1 to disk instead.
24/05/20 20:52:20 WARN MemoryStore: Not enough space to cache rdd_200_2 in memory! (computed 3.0 GiB so far)
24/05/20 20:52:20 WARN BlockManager: Persisting block rdd_200_2 to disk instead.
24/05/20 20:52:20 WARN MemoryStore: Not enough space to cache rdd_200_0 in memory! (computed 3.0 Gi

## Model 3: Random Forest Classifier

In [12]:
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)

24/05/20 21:48:43 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 21:48:55 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 21:48:59 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 21:49:08 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
24/05/20 21:49:13 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
24/05/20 21:49:17 WARN DAGScheduler: Broadcasting large task binary with size 66.5 MiB
24/05/20 21:49:39 WARN MemoryStore: Not enough space to cache rdd_244_0 in memory! (computed 3.0 GiB so far)
24/05/20 21:49:39 WARN BlockManager: Persisting block rdd_244_0 to disk instead.
24/05/20 21:49:39 WARN MemoryStore: Not enough space to cache rdd_244_2 in memory! (computed 3.0 GiB so far)
24/05/20 21:49:39 WARN BlockManager: Persisting block rdd_244_2 to disk instead.
24/05/20 21:49:39 WARN MemoryStore: Not enough space to cache rdd_244_1 in memory! (computed 3.0 Gi

## Initialising an Evaluator and Setting Accuracy as the Main Evaluation Metric

In [15]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

## Calculating Accuracies for the 3 Models Above

In [16]:
lr_accuracy = evaluator.evaluate(lr_predictions)
print(f"Logistic Regression Accuracy: {lr_accuracy}")

dt_accuracy = evaluator.evaluate(dt_predictions)
print(f"Decision Tree Accuracy: {dt_accuracy}")

rf_accuracy = evaluator.evaluate(rf_predictions)
print(f"Random Forest Accuracy: {rf_accuracy}")

24/05/20 22:32:37 WARN DAGScheduler: Broadcasting large task binary with size 59.4 MiB
                                                                                

Logistic Regression Accuracy: 0.6439538438645618


24/05/20 22:32:47 WARN DAGScheduler: Broadcasting large task binary with size 56.6 MiB
                                                                                

Decision Tree Accuracy: 0.9757179768137115


24/05/20 22:32:57 WARN DAGScheduler: Broadcasting large task binary with size 56.7 MiB

Random Forest Accuracy: 0.6385292804675134


                                                                                