In [0]:
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/lucas.guo@autuni.ac.nz/suicide.csv")

In [0]:
df1.printSchema()

root
 |-- country: string (nullable = true)
 |-- year: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- suicides_no: string (nullable = true)
 |-- population: string (nullable = true)
 |-- suicides/100k pop: string (nullable = true)
 |-- country-year: string (nullable = true)
 |-- HDI for year: string (nullable = true)
 |--  gdp_for_year ($) : string (nullable = true)
 |-- gdp_per_capita ($): string (nullable = true)
 |-- generation: string (nullable = true)



In [0]:
df1.show()

+-------+----+------+-----------+-----------+----------+-----------------+------------+------------+------------------+------------------+---------------+
|country|year|   sex|        age|suicides_no|population|suicides/100k pop|country-year|HDI for year| gdp_for_year ($) |gdp_per_capita ($)|     generation|
+-------+----+------+-----------+-----------+----------+-----------------+------------+------------+------------------+------------------+---------------+
|Albania|1987|  male|15-24 years|         21|    312900|             6.71| Albania1987|        null|     2,156,624,900|               796|   Generation X|
|Albania|1987|  male|35-54 years|         16|    308000|             5.19| Albania1987|        null|     2,156,624,900|               796|         Silent|
|Albania|1987|female|15-24 years|         14|    289700|             4.83| Albania1987|        null|     2,156,624,900|               796|   Generation X|
|Albania|1987|  male|  75+ years|          1|     21800|             4

In [0]:
df1.describe().show()

+-------+----------+------------------+------+-----------+------------------+------------------+------------------+--------------+-------------------+------------------+------------------+----------+
|summary|   country|              year|   sex|        age|       suicides_no|        population| suicides/100k pop|  country-year|       HDI for year| gdp_for_year ($) |gdp_per_capita ($)|generation|
+-------+----------+------------------+------+-----------+------------------+------------------+------------------+--------------+-------------------+------------------+------------------+----------+
|  count|     27820|             27820| 27820|      27820|             27820|             27820|             27820|         27820|               8364|             27820|             27820|     27820|
|   mean|      null|2001.2583752695903|  null|       null|242.57440690150972|1844793.6173975556|12.816097411933894|          null| 0.7766011477761785|              null|16866.464414090584|      null|


In [0]:
from pyspark.sql.functions import col,isnan, when, count
df1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df1.columns]
   ).show()

+-------+----+---+---+-----------+----------+-----------------+------------+------------+------------------+------------------+----------+
|country|year|sex|age|suicides_no|population|suicides/100k pop|country-year|HDI for year| gdp_for_year ($) |gdp_per_capita ($)|generation|
+-------+----+---+---+-----------+----------+-----------------+------------+------------+------------------+------------------+----------+
|      0|   0|  0|  0|          0|         0|                0|           0|       19456|                 0|                 0|         0|
+-------+----+---+---+-----------+----------+-----------------+------------+------------+------------------+------------------+----------+



In [0]:
df1.groupBy("sex").count().show(truncate=False)

+------+-----+
|sex   |count|
+------+-----+
|female|13910|
|male  |13910|
+------+-----+



In [0]:
df1.groupBy("age").count().show(truncate=False)

+-----------+-----+
|age        |count|
+-----------+-----+
|55-74 years|4642 |
|25-34 years|4642 |
|5-14 years |4610 |
|75+ years  |4642 |
|15-24 years|4642 |
|35-54 years|4642 |
+-----------+-----+



In [0]:
df1.groupBy("generation").count().show(truncate=False)

+---------------+-----+
|generation     |count|
+---------------+-----+
|Generation X   |6408 |
|Generation Z   |1470 |
|Millenials     |5844 |
|Silent         |6364 |
|G.I. Generation|2744 |
|Boomers        |4990 |
+---------------+-----+



In [0]:
df1.printSchema()
df1.show()


root
 |-- country: string (nullable = true)
 |-- year: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- suicides_no: string (nullable = true)
 |-- population: string (nullable = true)
 |-- suicide_rate: string (nullable = true)
 |-- country-year: string (nullable = true)
 |-- HDI for year: string (nullable = true)
 |-- gdp_year: float (nullable = true)
 |-- gdp_capita: string (nullable = true)
 |-- generation: string (nullable = true)
 |-- suicide_risk: integer (nullable = false)
 |-- log_GDP: double (nullable = true)

