##Analysis Questions
##### Can you categorize the factors for a sucessful and unsuccesful project? Classification
##### Can you predict if a project will succeed or fail? Logistic Regression
##### Can you predict how much a backer will pledge to a project based on various factors? Linear Regression
##### What are the most important factors for a successful project and unsuccessful project?

In [2]:
import pandas as pd
import chardet

In [3]:
# Reading error with UTF-8 unable to decode the file
#ks16a = pd.read_csv("/dbfs/FileStore/tables/ks_projects_201612-284ce.csv")

In [4]:
# Code based on https://www.kaggle.com/rtatman/data-cleaning-challenge-character-encodings
# Use chardet to detect character encoding: chardet shows Windows-1252 encoding

with open("/dbfs/FileStore/tables/ks_projects_201612-284ce.csv", 'rb') as rawdata:
    result = chardet.detect(rawdata.read(10000))
    
print(result)


In [5]:
# Read CSV file with encolding Windows-1252
ks16a = pd.read_csv("/dbfs/FileStore/tables/ks_projects_201612-284ce.csv", encoding='Windows-1252')


In [6]:
ks16a.head()

Unnamed: 0,ID,name,category,main_category,currency,deadline,goal,launched,pledged,state,backers,country,usd pledged,Unnamed: 13,Unnamed: 14,Unnamed: 15,Unnamed: 16
0,1000002330,The Songs of Adelaide & Abullah,Poetry,Publishing,GBP,2015-10-09 11:36:00,1000,2015-08-11 12:12:28,0,failed,0,GB,0,,,,
1,1000004038,Where is Hank?,Narrative Film,Film & Video,USD,2013-02-26 00:20:50,45000,2013-01-12 00:20:50,220,failed,3,US,220,,,,
2,1000007540,ToshiCapital Rekordz Needs Help to Complete Album,Music,Music,USD,2012-04-16 04:24:11,5000,2012-03-17 03:24:11,1,failed,1,US,1,,,,
3,1000011046,Community Film Project: The Art of Neighborhoo...,Film & Video,Film & Video,USD,2015-08-29 01:00:00,19500,2015-07-04 08:35:03,1283,canceled,14,US,1283,,,,
4,1000014025,Monarch Espresso Bar,Restaurants,Food,USD,2016-04-01 13:38:27,50000,2016-02-26 13:38:27,52375,successful,224,US,52375,,,,


In [7]:
ks16a.columns

In [8]:
# Column names have a space, rename column names to have no spaces 
ks16a.columns = ['ID', 'name', 'category', 'main_category', 'currency', 'deadline', \
       'goal', 'launched', 'pledged', 'state', 'backers', 'country', \
       'usd_pledged', 'c_13', 'c_14', 'c_15', 'c_16']
    

In [9]:
# In order to create Pyspark dataframe, cast all columns to string
ks16a["ID"] = ks16a["ID"].astype(str)
ks16a["name"] = ks16a["name"].astype(str)
ks16a["category"] = ks16a["category"].astype(str)
ks16a["main_category"] = ks16a["main_category"].astype(str)
ks16a["currency"] = ks16a["currency"].astype(str)
ks16a["deadline"] = ks16a["deadline"].astype(str)
ks16a["goal"] = ks16a["goal"].astype(str)
ks16a["launched"] = ks16a["launched"].astype(str)
ks16a["pledged"] = ks16a["pledged"].astype(str)
ks16a["state"] = ks16a["state"].astype(str)
ks16a["backers"] = ks16a["backers"].astype(str) 
ks16a["country"] = ks16a["country"].astype(str)                                              
ks16a["usd_pledged"] = ks16a["usd_pledged"].astype(str)
ks16a["c_13"] = ks16a["c_13"].astype(str)
ks16a["c_14"] = ks16a["c_14"].astype(str)
ks16a["c_15"] = ks16a["c_15"].astype(str)
ks16a["c_16"] = ks16a["c_16"].astype(str)

In [10]:
# Create a dataframe in Spark
ks16 = spark.createDataFrame(ks16a)

In [11]:
# Import functions/datatypes for timestamp, integer, and double
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Drop extra columns
ks16 = ks16.drop('c_13','c_14','c_15','c_16')


