<a href="https://colab.research.google.com/github/nhuyen183/DS-DV-Project/blob/master/LungCancerDSS_preprocessing288.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#@title Installing Spark and dependencies
#Java 8
#Apache Spark with hadoop and
#Findspark (used to locate the spark in the system)
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

#Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Waiting for headers] [W                                                                               Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bion

# Step 1: Define the problem
What sorts of people were likely to have lung cancer?

# Step 2: Gather the data
The datasets can be found here:
* https://www.kaggle.com/datasets/aemreusta/brfss-2020-survey-data
* https://www.kaggle.com/datasets/sakinak/behavioral-risk-factor-surveillance-survey-201619

In [2]:
#@title Create Spark entry points
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [3]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)

In [4]:
#@title Import Spark Mlib libraries
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import OneVsRest

# Step 3: Prepare data for consumption

In [5]:
#@title Mount content to drive for kaggle data download
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
!pip install kaggle

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [7]:
! mkdir ~/.kaggle

In [8]:
!cp /content/drive/MyDrive/kaggle.json ~/.kaggle/kaggle.json

In [9]:
! chmod 600 ~/.kaggle/kaggle.json

In [10]:
! kaggle datasets download aemreusta/brfss-2020-survey-data

Downloading brfss-2020-survey-data.zip to /content
 77% 37.0M/48.3M [00:00<00:00, 146MB/s] 
100% 48.3M/48.3M [00:00<00:00, 114MB/s]


In [11]:
! kaggle datasets download sakinak/behavioral-risk-factor-surveillance-survey-201619

Downloading behavioral-risk-factor-surveillance-survey-201619.zip to /content
 94% 221M/234M [00:02<00:00, 150MB/s]
100% 234M/234M [00:02<00:00, 106MB/s]


In [12]:
!ls

behavioral-risk-factor-surveillance-survey-201619.zip
brfss-2020-survey-data.zip
drive
sample_data
spark-3.1.1-bin-hadoop3.2
spark-3.1.1-bin-hadoop3.2.tgz


In [13]:
!unzip brfss-2020-survey-data.zip

Archive:  brfss-2020-survey-data.zip
  inflating: brfss2020.csv           


In [14]:
!unzip behavioral-risk-factor-surveillance-survey-201619.zip

Archive:  behavioral-risk-factor-surveillance-survey-201619.zip
  inflating: 2016.csv                
  inflating: 2017.csv                
  inflating: 2018.csv                
  inflating: 2019.csv                


In [None]:
from subprocess import check_output
print('-'*10, 'Files', '-'*10)
print(check_output(['ls', './']).decode('utf8'))

---------- Files ----------
2016.csv
2017.csv
2018.csv
2019.csv
behavioral-risk-factor-surveillance-survey-201619.zip
brfss2020.csv
brfss-2020-survey-data.zip
drive
sample_data
spark-3.1.1-bin-hadoop3.2
spark-3.1.1-bin-hadoop3.2.tgz



## About the BRFSS dataset and Prediction task

The Behavioral Risk Factor Surveillance System (BRFSS) is a collaborative project between all of the states in the United States and participating US territories and the Centers for Disease Control and Prevention (CDC).

BRFSS’s objective is to collect uniform state-specific data on health risk behaviors, chronic diseases and conditions, access to health care, and use of preventive health services related to the leading causes of death and disability in the United States. BRFSS conducts both landline and mobile phone-based surveys with individuals over the age of 18. General factors assessed by the BRFSS in 2020 included health status and healthy days, exercise, insufficient sleep, chronic health conditions, oral health, tobacco use, cancer screenings, and access to healthcare.

The aim of this project is to build a model with relatively high accuracy and AUC that could serve as an decision aid for those at high risk of developing lung cancer.

The data contains information about 401958 unique survey participant. As a result of my research to select the ones related to coronary artery disease among a total of 279 different features. Each example in the dataset contains the following demographic data for a set of individuals

