In [1]:
import pandas as pd
import numpy as np
import chardet
#Import PySpark libraries 
import pyspark
from pyspark import SparkContext, SparkConf
# Import functions/datatypes for timestamp, integer, and double
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import datetime
# Code based on https://www.kaggle.com/rtatman/data-cleaning-challenge-character-encodings
# Use chardet to detect character encoding: chardet shows Windows-1252 encoding
with open("/dbfs/FileStore/tables/ks_projects_201612-284ce.csv", 'rb') as rawdata:
    result = chardet.detect(rawdata.read(10000))
    
print(result)


In [2]:
# Read CSV file with encolding Windows-1252
ks16a = pd.read_csv("/dbfs/FileStore/tables/ks_projects_201612-284ce.csv", encoding='Windows-1252')
# Column names have a space, rename column names to have no spaces 
ks16a.columns = ['ID', 'name', 'category', 'main_category', 'currency', 'deadline', \
       'goal', 'launched', 'pledged', 'state', 'backers', 'country', \
       'usd_pledged', 'c_13', 'c_14', 'c_15', 'c_16']    
# Cast all columns to string
ks16a["ID"] = ks16a["ID"].astype(str)
ks16a["name"] = ks16a["name"].astype(str)
ks16a["category"] = ks16a["category"].astype(str)
ks16a["main_category"] = ks16a["main_category"].astype(str)
ks16a["currency"] = ks16a["currency"].astype(str)
ks16a["deadline"] = ks16a["deadline"].astype(str)
ks16a["goal"] = ks16a["goal"].astype(str)
ks16a["launched"] = ks16a["launched"].astype(str)
ks16a["pledged"] = ks16a["pledged"].astype(str)
ks16a["state"] = ks16a["state"].astype(str)
ks16a["backers"] = ks16a["backers"].astype(str) 
ks16a["country"] = ks16a["country"].astype(str)                                              
ks16a["usd_pledged"] = ks16a["usd_pledged"].astype(str)
ks16a["c_13"] = ks16a["c_13"].astype(str)
ks16a["c_14"] = ks16a["c_14"].astype(str)
ks16a["c_15"] = ks16a["c_15"].astype(str)
ks16a["c_16"] = ks16a["c_16"].astype(str)
# Create a dataframe in Spark
ks16 = spark.createDataFrame(ks16a)
# Drop extra columns
ks16 = ks16.drop('c_13','c_14','c_15','c_16')
# Cast from string to integer and double
ks16 = ks16.withColumn("ID", ks16['ID'].cast(IntegerType()))
ks16 = ks16.withColumn("goal", ks16['goal'].cast(IntegerType()))
ks16 = ks16.withColumn("pledged", ks16['pledged'].cast(DoubleType()))
ks16 = ks16.withColumn("backers", ks16['backers'].cast(IntegerType()))
ks16 = ks16.withColumn("usd_pledged", ks16['usd_pledged'].cast(DoubleType()))


In [3]:
from pyspark.sql.functions import isnan, when, count, col
ks16.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in ks16.columns]).show()

In [4]:
# Drop all nulls from the data frame
ks16 = ks16.dropna()

In [5]:
from pyspark.sql.functions import isnan, when, count, col
ks16.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in ks16.columns]).show()

In [6]:
# Convert from date/time to just date
ks16 = ks16.withColumn("deadline", to_date(unix_timestamp("deadline", "yyyy-MM-dd").cast("timestamp")))
ks16 = ks16.withColumn("launched", to_date(unix_timestamp("launched", "yyyy-MM-dd").cast("timestamp")))
# No issues with decoding errors
ks18a = pd.read_csv("/dbfs/FileStore/tables/ks_projects_201801-a566d.csv")
ks18a.columns = ['ID', 'name', 'category', 'main_category', 'currency', 'deadline', \
       'goal', 'launched', 'pledged', 'state', 'backers', 'country', \
       'usd_pledged', 'usd_pledged_real','usd_goal_real']
ks18 = spark.createDataFrame(ks18a)

In [7]:
#Reference code: Chapter 5 Big Data Analysis
from pyspark.sql.functions import isnan, when, count, col

ks18.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in ks18.columns]).show()



In [8]:
# Drop all null rows
ks18 = ks18.dropna()
# Drop extra columns
ks18 = ks18.drop('usd_pledged_real','usd_goal_real')


In [9]:
# Verify there are no nulls
ks18.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in ks18.columns]).show()

In [10]:
# Convert from date/time to just date
ks18 = ks18.withColumn("deadline", to_date(unix_timestamp("deadline", "yyyy-MM-dd").cast("timestamp")))
ks18 = ks18.withColumn("launched", to_date(unix_timestamp("launched", "yyyy-MM-dd").cast("timestamp")))
ks = ks16.union(ks18)
# Add the length of the project
ks = ks.withColumn('duration',datediff(ks.deadline,ks.launched))

In [11]:
pd.options.display.float_format = '{:,.2f}'.format

In [12]:
# Use a different dataframe to transform during analysis (create restore point)
ks1 = ks
ks1 = ks1.select('main_category', 'goal', 'state', 'backers', 'country', 'usd_pledged','duration')
# Analyze the successful and failed project data, live and cancelled project data are dropped
ks1 = ks1.filter("state == 'successful' or state == 'failed'")
display(ks1.take(10))

