https://docs.databricks.com/_static/notebooks/xgboost-pyspark.html

https://www.databricks.com/blog/2020/11/16/how-to-train-xgboost-with-spark.html

In [None]:
#Import SparkSession
from pyspark.sql import SparkSession
#Create Session
spark = SparkSession.builder.getOrCreate()

run locally


https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.SQLContext

https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.SparkContext.html#:~:text=A%20SparkContext%20represents%20the%20connection,parameters%20here%20or%20through%20conf%20.

In [None]:
import pyspark
import findspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContextfindspark.init('/spark-3.3.0-bin-hadoop3.2') 
#The key here is putting the path to the spark download on your Mac
VM.sc=pyspark.SparkContext(master='spark://ip:port',appName='Heart_Disease_Example')
#Use the same 'spark://ip:port' from connecting the worker(s) to the master node. 
spark = SQLContext(sc)
spark

In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import numpy as np
from pyspark.ml.tuning import CrossValidator
import plotly.graph_objects as go
df=spark.read.csv('heart.csv', inferSchema=True, header=True)
df.count()
len(df.columns)

 has 303 rows and 14 columns

In [None]:
df.dtypes

1. age: The person’s age in years

2. sex: The person’s sex (1 = male, 0 = female)

3. cp: The chest pain experienced (0 = typical angina, 1= atypical angina, 2= non-anginal pain, 3 = asymptomatic)

4. trestbps: The person’s resting blood pressure (mm Hg on admission to the hospital)

5. chol: The person’s cholesterol measurement in mg/dl

6. fbs: The person’s fasting blood sugar (> 120 mg/dl, 1 = true; 0 = false).

7. restecg: Resting electrocardiographic measurement (0 = normal, 1 = having ST-T wave abnormality, 2 = showing probable or definite left ventricular hypertrophy by Estes’ criteria)

8. thalach: The person’s maximum heart rate achieved

9. exang: Exercise induced angina (1 = yes; 0 = no)

10. oldpeak: ST depression induced by exercise relative to rest

11. slope: the slope of the peak exercise ST segment (0 = upsloping, 1 = flat, 2 = downsloping)

12. ca: The number of major vessels (0–4)


Check for missing values:

In [None]:
from pyspark.sql.functions import col,sum
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

Summary of Dataset



In [None]:
df.describe().toPandas().transpose()

Pie Chart of Target Variable

In [None]:
df2=df.toPandas()
df22=df2.groupby('target').count().reset_index()[['target','age']].rename(columns={'age':'counts'})
colors = ['gold', 'mediumturquoise', 'darkorange', 'lightgreen']
fig = go.Figure(data=[go.Pie(labels=df22.target,
                             values=df22.counts)])
fig.update_traces(hoverinfo='label+percent', textinfo='value+percent', textfont_size=20, textfont_color='black',
                  marker=dict(colors=colors, line=dict(color='#000000', width=2)))
# fig.show()
fig.update_layout(title='Heart Disease vs. Absence of Heart Disease', title_x=0.5)

Histograms of Feature Variables:

In [None]:
from plotly.subplots import make_subplots
fig = make_subplots(rows=4, cols=4, start_cell="top-left",
                   subplot_titles=df2.columns[:-1])
fig.add_trace(go.Histogram(x=df2.age, name='age'),
              row=1, col=1)
fig.add_trace(go.Histogram(x=df2.sex, name='sex'),
              row=1, col=2)
fig.add_trace(go.Histogram(x=df2.cp, name='cp'),
              row=1, col=3)
fig.add_trace(go.Histogram(x=df2.trestbps, name='trestbps'),
              row=1, col=4)
fig.add_trace(go.Histogram(x=df2.chol, name='chol'),
              row=2, col=1)
fig.add_trace(go.Histogram(x=df2.fbs, name='fbs'),
              row=2, col=2)
fig.add_trace(go.Histogram(x=df2.restecg, name='restecg'),
              row=2, col=3)
fig.add_trace(go.Histogram(x=df2.thalach, name='thalach'),
              row=2, col=4)
fig.add_trace(go.Histogram(x=df2.exang, name='exang'),
              row=3, col=1)
fig.add_trace(go.Histogram(x=df2.oldpeak, name='oldpeak'),
              row=3, col=2)
fig.add_trace(go.Histogram(x=df2.slope, name='slope'),
              row=3, col=3)
fig.add_trace(go.Histogram(x=df2.thalach, name='ca'),
              row=3, col=4)
fig.add_trace(go.Histogram(x=df2.thal, name='thal'),
              row=4, col=1)
fig.update_layout(title='Histograms of Variables', title_x=0.5)

Correlation Matrix Heatmap:


In [None]:
df3=df.withColumn('oldpeaklog', F.log(df['oldpeak']+1))
df33=df3.toPandas()
fig = make_subplots(rows=1, cols=2, start_cell="top-left",
                   subplot_titles=['oldpeak','oldpeaklog'])
fig.add_trace(go.Histogram(x=df33.oldpeak, name='oldpeak'),
              row=1, col=1)
fig.add_trace(go.Histogram(x=df33.oldpeaklog, name='oldpeaklog'),
              row=1, col=2)
fig.update_layout(title='Transforming oldpeak', title_x=0.5)

In [None]:
corr = df33.corr()
fig = go.Figure(data=go.Heatmap(z=corr.values,
 x=corr.index.values,
 y=corr.columns.values,
 text=np.round(corr.values,2),
 texttemplate=f"{text}"))
fig.update_layout(title=dict(text='Correlation Matrix Heatmap',font=dict(size=20), x=0.5))


Initialize Stages

StringIndexer:

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html

VectorAssembler:

https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html

In [None]:
#Initialize stages
stages = []
#Target column
label_stringIdx = StringIndexer(inputCol = 'target', outputCol = 'label')
stages += [label_stringIdx]
#Numeric Columns
numericCols = ['age',
 'sex',
 'cp',
 'trestbps',
 'chol',
 'fbs',
 'restecg',
 'thalach',
 'exang',
 'slope',
 'ca',
 'thal',
 'oldpeaklog'] 
#Create a vector assembler
assemblerInputs = numericCols 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features").setHandleInvalid('keep')
stages += [assembler]

Set up Pipeline


In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df3)
df3 = pipelineModel.transform(df3)
selectedCols = ['label', 'features'] + ['age','sex','cp','trestbps','chol','fbs','restecg','thalach','exang','slope','ca','thal','oldpeaklog','target']
df3 = df3.select(selectedCols)
df3.printSchema()

Split into training and test

In [None]:
train, test = df3.randomSplit([0.7, 0.3], seed = 2018)
train.groupby('target').count().show()
test.groupby('target').count().show()

## Models


### Random Forests


In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label', seed=101)
rfModel = rf.fit(train)
predictions_rf=rfModel.transform(test)

Accuracy of Model

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator_rf = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_rf.evaluate(predictions_rf)

Confusion Matrix

In [None]:
predictions_rf.crosstab('label','prediction').show()

ROC & Precision Recall Curves

In [None]:
from handyspark import *
# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output
# from the classifier
bcm = BinaryClassificationMetrics(predictions_rf, scoreCol='probability', labelCol='label')
# Get metrics from evaluator
print("Area under ROC Curve: {:.4f}".format(bcm.areaUnderROC))
print("Area under PR Curve: {:.4f}".format(bcm.areaUnderPR))
# Plot both ROC and PR curves
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm.plot_roc_curve(ax=axs[0])
bcm.plot_pr_curve(ax=axs[1])

Testing Various Thresholds

In [None]:
split1_udf = F.udf(lambda value: value[0].item(), FloatType())
split2_udf = F.udf(lambda value: value[1].item(), FloatType())
def test_threshold(model, prob):
    output2 = model.select('rawPrediction','target','probability',split1_udf('probability').alias('class_0'), split2_udf('probability').alias('class_1'))
    from pyspark.sql.functions import col, when
    output2=output2.withColumn('prediction', when(col('class_0')> prob, 1).otherwise(0))
    output2.crosstab('prediction','target').show()

In [None]:
test_threshold(predictions_rf,.6)

In [None]:
test_threshold(predictions_rf,.7)

Feature Importances

In [None]:
feat_imps=rfModel.featureImportances
x_values = list(range(len(feat_imps)))
plt.bar(x_values, feat_imps, orientation = 'vertical')
plt.xticks(x_values, ['age','sex','cp','trestbps','chol','fbs','restecg','thalach','exang','slope','ca','thal','oldpeaklog'], rotation=40)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')

cp, thalach, ca, and oldpeaklog
#### Tune Hyperparameters

In [None]:
paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [int(x) for x in np.arange(200,221,10)]) \
    .addGrid(rf.maxDepth, [int(x) for x in np.arange(10,11,10)]) \
    .addGrid(rf.featureSubsetStrategy, [x for x in ["sqrt", "log2", "onethird"]]) \
    .addGrid(rf.impurity, [x for x in ['gini','entropy']]) \
    .addGrid(rf.maxBins, [int(x) for x in np.arange(22, 42, 10)]) \
    .build()
evaluator = BinaryClassificationEvaluator()
rf_crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid_rf,
                          evaluator=evaluator,
                          numFolds=3)