### Categorical Features
*   `_AGE65YR`: The age of the individual in years two-level categories `18 <= AGE <= 64`: `1` and `65 <= AGE <= 99`:`2`
*   `SEXVAR`: Sex of Respondent `Male: 1` and `Female: 2`
*   `_BMI5CAT`:  Four-categories of Body Mass Index (BMI)`_BMI5 < 1850: Underweight` ; `1850 <= _BMI5 < 2500: Normal`;`2500 <= _BMI5 < 3000: Overweight`;`3000 <= _BMI5 < 9999: Obese`
*   `GENHLTH`: Health status: Would you say that in general your health is: `1: Excellent`; `2: Very good` ; `3: Good` ; `4: Fair` ; `5: Poor`
*   `SMOKE100`: Have you smoked at least 100 cigarettes in your entire life? [Note: 5 packs = 100 cigarettes] `1: Yes` ; `2: No`
*   `_SMOKER3`: Four-level smoker status: Everyday smoker: `1`, Someday smoker: `2`, Former smoker: `3`, Non-smoker: `4`

### Lung Cancer (Features) Screening Section
*   `LCSFIRST`: How old were you when you first started to smoke cigarettes regularly. `Value 1-100 in years`
*   `LCSLAST`: How old were you when you last smoked cigarettes regularly? `Value 1-100 in years`
*   `LCSNUMCG`: On average, when you smoke/smoked regularly, about how many cigarettes do/did you usually smoke each 
day? `Value 1-300 in number of cigarettes`
*   `LCSCTSCN`: In the last 12 months, did you have a CT or CAT scan? Example include: `Yes, to check for lung cancer`, `No (did not have a CT scan`, `Had a CT scan, but for other reason`.
*   `CNCRTYP1`:  What type of cancer was it? (If Response = 2 (Two) or 3 (Three or more), ask: “With your most recent 
diagnoses of cancer, what type of cancer was it?”). Examples include: `Lung cancer: 24`, `Others: 1-30`
*   `STOPSMK2`:  During the past 12 months, have you stopped smoking for one day or longer because you were trying to quit smoking? `Yes: 1` or `No: 2`.
*   `ECIGARET`: Have you ever used an e-cigarette or other electronic vaping product, even just one time, in your entire life? `Yes: 1` or `No: 2`.
*   `ECIGNOW`: Do you now use e-cigarettes or other electronic vaping products every day, some days, or not at all? `Every day: 1`; `Some days: 2` or `Not at all: 3`
* `ASTHMA3`: (Ever told) (you had) asthma? `Yes: 1` or `No: 2`.
### Prediction Task
The prediction task is to **early predict whether a person have the high risk of lung cancer.**

### Label
*   `CNCRTYP1`: What type of cancer (lung cancer = 24)





In [15]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from matplotlib import pyplot as plt
from matplotlib import rcParams
from sklearn.model_selection import train_test_split
import seaborn as sns

# The following lines adjust the granularity of reporting. 
pd.options.display.max_rows = 10
pd.options.display.float_format = "{:.1f}".format

from google.colab import widgets
# For facets
from IPython.core.display import display, HTML
import base64
!pip install facets-overview==1.0.0
from facets_overview.feature_statistics_generator import FeatureStatisticsGenerator

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting facets-overview==1.0.0
  Downloading facets_overview-1.0.0-py2.py3-none-any.whl (24 kB)
Installing collected packages: facets-overview
Successfully installed facets-overview-1.0.0


In [None]:
# load packages
import sys
print('Python version: {}'. format(sys.version))

import pandas as pd
print('Python version: {}'. format(pd.__version__))

import matplotlib
print('matplotlib version: {}'. format(matplotlib.__version__))

import numpy as np
print('numpy version: {}'. format(np.__version__))

import scipy as sp
print('scipy version: {}'. format(sp.__version__))

import IPython
from IPython import display # pretty printing of dataframe in Jupyter notebook
print('IPython version: {}'. format(IPython.__version__))

import pyspark
print('Apache Spark Pyspark version: {}'. format(pyspark.__version__)) # pyspark version

# misc libraries
import random
import time

