<a href="https://colab.research.google.com/github/nhuyen183/LungCancerSupportSystem/blob/master/BRFSS_preprocessingggg_fianlllll_18Mar1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#@title Installing Spark and dependencies
#Java 8
#Apache Spark with hadoop and
#Findspark (used to locate the spark in the system)
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

#Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()

Get:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease [1,581 B]
Get:2 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  Packages [920 kB]
Hit:4 http://archive.ubuntu.com/ubuntu focal InRelease
Get:5 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Get:6 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
Get:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease [18.1 kB]
Hit:8 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:9 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Get:10 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Hit:11 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu focal InRelease
Get:12 http://security.ubuntu.com/ubuntu focal-security/main amd64 Packages [2,544 kB]
Hit:13 http://ppa.launchpad.net/ubuntugis/ppa/ubuntu 

# Step 1: Define the problem
What sorts of people were likely to have lung cancer?

# Step 2: Gather the data
The datasets can be found here:
* https://www.kaggle.com/datasets/aemreusta/brfss-2020-survey-data
* https://www.kaggle.com/datasets/sakinak/behavioral-risk-factor-surveillance-survey-201619

In [2]:
#@title Create Spark entry points
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [3]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)

In [4]:
#@title Import Spark Mlib libraries
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Step 3: Prepare data for consumption

In [5]:
#@title Mount content to drive for kaggle data download
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
!pip install kaggle

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [7]:
! mkdir ~/.kaggle

In [8]:
!cp /content/drive/MyDrive/kaggle.json ~/.kaggle/kaggle.json

In [9]:
! chmod 600 ~/.kaggle/kaggle.json

In [10]:
! kaggle datasets download aemreusta/brfss-2020-survey-data

Downloading brfss-2020-survey-data.zip to /content
 68% 33.0M/48.3M [00:00<00:00, 89.0MB/s]
100% 48.3M/48.3M [00:00<00:00, 109MB/s] 


In [11]:
! kaggle datasets download sakinak/behavioral-risk-factor-surveillance-survey-201619

Downloading behavioral-risk-factor-surveillance-survey-201619.zip to /content
 95% 222M/234M [00:03<00:00, 64.8MB/s]
100% 234M/234M [00:03<00:00, 75.7MB/s]


In [12]:
!ls

behavioral-risk-factor-surveillance-survey-201619.zip
brfss-2020-survey-data.zip
drive
sample_data
spark-3.1.1-bin-hadoop3.2
spark-3.1.1-bin-hadoop3.2.tgz


In [13]:
!unzip brfss-2020-survey-data.zip

Archive:  brfss-2020-survey-data.zip
  inflating: brfss2020.csv           


In [14]:
!unzip behavioral-risk-factor-surveillance-survey-201619.zip

Archive:  behavioral-risk-factor-surveillance-survey-201619.zip
  inflating: 2016.csv                
  inflating: 2017.csv                
  inflating: 2018.csv                
  inflating: 2019.csv                


In [15]:
from subprocess import check_output
print('-'*10, 'Files', '-'*10)
print(check_output(['ls', './']).decode('utf8'))

---------- Files ----------
2016.csv
2017.csv
2018.csv
2019.csv
behavioral-risk-factor-surveillance-survey-201619.zip
brfss2020.csv
brfss-2020-survey-data.zip
drive
sample_data
spark-3.1.1-bin-hadoop3.2
spark-3.1.1-bin-hadoop3.2.tgz



## About the BRFSS dataset and Prediction task

The Behavioral Risk Factor Surveillance System (BRFSS) is a collaborative project between all of the states in the United States and participating US territories and the Centers for Disease Control and Prevention (CDC).

BRFSS’s objective is to collect uniform state-specific data on health risk behaviors, chronic diseases and conditions, access to health care, and use of preventive health services related to the leading causes of death and disability in the United States. BRFSS conducts both landline and mobile phone-based surveys with individuals over the age of 18. General factors assessed by the BRFSS in 2020 included health status and healthy days, exercise, insufficient sleep, chronic health conditions, oral health, tobacco use, cancer screenings, and access to healthcare.

The aim of this project is to build a model with relatively high accuracy and AUC that could serve as an decision aid for those at high risk of developing lung cancer.

The data contains information about 401958 unique survey participant. As a result of my research to select the ones related to coronary artery disease among a total of 279 different features. Each example in the dataset contains the following demographic data for a set of individuals

### Categorical Features
*   `_AGE65YR`: The age of the individual in years two-level categories `18 <= AGE <= 64`: `1` and `65 <= AGE <= 99`:`2`
*   `SEXVAR`: Sex of Respondent `Male: 1` and `Female: 2`
*   `_BMI5CAT`:  Four-categories of Body Mass Index (BMI)`_BMI5 < 1850: Underweight` ; `1850 <= _BMI5 < 2500: Normal`;`2500 <= _BMI5 < 3000: Overweight`;`3000 <= _BMI5 < 9999: Obese`
*   `GENHLTH`: Health status: Would you say that in general your health is: `1: Excellent`; `2: Very good` ; `3: Good` ; `4: Fair` ; `5: Poor`
*   `SMOKE100`: Have you smoked at least 100 cigarettes in your entire life? [Note: 5 packs = 100 cigarettes] `1: Yes` ; `2: No`
*   `_SMOKER3`: Four-level smoker status: Everyday smoker: `1`, Someday smoker: `2`, Former smoker: `3`, Non-smoker: `4`

### Lung Cancer (Features) Screening Section
*   `LCSFIRST`: How old were you when you first started to smoke cigarettes regularly. `Value 1-100 in years`
*   `LCSLAST`: How old were you when you last smoked cigarettes regularly? `Value 1-100 in years`
*   `LCSNUMCG`: On average, when you smoke/smoked regularly, about how many cigarettes do/did you usually smoke each 
day? `Value 1-300 in number of cigarettes`
*   `LCSCTSCN`: In the last 12 months, did you have a CT or CAT scan? Example include: `Yes, to check for lung cancer`, `No (did not have a CT scan`, `Had a CT scan, but for other reason`.
*   `CNCRTYP1`:  What type of cancer was it? (If Response = 2 (Two) or 3 (Three or more), ask: “With your most recent 
diagnoses of cancer, what type of cancer was it?”). Examples include: `Lung cancer: 24`, `Others: 1-30`
*   `STOPSMK2`:  During the past 12 months, have you stopped smoking for one day or longer because you were trying to quit smoking? `Yes: 1` or `No: 2`.
*   `ECIGARET`: Have you ever used an e-cigarette or other electronic vaping product, even just one time, in your entire life? `Yes: 1` or `No: 2`.
*   `ECIGNOW`: Do you now use e-cigarettes or other electronic vaping products every day, some days, or not at all? `Every day: 1`; `Some days: 2` or `Not at all: 3`
* `ASTHMA3`: (Ever told) (you had) asthma? `Yes: 1` or `No: 2`.
### Prediction Task
The prediction task is to **early predict whether a person have the high risk of lung cancer.**