# Cast from string to integer and double
ks16 = ks16.withColumn("ID", ks16['ID'].cast(IntegerType()))
ks16 = ks16.withColumn("goal", ks16['goal'].cast(IntegerType()))
ks16 = ks16.withColumn("pledged", ks16['pledged'].cast(DoubleType()))
ks16 = ks16.withColumn("backers", ks16['backers'].cast(IntegerType()))
ks16 = ks16.withColumn("usd_pledged", ks16['usd_pledged'].cast(DoubleType()))


In [12]:
from pyspark.sql.functions import isnan, when, count, col
ks16.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in ks16.columns]).show()

In [13]:
# Drop all nulls from the data frame
ks16 = ks16.dropna()

In [14]:
# Verify if there are no nulls 
from pyspark.sql.functions import isnan, when, count, col
ks16.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in ks16.columns]).show()

In [15]:
import time
import datetime

# Convert Timestamp to Date(YYYY-MM-DD)
ks16 = ks16.withColumn("deadline", to_date(unix_timestamp("deadline", "yyyy-MM-dd").cast("timestamp")))
ks16 = ks16.withColumn("launched", to_date(unix_timestamp("launched", "yyyy-MM-dd").cast("timestamp")))

In [16]:
# No issues with decoding errors
ks18a = pd.read_csv("/dbfs/FileStore/tables/ks_projects_201801-a566d.csv")
ks18a.columns = ['ID', 'name', 'category', 'main_category', 'currency', 'deadline', \
       'goal', 'launched', 'pledged', 'state', 'backers', 'country', \
       'usd_pledged','usd_pledged_real', 'usd_goal_real']
ks18 = spark.createDataFrame(ks18a)

In [17]:
#Reference code: Chapter 5 Big Data Analysis
from pyspark.sql.functions import isnan, when, count, col

ks18.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in ks18.columns]).show()



In [18]:
# Drop all null rows
ks18 = ks18.dropna()

# Drop extra columns
ks18 = ks18.drop('usd_pledged_real','usd_goal_real')


In [19]:
# Verify there are no nulls
ks18.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in ks18.columns]).show()

In [20]:
# Convert Timestamp to Date(yyyy-MM-dd)
ks18 = ks18.withColumn('deadline',to_date(unix_timestamp('deadline','yyyy-MM-dd').cast("timestamp"))) 
ks18 = ks18.withColumn('launched',to_date(unix_timestamp('launched','yyyy-MM-dd').cast("timestamp")))

In [21]:
display(ks18.take(5))

ID,name,category,main_category,currency,deadline,goal,launched,pledged,state,backers,country,usd_pledged
1000002330,The Songs of Adelaide & Abullah,Poetry,Publishing,GBP,2015-10-09,1000.0,2015-08-11,0.0,failed,0,GB,0.0
1000003930,Greeting From Earth: ZGAC Arts Capsule For ET,Narrative Film,Film & Video,USD,2017-11-01,30000.0,2017-09-02,2421.0,failed,15,US,100.0
1000004038,Where is Hank?,Narrative Film,Film & Video,USD,2013-02-26,45000.0,2013-01-12,220.0,failed,3,US,220.0
1000007540,ToshiCapital Rekordz Needs Help to Complete Album,Music,Music,USD,2012-04-16,5000.0,2012-03-17,1.0,failed,1,US,1.0
1000011046,Community Film Project: The Art of Neighborhood Filmmaking,Film & Video,Film & Video,USD,2015-08-29,19500.0,2015-07-04,1283.0,canceled,14,US,1283.0


In [22]:
ks = ks16.union(ks18)

In [23]:
# Add the length of the project
ks = ks.withColumn('duration',datediff(ks.deadline,ks.launched))

In [24]:
display(ks.take(5))

ID,name,category,main_category,currency,deadline,goal,launched,pledged,state,backers,country,usd_pledged,duration
1000002330,The Songs of Adelaide & Abullah,Poetry,Publishing,GBP,2015-10-09,1000.0,2015-08-11,0.0,failed,0,GB,0.0,59
1000004038,Where is Hank?,Narrative Film,Film & Video,USD,2013-02-26,45000.0,2013-01-12,220.0,failed,3,US,220.0,45
1000007540,ToshiCapital Rekordz Needs Help to Complete Album,Music,Music,USD,2012-04-16,5000.0,2012-03-17,1.0,failed,1,US,1.0,30
1000011046,Community Film Project: The Art of Neighborhood Filmmaking,Film & Video,Film & Video,USD,2015-08-29,19500.0,2015-07-04,1283.0,canceled,14,US,1283.0,56
1000014025,Monarch Espresso Bar,Restaurants,Food,USD,2016-04-01,50000.0,2016-02-26,52375.0,successful,224,US,52375.0,35


