#### Pipeline deployment on Small EC2 instance
Initiates a spark session, extracts heart disease data into a pandas DF from an AWS S3 bucket using boto3, converts that pandas DF into a spark DF using PyArrow, performs some exploratory analyses, makes some transformations in prep for MLib model building, builds and evaluates a Linear Regression ML model, and writes final DF to parquet for further analysis

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

import boto3
import pandas as pd
from io import BytesIO

In [2]:
spark = (SparkSession
        .builder
        .appName("heart-disease-risks")
        .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/11 22:12:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
filePath = "/home/ubuntu/heart-disease-risks/data/heart_data.csv"

In [5]:
heartDF = (spark.read.format("csv")
          .option("header","true")
          .option("inferSchema","true")
          .load(filePath))

                                                                                

In [6]:
heartDF.columns

['index',
 'id',
 'age',
 'gender',
 'height',
 'weight',
 'ap_hi',
 'ap_lo',
 'cholesterol',
 'gluc',
 'smoke',
 'alco',
 'active',
 'cardio']

In [7]:
heartDF.show(n=10)

+-----+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|index| id|  age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|cardio|
+-----+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|    0|  0|18393|     2|   168|  62.0|  110|   80|          1|   1|    0|   0|     1|     0|
|    1|  1|20228|     1|   156|  85.0|  140|   90|          3|   1|    0|   0|     1|     1|
|    2|  2|18857|     1|   165|  64.0|  130|   70|          3|   1|    0|   0|     0|     1|
|    3|  3|17623|     2|   169|  82.0|  150|  100|          1|   1|    0|   0|     1|     1|
|    4|  4|17474|     1|   156|  56.0|  100|   60|          1|   1|    0|   0|     0|     0|
|    5|  8|21914|     1|   151|  67.0|  120|   80|          2|   2|    0|   0|     0|     0|
|    6|  9|22113|     1|   157|  93.0|  130|   80|          3|   1|    0|   0|     1|     0|
|    7| 12|22584|     2|   178|  95.0|  130|   90|          3|   3|   

In [8]:
heartDF.createOrReplaceTempView("heart_tbl")

In [9]:
spark.sql("""SELECT cardio, count(id)
             FROM heart_tbl
             GROUP BY cardio""").show(10)

[Stage 3:>                                                          (0 + 1) / 1]

+------+---------+
|cardio|count(id)|
+------+---------+
|     1|    34979|
|     0|    35021|
+------+---------+



                                                                                

In [3]:
### Load the same df from an s3 bucket instead of csv hosted locally

# Define our s3 resource, bucket, & key
s3 = boto3.resource('s3')

#for bucket in s3.buckets.all():
#        print(bucket.name)


bucket_name = 'grand-corndog'
key = 'datasets/heart_data.csv'

obj = s3.Object(bucket_name, key)
with BytesIO(obj.get()['Body'].read()) as bio:
    df = pd.read_csv(bio)

df.head()

Unnamed: 0,index,id,age,gender,height,weight,ap_hi,ap_lo,cholesterol,gluc,smoke,alco,active,cardio
0,0,0,18393,2,168,62.0,110,80,1,1,0,0,1,0
1,1,1,20228,1,156,85.0,140,90,3,1,0,0,1,1
2,2,2,18857,1,165,64.0,130,70,3,1,0,0,0,1
3,3,3,17623,2,169,82.0,150,100,1,1,0,0,1,1
4,4,4,17474,1,156,56.0,100,60,1,1,0,0,0,0


In [4]:
### Enable PyArrow for faster processing
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [5]:
s3_heartDF = spark.createDataFrame(df)

  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


In [7]:
s3_heartDF.show(10)

23/02/11 22:32:33 WARN TaskSetManager: Stage 1 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
+-----+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|index| id|  age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|cardio|
+-----+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|    0|  0|18393|     2|   168|  62.0|  110|   80|          1|   1|    0|   0|     1|     0|
|    1|  1|20228|     1|   156|  85.0|  140|   90|          3|   1|    0|   0|     1|     1|
|    2|  2|18857|     1|   165|  64.0|  130|   70|          3|   1|    0|   0|     0|     1|
|    3|  3|17623|     2|   169|  82.0|  150|  100|          1|   1|    0|   0|     1|     1|
|    4|  4|17474|     1|   156|  56.0|  100|   60|          1|   1|    0|   0|     0|     0|
|    5|  8|21914|     1|   151|  67.0|  120|   80|          2|   2|    0|   0|     0|     0|
|    6|  9|22113|     

In [8]:
s3_heartDF.createOrReplaceTempView("heart_tbl_s3")

In [9]:
spark.sql("""SELECT gender, count(id)
             FROM heart_tbl_s3
             WHERE cardio = 1
             GROUP BY gender""").show(10)

23/02/11 22:32:56 WARN TaskSetManager: Stage 2 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.


[Stage 2:>                                                          (0 + 1) / 1]

+------+---------+
|gender|count(id)|
+------+---------+
|     1|    22616|
|     2|    12363|
+------+---------+



                                                                                

In [11]:
s3_heartDF_2 = (s3_heartDF.withColumn(
                    "gender",
                    expr("CASE WHEN gender = 1 THEN 'M' ELSE 'F' END")))
s3_heartDF_2.show(10)

23/02/11 22:37:22 WARN TaskSetManager: Stage 5 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
+-----+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|index| id|  age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|cardio|
+-----+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|    0|  0|18393|     F|   168|  62.0|  110|   80|          1|   1|    0|   0|     1|     0|
|    1|  1|20228|     M|   156|  85.0|  140|   90|          3|   1|    0|   0|     1|     1|
|    2|  2|18857|     M|   165|  64.0|  130|   70|          3|   1|    0|   0|     0|     1|
|    3|  3|17623|     F|   169|  82.0|  150|  100|          1|   1|    0|   0|     1|     1|
|    4|  4|17474|     M|   156|  56.0|  100|   60|          1|   1|    0|   0|     0|     0|
|    5|  8|21914|     M|   151|  67.0|  120|   80|          2|   2|    0|   0|     0|     0|
|    6|  9|22113|     

In [14]:
s3_heartDF_2.dtypes

[('index', 'bigint'),
 ('id', 'bigint'),
 ('age', 'bigint'),
 ('gender', 'string'),
 ('height', 'bigint'),
 ('weight', 'double'),
 ('ap_hi', 'bigint'),
 ('ap_lo', 'bigint'),
 ('cholesterol', 'bigint'),
 ('gluc', 'bigint'),
 ('smoke', 'bigint'),
 ('alco', 'bigint'),
 ('active', 'bigint'),
 ('cardio', 'bigint')]

In [41]:
df.describe

<bound method NDFrame.describe of        index     id    age  gender  height  weight  ap_hi  ap_lo  cholesterol  \
0          0      0  18393       2     168    62.0    110     80            1   
1          1      1  20228       1     156    85.0    140     90            3   
2          2      2  18857       1     165    64.0    130     70            3   
3          3      3  17623       2     169    82.0    150    100            1   
4          4      4  17474       1     156    56.0    100     60            1   
...      ...    ...    ...     ...     ...     ...    ...    ...          ...   
69995  69995  99993  19240       2     168    76.0    120     80            1   
69996  69996  99995  22601       1     158   126.0    140     90            2   
69997  69997  99996  19066       2     183   105.0    180     90            3   
69998  69998  99998  22431       1     163    72.0    135     80            1   
69999  69999  99999  20540       1     170    72.0    120     80           

In [32]:
# Cholesterol values look a bit fishy, so we might make some modifications or assumptions.
df.cholesterol.describe()

count    70000.000000
mean         1.366871
std          0.680250
min          1.000000
25%          1.000000
50%          1.000000
75%          2.000000
max          3.000000
Name: cholesterol, dtype: float64

In [33]:
# Range is 1-3, so instead of taking dataset notes at face value we're going to create low(1), med(2), and high(3)
# cholesterol categories and then OHC them.

In [37]:
s3_heartDF_3 = (s3_heartDF_2.withColumn("cholesterol", when(s3_heartDF_2.cholesterol == "1", "Low")
                                        .when(s3_heartDF_2.cholesterol == "2", "Med")
                                        .when(s3_heartDF_2.cholesterol == "3", "High")
                                        )
               )
s3_heartDF_3.show(10)

23/02/11 22:59:43 WARN TaskSetManager: Stage 16 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
+-----+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|index| id|  age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|cardio|
+-----+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|    0|  0|18393|     F|   168|  62.0|  110|   80|        Low|   1|    0|   0|     1|     0|
|    1|  1|20228|     M|   156|  85.0|  140|   90|       High|   1|    0|   0|     1|     1|
|    2|  2|18857|     M|   165|  64.0|  130|   70|       High|   1|    0|   0|     0|     1|
|    3|  3|17623|     F|   169|  82.0|  150|  100|        Low|   1|    0|   0|     1|     1|
|    4|  4|17474|     M|   156|  56.0|  100|   60|        Low|   1|    0|   0|     0|     0|
|    5|  8|21914|     M|   151|  67.0|  120|   80|        Med|   2|    0|   0|     0|     0|
|    6|  9|22113|    

In [39]:
# Same deal with glucose levels. Data is not as described so we'll clean this up
df.gluc.describe()

count    70000.000000
mean         1.226457
std          0.572270
min          1.000000
25%          1.000000
50%          1.000000
75%          1.000000
max          3.000000
Name: gluc, dtype: float64

In [40]:
s3_heartDF_4 = (s3_heartDF_3.withColumn("gluc", when(s3_heartDF_3.gluc == "1", "Low")
                                        .when(s3_heartDF_3.gluc == "2", "Med")
                                        .when(s3_heartDF_3.gluc == "3", "High")
                                        )
               )
s3_heartDF_4.show(10)

23/02/11 23:02:30 WARN TaskSetManager: Stage 17 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
+-----+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|index| id|  age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|cardio|
+-----+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|    0|  0|18393|     F|   168|  62.0|  110|   80|        Low| Low|    0|   0|     1|     0|
|    1|  1|20228|     M|   156|  85.0|  140|   90|       High| Low|    0|   0|     1|     1|
|    2|  2|18857|     M|   165|  64.0|  130|   70|       High| Low|    0|   0|     0|     1|
|    3|  3|17623|     F|   169|  82.0|  150|  100|        Low| Low|    0|   0|     1|     1|
|    4|  4|17474|     M|   156|  56.0|  100|   60|        Low| Low|    0|   0|     0|     0|
|    5|  8|21914|     M|   151|  67.0|  120|   80|        Med| Med|    0|   0|     0|     0|
|    6|  9|22113|    

In [42]:
trainDF, testDF = s3_heartDF_4.randomSplit([.8, .2], seed=42)

print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""")

23/02/11 23:04:39 WARN TaskSetManager: Stage 18 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
23/02/11 23:04:39 WARN TaskSetManager: Stage 21 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
There are 55851 rows in the training set, and 14149 in the test set


In [43]:
from pyspark.ml.feature import RFormula

In [44]:
s3_heartDF_4.dtypes

[('index', 'bigint'),
 ('id', 'bigint'),
 ('age', 'bigint'),
 ('gender', 'string'),
 ('height', 'bigint'),
 ('weight', 'double'),
 ('ap_hi', 'bigint'),
 ('ap_lo', 'bigint'),
 ('cholesterol', 'string'),
 ('gluc', 'string'),
 ('smoke', 'bigint'),
 ('alco', 'bigint'),
 ('active', 'bigint'),
 ('cardio', 'bigint')]

In [48]:
rf = RFormula(formula = "cardio ~ ."
             ,featuresCol = "features"
             ,labelCol = "heart_disease")
model = rf.fit(trainDF)

23/02/11 23:13:43 WARN TaskSetManager: Stage 34 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
23/02/11 23:13:44 WARN TaskSetManager: Stage 37 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
23/02/11 23:13:45 WARN TaskSetManager: Stage 40 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.


In [49]:
model.transform(trainDF).show()

23/02/11 23:13:48 WARN TaskSetManager: Stage 43 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
+-----+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+--------------------+-------------+
|index| id|  age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|cardio|            features|heart_disease|
+-----+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+--------------------+-------------+
|    0|  0|18393|     F|   168|  62.0|  110|   80|        Low| Low|    0|   0|     1|     0|(15,[2,4,5,6,7,8,...|          0.0|
|    1|  1|20228|     M|   156|  85.0|  140|   90|       High| Low|    0|   0|     1|     1|[1.0,1.0,20228.0,...|          1.0|
|    3|  3|17623|     F|   169|  82.0|  150|  100|        Low| Low|    0|   0|     1|     1|[3.0,3.0,17623.0,...|          1.0|
|    4|  4|17474|     M|   156|  56.0|  100|   60|        Low| Low|    0|   0|     0|     

In [54]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

In [55]:
lr = LinearRegression(labelCol="heart_disease", predictionCol="heart_disease_pred")
pipeline = Pipeline(stages = [rf, lr])

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)

23/02/11 23:17:32 WARN TaskSetManager: Stage 44 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
23/02/11 23:17:33 WARN TaskSetManager: Stage 47 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
23/02/11 23:17:34 WARN TaskSetManager: Stage 50 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
23/02/11 23:17:35 WARN Instrumentation: [4e01b012] regParam is zero, which might cause numerical instability and overfitting.
23/02/11 23:17:35 WARN TaskSetManager: Stage 53 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.


[Stage 53:>                                                         (0 + 1) / 1]

23/02/11 23:17:35 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/02/11 23:17:35 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

23/02/11 23:17:37 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
23/02/11 23:17:38 WARN TaskSetManager: Stage 54 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [56]:
predDF.select("features", "heart_disease", "heart_disease_pred").show(10)

23/02/11 23:18:41 WARN TaskSetManager: Stage 55 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
+--------------------+-------------+-------------------+
|            features|heart_disease| heart_disease_pred|
+--------------------+-------------+-------------------+
|[2.0,2.0,18857.0,...|          1.0| 0.6685603696888976|
|[6.0,9.0,22113.0,...|          0.0| 0.9345983072349767|
|[8.0,13.0,17668.0...|          0.0| 0.3601323104516745|
|[13.0,21.0,19809....|          0.0|0.48328201941608906|
|[19.0,29.0,21755....|          0.0|0.43216122345669006|
|[23.0,33.0,23376....|          0.0| 0.4731861953701907|
|[29.0,40.0,21057....|          0.0|0.47575570969291847|
|[35.0,47.0,20404....|          0.0| 0.3756621510874405|
|[45.0,60.0,17471....|          1.0| 0.3127038485883452|
|[46.0,61.0,18207....|          1.0| 0.7003578728862407|
+--------------------+-------------+-------------------+
only showing top 10 rows



In [57]:
from pyspark.ml.evaluation import RegressionEvaluator

In [58]:
regressionEvaluator = RegressionEvaluator(
    predictionCol="heart_disease_pred",
    labelCol="heart_disease",
    metricName="rmse")

rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")

23/02/11 23:19:32 WARN TaskSetManager: Stage 56 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.


[Stage 56:>                                                         (0 + 1) / 1]

RMSE is 0.5


                                                                                

In [59]:
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(r2)

23/02/11 23:20:01 WARN TaskSetManager: Stage 57 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.
0.1237012646554525


[Stage 57:>                                                         (0 + 1) / 1]                                                                                

In [None]:
### Explore schema of our final df

In [67]:
s3_heartDF_4.schema

StructType([StructField('index', LongType(), True), StructField('id', LongType(), True), StructField('age', LongType(), True), StructField('gender', StringType(), False), StructField('height', LongType(), True), StructField('weight', DoubleType(), True), StructField('ap_hi', LongType(), True), StructField('ap_lo', LongType(), True), StructField('cholesterol', StringType(), True), StructField('gluc', StringType(), True), StructField('smoke', LongType(), True), StructField('alco', LongType(), True), StructField('active', LongType(), True), StructField('cardio', LongType(), True)])

In [69]:
from pyspark.sql.types import StructType

schema = [i for i in s3_heartDF_4.schema]
schema

[StructField('index', LongType(), True),
 StructField('id', LongType(), True),
 StructField('age', LongType(), True),
 StructField('gender', StringType(), False),
 StructField('height', LongType(), True),
 StructField('weight', DoubleType(), True),
 StructField('ap_hi', LongType(), True),
 StructField('ap_lo', LongType(), True),
 StructField('cholesterol', StringType(), True),
 StructField('gluc', StringType(), True),
 StructField('smoke', LongType(), True),
 StructField('alco', LongType(), True),
 StructField('active', LongType(), True),
 StructField('cardio', LongType(), True),
 StructField('features', VectorUDT(), True),
 StructField('heart_disease', DoubleType(), True),
 StructField('heart_disease_pred', DoubleType(), False)]

In [80]:
### save df to parquet for further analysis
location = "/home/ubuntu/heart-disease-risks/tmp"
s3_heartDF_4.write.format("parquet").mode("overwrite").save(location)

23/02/11 23:36:31 WARN TaskSetManager: Stage 58 contains a task of very large size (7661 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [81]:
spark.stop()