### Label
*   `CNCRTYP1`: What type of cancer (lung cancer = 24)





In [16]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from matplotlib import pyplot as plt
from matplotlib import rcParams
from sklearn.model_selection import train_test_split
import seaborn as sns

# The following lines adjust the granularity of reporting. 
pd.options.display.max_rows = 10
pd.options.display.float_format = "{:.1f}".format

from google.colab import widgets
# For facets
from IPython.core.display import display, HTML
import base64
!pip install facets-overview==1.0.0
from facets_overview.feature_statistics_generator import FeatureStatisticsGenerator

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting facets-overview==1.0.0
  Downloading facets_overview-1.0.0-py2.py3-none-any.whl (24 kB)
Installing collected packages: facets-overview
Successfully installed facets-overview-1.0.0


In [17]:
# load packages
import sys
print('Python version: {}'. format(sys.version))

import pandas as pd
print('Python version: {}'. format(pd.__version__))

import matplotlib
print('matplotlib version: {}'. format(matplotlib.__version__))

import numpy as np
print('numpy version: {}'. format(np.__version__))

import scipy as sp
print('scipy version: {}'. format(sp.__version__))

import IPython
from IPython import display # pretty printing of dataframe in Jupyter notebook
print('IPython version: {}'. format(IPython.__version__))

import pyspark
print('Apache Spark Pyspark version: {}'. format(pyspark.__version__)) # pyspark version

# misc libraries
import random
import time

# ignore warnings
import warnings
warnings.filterwarnings('ignore')
print('-'*25)

Python version: 3.9.16 (main, Dec  7 2022, 01:11:51) 
[GCC 9.4.0]
Python version: 1.4.4
matplotlib version: 3.7.1
numpy version: 1.22.4
scipy version: 1.10.1
IPython version: 7.9.0
Apache Spark Pyspark version: 3.1.1
-------------------------


In [63]:
#@title Data Integration
from pyspark.sql.types import *

data_2020 = spark.read.csv('./brfss2020.csv', inferSchema=True, header=True)

# preview the data
# data type
print('-'*10, 'data types', '-'*10)
pd.DataFrame(data_2020.dtypes)

---------- data types ----------


Unnamed: 0,0,1
0,_STATE,double
1,FMONTH,double
2,IDATE,int
3,IMONTH,int
4,IDAY,int
...,...,...
274,_STOLDNA,double
275,_VIRCOLN,double
276,_SBONTIM,double
277,_CRCREC1,double


In [64]:
data_2020F = data_2020.select('SEXVAR', '_AGE65YR', '_BMI5CAT', 'GENHLTH', 'CHECKUP1', #frequency of checkup
                              'EXERANY2', #Exercise in Past 30 Days
                              'SMOKE100', '_SMOKER3',
                              'STOPSMK2', #try to quit smoking
                              'LASTSMK2',#time since quitting
                              #'LCSFIRST', 'LCSLAST', 'LCSNUMCG',
                              'ECIGARET', 'ECIGNOW',
                              'LCSCTSCN',
                              'CHCCOPD2',
                   'ASTHMA3','CVDSTRK3', #ever had a stroke
                              'CVDCRHD4', #Ever Diagnosed with Angina or Coronary Heart Disease
                              'CNCRTYP1')
data_2020F.show()

+------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-------+--------+--------+-------+--------+--------+--------+
|SEXVAR|_AGE65YR|_BMI5CAT|GENHLTH|CHECKUP1|EXERANY2|SMOKE100|_SMOKER3|STOPSMK2|LASTSMK2|ECIGARET|ECIGNOW|LCSCTSCN|CHCCOPD2|ASTHMA3|CVDSTRK3|CVDCRHD4|CNCRTYP1|
+------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-------+--------+--------+-------+--------+--------+--------+
|   2.0|     1.0|     1.0|    2.0|     4.0|     1.0|     1.0|     1.0|     2.0|    null|     1.0|    3.0|    null|     1.0|    1.0|     2.0|     2.0|    null|
|   2.0|     2.0|     3.0|    3.0|     1.0|     1.0|    null|     9.0|    null|    null|    null|   null|    null|     2.0|    1.0|     2.0|     2.0|    null|
|   2.0|     2.0|    null|    3.0|     1.0|     1.0|     2.0|     4.0|    null|    null|     2.0|   null|    null|     2.0|    2.0|     2.0|     2.0|    null|
|   2.0|     2.0|    null|    1.0|     2.0|   

In [65]:
from pyspark.sql.functions import col
data_2020F = data_2020F.withColumn("SEXVAR", col('SEXVAR').cast(IntegerType()))\
          .withColumn("_AGE65YR", col('_AGE65YR').cast(IntegerType()))\
          .withColumn("_BMI5CAT", col('_BMI5CAT').cast(IntegerType()))\
          .withColumn("GENHLTH", col('GENHLTH').cast(IntegerType()))\
          .withColumn("CHECKUP1", col('CHECKUP1').cast(IntegerType()))\
          .withColumn("EXERANY2", col('EXERANY2').cast(IntegerType()))\
          .withColumn("SMOKE100", col('SMOKE100').cast(IntegerType()))\
          .withColumn("_SMOKER3", col('_SMOKER3').cast(IntegerType()))\
          .withColumn("STOPSMK2", col('STOPSMK2').cast(IntegerType()))\
          .withColumn("LASTSMK2", col('LASTSMK2').cast(IntegerType()))\
          .withColumn("ECIGARET", col('ECIGARET').cast(IntegerType()))\
          .withColumn("ECIGNOW", col('ECIGNOW').cast(IntegerType()))\
          .withColumn("LCSCTSCN", col('LCSCTSCN').cast(IntegerType()))\
          .withColumn("CHCCOPD2", col('CHCCOPD2').cast(IntegerType()))\
          .withColumn("ASTHMA3", col('ASTHMA3').cast(IntegerType()))\
          .withColumn("CVDSTRK3", col('CVDSTRK3').cast(IntegerType()))\
          .withColumn("CVDCRHD4", col('CVDCRHD4').cast(IntegerType()))\
          .withColumn("CNCRTYP1", col('CNCRTYP1').cast(IntegerType()))