main_category,goal,state,backers,country,usd_pledged,duration
Publishing,1000.0,failed,0,GB,0.0,59
Film & Video,45000.0,failed,3,US,220.0,45
Music,5000.0,failed,1,US,1.0,30
Food,50000.0,successful,224,US,52375.0,35
Food,1000.0,successful,16,US,1205.0,20
Food,25000.0,failed,40,US,453.0,45
Publishing,2500.0,failed,0,CA,0.0,30
Music,12500.0,successful,100,US,12700.0,30
Crafts,5000.0,failed,0,US,0.0,30
Games,200000.0,failed,0,US,0.0,45


In [13]:
from pyspark.ml.classification import  RandomForestClassifier
from pyspark.ml.feature import CountVectorizer,StringIndexer, OneHotEncoderEstimator, VectorAssembler, VectorSlicer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit


In [14]:
train_data,test_data=ks1.randomSplit([0.7,0.3], seed=123)
display(train_data.take(5))


main_category,goal,state,backers,country,usd_pledged,duration
Art,1.0,successful,1,US,1.0,33
Art,3.0,successful,7,US,79.0,30
Art,4.0,successful,12,US,205.0,30
Art,5.0,failed,2,US,2.0,30
Art,5.0,failed,2,US,2.0,30


In [15]:
display(test_data.take(5))

main_category,goal,state,backers,country,usd_pledged,duration
Art,1.0,successful,17,GB,228.26419409,30
Art,1.0,successful,37,US,796.0,30
Art,1.0,successful,78,US,834.0,18
Art,5.0,successful,3,US,31.0,30
Art,5.0,successful,50,US,121.0,3


In [16]:
# Convert the categorical columns to hold numerical data
category_indexer = StringIndexer(inputCol='main_category',outputCol='category_index',handleInvalid='keep')
state_indexer = StringIndexer(inputCol='state',outputCol='state_index',handleInvalid='keep')
country_indexer = StringIndexer(inputCol='country',outputCol='country_index',handleInvalid='keep')

# Vector assembler is used to create a vector of input features
vector_assembler = VectorAssembler(inputCols=['category_index','country_index','goal','backers','duration','usd_pledged'],
                            outputCol="features")

In [17]:
rf_model = RandomForestClassifier(labelCol="state_index", featuresCol="features", seed = 321,
                            numTrees=100, cacheNodeIds = True, subsamplingRate = 0.7)

In [18]:
pipe = Pipeline(stages=[category_indexer,state_indexer,country_indexer,vector_assembler, rf_model])
# Train the model with training data
fitted_pipe=pipe.fit(train_data)
# Transform the test data based on model from training data
rf1_results=fitted_pipe.transform(test_data)

In [19]:
display(rf1_results.select(['state_index','rawPrediction','prediction','probability']).take(5))

state_index,rawPrediction,prediction,probability
1.0,"List(1, 3, List(), List(16.422129161723497, 83.5778708382765, 0.0))",1.0,"List(1, 3, List(), List(0.16422129161723498, 0.8357787083827649, 0.0))"
1.0,"List(1, 3, List(), List(9.473562652464901, 90.52643734753511, 0.0))",1.0,"List(1, 3, List(), List(0.094735626524649, 0.9052643734753509, 0.0))"
1.0,"List(1, 3, List(), List(7.3609005582196465, 92.63909944178037, 0.0))",1.0,"List(1, 3, List(), List(0.07360900558219645, 0.9263909944178036, 0.0))"
1.0,"List(1, 3, List(), List(82.31375422830057, 17.68624577169945, 0.0))",0.0,"List(1, 3, List(), List(0.8231375422830055, 0.1768624577169945, 0.0))"
1.0,"List(1, 3, List(), List(10.750342012721335, 89.24965798727867, 0.0))",1.0,"List(1, 3, List(), List(0.10750342012721335, 0.8924965798727866, 0.0))"


In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [21]:
fit_model = rf_model.fit(rf1_results.select(['features','state_index']))
ACC_evaluator = MulticlassClassificationEvaluator(
    labelCol="state_index", predictionCol="prediction", metricName="accuracy")

In [22]:
# Print the model
rfModel = fitted_pipe.stages[4]
print(rfModel,"\n") 
# Print the accuracy and the test data error
accuracy = ACC_evaluator.evaluate(rf1_results)*100
print("Accuracy = %g" % (accuracy)," %\n")
print("Test Error = %g" % (100 - accuracy)," %\n")

In [23]:
# Show feature importance vector
fitted_pipe.stages[-1].featureImportances

In [24]:
# function from https://www.timlrx.com/2018/06/19/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator/
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [25]:
# Use defined function to extract the important features in the data 
ExtractFeatureImp(fitted_pipe.stages[-1].featureImportances, rf1_results , "features")

Unnamed: 0,idx,name,vals,score
1,3,backers,,0.52
3,5,usd_pledged,,0.29
0,2,goal,,0.17
4,0,category_index,"[Film & Video, Music, Publishing, Games, Techn...",0.01
2,4,duration,,0.0
5,1,country_index,"[US, GB, CA, AU, DE, NL, FR, IT, ES, SE, NZ, D...",0.0


In [26]:
# Display tree not implemented for Random Forest Classification
display(fitted_pipe.stages[-1])