In [1]:
#Load Libraries
# PySpark Machine Learning Library

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, VectorAssembler, OneHotEncoderEstimator, MinMaxScaler
from pyspark.sql import Row, SQLContext
from pyspark.sql.functions import col, sum, when

import os
import sys
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession

from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Library for confusion matrix, precision, test error
from pyspark.mllib.evaluation import MulticlassMetrics
# Library For Area under ROC curve and Area under precision-recall curve
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Assign resources to the application
sqlContext = SQLContext(sc)

sc = spark.sparkContext

# packages for data analysis
import numpy as np
import pandas as pd

In [2]:
#access data
birthDf = spark.read.options(header="true",\
                             inferSchema="true",\
                             nullValue="NA",\
                             mode="failfast")\
                            .csv("/FileStore/tables/lowbwt.csv")

In [3]:
#Explore data
birthDf.printSchema()

In [4]:
#check missing values
from pyspark.sql.functions import isnan, when, count, col
fileDF = birthDf[['ID', 'LOW', 'AGE', 'RACE', 'SMOKE', 'PTL',
                  'HT', 'UI', 'FTV']]

fileDF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in fileDF.columns]).show()

In [5]:
# Number of patients and number of patients by birth weight category
print('Number of Patients: ', birthDf.count())

print('Number of patients by birth weight category: ')
birthDf.groupby('LOW').count().show()

In [6]:
# From above, we can see this data is imbalanced.  For future weighting purposes, we store the imbalance percentage
# to a variable we can use later.

imbalanced = 130.0/59

print('Normal birth weight makes up {}x more of the data than low birth weight.'.format(imbalanced))

In [7]:
# show the stats summary for our two continuous numeric variables

print('Summary statistics for all three continuous numeric variables: ')
birthDf.select(['AGE', 'PTL', 'FTV']).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
AGE,189,23.238095238095237,5.2986779334042655,14,45
PTL,189,0.19576719576719576,0.4933419132673032,0,3
FTV,189,0.7936507936507936,1.0592861429875455,0,6


In [8]:
import matplotlib.pyplot as plt
%matplotlib inline

df_race=birthDf.crosstab('RACE', 'LOW').toPandas()
print('\nCrosstabulation between birth weight category and race: ')
print(df_race)

df_smoke=birthDf.crosstab('SMOKE', 'LOW').toPandas()
print('\nCrosstabulation between birth weight category and history of smoking while pregnant: ')
print(df_smoke)

df_ht=birthDf.crosstab('HT', 'LOW').toPandas()
print('\nCrosstabulation between birth weight category and history of hypertension: ')
print(df_ht)

df_ui=birthDf.crosstab('UI', 'LOW').toPandas()
print('\nCrosstabulation between birth weight category and uterine irritability: ')
print(df_ui)

In [9]:
# Bar charts for all crosstabulation tables between birth weight category and our categorical variables

df_race.plot.bar(x="RACE_LOW", legend=True, title="Birth weight category by mother's race")
df_smoke.plot.bar(x="SMOKE_LOW", legend=True, title="Birth weight category by mother's smoking habit while pregnant")
df_ht.plot.bar(x="HT_LOW", legend=True, title="Birth weight category by mother's history of hypertension")
df_ui.plot.bar(x="UI_LOW", legend=True, title="Birth weight category by mother's history of uterine irritability")
display(plt.show())

In [10]:
# Average age by birth weight
# Convert Age and number of physician appointments in first trimester to numeric

pdf=birthDf.toPandas()
pdf["AGE"]=pd.to_numeric(pdf.AGE)
pdf["FTV"]=pd.to_numeric(pdf.FTV)
pdf["PTL"]=pd.to_numeric(pdf.PTL)

df=sqlContext.createDataFrame(pdf)

print('Average age by birth weight category:')
PAGE=df.groupby(['LOW'])\
.agg({"AGE": "AVG"}).toPandas()
print(PAGE)

In [11]:
#Age distribution for all mothers

df.toPandas()["AGE"].plot.hist(x="Age", bins=[0,5,10,15,20,25,30,35,40,45], title="Age distribution")

In [12]:
# Age distribution for mothers of children with by birth weight category
# With sweet Maryland colors

df_age_low=df.select('AGE', 'LOW').filter(df['LOW']=='1')
pdf_age_low=df_age_low.select('AGE').toPandas()
df_age_high=df.select('AGE', 'LOW').filter(df['LOW']=='0')
pdf_age_high=df_age_high.select('AGE').toPandas()
plt.hist(pdf_age_low.AGE, color='red', alpha=0.5, bins=[0,5,10,15,20,25,30,35,40,45], label='Low birth weight')
plt.hist(pdf_age_high.AGE, color='yellow', alpha=0.5, bins=[0,5,10,15,20,25,30,35,40,45], label='Normal birth weight')
plt.title("Mother's age by birth weight category")
plt.legend(loc='upper right')
display(plt.show())