In [66]:
#401958
data_2020F = data_2020F.na.drop(how='any', thresh=None, subset=("GENHLTH", "CHECKUP1", "EXERANY2", "SMOKE100", "CVDSTRK3", "CVDCRHD4", "CHCCOPD2"))#"_BMI5CAT", , "ASTHMA3", "CHCCOPD2", "LCSFIRST","LCSLAST","LCSNUMCG","LCSCTSCN", "LASTSMK2", "STOPSMK2", "ECIGNOW"))
data_2020F.count()

384092

In [67]:
print('Columns with null values:')
print('-'*25)
data_2020F.select([eval('data_2020F.' + x + '.isNull().cast("int").alias("' + x + '")') for x in data_2020F.columns]).\
    groupBy().sum().toPandas()

Columns with null values:
-------------------------


Unnamed: 0,sum(SEXVAR),sum(_AGE65YR),sum(_BMI5CAT),sum(GENHLTH),sum(CHECKUP1),sum(EXERANY2),sum(SMOKE100),sum(_SMOKER3),sum(STOPSMK2),sum(LASTSMK2),sum(ECIGARET),sum(ECIGNOW),sum(LCSCTSCN),sum(CHCCOPD2),sum(ASTHMA3),sum(CVDSTRK3),sum(CVDCRHD4),sum(CNCRTYP1)
0,0,0,28988,0,0,0,0,0,331670,280367,119536,333342,352845,0,0,0,0,361416


In [23]:
data_2020F.filter(data_2020F.CNCRTYP1 ==24.0).count()

440

In [24]:
data_2020F.count()

384092

In [25]:
data_2020F.printSchema

<bound method DataFrame.printSchema of DataFrame[SEXVAR: int, _AGE65YR: int, _BMI5CAT: int, GENHLTH: int, CHECKUP1: int, EXERANY2: int, SMOKE100: int, _SMOKER3: int, STOPSMK2: int, LASTSMK2: int, ECIGARET: int, ECIGNOW: int, LCSCTSCN: int, CHCCOPD2: int, ASTHMA3: int, CVDSTRK3: int, CVDCRHD4: int, CNCRTYP1: int]>

In [68]:
#@title User Defined Functions (UDF) for prediction label

from pyspark.sql.functions import udf
from pyspark.sql.types import *
y_udf = udf(lambda y: 1 if y == 24 else 0, StringType())


x_age = udf(lambda x: 0 if (x==1 or x==3) else 1, StringType())
x_bmi = udf(lambda x: x if (x==1 or x==2 or x==3 or x==4) else 2, StringType())
x_health = udf(lambda x: x if (x==1 or x==2 or x==3 or x==4 or x==5) else 0, StringType())
x_checkup = udf(lambda x: x if (x==1 or x==2 or x==3 or x==4 or x==8) else 0, StringType())
x_smoke3 = udf(lambda x: x if (x==1 or x==2 or x==3 or x==4) else 4, StringType())
x_lastsmk = udf(lambda x: x if (x==1 or x==2 or x==3 or x==4 or x==5 or x==6 or x==7 or x==8) else 0, StringType())
x_ecig = udf(lambda x: x if (x==1 or x==2 or x==3) else 0, StringType())
x_CT = udf(lambda x: x if (x==1 or x==3) else 0, StringType())
x_bool = udf(lambda x: 1 if (x==1) else 0, StringType())

processed_2020 = data_2020F.withColumn("Gender", x_bool('SEXVAR')).drop("SEXVAR")\
                    .withColumn("Age65", x_age('_AGE65YR')).drop("_AGE65YR")\
                    .withColumn("BMI", x_bmi('_BMI5CAT')).drop("_BMI5CAT")\
                    .withColumn("GeneralHealth", x_health('GENHLTH')).drop("GENHLTH")\
                    .withColumn("CheckupFreq", x_checkup('CHECKUP1')).drop("CHECKUP1")\
                    .withColumn("Exercise", x_bool('EXERANY2')).drop("EXERANY2")\
                    .withColumn("Smoked100", x_bool('SMOKE100')).drop("SMOKE100")\
                    .withColumn("SmokerStatus", x_smoke3('_SMOKER3')).drop("_SMOKER3")\
                    .withColumn("StopSmoking", x_bool('STOPSMK2')).drop("STOPSMK2")\
                    .withColumn("TimeSinceQuitSmk", x_lastsmk('LASTSMK2')).drop("LASTSMK2")\
                    .withColumn("EverUsedEcig", x_bool('ECIGARET')).drop("ECIGARET")\
                    .withColumn("EcigLevel", x_ecig('ECIGNOW')).drop("ECIGNOW")\
                    .withColumn("HasCTScan", x_CT('LCSCTSCN')).drop("LCSCTSCN")\
                    .withColumn("HasChronicDisease", x_bool('CHCCOPD2')).drop("CHCCOPD2")\
                    .withColumn("HasAsthma", x_bool('ASTHMA3')).drop("ASTHMA3")\
                    .withColumn("HadStroke", x_bool('CVDSTRK3')).drop("CVDSTRK3")\
                    .withColumn("HadHeartDisease", x_bool('CVDCRHD4')).drop("CVDCRHD4")\
                    .withColumn("HasLungCancer", y_udf('CNCRTYP1')).drop("CNCRTYP1")

In [69]:
#Cleaning missing data encoded as 0
processed_2020 = processed_2020.filter(processed_2020.GeneralHealth!=0)

In [70]:
processed_2020 = processed_2020.filter(processed_2020.CheckupFreq!=0)

In [71]:
processed_2020.groupBy(processed_2020.HasLungCancer).count().show()
# Data is unbalanced :)

+-------------+------+
|HasLungCancer| count|
+-------------+------+
|            0|378221|
|            1|   432|
+-------------+------+



In [29]:
processed_2020.count()

384092

In [30]:
#Imputation
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['FirstSmokedAge', 'LastSmokedAge', 'AvgNumCigADay'],
    outputCols = ['FirstSmokedAge', 'LastSmokedAge', 'AvgNumCigADay']
).setStrategy("mean")

In [31]:
processed_2020 = imputer.fit(processed_2020).transform(processed_2020)

KeyboardInterrupt: ignored

In [None]:
processed_2020.show(5)

In [None]:
column_subset = [col_ for col_ in processed_2020.columns]
for col_ in column_subset:
    temp_col = processed_2020.groupBy(col_).count()
    temp_col = temp_col.dropna(subset=col_)
    frequent_category = temp_col.orderBy(
                        temp_col['count'].desc()).show()

column_subset

In [None]:
processed_2020.write.option("header",True).csv("data2020")

In [45]:
#@title Data Integration 2
data_2017 = spark.read.csv('./2017.csv', inferSchema=True, header=True)

# preview the data
# data type
print('-'*10, 'data types', '-'*10)
pd.DataFrame(data_2017.dtypes)

---------- data types ----------


