In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Iteration 4').getOrCreate()

In [2]:
df = spark.read.load("./cardio_train.csv", format="csv", header="true")

df.printSchema()

root
 |-- id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- ap_hi: string (nullable = true)
 |-- ap_lo: string (nullable = true)
 |-- cholesterol: string (nullable = true)
 |-- gluc: string (nullable = true)
 |-- smoke: string (nullable = true)
 |-- alco: string (nullable = true)
 |-- active: string (nullable = true)
 |-- cardio: string (nullable = true)



In [3]:
# fix measuremnet errors
from pyspark.sql.types import (StructField,StringType,IntegerType,FloatType,StructType)

data_schema = [StructField('id',IntegerType(),True),
              StructField('age',IntegerType(),True),
              StructField('gender',IntegerType(),True),
              StructField('height',IntegerType(),True),
              StructField('weight',FloatType(),True),
              StructField('ap_hi',IntegerType(),True),
              StructField('ap_lo',IntegerType(),True),
              StructField('cholesterol',IntegerType(),True),
              StructField('gluc',IntegerType(),True),
              StructField('smoke',IntegerType(),True),
              StructField('alco',IntegerType(),True),
              StructField('active',IntegerType(),True),
              StructField('cardio',IntegerType(),True)]

final_struct = StructType(fields=data_schema)

df = spark.read.load("./cardio_train.csv", format="csv", header="true", schema=final_struct)

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- height: integer (nullable = true)
 |-- weight: float (nullable = true)
 |-- ap_hi: integer (nullable = true)
 |-- ap_lo: integer (nullable = true)
 |-- cholesterol: integer (nullable = true)
 |-- gluc: integer (nullable = true)
 |-- smoke: integer (nullable = true)
 |-- alco: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- cardio: integer (nullable = true)



###### 3.1 Select the data

In [4]:
# Selecting attributes
df = df.select(df.columns[1:13])
df.show()

+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|  age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|cardio|
+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|18393|     2|   168|  62.0|  110|   80|          1|   1|    0|   0|     1|     0|
|20228|     1|   156|  85.0|  140|   90|          3|   1|    0|   0|     1|     1|
|18857|     1|   165|  64.0|  130|   70|          3|   1|    0|   0|     0|     1|
|17623|     2|   169|  82.0|  150|  100|          1|   1|    0|   0|     1|     1|
|17474|     1|   156|  56.0|  100|   60|          1|   1|    0|   0|     0|     0|
|21914|     1|   151|  67.0|  120|   80|          2|   2|    0|   0|     0|     0|
|22113|     1|   157|  93.0|  130|   80|          3|   1|    0|   0|     1|     0|
|22584|     2|   178|  95.0|  130|   90|          3|   3|    0|   0|     1|     1|
|17668|     1|   158|  71.0|  110|   70|          1|   1|    0|   0|     1|     0|
|198

In [5]:
# count number of rows
print(df.count())
# count number of columns
len(df.columns)

70000


12

###### 3.2 Clean the data

In [6]:
# Remove data errors
df = df.where("ap_lo > 0")
df = df.where("ap_hi > 0")
df = df.where("ap_hi > ap_lo")
df.count()

68742

In [7]:
# Remove outliers
import pandas as pd

pdf = df.toPandas()
print("Before:", pdf.shape)

Before: (68742, 12)


In [8]:
# height
Q1 = pdf["height"].quantile(0.25)
Q3 = pdf["height"].quantile(0.75)
IQR = Q3 - Q1

pdf.drop(pdf[(pdf["height"]>Q3+1.5*IQR) | (pdf["height"]<Q1-1.5*IQR)]
    .index,inplace=True)

In [9]:
print("After:", pdf.shape)

After: (68236, 12)


In [10]:
# weight
Q1 = pdf["weight"].quantile(0.25)
Q3 = pdf["weight"].quantile(0.75)
IQR = Q3 - Q1