In [13]:
#Average number of first trimester physician appointments by birth weight category

print('Average number of first trimester appointments by birth weight category: ')
PFTV=df.groupby(['LOW'])\
.agg({"FTV": "AVG"}).toPandas()
print(PFTV)

In [14]:
#First trimester appointments distribution for all mothers
df.toPandas()["FTV"].plot.hist(x="First trimester appointments", bins=[0,1,2,3,4,5,6], title="First trimester appointments distribution")

In [15]:
# Age distribution for mothers of children with low birth weight
# With sweet Virginia Tech colors / Go Hokies! (and UMUC! I love you guys too but you're covered by Maryland colors)

df_ftv_low=df.select('FTV', 'LOW').filter(df['LOW']=='1')
pdf_ftv_low=df_ftv_low.select('FTV').toPandas()
df_ftv_high=df.select('FTV', 'LOW').filter(df['LOW']=='0')
pdf_ftv_high=df_ftv_high.select('FTV').toPandas()
plt.hist(pdf_ftv_low.FTV, color='maroon', alpha=0.5, bins=[0,1,2,3,4,5,6], label='Low birth weight')
plt.hist(pdf_ftv_high.FTV, color='orange', alpha=0.5, bins=[0,1,2,3,4,5,6], label='Normal birth weight')
plt.title('First trimester physician visits by birth weight category')
plt.legend(loc='upper right')
display(plt.show())

In [16]:
print('Average age by birth weight category:')
PAGE=df.groupby(['LOW'])\
.agg({"PTL": "AVG"}).toPandas()
print(PAGE)

In [17]:
# Preterm labor distribution for all mothers
df.toPandas()["PTL"].plot.hist(x="Number of preterm labors", bins=[0,1,2,3,4], title="Preterm labor distribution")

In [18]:
# Age distribution for mothers of children with by birth weight category
# With glorious summer colors
df_ptl_low=df.select('PTL', 'LOW').filter(df['LOW']=='1')
pdf_ptl_low=df_ptl_low.select('PTL').toPandas()
df_ptl_high=df.select('PTL', 'LOW').filter(df['LOW']=='0')
pdf_ptl_high=df_ptl_high.select('PTL').toPandas()
plt.hist(pdf_ptl_low.PTL, color='green', alpha=0.5, bins=[0,1,2,3,4], label='Low birth weight')
plt.hist(pdf_ptl_high.PTL, color='yellow', alpha=0.5, bins=[0,1,2,3,4], label='Normal birth weight')
plt.title('History of preterm labor by birth weight category')
plt.legend(loc='upper right')
display(plt.show())

In [19]:
data = sc.textFile("/FileStore/tables/lowbwt.csv")
print ("Total records in the data set:", birthDf.count())
print ("The first 5 rows")
birthDf.take(5)

In [20]:
# Label is the target variable LOW
# All binary and continuous numeric variables will be converted to float and used "as-is," since there is really no point to
# one-hot encode a binary variable.  Given that, only one variable remains which cannot be considered binary or continuous
# numeric--Race.  Since Race is the only categorical variable, we skip the whole one-hot encoding and then assembling
# part of the pipeline by just one-hot encoding it manually in the parser.  PersonID is just there to be there
# because we aren't really going to do anything with it at all except include it as an identifier when looking at predictions.
# FTV and PTL are transformed via square root to semi-normalize them, as they are clearly skewed to the left in the plot above.

LabeledDocument = Row("PersonID", "Age", "Race_1", "Race_2", "Smoke", "PTL", "sqrt_PTL", "HT", "UI", "FTV", "sqrt_FTV", "label")

# Define a function that parses each row in a CSV file and returns an object of type LabeledDocument

def parseDocument(line):
    values = [str(x) for x in line.split(',')]
    race = int(values[3])
    race_1=0.0
    race_2=0.0
    if race==1:
        race_1=1.0
    elif race==2:
        race_2=1.0
    pid = str(values[0])
    age = float(values[2])
    smoke = float(values[4])
    ptl = float(values[5])
    sqrt_ptl = float(np.sqrt(ptl))
    ht = float(values[6])
    ui = float(values[7])
    ftv = float(values[8])
    sqrt_ftv = float(np.sqrt(ftv))
    labels = float(values[1])
    
    return LabeledDocument(pid, age, race_1, race_2, smoke, ptl, sqrt_ptl, ht, ui, ftv, sqrt_ftv, labels)

In [21]:
#Parse and Load the data into a dataframe. The code calls the parsing function defined above

documents=data.filter(lambda s: "LOW" not in s).map(parseDocument)
BirthWeightData = documents.toDF()
print ("Number of records: " + str(BirthWeightData.count()))
print ( "First 5 records: ")
BirthWeightData.take(5)

