In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tree_methods_adv').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/10 02:00:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# data processing from section 03
from pyspark.sql.types import (StructField,StringType,IntegerType,FloatType,StructType)
data_schema = [StructField('Entity',StringType(),True),
              StructField('Code',StringType(),True),
              StructField('Year',IntegerType(),True),
              StructField('Share of total deaths that are from all causes attributed to unsafe water source, in both sexes aged age-standardized',FloatType(),True)]
final_struct = StructType(fields=data_schema)
death_df = spark.read.csv("unsafewaterdeathrate.csv",schema=final_struct,header=True)

data_schema = [StructField('Entity',StringType(),True),
              StructField('Code',StringType(),True),
              StructField('Year',IntegerType(),True),
              StructField('GDP per capita, PPP (constant 2017 international $)',FloatType(),True)]
final_struct = StructType(fields=data_schema)
gdp_df = spark.read.csv("gdp-per-capita-worldbank.csv",schema=final_struct,header=True)

data_schema = [StructField('Type',StringType(),True),
              StructField('Region',StringType(),True),
              StructField("Residence Type",StringType(),True),
              StructField("Service Type",StringType(),True),
              StructField('Year',IntegerType(),True),
              StructField('Coverage',FloatType(),True),
              StructField('Population',FloatType(),True),
              StructField('Facility type',StringType(),True)]
final_struct = StructType(fields=data_schema)
wash_df = spark.read.csv("WASH.csv",schema=final_struct,header=True)

wash_df = wash_df.drop('Type')
death_df = death_df.drop('Code')

from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType, FloatType, DoubleType

numeric_cols = [f.name for f in wash_df.schema.fields if isinstance(f.dataType, (DoubleType, IntegerType, FloatType))]

# Identify numeric columns
numeric_cols = [f.name for f in wash_df.schema.fields if isinstance(f.dataType, (DoubleType, IntegerType, FloatType))]

for column in numeric_cols:
    Q1, Q3 = wash_df.approxQuantile(column, [0.25, 0.75], 0.05)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    # Coerce outliers using withColumn and when
    wash_df = wash_df.withColumn(column, 
                                 when(col(column) > upper_bound, upper_bound)
                                 .when(col(column) < lower_bound, lower_bound)
                                 .otherwise(col(column)))
    
Q1, Q3 = death_df.approxQuantile('Share of total deaths that are from all causes attributed to unsafe water source, in both sexes aged age-standardized', [0.25, 0.75], 0.05)
IQR = Q3 - Q1

# Define bounds for outliers
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

# Replace outliers with closest point
death_df = death_df.withColumn ('Share of total deaths that are from all causes attributed to unsafe water source, in both sexes aged age-standardized', 
                               when(col('Share of total deaths that are from all causes attributed to unsafe water source, in both sexes aged age-standardized') < lower_bound, lower_bound)
                               .when(col('Share of total deaths that are from all causes attributed to unsafe water source, in both sexes aged age-standardized') > upper_bound, upper_bound)
                               .otherwise(col('Share of total deaths that are from all causes attributed to unsafe water source, in both sexes aged age-standardized')))

# filter irrelevant years
wash_df = wash_df.filter((col('Year') >= 2012) & (col('Year') <= 2019))
death_df = death_df.filter((col('Year') >= 2012) & (col('Year') <= 2019))
death_df = death_df.withColumnRenamed('Share of total deaths that are from all causes attributed to unsafe water source, in both sexes aged age-standardized', 'MortalityRate')
gdp_df = gdp_df.filter((col('Year') >= 2012) & (col('Year') <= 2019))

# compute the mean gdp by country
from pyspark.sql.functions import avg
mean_gdp_by_country = gdp_df.groupBy('Entity').agg(avg('GDP per capita, PPP (constant 2017 international $)')\
                                                   .alias('Mean GDP'))
# convert into incomelevels
income_category = mean_gdp_by_country.withColumn("IncomeLevel",
    when(col("`Mean GDP`") <= 1897.289, "Low income")
    .when((col("`Mean GDP`") > 1897.289) & (col("`Mean GDP`") <= 6464.184), "Lower middle income")
    .when((col("`Mean GDP`") > 6464.184) & (col("`Mean GDP`") <= 15324.779), "Upper middle income")
    .otherwise("High income")
)

death_gdp_df = death_df.join(income_category, death_df["Entity"] \
                             == income_category["Entity"], how='inner').drop(income_category["Entity"])

wash_df = wash_df.withColumnRenamed("Region", "IncomeLevel")

final_df = wash_df.join(death_gdp_df, on=['Year', 'IncomeLevel'], how='inner')

from pyspark.sql.types import IntegerType, DoubleType

final_df = (final_df.withColumn("Year", final_df["Year"].cast(IntegerType()))
                   .withColumn("Coverage", final_df["Coverage"].cast(DoubleType()))
                   .withColumn("MortalityRate", final_df["MortalityRate"].cast(DoubleType()))
                   .withColumn("Population", final_df["Population"].cast(DoubleType()))
                   .withColumn("Mean GDP", final_df["Mean GDP"].cast(DoubleType())))

from pyspark.sql.functions import col, when

final_df = final_df.withColumn("IncomeLevel", 
                               when(col("IncomeLevel") == "Lower middle income", 1)
                               .when(col("IncomeLevel") == "Upper middle income", 2)
                               .when(col("IncomeLevel") == "High income", 3)
                               .when(col("IncomeLevel") == "Low income", 0)
                               .otherwise(col("IncomeLevel")))

final_df = final_df.withColumn("Service Type", 
                               when(col("Service Type") == "Sanitation", 1)
                               .when(col("Service Type") == "Drinking water", 2)
                               .otherwise(col("Service Type")))