In [25]:
ks.schema

In [26]:
# Importing libraries to visualize the data
import matplotlib.pyplot as plt
import seaborn as sns
ksp=ks.toPandas()


In [27]:
ksp.dtypes

In [28]:
# Scientific Notations formatting
pd.options.display.float_format = '{:,.2f}'.format

In [29]:
# Show statistics of pandas dataframe
ksp.describe()

Unnamed: 0,ID,goal,pledged,backers,usd_pledged,duration
count,694188.0,694188.0,694188.0,694188.0,694188.0,694188.0
mean,1074728555.73,48660.43,9296.76,104.92,7409.81,34.63
std,619246061.86,1169751.31,93286.87,925.01,81477.01,68.75
min,5971.0,0.0,0.0,0.0,0.0,1.0
25%,537722510.0,2000.0,30.0,2.0,20.26,30.0
50%,1075550449.5,5250.0,615.0,12.0,456.68,30.0
75%,1610536778.25,16000.0,4022.07,56.0,3262.08,38.0
max,2147476221.0,100000000.0,20338986.27,219382.0,20338986.27,16739.0


In [30]:
# Distinct values for each column
for i in ks.columns:
    print(i, ks.select(i).distinct().count())

In [31]:
# Explore the total amount of usd pledged for different countries
ks_usd=ks.select('country','usd_pledged').groupby('country').sum('usd_pledged').sort('country',ascending=False).toPandas()

In [32]:
ks_usd.head()

Unnamed: 0,country,sum(usd_pledged)
0,US,4398992294.77
1,GB,356535857.28
2,CA,136365845.13
3,AU,58896499.85
4,DE,41134654.81


In [33]:
ks_usd.columns=['country','total usd pledged']

In [34]:
sns.barplot(x='country', y='total usd pledged', data=ks_usd).set_title("Country and Total usd pledged distribution in billions")
plt.ylim(0,4500000000)
display(plt.show())
# Y axis which is sum(usd pledged) is in bilions and we can see that US has the highest amount pledged around 4.4 billions followed by GB which is just 0.3 billion

In [35]:
ks_state=ks.groupby('state').count().sort('count',ascending=True).toPandas()

In [36]:
ks_state

Unnamed: 0,state,count
0,suspended,3320
1,live,7225
2,canceled,71089
3,successful,246827
4,failed,365727


In [37]:
sns.barplot(x='state', y='count', data=ks_state).set_title("Project state distribution")
display(plt.show())

In [38]:
from pyspark.sql.functions import *
ks_backers=ks.select('main_category','backers').groupby('main_category').agg(count('backers')).sort('count(backers)',ascending=False).toPandas()

In [39]:
ks_backers

Unnamed: 0,main_category,count(backers)
0,Film & Video,119473
1,Music,93864
2,Publishing,73105
3,Games,63225
4,Technology,58671
5,Design,53928
6,Art,52111
7,Food,45794
8,Fashion,41190
9,Theater,20873


In [40]:
fig,b=plt.subplots()
b=sns.barplot(x='count(backers)', y='main_category', data=ks_backers).set_title("Project category distribution by no. of backers ")
plt.show()

In [41]:
ksyear = ks.withColumn("launched", ks['launched'].cast('string'))
ksyear = ks.withColumn("launched", ks['launched'].substr(0,4))

In [42]:
pd.options.display.float_format = '{:,.2f}'.format

In [43]:
ks_year=ksyear.select('usd_pledged','launched').groupby('launched').sum('usd_pledged').sort('launched').toPandas()

In [44]:
fig,c=plt.subplots()
c=sns.lineplot(x='launched', y='sum(usd_pledged)', data=ks_year).set_title("USD distribution as per the year")
display(fig)

#We 

In [45]:
#Now, we will analyse the distribution of successful and failed projects by main categories