# ignore warnings
import warnings
warnings.filterwarnings('ignore')
print('-'*25)

Python version: 3.7.15 (default, Oct 12 2022, 19:14:55) 
[GCC 7.5.0]
Python version: 1.3.5
matplotlib version: 3.2.2
numpy version: 1.21.6
scipy version: 1.7.3
IPython version: 7.9.0
Apache Spark Pyspark version: 3.1.1
-------------------------


In [16]:
#@title Data Integration
data_2020 = spark.read.csv('./brfss2020.csv', inferSchema=True, header=True)

# preview the data
# data type
print('-'*10, 'data types', '-'*10)
pd.DataFrame(data_2020.dtypes)

---------- data types ----------


Unnamed: 0,0,1
0,_STATE,double
1,FMONTH,double
2,IDATE,int
3,IMONTH,int
4,IDAY,int
...,...,...
274,_STOLDNA,double
275,_VIRCOLN,double
276,_SBONTIM,double
277,_CRCREC1,double


In [17]:
data_2020F = data_2020.select('SEXVAR', '_AGE65YR', '_BMI5CAT', 'GENHLTH', 'SMOKE100', '_SMOKER3',
                  'LCSFIRST', 'LCSLAST', 'LCSNUMCG', 'LCSCTSCN', 'CNCRTYP1',
                  'STOPSMK2', 'ASTHMA3') #'ECIGARET',  'ECIGNOW'
data_2020F.show(6)

+------+--------+--------+-------+--------+--------+--------+-------+--------+--------+--------+--------+-------+
|SEXVAR|_AGE65YR|_BMI5CAT|GENHLTH|SMOKE100|_SMOKER3|LCSFIRST|LCSLAST|LCSNUMCG|LCSCTSCN|CNCRTYP1|STOPSMK2|ASTHMA3|
+------+--------+--------+-------+--------+--------+--------+-------+--------+--------+--------+--------+-------+
|   2.0|     1.0|     1.0|    2.0|     1.0|     1.0|    null|   null|    null|    null|    null|     2.0|    1.0|
|   2.0|     2.0|     3.0|    3.0|    null|     9.0|    null|   null|    null|    null|    null|    null|    1.0|
|   2.0|     2.0|    null|    3.0|     2.0|     4.0|    null|   null|    null|    null|    null|    null|    2.0|
|   2.0|     2.0|    null|    1.0|     2.0|     4.0|    null|   null|    null|    null|    null|    null|    2.0|
|   2.0|     2.0|     2.0|    2.0|     2.0|     4.0|    null|   null|    null|    null|    null|    null|    2.0|
|   1.0|     2.0|     3.0|    4.0|     1.0|     3.0|    null|   null|    null|    null| 

In [18]:
data_201719 = spark.read.csv('./*.csv', inferSchema=True, header=True)

# preview the data
# data type
print('-'*10, 'data types', '-'*10)
pd.DataFrame(data_201719.dtypes)

---------- data types ----------


Unnamed: 0,0,1
0,_c0,string
1,_STATE,string
2,FMONTH,string
3,IDATE,string
4,IMONTH,string
...,...,...
354,_RFSEAT2,int
355,_RFSEAT3,int
356,_FLSHOT6,string
357,_PNEUMO2,string


In [19]:
data_201719 = data_201719.withColumnRenamed('SEX', 'SEXVAR')

In [20]:
data_201719F = data_201719.select('SEXVAR', '_AGE65YR', '_BMI5CAT', 'GENHLTH', 'SMOKE100', '_SMOKER3',
                  'LCSFIRST', 'LCSLAST', 'LCSNUMCG', 'LCSCTSCN', 'CNCRTYP1',
                  'STOPSMK2', 'ASTHMA3')
data_201719F.show(6)