+-------+----+------+-----------+-----------+----------+------------+------------+------------+-----------+----------+---------------+------------+------------------+
|country|year|   sex|        age|suicides_no|population|suicide_rate|country-year|HDI for year|   gdp_year|gdp_capita|     generation|suicide_risk|           log_GDP|
+-------+----+------+-----------+-----------+----------+------------+------------+--------

In [0]:
df1 = df1.withColumnRenamed(" gdp_for_year ($) ","gdp_year")
df1 = df1.withColumnRenamed("gdp_per_capita ($)","gdp_capita")
df1 = df1.withColumnRenamed("suicides/100k pop","suicide_rate")

In [0]:
from pyspark.sql.functions import udf,concat,col,lit
commaRep = udf(lambda x:x.replace(',',''))
df1 = df1.withColumn('gdp_year',commaRep('gdp_year'))
df1 = df1.withColumn('gdp_year',df1.gdp_year.cast('float'))

In [0]:
df1.describe().show()

+-------+----------+------------------+------+-----------+------------------+------------------+------------------+--------------+-------------------+--------------------+------------------+----------+------------------+------------------+
|summary|   country|              year|   sex|        age|       suicides_no|        population|      suicide_rate|  country-year|       HDI for year|            gdp_year|        gdp_capita|generation|      suicide_risk|           log_GDP|
+-------+----------+------------------+------+-----------+------------------+------------------+------------------+--------------+-------------------+--------------------+------------------+----------+------------------+------------------+
|  count|     27820|             27820| 27820|      27820|             27820|             27820|             27820|         27820|               8364|               27820|             27820|     27820|             27820|             27820|
|   mean|      null|2001.2583752695903| 

In [0]:
from pyspark.sql import functions as F
df1 = df1.withColumn("suicide_risk", F.when(df1.suicide_rate>12.81,1).otherwise(0))
df1 = df1.withColumn("log_GDP", F.log(df1.gdp_year))
df1 = df1.withColumn("log_GDP_c", F.log(df1.gdp_capita))

In [0]:
df2 = df1.drop('suicides_no','population','suicide_rate','country-year','HDI for year','gdp_year','gdp_capita')
df2.printSchema()

root
 |-- country: string (nullable = true)
 |-- year: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- generation: string (nullable = true)
 |-- suicide_risk: integer (nullable = false)
 |-- log_GDP: double (nullable = true)
 |-- log_GDP_c: double (nullable = true)



In [0]:
df2.select('suicide_risk').describe().show()

+-------+------------------+
|summary|      suicide_risk|
+-------+------------------+
|  count|             27820|
|   mean|0.3148454349388929|
| stddev| 0.464462637216317|
|    min|                 0|
|    max|                 1|
+-------+------------------+



In [0]:
df2.groupBy("suicide_risk").count().show(truncate=False)

+------------+-----+
|suicide_risk|count|
+------------+-----+
|1           |8759 |
|0           |19061|
+------------+-----+



In [0]:
from pyspark.sql.functions import col,isnan, when, count
df2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df2.columns]
   ).show()

+-------+----+---+---+----------+------------+-------+
|country|year|sex|age|generation|suicide_risk|log_GDP|
+-------+----+---+---+----------+------------+-------+
|      0|   0|  0|  0|         0|           0|      0|
+-------+----+---+---+----------+------------+-------+



In [0]:
from pyspark.sql import functions as f
display(df1.select(f.skewness(df1['gdp_year']),f.skewness(df1['log_GDP'])))

skewness(gdp_year),skewness(log_GDP)
7.233365001894616,-0.1376893621641173


In [0]:
df1.printSchema()

root
 |-- country: string (nullable = true)
 |-- year: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- suicides_no: string (nullable = true)
 |-- population: string (nullable = true)
 |-- suicide_rate: string (nullable = true)
 |-- country-year: string (nullable = true)
 |-- HDI for year: string (nullable = true)
 |-- gdp_year: float (nullable = true)
 |-- gdp_capita: string (nullable = true)
 |-- generation: string (nullable = true)
 |-- suicide_risk: integer (nullable = false)
 |-- log_GDP: double (nullable = true)



In [0]:
display(df1.select('suicides_no'))

suicides_no
21
16
14
1
9
1
6
4
1
0