failed = ksp.loc[ksp.state=='failed']
successful = ksp.loc[ksp.state=='successful']


In [46]:
fig,ax=plt.subplots(1,2)
sns.barplot(x=successful.main_category.value_counts().values,
            y=successful.main_category.value_counts().index, data=successful,ax=ax[0]).set_title("Dist. for successful projects")
sns.barplot(x=failed.main_category.value_counts().values,
            y=failed.main_category.value_counts().index, data=failed,ax=ax[1]).set_title("Dist. for failed projects")
plt.subplots_adjust(wspace=0.8)
plt.subplots_adjust(bottom=0.1, top=0.90)
display(fig)

In [47]:
#Distribution according to main category of Successful projects according to different columns

ksp['duration(days)'] = (ksp['deadline'] - ksp['launched']).dt.days


In [48]:
mean_ks=successful.groupby(['main_category']).mean()
mean_ks

Unnamed: 0_level_0,ID,goal,pledged,backers,usd_pledged,duration,duration(days)
main_category,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
Art,1080772661.9,4588.9,7438.9,87.69,5746.16,30.56,30.56
Comics,1059946725.41,5681.61,12014.88,235.04,10075.17,32.24,32.24
Crafts,1079588146.82,3290.66,6401.4,94.55,4418.12,29.12,29.12
Dance,1079644681.23,4727.84,5331.03,63.53,4651.4,31.86,31.86
Design,1078844382.4,17309.04,66792.01,619.87,48076.1,33.51,33.51
Fashion,1067020313.68,10347.23,22848.05,221.93,16672.02,31.43,31.43
Film & Video,1072617776.05,11297.72,14134.45,152.91,12921.87,32.41,32.41
Food,1077180200.66,11944.37,17567.45,180.0,15054.37,31.59,31.59
Games,1077355922.51,15694.1,55528.41,833.12,45585.35,30.21,30.21
Journalism,1088768563.35,9421.65,12273.79,152.53,9140.65,32.2,32.2


In [49]:
fig,ax=plt.subplots(1,2,figsize=(25,10))
sns.barplot(x=mean_ks.backers.values,
            y=mean_ks.backers.index, data=mean_ks, ax=ax[0]).set_title("Dist. as per avg backers")
sns.barplot(x=mean_ks['usd pledged'].values,
            y=mean_ks['usd pledged'].index, data=mean_ks, ax=ax[1]).set_title("Dist. as per avg pledged")
plt.subplots_adjust(wspace=0.8)
plt.subplots_adjust(bottom=0.2, top=0.90)
display(fig)

In [50]:
fig,d=plt.subplots()
sns.barplot(x=mean_ks.duration.values,
            y=mean_ks.duration.index, data=mean_ks).set_title("Dist. of avg. project duration")
plt.show()

In [51]:
mean_f=failed.groupby(['main_category']).mean()
mean_f

Unnamed: 0_level_0,ID,goal,pledged,backers,usd_pledged,duration,duration(days)
main_category,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
Art,1073367658.36,66405.07,644.58,8.94,588.05,34.1,34.1
Comics,1063390632.97,45115.75,1026.88,21.26,926.49,36.88,36.88
Crafts,1076045361.53,13627.92,386.18,5.99,280.75,32.38,32.38
Dance,1067779509.75,16120.86,644.29,9.65,584.98,34.73,34.73
Design,1078147053.87,61111.69,3172.78,32.27,2519.62,34.68,34.68
Fashion,1070673706.44,29741.76,910.76,9.92,732.17,33.22,33.22
Film & Video,1073626532.84,118038.52,1572.22,15.29,1430.1,37.05,37.05
Food,1083999764.99,62228.99,1113.4,12.75,985.0,34.79,34.79
Games,1071356906.32,63271.86,2470.48,45.17,2125.37,34.08,34.08
Journalism,1074044222.34,77538.36,591.9,7.8,484.26,34.96,34.96


In [52]:
# Import the required libraries

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml import Pipeline

In [53]:
ks_dt=ks.select('main_category','goal','state','backers','country','usd_pledged','duration').filter("state== 'successful' or state== 'failed'")
ks_dt.show()

In [54]:
# Create a 70-30 train test split

train_data,test_data=ks_dt.randomSplit([0.7,0.3])

In [55]:
train_data.show(5)
test_data.show(5)