Unnamed: 0,0,1
0,_c0,int
1,_STATE,int
2,FMONTH,int
3,IDATE,int
4,IMONTH,int
...,...,...
354,_RFSEAT2,int
355,_RFSEAT3,int
356,_FLSHOT6,string
357,_PNEUMO2,string


In [72]:
data_2017F = data_2017.select('SEX', '_AGE65YR', '_BMI5CAT', 'GENHLTH', 'CHECKUP1', #frequency of checkup
                              'EXERANY2', #Exercise in Past 30 Days
                              'SMOKE100', '_SMOKER3',
                              'STOPSMK2', #try to quit smoking
                              'LASTSMK2',#time since quitting
                              #'LCSFIRST', 'LCSLAST', 'LCSNUMCG',
                              'ECIGARET', 'ECIGNOW',
                              'LCSCTSCN',
                              'CHCCOPD1',
                   'ASTHMA3','CVDSTRK3', #ever had a stroke
                              'CVDCRHD4', #Ever Diagnosed with Angina or Coronary Heart Disease
                              'CNCRTYP1')
data_2017F.show(6)

+---+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-------+--------+--------+-------+--------+--------+--------+
|SEX|_AGE65YR|_BMI5CAT|GENHLTH|CHECKUP1|EXERANY2|SMOKE100|_SMOKER3|STOPSMK2|LASTSMK2|ECIGARET|ECIGNOW|LCSCTSCN|CHCCOPD1|ASTHMA3|CVDSTRK3|CVDCRHD4|CNCRTYP1|
+---+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-------+--------+--------+-------+--------+--------+--------+
|  2|       2|       3|      2|       2|       1|       2|       4|      NA|      NA|       2|     NA|      NA|       2|      2|       2|       2|      NA|
|  1|       2|       3|      2|       1|       1|       2|       4|      NA|      NA|       2|     NA|      NA|       2|      2|       2|       2|      NA|
|  1|       2|       3|      3|       1|       2|       2|       4|      NA|      NA|       2|     NA|      NA|       2|      2|       2|       2|      NA|
|  2|       2|       3|      4|       1|      NA|       2|      

In [None]:
#401958
#data_2017F = data_2017F.na.drop(how="any")
data_2017F.count()

In [73]:
from pyspark.sql.functions import col
data_2017F = data_2017F.withColumn("SEX", col('SEX').cast(IntegerType()))\
          .withColumn("_AGE65YR", col('_AGE65YR').cast(IntegerType()))\
          .withColumn("_BMI5CAT", col('_BMI5CAT').cast(IntegerType()))\
          .withColumn("GENHLTH", col('GENHLTH').cast(IntegerType()))\
          .withColumn("CHECKUP1", col('CHECKUP1').cast(IntegerType()))\
          .withColumn("EXERANY2", col('EXERANY2').cast(IntegerType()))\
          .withColumn("SMOKE100", col('SMOKE100').cast(IntegerType()))\
          .withColumn("_SMOKER3", col('_SMOKER3').cast(IntegerType()))\
          .withColumn("STOPSMK2", col('STOPSMK2').cast(IntegerType()))\
          .withColumn("LASTSMK2", col('LASTSMK2').cast(IntegerType()))\
          .withColumn("ECIGARET", col('ECIGARET').cast(IntegerType()))\
          .withColumn("ECIGNOW", col('ECIGNOW').cast(IntegerType()))\
          .withColumn("LCSCTSCN", col('LCSCTSCN').cast(IntegerType()))\
          .withColumn("CHCCOPD1", col('CHCCOPD1').cast(IntegerType()))\
          .withColumn("ASTHMA3", col('ASTHMA3').cast(IntegerType()))\
          .withColumn("CVDSTRK3", col('CVDSTRK3').cast(IntegerType()))\
          .withColumn("CVDCRHD4", col('CVDCRHD4').cast(IntegerType()))\
          .withColumn("CNCRTYP1", col('CNCRTYP1').cast(IntegerType()))

In [74]:
#450016
data_2017F = data_2017F.na.drop(how='any', thresh=None, subset=("_BMI5CAT","_AGE65YR", "GENHLTH", "CHECKUP1", "EXERANY2", "SMOKE100", "_SMOKER3", "ECIGARET", "CHCCOPD1", "ASTHMA3", "CVDSTRK3", "CVDCRHD4"))#"_BMI5CAT",, "ASTHMA3", "CHCCOPD2", "LCSFIRST","LCSLAST","LCSNUMCG","LCSCTSCN", "LASTSMK2", "STOPSMK2", "ECIGNOW"))
data_2017F.count()

391031

In [75]:
print('Columns with null values:')
print('-'*25)
data_2017F.select([eval('data_2017F.' + x + '.isNull().cast("int").alias("' + x + '")') for x in data_2017F.columns]).\
    groupBy().sum().toPandas()

Columns with null values:
-------------------------


Unnamed: 0,sum(SEX),sum(_AGE65YR),sum(_BMI5CAT),sum(GENHLTH),sum(CHECKUP1),sum(EXERANY2),sum(SMOKE100),sum(_SMOKER3),sum(STOPSMK2),sum(LASTSMK2),sum(ECIGARET),sum(ECIGNOW),sum(LCSCTSCN),sum(CHCCOPD1),sum(ASTHMA3),sum(CVDSTRK3),sum(CVDCRHD4),sum(CNCRTYP1)
0,0,0,0,0,0,0,0,0,333110,279537,0,329178,343043,0,0,0,0,383363


In [None]:
data_2017F.count()

In [76]:
#@title User Defined Functions (UDF) for prediction label

from pyspark.sql.functions import udf
from pyspark.sql.types import *

processed_2017 = data_2017F.withColumn("Gender", x_bool('SEX')).drop("SEX")\
                    .withColumn("Age65", x_age('_AGE65YR')).drop("_AGE65YR")\
                    .withColumn("BMI", x_bmi('_BMI5CAT')).drop("_BMI5CAT")\
                    .withColumn("GeneralHealth", x_health('GENHLTH')).drop("GENHLTH")\
                    .withColumn("CheckupFreq", x_checkup('CHECKUP1')).drop("CHECKUP1")\
                    .withColumn("Exercise", x_bool('EXERANY2')).drop("EXERANY2")\
                    .withColumn("Smoked100", x_bool('SMOKE100')).drop("SMOKE100")\
                    .withColumn("SmokerStatus", x_smoke3('_SMOKER3')).drop("_SMOKER3")\
                    .withColumn("StopSmoking", x_bool('STOPSMK2')).drop("STOPSMK2")\
                    .withColumn("TimeSinceQuitSmk", x_lastsmk('LASTSMK2')).drop("LASTSMK2")\
                    .withColumn("EverUsedEcig", x_bool('ECIGARET')).drop("ECIGARET")\
                    .withColumn("EcigLevel", x_ecig('ECIGNOW')).drop("ECIGNOW")\
                    .withColumn("HasCTScan", x_CT('LCSCTSCN')).drop("LCSCTSCN")\
                    .withColumn("HasChronicDisease", x_bool('CHCCOPD1')).drop("CHCCOPD1")\
                    .withColumn("HasAsthma", x_bool('ASTHMA3')).drop("ASTHMA3")\
                    .withColumn("HadStroke", x_bool('CVDSTRK3')).drop("CVDSTRK3")\
                    .withColumn("HadHeartDisease", x_bool('CVDCRHD4')).drop("CVDCRHD4")\
                    .withColumn("HasLungCancer", y_udf('CNCRTYP1')).drop("CNCRTYP1")