+------+--------+--------+-------+--------+--------+--------+-------+--------+--------+--------+--------+-------+
|SEXVAR|_AGE65YR|_BMI5CAT|GENHLTH|SMOKE100|_SMOKER3|LCSFIRST|LCSLAST|LCSNUMCG|LCSCTSCN|CNCRTYP1|STOPSMK2|ASTHMA3|
+------+--------+--------+-------+--------+--------+--------+-------+--------+--------+--------+--------+-------+
|     2|       2|       3|      2|       2|       4|      NA|     NA|      NA|      NA|      NA|      NA|      2|
|     1|       2|       3|      2|       2|       4|      NA|     NA|      NA|      NA|      NA|      NA|      2|
|     1|       2|       3|      3|       2|       4|      NA|     NA|      NA|      NA|      NA|      NA|      2|
|     2|       2|       3|      4|       2|       4|      NA|     NA|      NA|      NA|      NA|      NA|      2|
|     2|       2|       2|      4|       1|       3|      NA|     NA|      NA|      NA|      NA|      NA|      1|
|     1|       2|       3|      3|       1|       1|      NA|     NA|      NA|      NA| 

In [None]:
data_201719F.dtypes

[('SEXVAR', 'string'),
 ('_AGE65YR', 'string'),
 ('_BMI5CAT', 'string'),
 ('GENHLTH', 'string'),
 ('SMOKE100', 'string'),
 ('_SMOKER3', 'string'),
 ('LCSFIRST', 'string'),
 ('LCSLAST', 'string'),
 ('LCSNUMCG', 'string'),
 ('LCSCTSCN', 'string'),
 ('CNCRTYP1', 'string'),
 ('STOPSMK2', 'string'),
 ('ASTHMA3', 'string')]

In [21]:
#@title Data Preprocessing
data_raw = data_201719F.unionByName(data_2020F)

# preview the data
# data type
print('-'*10, 'data types', '-'*10)
pd.DataFrame(data_raw.dtypes)

---------- data types ----------


Unnamed: 0,0,1
0,SEXVAR,string
1,_AGE65YR,string
2,_BMI5CAT,string
3,GENHLTH,string
4,SMOKE100,string
...,...,...
8,LCSNUMCG,string
9,LCSCTSCN,string
10,CNCRTYP1,string
11,STOPSMK2,string


In [22]:
# Define schema explitcitly
from pyspark.sql.types import *
data_raw.columns

['SEXVAR',
 '_AGE65YR',
 '_BMI5CAT',
 'GENHLTH',
 'SMOKE100',
 '_SMOKER3',
 'LCSFIRST',
 'LCSLAST',
 'LCSNUMCG',
 'LCSCTSCN',
 'CNCRTYP1',
 'STOPSMK2',
 'ASTHMA3']

In [None]:
# data summary
print('-'*10, 'data summary', '-'*10)
data_raw.describe().toPandas()

---------- data summary ----------


Unnamed: 0,summary,SEXVAR,_AGE65YR,_BMI5CAT,GENHLTH,SMOKE100,_SMOKER3,LCSFIRST,LCSLAST,LCSNUMCG,LCSCTSCN,CNCRTYP1,STOPSMK2,ASTHMA3
0,count,2109624.0,1270241.0,1228884.0,1984870.0,1793543.0,1270241.0,1721720.0,1721302.0,1595944.0,1393340.0,1458472.0,1739174.0,2109630.0
1,mean,2.394912388995665,2.108056660114104,2.519164937715839,2.292883933355446,2.3590680056629214,3.021296141198459,74.31277320580718,4.766056365371695,8.857359703971053,5.540070272109681,5.304107725788901,183.6050363361328,1.8717644000801856
2,stddev,1.6098799338491885,1.416020021850503,1.3927996139464058,1.1576795535374298,4.873513040928731,1.6794138102230625,150.9758019887514,37.54385771800232,37.04485916907272,18.08974775408709,9.528022264880647,325.8416456791889,0.5203057765660856
3,min,1.0,1.0,1.0,1.0,1.0,0.0,0.142255930631018,1.0,1.0,1.0,1.0,1.0,1.0
4,max,,9.0,,,,,,,,,,,