In [22]:
# Divide the data into training and test set with proportions 80/20
# We will not use randomSplit used in class because it does not produce
# training and testing sets stratified by our target variable ('label').
# Since this dataset is exceedingly small (ironically, since we're using SparkML),
# it is important to make sure our test/train split is stratified so we have "enough"
# of each label to train and test on.

random_seed = 1988
train = BirthWeightData.sampleBy("label", fractions={0: 0.8, 1: 0.8}, seed=random_seed)
test = BirthWeightData.subtract(train)

print("Number of records in the training set: " + str(train.count()))
print("Number of records in the test set: " + str(test.count()))
print("\nProportion of training set to total dataset: " + str(train.count()/(test.count()+train.count()))) 
# Output first 20 records in the training set

print ("\nFirst 5 records in the training set: ")
train.show(5)

print ("\nFirst 5 records in the testing set: ")
test.show(5)

In [23]:
# set up Logistic Regression using SparkML pipeline

assembler = VectorAssembler(inputCols=["Age", "Smoke", "Race_1", "Race_2", "sqrt_PTL", "HT", "UI", "sqrt_FTV"], outputCol="features")
lr = LogisticRegression(maxIter=100, regParam=0.01)
pipeline = Pipeline(stages=[assembler, lr])

In [24]:
# train the logistic regression model
model = pipeline.fit(train)

In [25]:
# Make predictions for test data and print columns of interest
prediction = model.transform(test)
selected = prediction.select("PersonID", "label", "prediction", "probability")
for row in selected.collect():
    print (row)

In [26]:
#Evaluate the Model (Logistic Regression)
#We evaluate the model on the training set and test set. The purpose is to measure the model's predictive accuracy, including the accuracy for new data.

def full_metrics(model, data, curve_metrics=False):
    pred_m=model.transform(data).select("prediction", "label")
    eval_m=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy_m=eval_m.evaluate(pred_m)
    predictionAndLabels_m=pred_m.rdd
    metrics_m=MulticlassMetrics(predictionAndLabels_m)
    precision_m=metrics_m.precision(1.0)
    recall_m=metrics_m.recall(1.0)
    f1Measure_m = metrics_m.fMeasure(1.0, 1.0)
    print ("Accuracy = %s" %accuracy_m)
    print ("Error = %s" % (1-accuracy_m))
    print ("Precision = %s" %precision_m)
    print ("Recall = %s" %recall_m)
    print ("F1 Measure = %s" % f1Measure_m)
    
    if curve_metrics:
        bin_m=BinaryClassificationMetrics(predictionAndLabels_m)
        print("Area under PR = %s" % bin_m.areaUnderPR)
        print("Area under ROC = %s" % bin_m.areaUnderROC)

def pipe_to_metrics(model, train, test, curve_metrics=False, graph_ROC=False, model_trained=False):
    if not model_trained:
        model = model.fit(train)
    
    prediction = model.transform(train)
    print("Training confusion matrix: ")
    prediction.crosstab('label', 'prediction').show()
    
    prediction = model.transform(test)
    print("\nTest confusion matrix: ")
    prediction.crosstab('label', 'prediction').show()
    
    print ("\nModel evaluation for the train data: ")
    full_metrics(model, train, curve_metrics=curve_metrics)
    print ("\nModel evaluation for the test data: ")
    full_metrics(model, test, curve_metrics=curve_metrics)
    
    if graph_ROC:
        ROC=model.stages[-1].summary.roc
        df=ROC.toPandas()
        df.plot(x='FPR', y='TPR', label="ROC", legend=False)


In [27]:
pipe_to_metrics(model, train, test, curve_metrics=True, graph_ROC=True, model_trained=True)

In [28]:
#Building a Decision Tree Model
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol='label')
pipeline_dt = Pipeline(stages=[assembler, dt])
pipe_to_metrics(pipeline_dt, train, test, curve_metrics=False, graph_ROC=False, model_trained=False)

In [29]:
#Building a Random Forest Model

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='label', numTrees=100)
pipeline_rf = Pipeline(stages=[assembler, rf])
pipe_to_metrics(pipeline_rf, train, test, curve_metrics=False, graph_ROC=False, model_trained=False)

In [30]:
#Building a Gradient Boosted Decision Tree Model

from pyspark.ml.classification import GBTClassifier

gb = GBTClassifier(labelCol='label', maxIter=50)
pipeline_gb = Pipeline(stages=[assembler, gb])
pipe_to_metrics(pipeline_gb, train, test, curve_metrics=False, graph_ROC=False, model_trained=False)

In [31]:
#Building a Perceptron

from pyspark.ml.classification import MultilayerPerceptronClassifier

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
mp = MultilayerPerceptronClassifier(featuresCol=scaler.getOutputCol(), maxIter=100, layers=[8,8,2], blockSize=1, seed=random_seed)
pipeline_mp = Pipeline(stages=[assembler, scaler, mp])
pipe_to_metrics(pipeline_mp, train, test, curve_metrics=False, graph_ROC=False, model_trained=False)

