In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

from pyspark.sql.types import (StructField,StringType,IntegerType,StructType)
from pyspark.sql.functions import mean, avg
from pyspark.sql import DataFrameStatFunctions
from pyspark.sql.functions import array, col, explode, struct, lit
from pyspark.sql.functions import when, count, col

In [2]:
# Use Spark to read csv file
SuicideStats = spark.read.csv("data/who_suicide_statistics.csv",inferSchema=True,header=True)
SuicideStats.createOrReplaceTempView('suicideStats')
SuicideStats = spark.sql("SELECT * FROM suicideStats WHERE population >= 0")

SuicideStats = SuicideStats.na.drop(subset="suicides_no")
bounds = {
    c: dict(
        zip(["q1", "q3"], SuicideStats.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in SuicideStats.columns if c in ["suicides_no", "population"]
}

for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)

SuicideStats = SuicideStats.select(
    "*",
    *[
        when(
            col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in SuicideStats.columns  if c in ["suicides_no", "population"]
    ]
)

SuicideStats.createOrReplaceTempView('SuicideStats')
SuicideStats = spark.sql("SELECT country, year, sex, age, suicides_no, population FROM SuicideStats WHERE suicides_no_out = 0 and population_out=0 ")

SuicideStats = SuicideStats.withColumn("SuicidePer100k", col("suicides_no")/col("population")*100000)

SuicideStats = SuicideStats.withColumnRenamed("country","Country")\
    .withColumnRenamed("year","Year")\
    .withColumnRenamed("sex","Sex")\
    .withColumnRenamed("age","Age")\
    .withColumnRenamed("suicides_no","SuicidesNo")\
    .withColumnRenamed("population","Population")

In [3]:
HDIStats = spark.read.csv("data/Human Development Index.csv",inferSchema=True, header=True, nullValue="..")
HDIStats = HDIStats.drop("HDI Rank")

HDIStatsMean = HDIStats.na.fill(0)
HDIStatsMean = HDIStatsMean.select(col('Country'), (sum(col(x) for x in HDIStats.columns[1:]) / len(HDIStats.columns)-1).alias("mean"))
HDIStats = HDIStats.alias('a')\
    .join(HDIStatsMean.alias('b'),col('b.Country') == col('a.Country'))\
    .select([when(col('a.'+xx).isNull(), col('b.mean')).otherwise(col('a.'+xx)).alias(xx) for xx in HDIStats.columns])

cols, dtypes = zip(*((c, t) for (c, t) in HDIStats.dtypes if c not in ['Country']))
kvs = explode(array([
      struct(lit(c).alias("Year"), col(c).alias("HDIScore")) for c in cols
    ])).alias("kvs")
    
HDIStats = HDIStats.select(['Country'] + [kvs]).select(['Country'] + ["kvs.Year", "kvs.HDIScore"])

In [4]:
#WDIStats
WDIStats = spark.read.csv("data/World_Development_Indicators.csv",inferSchema=True,header=True)

def fill_with_mean(df, exclude=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c not in exclude
    ))
    return df.na.fill(stats.first().asDict())

WDIStats = fill_with_mean(WDIStats, ["Country Name", "Year"])

WDIStats = WDIStats.withColumnRenamed("Country Name","Country")\
    .withColumnRenamed("Employment to population ratio","EmploymentToPopulationRatio")\
    .withColumnRenamed("GDP per capita","GDPPerCapita")\
    .withColumnRenamed("Gini index","GiniIndex")

In [5]:
#Integrate Data Sources
datasource = SuicideStats\
    .join(HDIStats, (SuicideStats.Country == HDIStats.Country) & (SuicideStats.Year == HDIStats.Year))\
    .join(WDIStats, (SuicideStats.Country == WDIStats.Country) & (SuicideStats.Year == WDIStats.Year))\
    .select([SuicideStats.Country, SuicideStats.Year, SuicideStats.Sex, SuicideStats.Age, SuicideStats.SuicidesNo,
             SuicideStats.Population, SuicideStats.SuicidePer100k, HDIStats.HDIScore,
             WDIStats.EmploymentToPopulationRatio, WDIStats.GDP, WDIStats.GDPPerCapita, WDIStats.GiniIndex])

print((datasource.count(), len(datasource.columns)))


(17665, 12)


In [6]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor

In [18]:
(train, test) = datasource.randomSplit([0.7, 0.3], seed =722)
print("The size of dataset is: ",(datasource.count(), len(datasource.columns)))
print("The size of training data is: ", (train.count(), len(train.columns)))
print("The size of test data is: ", (test.count(), len(test.columns)))

The size of dataset is:  (17665, 12)
The size of training data is:  (12335, 12)
The size of test data is:  (5330, 12)


In [8]:
sexIndexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
ageIndexer = StringIndexer(inputCol="Age", outputCol="AgeIndex")

featuresCols = ['HDIScore', 'EmploymentToPopulationRatio', 'GDP', 'GDPPerCapita', 'GiniIndex', 'SexIndex', 'AgeIndex']
featureAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
featureIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="indexedFeatures", maxCategories=4)