In [0]:
display(df1.select('gdp_year'))

gdp_year
2156624900.0
2156624900.0
2156624900.0
2156624900.0
2156624900.0
2156624900.0
2156624900.0
2156624900.0
2156624900.0
2156624900.0


In [0]:
display(df1.select('gdp_capita'))

gdp_capita
796
796
796
796
796
796
796
796
796
796


In [0]:
display(df2.select('log_GDP'))

log_GDP
21.491810288442803
21.491810288442803
21.491810288442803
21.491810288442803
21.491810288442803
21.491810288442803
21.491810288442803
21.491810288442803
21.491810288442803
21.491810288442803


In [0]:
display(df2)

country,year,sex,age,generation,suicide_risk,log_GDP
Albania,1987,male,15-24 years,Generation X,0,21.491810288442803
Albania,1987,male,35-54 years,Silent,0,21.491810288442803
Albania,1987,female,15-24 years,Generation X,0,21.491810288442803
Albania,1987,male,75+ years,G.I. Generation,0,21.491810288442803
Albania,1987,male,25-34 years,Boomers,0,21.491810288442803
Albania,1987,female,75+ years,G.I. Generation,0,21.491810288442803
Albania,1987,female,35-54 years,Silent,0,21.491810288442803
Albania,1987,female,25-34 years,Boomers,0,21.491810288442803
Albania,1987,male,55-74 years,G.I. Generation,0,21.491810288442803
Albania,1987,female,5-14 years,Generation X,0,21.491810288442803


In [0]:
df2.columns

Out[74]: ['country', 'year', 'sex', 'age', 'generation', 'suicide_risk', 'log_GDP']

In [0]:
df2.printSchema()

root
 |-- country: string (nullable = true)
 |-- year: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- generation: string (nullable = true)
 |-- suicide_risk: integer (nullable = false)
 |-- log_GDP: double (nullable = true)



In [0]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier,MultilayerPerceptronClassifier


country_indexer = StringIndexer(inputCol='country',outputCol='countryIndex')
country_encoder = OneHotEncoder(inputCol='countryIndex',outputCol='contryVec')


gender_indexer = StringIndexer(inputCol='sex',outputCol='sexIndex')
gender_encoder = OneHotEncoder(inputCol='sexIndex',outputCol='sexVec')

year_indexer = StringIndexer(inputCol='year',outputCol='yearIndex')
year_encoder = OneHotEncoder(inputCol='yearIndex',outputCol='yearVec')

age_indexer = StringIndexer(inputCol='age',outputCol='ageIndex')
age_encoder = OneHotEncoder(inputCol='ageIndex',outputCol='ageVec')

generation_indexer = StringIndexer(inputCol='generation',outputCol='generationIndex')
generation_encoder = OneHotEncoder(inputCol='generationIndex',outputCol='generationVec')

assembler = VectorAssembler(inputCols=['contryVec',
 'sexVec',
 'yearVec',
 'ageVec',
 'generationVec',
 'log_GDP_c',                                      
 'log_GDP'],outputCol='features')

LR = LogisticRegression(featuresCol='features',labelCol='suicide_risk',maxIter =100, regParam=0.0,elasticNetParam=0.0)
DT = DecisionTreeClassifier(featuresCol='features',labelCol='suicide_risk', maxBins = 120, maxDepth=12, impurity='entropy',)
RF = RandomForestClassifier(featuresCol='features',labelCol='suicide_risk', maxBins = 120, maxDepth=12, impurity='entropy',numTrees=40)


<h2>LR</h2>

In [0]:
pipeline = Pipeline(stages=[country_indexer,country_encoder,
                           gender_indexer,gender_encoder,
                            year_indexer,year_encoder,
                            age_indexer,age_encoder,
                            generation_indexer,generation_encoder,
                           assembler,LR])

train, test = df2.randomSplit([0.8,.2])
fit_model = pipeline.fit(train)
results = fit_model.transform(test)
results.select('features','rawPrediction','probability','prediction').show()