In [32]:

# #Building a Logistic Regression Model with Weighted Cost Function
# After the whirlwind of classifiers we just built and evaluated, we arere going to bring it back to logistic regression because, well, this assignment is about building a classifier with logistic regression in Spark. A big issue we have with this data is that it is imbalanced and the logistic regression model built previously was decently accurate for the majority case but absolutely horrible (worse than random chance) at the minority case. We create a new weighted column to feed this logistic regression model to see if we can increase the accuracy and precision (recall may go down because of the weighting).

train=train.withColumn("classWeights", when(train.label == 1, imbalanced).otherwise(1))
wlr = LogisticRegression(maxIter=100, regParam=0.01, weightCol="classWeights")
pipeline_wlr = Pipeline(stages=[assembler, wlr])
pipe_to_metrics(pipeline_wlr, train, test, curve_metrics=False, graph_ROC=False, model_trained=False)

In [33]:
# Tuning the Logistic Regression Model
# The last LogReg model was probably the best overall performer with the highest F1-score on the test set. With this current problem, however, we really want to predict low birth rate better than normal birth weight--which is an issue, because we have much less data on low birth rate in our already super tiny dataset. To try to get around this, we will split the data into a train/test/val split and use the validation set to tune a weight multiplier.

random_seed = 123
train = BirthWeightData.sampleBy("label", fractions={0: 0.7, 1: 0.7}, seed=random_seed)
other = BirthWeightData.subtract(train)
test = other.sampleBy("label", fractions={0:0.6, 1:0.6}, seed=random_seed+1)
val = other.subtract(test)

In [34]:
def eval_model(model, data):
    pred_m=model.transform(data).select("prediction", "label")
    predictionAndLabels_m=pred_m.rdd
    metrics_m=MulticlassMetrics(predictionAndLabels_m)
    return metrics_m.precision(1.0), metrics_m.recall(1.0), metrics_m.fMeasure(1.0, 1.0)

In [35]:
# This cell trains multiple models for different weights and creates a list of their metrics so they can be graphed and
# compared to determine the best weighting for our purposes

m_range=[]
m_precision=[]
m_recall=[]
m_f1 = []

for index in range(5, 40):
    multiplier = 0.2*index
    train=train.withColumn("classWeights", when(train.label == 1, multiplier).otherwise(1))
    wlr = LogisticRegression(maxIter=100, regParam=0.01, weightCol="classWeights")
    pipeline_wlr = Pipeline(stages=[assembler, wlr])
    model = pipeline_wlr.fit(train)
    p, r, f = eval_model(model, val)
    m_range.append(multiplier)
    m_precision.append(p)
    m_recall.append(r)
    m_f1.append(f)

In [36]:
plt.plot(m_range, m_precision, color='red', linestyle='-', marker='o')
plt.plot(m_range, m_recall, color='orange', linestyle='-', marker='o')
plt.plot(m_range, m_f1, color='blue', linestyle='-', marker='o')
plt.xlabel('Weight multiplier')
plt.ylabel('Metrics')
plt.title('Metrics by weight multiplier')
display(plt.show())

In [37]:
# Clearly, we get the "best-ish" (but still not awesome) results with a multiplier greater than or equal to 6.2

# a weight multiplier of 6.4 is used instead of 6.2 because it is a more stable position than 6.2, which
# is at the tipping point between a huge increase and a plateau.

best_multiplier = 6.4
train=train.withColumn("classWeights", when(train.label == 1, best_multiplier).otherwise(1))
wlr = LogisticRegression(maxIter=100, regParam=0.01, weightCol="classWeights")
pipeline_wlr = Pipeline(stages=[assembler, wlr])
pipe_to_metrics(pipeline_wlr, train, test, curve_metrics=True, graph_ROC=True)

In [38]:
# This last model with weight multiplier of 6.4 is definitely not perfect, but it's the best so far for our purposes. When it comes to predicting whether a child will be born with a low birth weight or not, it is more important to predict low birth weight when their is low birth weight than it is to predict normal birth weight when the child has normal birth weight. If the model predicts low birth weight and the child is born with a normal birth weight then it's really a "no harm, no foul" situation. It's better to prepare for the worst and hope for the best than to be underprepared when a child is born with a low birth weight. With this last model, if it predicts low birth weight then there is a 92% chance (11/12 on the test set) that the child will have a low birth weight--so a 1 indicates we should take necessary precautions. Granted, with a 1 predicted there is still only a 32% chance that the child actually comes out with a low birth weight, so there will be a lot of situations where additional precautions are unnecessary--but that is a much better situation then not being prepared for a low birth weight when it occurs.