In [56]:
# Use StringIndexer to convert the categorical columns to hold numerical data

main_category_indexer = StringIndexer(inputCol='main_category',outputCol='main_category_index',handleInvalid='keep')
state_indexer = StringIndexer(inputCol='state',outputCol='state_index',handleInvalid='keep')
country_indexer = StringIndexer(inputCol='country',outputCol='country_index',handleInvalid='keep')

In [57]:
# Vector assembler is used to create a vector of input features

assembler = VectorAssembler(inputCols=['main_category_index','country_index','goal','backers','usd_pledged','duration'],
                            outputCol='features')

In [58]:
# Create an object for the Decision Tree model
dt_model = DecisionTreeClassifier(labelCol='state_index',maxBins=5000)

In [59]:
# Pipeline is used to pass the data through indexer and assembler simultaneously in order to specify our machine learning algorithm workflow.

pipe = Pipeline(stages=[main_category_indexer,state_indexer,country_indexer,assembler,dt_model])

In [60]:
fit_model=pipe.fit(train_data)

In [61]:
results = fit_model.transform(test_data)

In [62]:
results.show(5)

In [63]:
display(results.select(['state_index','prediction']).take(5))

state_index,prediction
1.0,0.0
0.0,0.0
1.0,1.0
1.0,0.0
1.0,1.0


In [64]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [65]:
ACC_evaluator = MulticlassClassificationEvaluator(
    labelCol="state_index", predictionCol="prediction", metricName="accuracy")

In [66]:
dtModel = fit_model.stages[4]
print(dtModel) 
accuracy = ACC_evaluator.evaluate(results)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g" % (1.0 - accuracy))

In [67]:
fit_model.stages[-1].featureImportances

In [68]:
display(fit_model.stages[-1])

treeNode
"{""index"":19,""featureType"":""continuous"",""prediction"":null,""threshold"":17.5,""categories"":null,""feature"":3,""overflow"":false}"
"{""index"":13,""featureType"":""continuous"",""prediction"":null,""threshold"":850.5,""categories"":null,""feature"":2,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":195.025,""categories"":null,""feature"":4,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":199.5,""categories"":null,""feature"":2,""overflow"":false}"
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":18.00302172,""categories"":null,""feature"":4,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":5,""featureType"":""continuous"",""prediction"":null,""threshold"":8.5,""categories"":null,""feature"":3,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


In [69]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [70]:
ExtractFeatureImp(fit_model.stages[-1].featureImportances, results , "features")

Unnamed: 0,idx,name,vals,score
1,3,backers,,0.6
2,4,usd_pledged,,0.21
0,2,goal,,0.19
5,1,country_index,"[US, GB, CA, AU, DE, NL, FR, IT, ES, SE, NZ, D...",0.0
3,5,duration,,0.0
4,0,main_category_index,"[Film & Video, Music, Publishing, Games, Techn...",0.0


In [71]:
display(ks.limit(5))

ID,name,category,main_category,currency,deadline,goal,launched,pledged,state,backers,country,usd_pledged,duration
1000002330,The Songs of Adelaide & Abullah,Poetry,Publishing,GBP,2015-10-09,1000.0,2015-08-11,0.0,failed,0,GB,0.0,59
1000004038,Where is Hank?,Narrative Film,Film & Video,USD,2013-02-26,45000.0,2013-01-12,220.0,failed,3,US,220.0,45
1000007540,ToshiCapital Rekordz Needs Help to Complete Album,Music,Music,USD,2012-04-16,5000.0,2012-03-17,1.0,failed,1,US,1.0,30
1000011046,Community Film Project: The Art of Neighborhood Filmmaking,Film & Video,Film & Video,USD,2015-08-29,19500.0,2015-07-04,1283.0,canceled,14,US,1283.0,56
1000014025,Monarch Espresso Bar,Restaurants,Food,USD,2016-04-01,50000.0,2016-02-26,52375.0,successful,224,US,52375.0,35


In [72]:

# Getting Numerical Columns from the Spark Dataframe
num_cols=['goal','backers','usd_pledged','duration']

In [73]:
categoricalcols=[]
for col in ks_dt.columns:
    if col in num_cols:
        pass
    else:
        categoricalcols.append(col)

In [74]:
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer

In [75]:
# Converting categorical columns to numeric using string indexer
flag=0
for col in categoricalcols:
    if flag != 1:
        indexer = StringIndexer(inputCol=col, outputCol=col+"_categorical").fit(ks_dt)
        indexed_df = indexer.transform(ks_dt)
        flag=1
    else:
        indexer = StringIndexer(inputCol=col, outputCol=col+"_categorical").fit(indexed_df)
        indexed_df = indexer.transform(indexed_df)

In [76]:
display(indexed_df.take(5))

main_category,goal,state,backers,country,usd_pledged,duration,main_category_categorical,state_categorical,country_categorical
Publishing,1000.0,failed,0,GB,0.0,59,2.0,0.0,1.0
Film & Video,45000.0,failed,3,US,220.0,45,0.0,0.0,0.0
Music,5000.0,failed,1,US,1.0,30,1.0,0.0,0.0
Food,50000.0,successful,224,US,52375.0,35,7.0,1.0,0.0
Food,1000.0,successful,16,US,1205.0,20,7.0,1.0,0.0


In [77]:
indexed_df.printSchema()

In [78]:
indexed_df=indexed_df.drop('main_category','state','country')

In [79]:
pd.options.display.float_format = '{:,.2f}'.format

In [80]:
# Common statistics of all the columns
indexed_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
goal,612554,43156.51625628107,1085174.8589868369,0.0,1.0E8
backers,612554,114.44249486575877,980.4828413222216,0,219382
usd_pledged,612554,8098.233586188493,86037.88362806017,0.0,2.033898627E7
duration,612554,34.06963794212429,12.793770766391441,1,92
main_category_categorical,612554,4.184708613444692,3.654474808862858,0.0,14.0
state_categorical,612554,0.4029473319903225,0.4904907464806698,0.0,1.0
country_categorical,612554,0.6935943606604479,2.2036375703228135,0.0,21.0


In [81]:
#Creating a Correlation Dictionary to select the best positively and negatively Correlated values.
correlation_dict={}

for col in indexed_df.columns:
    correlation_dict[col]=indexed_df.corr(col1='backers',col2=col)

In [82]:
#https://stackoverflow.com/questions/613183/how-do-i-sort-a-dictionary-by-value
import operator
sorted_dict = sorted(correlation_dict.items(), key=operator.itemgetter(1),reverse=True)

In [83]:
sorted_dict[0:1][0][0]

In [84]:
sorted_dict[-6:]

In [85]:
# Creating new_col_list which joins the slices created above.
new_col_list=sorted_dict[0:1]+sorted_dict[-6:]

In [86]:
collist=[]
for item in new_col_list:
    if item[0]=='state':
        continue
    else:
        collist.append(item[0])

In [87]:
# Getting Correlation within the columns of collist. 
f=indexed_df.select(collist).toPandas().corr()

In [88]:
#Creating heatmap for visualizing the correlated columns so that we can eliminate auto-correlation.

import seaborn as sns
import matplotlib.pyplot as plt

fig,ax=plt.subplots()
plt.figure(figsize=(10,8))
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
sns.heatmap(data=f)
display(fig)

In [89]:
import matplotlib.pyplot as plt
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

columns = ['col1','col2','col3']

myGraph=spark.createDataFrame([(1.3,2.1,3.0),
                               (2.5,4.6,3.1),
                               (6.5,7.2,10.0)],
                              columns)
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=['col1','col2','col3'], 
                            outputCol=vector_col)
myGraph_vector = assembler.transform(myGraph).select(vector_col)
matrix = Correlation.corr(myGraph_vector, vector_col)

In [90]:
matrix = Correlation.corr(myGraph_vector, vector_col).collect()[0][0]
corrmatrix = matrix.toArray().tolist()
print(corrmatrix)

In [91]:
def plot_corr_matrix(correlations,attr,fig_no):
    fig=plt.figure(fig_no)
    ax=fig.add_subplot(111)
    ax.set_title("Correlation Matrix for Specified Attributes")
    ax.set_xticklabels(['']+attr)
    ax.set_yticklabels(['']+attr)
    cax=ax.matshow(correlations,vmax=1,vmin=-1)
    fig.colorbar(cax)
    plt.show()

In [92]:
plot_corr_matrix(corrmatrix, columns, 234)