In [78]:
#Cleaning missing data encoded as 0
processed_2017 = (processed_2017.filter(processed_2017.GeneralHealth!=0))

In [77]:
processed_2017 = (processed_2017.filter(processed_2017.CheckupFreq!=0))

In [79]:
processed_2017.groupBy(processed_2017.HasLungCancer).count().show()
# Data is unbalanced :)

+-------------+------+
|HasLungCancer| count|
+-------------+------+
|            0|385657|
|            1|   153|
+-------------+------+



In [80]:
processed_2017.show()

+------+-----+---+-------------+-----------+--------+---------+------------+-----------+----------------+------------+---------+---------+-----------------+---------+---------+---------------+-------------+
|Gender|Age65|BMI|GeneralHealth|CheckupFreq|Exercise|Smoked100|SmokerStatus|StopSmoking|TimeSinceQuitSmk|EverUsedEcig|EcigLevel|HasCTScan|HasChronicDisease|HasAsthma|HadStroke|HadHeartDisease|HasLungCancer|
+------+-----+---+-------------+-----------+--------+---------+------------+-----------+----------------+------------+---------+---------+-----------------+---------+---------+---------------+-------------+
|     0|    1|  3|            2|          2|       1|        0|           4|          0|               0|           0|        0|        0|                0|        0|        0|              0|            0|
|     1|    1|  3|            2|          1|       1|        0|           4|          0|               0|           0|        0|        0|                0|        0|      

In [None]:
#Imputation
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['FirstSmokedAge', 'LastSmokedAge', 'AvgNumCigADay'],
    outputCols = ['FirstSmokedAge', 'LastSmokedAge', 'AvgNumCigADay']
).setStrategy("mean")

In [None]:
processed_2017 = imputer.fit(processed_2017).transform(processed_2017)

In [None]:
processed_2017.show(5)

In [None]:
column_subset = [col_ for col_ in processed_2020.columns]
for col_ in column_subset:
    temp_col = processed_2020.groupBy(col_).count()
    temp_col = temp_col.dropna(subset=col_)
    frequent_category = temp_col.orderBy(
                        temp_col['count'].desc()).show()

column_subset

In [55]:
#@title Data Integration 3
data_2018 = spark.read.csv('./2018.csv', inferSchema=True, header=True)

# preview the data
# data type
print('-'*10, 'data types', '-'*10)
pd.DataFrame(data_2018.dtypes)

---------- data types ----------


Unnamed: 0,0,1
0,_c0,int
1,_STATE,int
2,FMONTH,int
3,IDATE,int
4,IMONTH,int
...,...,...
271,_HFOB3YR,string
272,_FS5YR,string
273,_FOBTFS,string
274,_CRCREC,string


In [81]:
data_2018F = data_2018.select('SEX1', '_AGE65YR', '_BMI5CAT', 'GENHLTH', 'CHECKUP1', #frequency of checkup
                              'EXERANY2', #Exercise in Past 30 Days
                              'SMOKE100', '_SMOKER3',
                              'STOPSMK2', #try to quit smoking
                              'LASTSMK2',#time since quitting
                              #'LCSFIRST', 'LCSLAST', 'LCSNUMCG',
                              'ECIGARET', 'ECIGNOW',
                              'LCSCTSCN',
                              'CHCCOPD1',
                   'ASTHMA3','CVDSTRK3', #ever had a stroke
                              'CVDCRHD4', #Ever Diagnosed with Angina or Coronary Heart Disease
                              'CNCRTYP1')
data_2018F.show(6)

+----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-------+--------+--------+-------+--------+--------+--------+
|SEX1|_AGE65YR|_BMI5CAT|GENHLTH|CHECKUP1|EXERANY2|SMOKE100|_SMOKER3|STOPSMK2|LASTSMK2|ECIGARET|ECIGNOW|LCSCTSCN|CHCCOPD1|ASTHMA3|CVDSTRK3|CVDCRHD4|CNCRTYP1|
+----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-------+--------+--------+-------+--------+--------+--------+
|   2|       2|       2|      2|       1|       2|       2|       4|      NA|      NA|      NA|     NA|      NA|       2|      2|       2|       2|      NA|
|   2|       1|       4|      3|       2|       1|       1|       1|       1|      NA|      NA|     NA|      NA|       2|      2|       2|       2|      NA|
|   2|       2|       3|      5|       1|       1|       2|       4|      NA|      NA|      NA|     NA|      NA|       2|      2|       1|       2|      NA|
|   1|       2|       3|      1|       1|       1|       2

In [82]:
from pyspark.sql.functions import col
data_2018F = data_2018F.withColumn("SEX1", col('SEX1').cast(IntegerType()))\
          .withColumn("_AGE65YR", col('_AGE65YR').cast(IntegerType()))\
          .withColumn("_BMI5CAT", col('_BMI5CAT').cast(IntegerType()))\
          .withColumn("GENHLTH", col('GENHLTH').cast(IntegerType()))\
          .withColumn("CHECKUP1", col('CHECKUP1').cast(IntegerType()))\
          .withColumn("EXERANY2", col('EXERANY2').cast(IntegerType()))\
          .withColumn("SMOKE100", col('SMOKE100').cast(IntegerType()))\
          .withColumn("_SMOKER3", col('_SMOKER3').cast(IntegerType()))\
          .withColumn("STOPSMK2", col('STOPSMK2').cast(IntegerType()))\
          .withColumn("LASTSMK2", col('LASTSMK2').cast(IntegerType()))\
          .withColumn("ECIGARET", col('ECIGARET').cast(IntegerType()))\
          .withColumn("ECIGNOW", col('ECIGNOW').cast(IntegerType()))\
          .withColumn("LCSCTSCN", col('LCSCTSCN').cast(IntegerType()))\
          .withColumn("CHCCOPD1", col('CHCCOPD1').cast(IntegerType()))\
          .withColumn("ASTHMA3", col('ASTHMA3').cast(IntegerType()))\
          .withColumn("CVDSTRK3", col('CVDSTRK3').cast(IntegerType()))\
          .withColumn("CVDCRHD4", col('CVDCRHD4').cast(IntegerType()))\
          .withColumn("CNCRTYP1", col('CNCRTYP1').cast(IntegerType()))