pdf.drop(pdf[(pdf["weight"]>Q3+1.5*IQR) | (pdf["weight"]<Q1-1.5*IQR)]
    .index,inplace=True)

In [11]:
# ap_hi
Q1 = pdf["ap_hi"].quantile(0.25)
Q3 = pdf["ap_hi"].quantile(0.75)
IQR = Q3 - Q1

pdf.drop(pdf[(pdf["ap_hi"]>Q3+1.5*IQR) | (pdf["ap_hi"]<Q1-1.5*IQR)]
    .index,inplace=True)

In [12]:
# ap_lo
Q1 = pdf["ap_lo"].quantile(0.25)
Q3 = pdf["ap_lo"].quantile(0.75)
IQR = Q3 - Q1

pdf.drop(pdf[(pdf["ap_lo"]>Q3+1.5*IQR) | (pdf["ap_lo"]<Q1-1.5*IQR)]
    .index,inplace=True)

In [13]:
print(pdf.shape)
pdf

(62502, 12)


Unnamed: 0,age,gender,height,weight,ap_hi,ap_lo,cholesterol,gluc,smoke,alco,active,cardio
0,18393,2,168,62.0,110,80,1,1,0,0,1,0
1,20228,1,156,85.0,140,90,3,1,0,0,1,1
2,18857,1,165,64.0,130,70,3,1,0,0,0,1
3,17623,2,169,82.0,150,100,1,1,0,0,1,1
5,21914,1,151,67.0,120,80,2,2,0,0,0,0
6,22113,1,157,93.0,130,80,3,1,0,0,1,0
7,22584,2,178,95.0,130,90,3,3,0,0,1,1
8,17668,1,158,71.0,110,70,1,1,0,0,1,0
10,22530,1,169,80.0,120,80,1,1,0,0,1,0
11,18815,2,173,60.0,120,80,1,1,0,0,1,0


###### 3.3 Construct the data

In [14]:
import numpy as np

pdf.insert(6, "blood pressure", 0)

conditionlist = [
    (pdf["ap_hi"]<120) & (pdf["ap_lo"]<80),
    (pdf["ap_hi"]>=120) & (pdf["ap_hi"]<130) & (pdf["ap_lo"]<80),
    (pdf["ap_hi"]>=130) & (pdf["ap_hi"]<140) | (pdf["ap_lo"]>=80) & (pdf["ap_lo"]<90),
    (pdf["ap_hi"]>=140) & (pdf["ap_hi"]<=180) | (pdf["ap_lo"]>=90) & (pdf["ap_lo"]<=120),
    (pdf["ap_hi"]>180) & (pdf["ap_lo"]>120)]

choicelist = [1, 2, 3, 4, 5]
pdf["blood pressure"] = np.select(conditionlist, choicelist, default=0)
pdf

Unnamed: 0,age,gender,height,weight,ap_hi,ap_lo,blood pressure,cholesterol,gluc,smoke,alco,active,cardio
0,18393,2,168,62.0,110,80,3,1,1,0,0,1,0
1,20228,1,156,85.0,140,90,4,3,1,0,0,1,1
2,18857,1,165,64.0,130,70,3,3,1,0,0,0,1
3,17623,2,169,82.0,150,100,4,1,1,0,0,1,1
5,21914,1,151,67.0,120,80,3,2,2,0,0,0,0
6,22113,1,157,93.0,130,80,3,3,1,0,0,1,0
7,22584,2,178,95.0,130,90,3,3,3,0,0,1,1
8,17668,1,158,71.0,110,70,1,1,1,0,0,1,0
10,22530,1,169,80.0,120,80,3,1,1,0,0,1,0
11,18815,2,173,60.0,120,80,3,1,1,0,0,1,0


In [15]:
pdf = pdf.drop(["ap_hi", "ap_lo"], axis=1)
pdf.columns

Index(['age', 'gender', 'height', 'weight', 'blood pressure', 'cholesterol',
       'gluc', 'smoke', 'alco', 'active', 'cardio'],
      dtype='object')