In [None]:
# we first check which values are NULL values for each column
# then we convert the boolean values to int (0 and 1), then we can count how many 1's exist in each column.
print('-'*25)
print('0: is not NULL')
print('1: is NULL')
print('-'*25)
print(' '*25)
# we build column strings and then use eval() to convert strings to column expressions.
data_raw.select([eval('data_raw.' + x + '.isNull().cast("int").alias("' + x + '")') for x in data_raw.columns]).show(n=10)

-------------------------
0: is not NULL
1: is NULL
-------------------------
                         
+------+--------+--------+-------+--------+--------+--------+-------+--------+--------+--------+--------+-------+
|SEXVAR|_AGE65YR|_BMI5CAT|GENHLTH|SMOKE100|_SMOKER3|LCSFIRST|LCSLAST|LCSNUMCG|LCSCTSCN|CNCRTYP1|STOPSMK2|ASTHMA3|
+------+--------+--------+-------+--------+--------+--------+-------+--------+--------+--------+--------+-------+
|     0|       0|       0|      0|       0|       0|       0|      0|       0|       0|       0|       0|      0|
|     0|       0|       0|      0|       0|       0|       0|      0|       0|       0|       0|       0|      0|
|     0|       0|       0|      0|       0|       0|       0|      0|       0|       0|       0|       0|      0|
|     0|       0|       0|      0|       0|       0|       0|      0|       0|       0|       0|       0|      0|
|     0|       0|       0|      0|       0|       0|       0|      0|       0|       0|       0|  

In [None]:
print('Columns with null values:')
print('-'*25)
data_raw.select([eval('data_raw.' + x + '.isNull().cast("int").alias("' + x + '")') for x in data_raw.columns]).\
    groupBy().sum().toPandas()

Columns with null values:
-------------------------


Unnamed: 0,sum(SEXVAR),sum(_AGE65YR),sum(_BMI5CAT),sum(GENHLTH),sum(SMOKE100),sum(_SMOKER3),sum(LCSFIRST),sum(LCSLAST),sum(LCSNUMCG),sum(LCSCTSCN),sum(CNCRTYP1),sum(STOPSMK2),sum(ASTHMA3)
0,12,839395,880752,124766,316093,839395,387916,388334,513692,716296,651164,370462,6


In [23]:
print(data_raw.count(),data_raw.na.drop(how="any").count())

2109636 868519


In [24]:
data_raw = data_raw.na.drop(how="any")

In [25]:
data_raw.filter(data_raw.CNCRTYP1 != 'NA').count()

27515

In [None]:
data_raw.printSchema()

root
 |-- SEXVAR: string (nullable = true)
 |-- _AGE65YR: string (nullable = true)
 |-- _BMI5CAT: string (nullable = true)
 |-- GENHLTH: string (nullable = true)
 |-- SMOKE100: string (nullable = true)
 |-- _SMOKER3: string (nullable = true)
 |-- LCSFIRST: string (nullable = true)
 |-- LCSLAST: string (nullable = true)
 |-- LCSNUMCG: string (nullable = true)
 |-- LCSCTSCN: string (nullable = true)
 |-- CNCRTYP1: string (nullable = true)
 |-- STOPSMK2: string (nullable = true)
 |-- ASTHMA3: string (nullable = true)



In [26]:
#@title User Defined Functions (UDF) for prediction label

from pyspark.sql.functions import udf
y_udf = udf(lambda y: "Yes" if y == '24' else "No", StringType())

data_raw = data_raw.withColumn("HasLungCancer", y_udf('CNCRTYP1')).drop("CNCRTYP1")

In [None]:
data_raw.printSchema()

root
 |-- SEXVAR: string (nullable = true)
 |-- _AGE65YR: string (nullable = true)
 |-- _BMI5CAT: string (nullable = true)
 |-- GENHLTH: string (nullable = true)
 |-- SMOKE100: string (nullable = true)
 |-- _SMOKER3: string (nullable = true)
 |-- LCSFIRST: string (nullable = true)
 |-- LCSLAST: string (nullable = true)
 |-- LCSNUMCG: string (nullable = true)
 |-- LCSCTSCN: string (nullable = true)
 |-- STOPSMK2: string (nullable = true)
 |-- ASTHMA3: string (nullable = true)
 |-- HasLungCancer: string (nullable = true)