In [83]:
#437436
data_2018F = data_2018F.na.drop(how='any', thresh=None, subset=("_BMI5CAT","_AGE65YR", "GENHLTH", "CHECKUP1", "EXERANY2", "SMOKE100", "_SMOKER3", "CHCCOPD1", "ASTHMA3", "CVDSTRK3", "CVDCRHD4"))#"_BMI5CAT",, "ASTHMA3", "CHCCOPD2", "LCSFIRST","LCSLAST","LCSNUMCG","LCSCTSCN", "LASTSMK2", "STOPSMK2", "ECIGNOW"))
data_2018F.count()

395940

In [84]:
print('Columns with null values:')
print('-'*25)
data_2018F.select([eval('data_2018F.' + x + '.isNull().cast("int").alias("' + x + '")') for x in data_2018F.columns]).\
    groupBy().sum().toPandas()

Columns with null values:
-------------------------


Unnamed: 0,sum(SEX1),sum(_AGE65YR),sum(_BMI5CAT),sum(GENHLTH),sum(CHECKUP1),sum(EXERANY2),sum(SMOKE100),sum(_SMOKER3),sum(STOPSMK2),sum(LASTSMK2),sum(ECIGARET),sum(ECIGNOW),sum(LCSCTSCN),sum(CHCCOPD1),sum(ASTHMA3),sum(CVDSTRK3),sum(CVDCRHD4),sum(CNCRTYP1)
0,0,0,0,0,0,0,0,0,337453,283638,144485,349732,356144,0,0,0,0,390052


In [None]:
data_2018F.count()

In [None]:
data_2018F.printSchema()

In [86]:
#@title User Defined Functions (UDF) for prediction label

from pyspark.sql.functions import udf
from pyspark.sql.types import *
y_udf = udf(lambda y: 1 if y == 24 else 0, StringType())

processed_2018 = data_2018F.withColumn("Gender", x_bool('SEX1')).drop("SEX1")\
                    .withColumn("Age65", x_age('_AGE65YR')).drop("_AGE65YR")\
                    .withColumn("BMI", x_bmi('_BMI5CAT')).drop("_BMI5CAT")\
                    .withColumn("GeneralHealth", x_health('GENHLTH')).drop("GENHLTH")\
                    .withColumn("CheckupFreq", x_checkup('CHECKUP1')).drop("CHECKUP1")\
                    .withColumn("Exercise", x_bool('EXERANY2')).drop("EXERANY2")\
                    .withColumn("Smoked100", x_bool('SMOKE100')).drop("SMOKE100")\
                    .withColumn("SmokerStatus", x_smoke3('_SMOKER3')).drop("_SMOKER3")\
                    .withColumn("StopSmoking", x_bool('STOPSMK2')).drop("STOPSMK2")\
                    .withColumn("TimeSinceQuitSmk", x_lastsmk('LASTSMK2')).drop("LASTSMK2")\
                    .withColumn("EverUsedEcig", x_bool('ECIGARET')).drop("ECIGARET")\
                    .withColumn("EcigLevel", x_ecig('ECIGNOW')).drop("ECIGNOW")\
                    .withColumn("HasCTScan", x_CT('LCSCTSCN')).drop("LCSCTSCN")\
                    .withColumn("HasChronicDisease", x_bool('CHCCOPD1')).drop("CHCCOPD1")\
                    .withColumn("HasAsthma", x_bool('ASTHMA3')).drop("ASTHMA3")\
                    .withColumn("HadStroke", x_bool('CVDSTRK3')).drop("CVDSTRK3")\
                    .withColumn("HadHeartDisease", x_bool('CVDCRHD4')).drop("CVDCRHD4")\
                    .withColumn("HasLungCancer", y_udf('CNCRTYP1')).drop("CNCRTYP1")
                    

In [62]:
#Checking if cleaning missing data, how many lung cancer cases gone
(processed_2018.filter(processed_2018.CheckupFreq==0)).filter(processed_2018.HasLungCancer==1).count()

1

In [87]:
#Cleaning missing data encoded as 0
processed_2018 = (processed_2018.filter(processed_2018.GeneralHealth!=0))

In [88]:
processed_2018 = (processed_2018.filter(processed_2018.CheckupFreq!=0))

In [89]:
processed_2018.groupBy(processed_2018.HasLungCancer).count().show()
# Data is unbalanced :)

+-------------+------+
|HasLungCancer| count|
+-------------+------+
|            0|391156|
|            1|   143|
+-------------+------+



In [91]:
processed_2018.show()

+------+-----+---+-------------+-----------+--------+---------+------------+-----------+----------------+------------+---------+---------+-----------------+---------+---------+---------------+-------------+
|Gender|Age65|BMI|GeneralHealth|CheckupFreq|Exercise|Smoked100|SmokerStatus|StopSmoking|TimeSinceQuitSmk|EverUsedEcig|EcigLevel|HasCTScan|HasChronicDisease|HasAsthma|HadStroke|HadHeartDisease|HasLungCancer|
+------+-----+---+-------------+-----------+--------+---------+------------+-----------+----------------+------------+---------+---------+-----------------+---------+---------+---------------+-------------+
|     0|    1|  2|            2|          1|       0|        0|           4|          0|               0|           0|        0|        0|                0|        0|        0|              0|            0|
|     0|    0|  4|            3|          2|       1|        1|           1|          1|               0|           0|        0|        0|                0|        0|      

In [92]:
processed_2018.write.option("header",True).csv("df_test")

KeyboardInterrupt: ignored

In [None]:
#Imputation
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['FirstSmokedAge', 'LastSmokedAge', 'AvgNumCigADay'],
    outputCols = ['FirstSmokedAge', 'LastSmokedAge', 'AvgNumCigADay']
).setStrategy("mean")

In [None]:
processed_2018 = imputer.fit(processed_2018).transform(processed_2018)

In [None]:
processed_2018.show(5)

In [None]:
column_subset = [col_ for col_ in processed_2020.columns]
for col_ in column_subset:
    temp_col = processed_2020.groupBy(col_).count()
    temp_col = temp_col.dropna(subset=col_)
    frequent_category = temp_col.orderBy(
                        temp_col['count'].desc()).show()

column_subset

In [None]:
#437436
data_2018F = data_2018F.na.drop(how="any")
data_2018F.count()

In [None]:
data_2018F.show(6)