###### 3.4 Integrate various data sources

In [16]:
data_a = spark.read.load("./cardio_train_a.csv", format="csv", header="true")
data_b = spark.read.load("./cardio_train_b.csv", format="csv", header="true")

In [17]:
data_a.show()

+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+
| id|  age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|
+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+
|  0|18393|     2|   168|    62|  110|   80|          1|   1|    0|   0|     1|
|  1|20228|     1|   156|    85|  140|   90|          3|   1|    0|   0|     1|
|  2|18857|     1|   165|    64|  130|   70|          3|   1|    0|   0|     0|
|  3|17623|     2|   169|    82|  150|  100|          1|   1|    0|   0|     1|
|  4|17474|     1|   156|    56|  100|   60|          1|   1|    0|   0|     0|
|  8|21914|     1|   151|    67|  120|   80|          2|   2|    0|   0|     0|
|  9|22113|     1|   157|    93|  130|   80|          3|   1|    0|   0|     1|
| 12|22584|     2|   178|    95|  130|   90|          3|   3|    0|   0|     1|
| 13|17668|     1|   158|    71|  110|   70|          1|   1|    0|   0|     1|
| 14|19834|     1|   164|    68|  110|  

In [18]:
data_b.show()

+---+------+
| id|cardio|
+---+------+
|  0|     0|
|  1|     1|
|  2|     1|
|  3|     1|
|  4|     0|
|  8|     0|
|  9|     0|
| 12|     1|
| 13|     0|
| 14|     0|
| 15|     0|
| 16|     0|
| 18|     0|
| 21|     0|
| 23|     0|
| 24|     1|
| 25|     0|
| 27|     0|
| 28|     0|
| 29|     0|
+---+------+
only showing top 20 rows



In [19]:
df_inner = data_a.join(data_b, on=['id'], how='inner')
df_inner.show()

+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
| id|  age|gender|height|weight|ap_hi|ap_lo|cholesterol|gluc|smoke|alco|active|cardio|
+---+-----+------+------+------+-----+-----+-----------+----+-----+----+------+------+
|  0|18393|     2|   168|    62|  110|   80|          1|   1|    0|   0|     1|     0|
|  1|20228|     1|   156|    85|  140|   90|          3|   1|    0|   0|     1|     1|
|  2|18857|     1|   165|    64|  130|   70|          3|   1|    0|   0|     0|     1|
|  3|17623|     2|   169|    82|  150|  100|          1|   1|    0|   0|     1|     1|
|  4|17474|     1|   156|    56|  100|   60|          1|   1|    0|   0|     0|     0|
|  8|21914|     1|   151|    67|  120|   80|          2|   2|    0|   0|     0|     0|
|  9|22113|     1|   157|    93|  130|   80|          3|   1|    0|   0|     1|     0|
| 12|22584|     2|   178|    95|  130|   90|          3|   3|    0|   0|     1|     1|
| 13|17668|     1|   158|    71|  110|   70

###### 3.5 Format the data

In [20]:
pdf["age"] = (pdf["age"] / 365).round().astype("int")
pdf

Unnamed: 0,age,gender,height,weight,blood pressure,cholesterol,gluc,smoke,alco,active,cardio
0,50,2,168,62.0,3,1,1,0,0,1,0
1,55,1,156,85.0,4,3,1,0,0,1,1
2,52,1,165,64.0,3,3,1,0,0,0,1
3,48,2,169,82.0,4,1,1,0,0,1,1
5,60,1,151,67.0,3,2,2,0,0,0,0
6,61,1,157,93.0,3,3,1,0,0,1,0
7,62,2,178,95.0,3,3,3,0,0,1,1
8,48,1,158,71.0,1,1,1,0,0,1,0
10,62,1,169,80.0,3,1,1,0,0,1,0
11,52,2,173,60.0,3,1,1,0,0,1,0


In [21]:
pdf.to_csv("new_train.csv", index=False)