In [27]:
data_raw.groupBy(data_raw.HasLungCancer).count().show()
# Data is unbalanced :)

+-------------+------+
|HasLungCancer| count|
+-------------+------+
|           No|868363|
|          Yes|   156|
+-------------+------+



## Analyzing the BRFSS Dataset with Facets
It is important to understand your dataset *before* diving straight into the prediction task. 

Some important questions to investigate when auditing a dataset for fairness:

* **Are there missing feature values for a large number of observations?**
* **Are there features that are missing that might affect other features?**
* **Are there any unexpected feature values?**
* **What signs of data skew do you see?**

In [None]:
#@title Visualize the Data in Facets
fsg = FeatureStatisticsGenerator()
dataframes = [
    {'table': data_raw.toPandas(), 'name': 'trainData'}]
censusProto = fsg.ProtoFromDataFrames(dataframes)
protostr = base64.b64encode(censusProto.SerializeToString()).decode("utf-8")


HTML_TEMPLATE = """<script src="https://cdnjs.cloudflare.com/ajax/libs/webcomponentsjs/1.3.3/webcomponents-lite.js"></script>
        <link rel="import" href="https://raw.githubusercontent.com/PAIR-code/facets/1.0.0/facets-dist/facets-jupyter.html">
        <facets-overview id="elem"></facets-overview>
        <script>
          document.querySelector("#elem").protoInput = "{protostr}";
        </script>"""
html = HTML_TEMPLATE.format(protostr=protostr)
display(HTML(html))

Py4JJavaError: ignored

In [None]:
#@title Set the Number of Data Points to Visualize in Facets Dive

SAMPLE_SIZE = 61 #@param
  
train_dive = X_train.sample(SAMPLE_SIZE).to_json(orient='records')

HTML_TEMPLATE = """<script src="https://cdnjs.cloudflare.com/ajax/libs/webcomponentsjs/1.3.3/webcomponents-lite.js"></script>
        <link rel="import" href="https://raw.githubusercontent.com/PAIR-code/facets/1.0.0/facets-dist/facets-jupyter.html">
        <facets-dive id="elem" height="600"></facets-dive>
        <script>
          var data = {jsonstr};
          document.querySelector("#elem").data = data;
        </script>"""
html = HTML_TEMPLATE.format(jsonstr=train_dive)
display(HTML(html))

# Step 4: Decision Tree Classification with PySpark

In [28]:
#@title Process categorical columns
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer, BucketedRandomProjectionLSH,VectorSlicer
from pyspark.sql.window import Window
from pyspark.ml.linalg import Vectors,VectorUDT
from pyspark.sql.functions import array, create_map, struct
from pyspark.ml import Pipeline

# categorical columns
categorical_columns = data_raw.columns[0:12]

In [29]:
#@title Build StringIndexer stages
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c) for c in categorical_columns]
# encode label column and add it to stringindexer_stages
stringindexer_stages += [StringIndexer(inputCol='HasLungCancer', outputCol='label')]

In [30]:
#@title Build OneHotEncoder stages
onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in categorical_columns]

In [31]:
#@title Build VectorAssembler stage
feature_columns = ['onehot_' + c for c in categorical_columns]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features') 

In [32]:
#@title Build Pipeline model
# all stages
all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]
pipeline = Pipeline(stages=all_stages)

In [33]:
#@title Fit pipeline model
pipeline_model = pipeline.fit(data_raw)

In [34]:
#@title Transform data
final_columns = feature_columns + ['features', 'label']
df_raw = pipeline_model.transform(data_raw).\
            select(final_columns)
            
df_raw.show(5)