In [None]:
data_2018F.groupBy(data_2018F.CNCRTYP1==24).count().show()
# Data is unbalanced :)

In [None]:
test_data = data_2018F.filter(data_2018F.CNCRTYP1 == 24)
test_data.show()

In [None]:
df_train.groupBy(df_train.HasLungCancer).count().show()


In [None]:
df_train.write.option("header",True).csv("df_train")

In [None]:
#@title Data Integration 4
data_2019 = spark.read.csv('./2019.csv', inferSchema=True, header=True)

# preview the data
# data type
print('-'*10, 'data types', '-'*10)
pd.DataFrame(data_2019.dtypes)

In [None]:
data_2019F = data_2019.select('SEXVAR', '_AGE65YR', '_BMI5CAT', 'GENHLTH', 'SMOKE100', '_SMOKER3',
                  'LCSFIRST', 'LCSLAST', 'LCSNUMCG', 'LCSCTSCN', 'CNCRTYP1',
                  'STOPSMK2', 'ASTHMA3', 'CHCCOPD2') #'ECIGARET',  'ECIGNOW'
data_2019F.show(6)

In [None]:
print('Columns with null values:')
print('-'*25)
data_2019F.select([eval('data_2019F.' + x + '.isNull().cast("int").alias("' + x + '")') for x in data_2019F.columns]).\
    groupBy().sum().toPandas()

In [None]:
#data_2019F = data_2019F.na.drop(how="any")
data_2019F.count()

In [None]:
data_2019F.show(6)

In [None]:
data_2019F.filter(data_2019F.CNCRTYP1 != 'NA').count()

In [None]:
data_2019F.printSchema()

In [None]:
#@title User Defined Functions (UDF) for prediction label

from pyspark.sql.functions import udf
from pyspark.sql.types import *
y_udf = udf(lambda y: '1' if y == '24' else '0', StringType())
x_udf = udf(lambda x: '0' if (x=='NA' or x=='7' or x=='77' or x=='777' or x=='9' or x=='99' or x=='888' or x=='999') else x, StringType())

processed_2019 = data_2019F.withColumn("Gender", x_udf('SEXVAR')).drop("SEXVAR")\
                    .withColumn("Age65", x_udf('_AGE65YR')).drop("_AGE65YR")\
                    .withColumn("GeneralHealth", x_udf('GENHLTH')).drop("GENHLTH")\
                    .withColumn("Smoked100", x_udf('SMOKE100')).drop("SMOKE100")\
                    .withColumn("SmokerStatus", x_udf('_SMOKER3')).drop("_SMOKER3")\
                    .withColumn("FirstSmokedAge", x_udf('LCSFIRST')).drop("LCSFIRST")\
                    .withColumn("LastSmokedAge", x_udf('LCSLAST')).drop("LCSLAST")\
                    .withColumn("AvgNumCigADay", x_udf('LCSNUMCG')).drop("LCSNUMCG")\
                    .withColumn("HasCTScan", x_udf('LCSCTSCN')).drop("LCSCTSCN")\
                    .withColumn("StopSmoking", x_udf('STOPSMK2')).drop("STOPSMK2")\
                    .withColumn("HasAsthma", x_udf('ASTHMA3')).drop("ASTHMA3")\
                    .withColumn("HasChronicDisease", x_udf('CHCCOPD2')).drop("CHCCOPD2")\
                    .withColumn("HasLungCancer", y_udf('CNCRTYP1')).drop("CNCRTYP1")
                    

In [None]:
processed_2019.printSchema()

In [None]:
processed_2019.groupBy(processed_2019.HasLungCancer).count().show()
# Data is unbalanced :)

In [93]:
#@title Data Preprocessing
data = processed_2020.unionByName(processed_2017)

# preview the data
# data type
print('-'*10, 'data types', '-'*10)
pd.DataFrame(data.dtypes)

---------- data types ----------


Unnamed: 0,0,1
0,Gender,string
1,Age65,string
2,BMI,string
3,GeneralHealth,string
4,CheckupFreq,string
...,...,...
13,HasChronicDisease,string
14,HasAsthma,string
15,HadStroke,string
16,HadHeartDisease,string


In [99]:
data.write.option("header",True).csv("df_train")

In [94]:
data_raw = data.unionByName(processed_2018)

# preview the data
# data type
print('-'*10, 'data types', '-'*10)
pd.DataFrame(data_raw.dtypes)

---------- data types ----------


Unnamed: 0,0,1
0,Gender,string
1,Age65,string
2,BMI,string
3,GeneralHealth,string
4,CheckupFreq,string
...,...,...
13,HasChronicDisease,string
14,HasAsthma,string
15,HadStroke,string
16,HadHeartDisease,string


In [101]:
processed_2018.write.option("header",True).csv("df_test")

In [95]:
# Define schema explitcitly
from pyspark.sql.types import *
data_raw.columns

['Gender',
 'Age65',
 'BMI',
 'GeneralHealth',
 'CheckupFreq',
 'Exercise',
 'Smoked100',
 'SmokerStatus',
 'StopSmoking',
 'TimeSinceQuitSmk',
 'EverUsedEcig',
 'EcigLevel',
 'HasCTScan',
 'HasChronicDisease',
 'HasAsthma',
 'HadStroke',
 'HadHeartDisease',
 'HasLungCancer']

In [96]:
data_raw.groupBy(data_raw.HasLungCancer).count().show()
# Data is unbalanced :)

+-------------+-------+
|HasLungCancer|  count|
+-------------+-------+
|            0|1155034|
|            1|    728|
+-------------+-------+



In [None]:
# data summary
print('-'*10, 'data summary', '-'*10)
data_raw.describe().toPandas()

In [97]:
data_raw_copy = data_raw.select('*')

In [None]:
data_raw_copy = data_raw_copy.where(data_raw.Age65 != '0')
data_raw_copy = data_raw_copy.where(data_raw.Gender != '0')

In [None]:
from pyspark.sql.functions import *


                    #.withColumn("LastSmokedAge", regexp_replace('LastSmokedAge', '0', '30'))\
                    #.withColumn("AvgNumCigADay", regexp_replace('Smoked100', '0', '2'))\
data_raw_copy = data_raw_copy.withColumn("BMI", regexp_replace('BMI', '0', '2'))\
                    .withColumn("GeneralHealth", regexp_replace('GeneralHealth', '0', '2'))\
                    .withColumn("Smoked100", regexp_replace('Smoked100', '0', '2'))\
                    .withColumn("SmokerStatus", regexp_replace('SmokerStatus', '0', '4'))\
                    .withColumn("FirstSmokedAge", regexp_replace('FirstSmokedAge', '0', '18'))\
                    .withColumn("HasCTScan", regexp_replace('HasCTScan', '0', '2'))\
                    .withColumn("StopSmoking", regexp_replace('StopSmoking', '0', '2'))\
                    .withColumn("HasAsthma", regexp_replace('HasAsthma', '0', '2'))\
                    .withColumn("HasChronicDisease", regexp_replace('HasChronicDisease', '0', '2'))