final_df = final_df.withColumn("Residence Type", 
                               when(col("Residence Type") == "total", 1)
                               .when(col("Residence Type") == "rural", 2)
                               .when(col("Residence Type") == "urban", 3)
                               .otherwise(col("Residence Type")))

final_df = final_df.withColumn("Facility type", 
                               when(col("Facility type") == "Improved latrine and other", 1)
                               .when(col("Facility type") == "Septic tank", 2)
                               .when(col("Facility type") == "Sewer", 3)
                               .when(col("Facility type") == "Non-piped improved", 4)
                               .when(col("Facility type") == "Piped improved", 5)
                               .otherwise(col("Facility type")))

final_df = (final_df.withColumn("IncomeLevel", final_df["IncomeLevel"].cast(IntegerType()))
                   .withColumn("Residence Type", final_df["Residence Type"].cast(IntegerType()))
                   .withColumn("Service Type", final_df["Service Type"].cast(IntegerType()))
                   .withColumn("Facility type", final_df["Facility type"].cast(IntegerType()))
           )

                                                                                

## Single Decision Tree

## Spark Formatting of Data

In [3]:
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [4]:
final_df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- IncomeLevel: integer (nullable = true)
 |-- Residence Type: integer (nullable = true)
 |-- Service Type: integer (nullable = true)
 |-- Coverage: double (nullable = true)
 |-- Population: double (nullable = true)
 |-- Facility type: integer (nullable = true)
 |-- Entity: string (nullable = true)
 |-- MortalityRate: double (nullable = true)
 |-- Mean GDP: double (nullable = true)



In [5]:
final_df.head()

                                                                                

Row(Year=2019, IncomeLevel=0, Residence Type=3, Service Type=2, Coverage=69.44994354248047, Population=149290160.0, Facility type=5, Entity='Chad', MortalityRate=7.417001601308584, Mean GDP=1709.7349700927734)

In [6]:
final_df.columns

['Year',
 'IncomeLevel',
 'Residence Type',
 'Service Type',
 'Coverage',
 'Population',
 'Facility type',
 'Entity',
 'MortalityRate',
 'Mean GDP']

In [7]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# Combine all features into one vector named features.
assembler = VectorAssembler(
  inputCols=['Year',
             'IncomeLevel',
             'Residence Type',
             'Facility type',
             'MortalityRate',
             'Mean GDP'],
              outputCol="features")

In [8]:
output = assembler.transform(final_df)
print(output)

DataFrame[Year: int, IncomeLevel: int, Residence Type: int, Service Type: int, Coverage: double, Population: double, Facility type: int, Entity: string, MortalityRate: double, Mean GDP: double, features: vector]


In [9]:
from pyspark.ml.feature import StringIndexer

In [10]:
indexer = StringIndexer(inputCol="Coverage", outputCol="CoverageIndex")
output_fixed = indexer.fit(output).transform(output)
print(output_fixed)

                                                                                

DataFrame[Year: int, IncomeLevel: int, Residence Type: int, Service Type: int, Coverage: double, Population: double, Facility type: int, Entity: string, MortalityRate: double, Mean GDP: double, features: vector, CoverageIndex: double]


In [11]:
final_data = output_fixed.select("features",'CoverageIndex')
train_data,test_data = final_data.randomSplit([0.8,0.2])

## The classfier

In [12]:
# Let's import the relevant classifiers. 
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

In [13]:
dtc = DecisionTreeClassifier(labelCol='CoverageIndex', featuresCol='features')
dtc_model = dtc.fit(train_data)
dtc_predictions = dtc_model.transform(test_data)

                                                                                

### Evaluation metrices

In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_evaluator = MulticlassClassificationEvaluator(labelCol="CoverageIndex", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
print("Here are the results!")
print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))

[Stage 49:>                                                         (0 + 1) / 1]

Here are the results!
----------------------------------------
A single decision tree has an accuracy of: 86.01%


                                                                                

## Random Forest (L.O.2)

## Spark Formatting of Data

In [15]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# Combine all features into one vector named features.
assembler = VectorAssembler(
  inputCols=['Year',
             'IncomeLevel',
             'Residence Type',
             'Service Type',
             'Facility type',
             'Population',
             'Coverage',
             'Mean GDP'],
              outputCol="features")

In [16]:
output = assembler.transform(final_df)
print(output)

DataFrame[Year: int, IncomeLevel: int, Residence Type: int, Service Type: int, Coverage: double, Population: double, Facility type: int, Entity: string, MortalityRate: double, Mean GDP: double, features: vector]


In [17]:
indexer = StringIndexer(inputCol="MortalityRate", outputCol="MortalityRateIndex")
output_fixed = indexer.fit(output).transform(output)
print(output_fixed)

DataFrame[Year: int, IncomeLevel: int, Residence Type: int, Service Type: int, Coverage: double, Population: double, Facility type: int, Entity: string, MortalityRate: double, Mean GDP: double, features: vector, MortalityRateIndex: double]


In [18]:
final_data = output_fixed.select("features",'MortalityRateIndex')
train_data,test_data = final_data.randomSplit([0.8,0.2])

## The classfier

In [19]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline
rfc = DecisionTreeClassifier(labelCol='MortalityRateIndex', featuresCol='features')
rfc_model = rfc.fit(train_data)
rfc_predictions = rfc_model.transform(test_data)

                                                                                

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_evaluator = MulticlassClassificationEvaluator(labelCol="MortalityRateIndex", predictionCol="prediction", metricName="accuracy")
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
print("Here are the results!")
print('-'*40)
print('A Random Forest has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))

[Stage 91:>                                                         (0 + 1) / 1]

Here are the results!
----------------------------------------
A Random Forest has an accuracy of: 81.88%


                                                                                