rf_cvModel = rf_crossval.fit(train)
predictions_rf_cv = rf_cvModel.transform(test)

Accuracy of Best CV Model


In [None]:
evaluator_rf_cv = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_rf_cv.evaluate(predictions_rf_cv)

Feature Importances of Best Model

In [None]:
import matplotlib.pyplot as plt
feat_imps=rf_cvModel.bestModel.featureImportances
x_values = list(range(len(feat_imps)))
plt.bar(x_values, feat_imps, orientation = 'vertical')
plt.xticks(x_values, ['age','sex','cp','trestbps','chol','fbs','restecg','thalach','exang','slope','ca','thal','oldpeaklog'], rotation=40)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')

Get Hyperparameter Values of Best Random Forests CV Model

In [None]:
print('Num Trees: ' + str(rf_cvModel.bestModel.getNumTrees))
print('Max Depth: ' + str(rf_cvModel.bestModel.getMaxDepth()))
print('Feature Subset Strategy: ' + str(rf_cvModel.bestModel.getFeatureSubsetStrategy()))
print('Impurity: ' + str(rf_cvModel.bestModel.getImpurity()))
print('Max Bins: ' + str(rf_cvModel.bestModel.getMaxBins()))

Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0,featuresCol = 'features', labelCol = 'label')
lrModel = lr.fit(train)
predictions_lr = lrModel.transform(test)

Accuracy of Model

evaluator_lr = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_lr.evaluate(predictions_lr)

In [None]:
evaluator_lr = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_lr.evaluate(predictions_lr)

In [None]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

In [None]:
predictions_lr.crosstab('label','prediction').show()

In [None]:
# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output
# from the classifier
bcm = BinaryClassificationMetrics(predictions_lr, scoreCol='probability', labelCol='label')
# Get metrics from evaluator
print("Area under ROC Curve: {:.4f}".format(bcm.areaUnderROC))
print("Area under PR Curve: {:.4f}".format(bcm.areaUnderPR))
# Plot both ROC and PR curves
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm.plot_roc_curve(ax=axs[0])
bcm.plot_pr_curve(ax=axs[1])

Tune Hyperparameters


In [None]:
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.maxIter, [int(x) for x in np.arange(10,30,10)]) \
    .addGrid(lr.regParam, [int(x) for x in np.arange(.1,.5,.1)]) \
    .addGrid(lr.elasticNetParam, [int(x) for x in np.arange(0,.2,.1)]) \
    .build()
evaluator = BinaryClassificationEvaluator()
lr_crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid_lr,
                          evaluator=evaluator,
                          numFolds=3)
lr_cvModel = lr_crossval.fit(train)
predictions_lr_cv = lr_cvModel.transform(test)

Overall Accuracy of Best CV Model

In [None]:
evaluator_lr_cv = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_lr_cv.evaluate(predictions_lr_cv)

Logistic Regression V.2

In [None]:
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression(family="binomial", link="logit", maxIter=10, 
regParam=0.0)
model = glr.fit(train)
summary = model.summary
print('Variables:' + str(train.columns[2:-1]))
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))

Naive Bayes

In [None]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(featuresCol = 'features', labelCol = 'label')
nb_model = nb.fit(train)
predictions_nb=nb_model.transform(test)

Overall Accuracy of Model

In [None]:
evaluator_nb = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_nb.evaluate(predictions_nb)

Confusion Matrix

In [None]:
predictions_nb.crosstab('label','prediction').show()

ROC and PR Curves

In [None]:
from handyspark import *
from matplotlib import pyplot as plt
%matplotlib inline
# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output
# from the classifier
bcm = BinaryClassificationMetrics(predictions_nb, scoreCol='probability', labelCol='label')
# Get metrics from evaluator
print("Area under ROC Curve: {:.4f}".format(bcm.areaUnderROC))
print("Area under PR Curve: {:.4f}".format(bcm.areaUnderPR))
# Plot both ROC and PR curves
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm.plot_roc_curve(ax=axs[0])
bcm.plot_pr_curve(ax=axs[1])

Tune Hyperparameters

In [None]:
paramGrid_nb = ParamGridBuilder() \
    .addGrid(nb.smoothing, [int(x) for x in np.arange(1,10,1)]) \
    .build()
evaluator = BinaryClassificationEvaluator()
nb_crossval = CrossValidator(estimator=nb,
                          estimatorParamMaps=paramGrid_nb,
                          evaluator=evaluator,
                          numFolds=3)
nb_cvModel = nb_crossval.fit(train)
predictions_nb_cv = nb_cvModel.transform(test)

Evaluate Best CV Model

In [None]:
evaluator_nb_cv = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_nb_cv.evaluate(predictions_nb_cv)