In [98]:
column_subset = [col_ for col_ in data_raw_copy.columns if data_raw_copy.select(col_).dtypes!="string"]
for col_ in column_subset:
    temp_col = data_raw_copy.groupBy(col_).count()
    temp_col = temp_col.dropna(subset=col_)
    frequent_category = temp_col.orderBy(
                        temp_col['count'].desc()).show()

column_subset

+------+------+
|Gender| count|
+------+------+
|     0|623500|
|     1|532262|
+------+------+

+-----+------+
|Age65| count|
+-----+------+
|    0|748097|
|    1|407665|
+-----+------+

+---+------+
|BMI| count|
+---+------+
|  3|404614|
|  2|374028|
|  4|358566|
|  1| 18554|
+---+------+

+-------------+------+
|GeneralHealth| count|
+-------------+------+
|            2|387816|
|            3|355945|
|            1|205349|
|            4|150728|
|            5| 55924|
+-------------+------+

+-----------+------+
|CheckupFreq| count|
+-----------+------+
|          1|899943|
|          2|124722|
|          3| 64696|
|          4| 59355|
|          8|  7046|
+-----------+------+

+--------+------+
|Exercise| count|
+--------+------+
|       1|864567|
|       0|291195|
+--------+------+

+---------+------+
|Smoked100| count|
+---------+------+
|        0|664458|
|        1|491304|
+---------+------+

+------------+------+
|SmokerStatus| count|
+------------+------+
|           4|66542

['Gender',
 'Age65',
 'BMI',
 'GeneralHealth',
 'CheckupFreq',
 'Exercise',
 'Smoked100',
 'SmokerStatus',
 'StopSmoking',
 'TimeSinceQuitSmk',
 'EverUsedEcig',
 'EcigLevel',
 'HasCTScan',
 'HasChronicDisease',
 'HasAsthma',
 'HadStroke',
 'HadHeartDisease',
 'HasLungCancer']

In [None]:
column_subset = [col_ for col_ in data_raw.columns if data_raw.select(col_).dtypes!="string"]
for col_ in column_subset:
    temp_col = data_raw.groupBy(col_).count()
    temp_col = temp_col.dropna(subset=col_)
    frequent_category = temp_col.orderBy(
                        temp_col['count'].desc()).show()

column_subset

In [None]:
column_subset = [col_ for col_ in data_raw.columns if data_raw.select(col_).dtypes!="string"]
for col_ in column_subset:
    temp_col = data_raw.groupBy(col_).count()
    temp_col = temp_col.dropna(subset=col_)
    frequent_category=temp_col.orderBy(
                     temp_col['count'].desc()).collect()[0][0]
    data_raw = data_raw.replace(frequent_category, subset=col_)
data_raw.show()

In [None]:
data_raw.write.option("header",True).csv("final_data")

# Step 4: Decision Tree Classification with PySpark

In [None]:
#@title Process categorical columns
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer, BucketedRandomProjectionLSH,VectorSlicer
from pyspark.sql.window import Window
from pyspark.ml.linalg import Vectors,VectorUDT
from pyspark.sql.functions import array, create_map, struct
from pyspark.ml import Pipeline

# categorical columns
categorical_columns = data_raw.columns[0:12]

In [None]:
#@title Build StringIndexer stages
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c) for c in categorical_columns]
# encode label column and add it to stringindexer_stages
stringindexer_stages += [StringIndexer(inputCol='HasLungCancer', outputCol='label')]

In [None]:
#@title Build OneHotEncoder stages
onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in categorical_columns]

In [None]:
#@title Build VectorAssembler stage
feature_columns = ['onehot_' + c for c in categorical_columns]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features') 

In [None]:
#@title Build Pipeline model
# all stages
all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]
pipeline = Pipeline(stages=all_stages)

In [None]:
#@title Fit pipeline model
pipeline_model = pipeline.fit(data_raw)

In [None]:
#@title Transform data
final_columns = feature_columns + ['features', 'label']
df_raw = pipeline_model.transform(data_raw).\
            select(final_columns)
            
df_raw.show(5)

In [None]:
#@title Split data into traning and test sets
training, test = df_raw.randomSplit([0.8, 0.2], seed=1234)

In [None]:
##@title Data Imputing
#    inputCols = ['Age of Employee', 'Experience (in years)', 'Salary (per month - $)'],
#    outputCols = ["{}_imputed".format(a) for a in ['Age of Employee', 'Experience (in years)', 'Salary (per month - $)']]
#).setStrategy("mean")

In [None]:
training.printSchema() #'onehot_SEXVAR', 'onehot__AGE65YR', 'onehot__BMI5CAT', 'onehot_GENHLTH',
                       #     'onehot_SMOKE100', 'onehot__SMOKER3', 'onehot_LCSFIRST', 'onehot_LCSLAST',
                       #     'onehot_LCSNUMCG', 'onehot_LCSCTSCN', 'onehot_STOPSMK2', 'onehot_ASTHMA3', 

In [None]:
#@title Data balancing using SMOTE
#K-nearest neighbor algorithm to simulate the minority sample
from imblearn.over_sampling import SMOTE

features = training.select(['features']).toPandas()

labels = training.select('label').toPandas()

In [None]:
sm = SMOTE(sampling_strategy = 'not majority', k_neighbors = 50, random_state = 42)

features, labels = sm.fit_resample(features, labels)

In [None]:
features['label'] = labels.values
features = spark.createDataFrame(features)

In [None]:
#@title Build cross validation 
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')

In [None]:
#@title Parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(dt.maxDepth, [2,3,4,5]).\
    build()

In [None]:
#@title Evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")

In [None]:
#@title Cross-validation model
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)

In [None]:
#@title Fit cross validation model
cv_model = cv.fit(df_raw)

In [None]:
show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']

In [None]:
#@title Prediction on training data
pred_training_cv = cv_model.transform(training)
pred_training_cv.select(show_columns).show(5, truncate=False)

In [None]:
#@title Prediction on test data
pred_test_cv = cv_model.transform(test)
pred_test_cv.select(show_columns).show(5, truncate=False)

In [None]:
#@title Confusion matrix
label_and_pred = cv_model.transform(df_raw).select('label', 'prediction')
label_and_pred.rdd.zipWithIndex().countByKey()

In [None]:
print('The best MaxDepth is:', cv_model.bestModel._java_obj.getMaxDepth())