+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(144,[64,127,136,...|[10.2370862679860...|[0.99996418422375...|       0.0|
|(144,[64,127,132,...|[8.32313568784032...|[0.99975722573368...|       0.0|
|(144,[64,127,137,...|[20.0092282359235...|[0.99999999795777...|       0.0|
|(144,[64,127,135,...|[7.67802644847383...|[0.99953732663541...|       0.0|
|(144,[64,100,127,...|[4.57167746959669...|[0.98976523448714...|       0.0|
|(144,[64,129,137,...|[19.7110788737323...|[0.99999999724838...|       0.0|
|(144,[64,129,135,...|[7.37987708628256...|[0.99937671119141...|       0.0|
|(144,[64,128,132,...|[8.12841027879293...|[0.99970505016653...|       0.0|
|(144,[64,128,135,...|[7.48330103942644...|[0.99943791826302...|       0.0|
|(144,[64,100,128,...|[4.37695206054931...|[0.98759229200344...|       0.0|
|(144,[64,10

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='suicide_risk')
acc = my_eval.evaluate(results)
print(acc)
y_pred=results.select("prediction").collect()
y_orig=results.select("suicide_risk").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

0.8858967191133715
Confusion Matrix:
[[3615  232]
 [ 295 1462]]


In [0]:
print("Coefficients:"+str())

<h2>DT</h2>

In [0]:
pipeline = Pipeline(stages=[country_indexer,country_encoder,
                           gender_indexer,gender_encoder,
                            year_indexer,year_encoder,
                            age_indexer,age_encoder,
                            generation_indexer,generation_encoder,
                           assembler,DT])

train, test = df2.randomSplit([0.8,.2])
fit_model = pipeline.fit(train)
results = fit_model.transform(test)
results.select('features','rawPrediction','probability','prediction').show()


+--------------------+--------------+--------------------+----------+
|            features| rawPrediction|         probability|prediction|
+--------------------+--------------+--------------------+----------+
|(144,[65,127,135,...|[1423.0,487.0]|[0.74502617801047...|       0.0|
|(144,[65,131,136,...|[1423.0,487.0]|[0.74502617801047...|       0.0|
|(144,[65,131,134,...|[1423.0,487.0]|[0.74502617801047...|       0.0|
|(144,[65,100,128,...| [1629.0,78.0]|[0.95430579964850...|       0.0|
|(144,[65,100,128,...|[1465.0,216.0]|[0.87150505651397...|       0.0|
|(144,[65,100,123,...| [5110.0,51.0]|[0.99011819414842...|       0.0|
|(144,[65,100,123,...|[1465.0,216.0]|[0.87150505651397...|       0.0|
|(144,[65,123,134,...|[1423.0,487.0]|[0.74502617801047...|       0.0|
|(144,[65,100,122,...| [1629.0,78.0]|[0.95430579964850...|       0.0|
|(144,[65,122,136,...|[1423.0,487.0]|[0.74502617801047...|       0.0|
|(144,[65,122,139,...|   [971.0,0.0]|           [1.0,0.0]|       0.0|
|(144,[65,100,121,..

In [0]:
fit_model.stages[-1].featureImportances

Out[171]: SparseVector(144, {0: 0.0072, 1: 0.0041, 2: 0.0115, 3: 0.0105, 5: 0.0021, 6: 0.0017, 7: 0.0052, 8: 0.0054, 9: 0.0134, 10: 0.0079, 13: 0.0038, 16: 0.0081, 17: 0.0006, 18: 0.004, 19: 0.013, 20: 0.0124, 23: 0.0016, 24: 0.0116, 25: 0.0039, 26: 0.0116, 27: 0.0064, 34: 0.0051, 35: 0.0056, 36: 0.0039, 37: 0.0126, 38: 0.0054, 47: 0.0053, 48: 0.0018, 51: 0.0144, 52: 0.0126, 55: 0.0036, 56: 0.0027, 57: 0.0114, 58: 0.0022, 59: 0.0056, 68: 0.0097, 70: 0.0021, 72: 0.0068, 73: 0.002, 74: 0.0094, 75: 0.0025, 76: 0.0068, 78: 0.0025, 81: 0.0043, 82: 0.0058, 83: 0.0039, 87: 0.0061, 90: 0.0027, 100: 0.3013, 101: 0.0001, 102: 0.0001, 103: 0.0002, 104: 0.0001, 105: 0.0002, 108: 0.0013, 109: 0.0014, 111: 0.0004, 112: 0.0007, 113: 0.0003, 114: 0.0001, 115: 0.0006, 116: 0.0013, 117: 0.0003, 119: 0.0016, 120: 0.0001, 122: 0.0002, 123: 0.001, 124: 0.0002, 125: 0.0004, 126: 0.0002, 128: 0.0004, 129: 0.0007, 130: 0.0012, 131: 0.0003, 132: 0.0398, 133: 0.0313, 134: 0.0502, 135: 0.0803, 136: 0.0569, 137: 

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='suicide_risk')
acc = my_eval.evaluate(results)
print(acc)
y_pred=results.select("prediction").collect()
y_orig=results.select("suicide_risk").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

0.8431072151406704
Confusion Matrix:
[[3528  298]
 [ 414 1341]]


<h2>RF</h2>

In [0]:
pipeline = Pipeline(stages=[country_indexer,country_encoder,
                           gender_indexer,gender_encoder,
                            year_indexer,year_encoder,
                            age_indexer,age_encoder,
                            generation_indexer,generation_encoder,
                           assembler,RF])

train, test = df2.randomSplit([0.8,.2])
fit_model = pipeline.fit(train)
results = fit_model.transform(test)
results.select('features','rawPrediction','probability','prediction').show()


+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(144,[66,100,127,...|[34.0760427681327...|[0.85190106920331...|       0.0|
|(144,[66,127,138,...|[28.7105884748330...|[0.71776471187082...|       0.0|
|(144,[66,127,132,...|[36.7529134563728...|[0.91882283640932...|       0.0|
|(144,[66,127,136,...|[28.4054688453401...|[0.71013672113350...|       0.0|
|(144,[66,100,129,...|[34.4866182406542...|[0.86216545601635...|       0.0|
|(144,[66,100,129,...|[34.0760427681327...|[0.85190106920331...|       0.0|
|(144,[66,100,128,...|[34.4707466491910...|[0.86176866622977...|       0.0|
|(144,[66,100,128,...|[34.4866182406542...|[0.86216545601635...|       0.0|
|(144,[66,100,128,...|[33.8182124626142...|[0.84545531156535...|       0.0|
|(144,[66,128,133,...|[28.6319889362010...|[0.71579972340502...|       0.0|
|(144,[66,10

In [0]:
fit_model.stages[-1].featureImportances

Out[174]: SparseVector(144, {0: 0.0008, 1: 0.001, 2: 0.004, 3: 0.0006, 4: 0.0007, 5: 0.0003, 6: 0.0014, 7: 0.0008, 8: 0.0008, 9: 0.0034, 10: 0.0039, 11: 0.0005, 12: 0.0012, 13: 0.0001, 14: 0.0029, 15: 0.0013, 16: 0.0074, 17: 0.0033, 18: 0.0076, 19: 0.0003, 20: 0.0008, 21: 0.0004, 22: 0.0045, 23: 0.0083, 24: 0.0005, 25: 0.0008, 26: 0.0066, 27: 0.0006, 28: 0.0018, 29: 0.0103, 30: 0.0002, 31: 0.0005, 32: 0.0014, 33: 0.0004, 34: 0.0001, 35: 0.0003, 36: 0.0005, 37: 0.0001, 38: 0.0071, 39: 0.0034, 40: 0.0008, 41: 0.0068, 42: 0.0003, 43: 0.0028, 44: 0.0027, 45: 0.0028, 46: 0.0019, 47: 0.0004, 48: 0.0012, 49: 0.0003, 50: 0.0065, 51: 0.0003, 52: 0.0006, 53: 0.0029, 54: 0.0047, 55: 0.0001, 56: 0.0095, 57: 0.0003, 58: 0.0066, 59: 0.0012, 60: 0.001, 61: 0.0068, 62: 0.0023, 63: 0.0028, 64: 0.005, 65: 0.0003, 66: 0.0041, 67: 0.0002, 68: 0.0025, 69: 0.0014, 70: 0.0024, 71: 0.0006, 72: 0.0, 73: 0.003, 74: 0.0059, 75: 0.0022, 76: 0.0045, 77: 0.0001, 78: 0.0026, 79: 0.0036, 80: 0.0053, 81: 0.0046, 82: 0

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='suicide_risk')
acc = my_eval.evaluate(results)
print(acc)
y_pred=results.select("prediction").collect()
y_orig=results.select("suicide_risk").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

0.7899995717783357
Confusion Matrix:
[[3578  149]
 [ 681 1111]]