+-------------+---------------+---------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+--------------+--------------------+-----+
|onehot_SEXVAR|onehot__AGE65YR|onehot__BMI5CAT|onehot_GENHLTH|onehot_SMOKE100|onehot__SMOKER3|onehot_LCSFIRST| onehot_LCSLAST|onehot_LCSNUMCG|onehot_LCSCTSCN|onehot_STOPSMK2|onehot_ASTHMA3|            features|label|
+-------------+---------------+---------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+--------------+--------------------+-----+
|(6,[0],[1.0])|  (7,[2],[1.0])|  (9,[2],[1.0])|(12,[1],[1.0])|  (5,[0],[1.0])|  (8,[0],[1.0])|(103,[0],[1.0])|(151,[0],[1.0])| (87,[0],[1.0])| (10,[0],[1.0])|  (6,[1],[1.0])| (7,[0],[1.0])|(411,[0,8,15,23,3...|  0.0|
|(6,[1],[1.0])|  (7,[2],[1.0])|  (9,[2],[1.0])|(12,[1],[1.0])|  (5,[0],[1.0])|  (8,[0],[1.0])|(103,[0],[1.0])|(151,[0],[1.0])| (87,[

In [35]:
#@title Split data into traning and test sets
training, test = df_raw.randomSplit([0.8, 0.2], seed=1234)

In [None]:
##@title Data Imputing
#    inputCols = ['Age of Employee', 'Experience (in years)', 'Salary (per month - $)'],
#    outputCols = ["{}_imputed".format(a) for a in ['Age of Employee', 'Experience (in years)', 'Salary (per month - $)']]
#).setStrategy("mean")

In [None]:
training.printSchema() #'onehot_SEXVAR', 'onehot__AGE65YR', 'onehot__BMI5CAT', 'onehot_GENHLTH',
                       #     'onehot_SMOKE100', 'onehot__SMOKER3', 'onehot_LCSFIRST', 'onehot_LCSLAST',
                       #     'onehot_LCSNUMCG', 'onehot_LCSCTSCN', 'onehot_STOPSMK2', 'onehot_ASTHMA3', 

root
 |-- onehot_SEXVAR: vector (nullable = true)
 |-- onehot__AGE65YR: vector (nullable = true)
 |-- onehot__BMI5CAT: vector (nullable = true)
 |-- onehot_GENHLTH: vector (nullable = true)
 |-- onehot_SMOKE100: vector (nullable = true)
 |-- onehot__SMOKER3: vector (nullable = true)
 |-- onehot_LCSFIRST: vector (nullable = true)
 |-- onehot_LCSLAST: vector (nullable = true)
 |-- onehot_LCSNUMCG: vector (nullable = true)
 |-- onehot_LCSCTSCN: vector (nullable = true)
 |-- onehot_STOPSMK2: vector (nullable = true)
 |-- onehot_ASTHMA3: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [50]:
#@title Data balancing using SMOTE
#K-nearest neighbor algorithm to simulate the minority sample
from imblearn.over_sampling import SMOTE

features = training.select(['features']).toPandas()

labels = training.select('label').toPandas()

In [51]:
sm = SMOTE(sampling_strategy = 'not majority', k_neighbors = 50, random_state = 42)

features, labels = sm.fit_resample(features, labels)

ValueError: ignored

In [None]:
features['label'] = labels.values
features = spark.createDataFrame(features)

In [52]:
#@title Build cross validation 
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')

In [53]:
#@title Parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(dt.maxDepth, [2,3,4,5]).\
    build()

In [54]:
#@title Evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")

In [55]:
#@title Cross-validation model
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)

In [56]:
#@title Fit cross validation model
cv_model = cv.fit(df_raw)

Py4JJavaError: ignored

In [None]:
show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']

In [None]:
#@title Prediction on training data
pred_training_cv = cv_model.transform(training)
pred_training_cv.select(show_columns).show(5, truncate=False)

In [None]:
#@title Prediction on test data
pred_test_cv = cv_model.transform(test)
pred_test_cv.select(show_columns).show(5, truncate=False)

In [None]:
#@title Confusion matrix
label_and_pred = cv_model.transform(df_raw).select('label', 'prediction')
label_and_pred.rdd.zipWithIndex().countByKey()

In [None]:
print('The best MaxDepth is:', cv_model.bestModel._java_obj.getMaxDepth())