gbt = GBTRegressor(featuresCol="indexedFeatures",labelCol="SuicidePer100k", maxIter=5, maxDepth=5)

In [9]:
pipeline = Pipeline(stages=[sexIndexer, ageIndexer, featureAssembler, featureIndexer, gbt])
model = pipeline.fit(train)

In [10]:
predictions = model.transform(t est)
predictions.show(6)

+-------+----+------+-----------+----------+----------+------------------+--------+---------------------------+-------------+------------+-----------------+--------+--------+--------------------+--------------------+--------------------+
|Country|Year|   Sex|        Age|SuicidesNo|Population|    SuicidePer100k|HDIScore|EmploymentToPopulationRatio|          GDP|GDPPerCapita|        GiniIndex|SexIndex|AgeIndex|         rawFeatures|     indexedFeatures|          prediction|
+-------+----+------+-----------+----------+----------+------------------+--------+---------------------------+-------------+------------+-----------------+--------+--------+--------------------+--------------------+--------------------+
|Albania|1992|  male|15-24 years|         9|    263700|3.4129692832764507|   0.608|                54.30599976|4.038036613E9| 1243.605824|38.62148387096775|     1.0|     2.0|[0.608,54.3059997...|[0.608,54.3059997...|  13.457439361649962|
|Albania|1992|  male|25-34 years|         7|    

In [11]:
display(predictions.select("SuicidePer100k", "prediction", *featuresCols))

DataFrame[SuicidePer100k: double, prediction: double, HDIScore: double, EmploymentToPopulationRatio: double, GDP: double, GDPPerCapita: double, GiniIndex: double, SexIndex: double, AgeIndex: double]

In [22]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="SuicidePer100k", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 11.9187


In [13]:
model.stages[-1].featureImportances

SparseVector(7, {0: 0.0873, 1: 0.1068, 2: 0.2416, 3: 0.1958, 4: 0.0604, 5: 0.1519, 6: 0.1562})

In [29]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

# Define a grid of hyperparameters to test:
#  - maxDepth: max depth of each decision tree in the GBT ensemble
#  - maxIter: iterations, i.e., number of trees in each GBT ensemble
# In this example notebook, we keep these values small.  In practice, to get the highest accuracy, you would likely want to try deeper trees (10 or higher) and more trees in the ensemble (>100)
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [2, 5])\
  .build()

# We define an evaluation metric.  This tells CrossValidator how well we are doing by comparing the true labels with predictions.
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())

# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [31]:
pipelineWithCV = Pipeline(stages=[sexIndexer, ageIndexer, featureAssembler, featureIndexer, cv])
modelWithCV = pipelineWithCV.fit(train)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1035, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 883, in send_command
    response = connection.send_command(command)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1040, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving


Py4JError: An error occurred while calling o940.getParam