### DSE-230 Final Project
## Diabetes Risk Prediction from Personal Health Indicators
### Notebook 2: Data Preparation
***
#### Chunxia Tong  
#### Camm Perera
#### Sergey Gurvich

***
## Table of Contents:

* [1. Notebook Setup](#project_description)
    * [1.1 Imports](#imports)
    * [1.2 Start Spark Session](#start_spark_session)
    * [1.3 Read Data](#read_data)
* [2. Data Preparation](#data_preparation)
    * [2.1 Features Removing](#data_preparation)
    * [2.2 Dataset Cleaning](#cleaning)
    * [2.3 Feature Selection and Transformation](#feature_selection)
* [3. Stop Spark Session](#stop)
***

### 1. Notebook Setup <a class="anchor" id="setup"></a>
Please read Readme file located in https://github.com/spring-camm-sergey/dse230/blob/main/README.txt

_Note: for running locally, please place the CSV files into HDFS. Example:_
```
hadoop fs -copyFromLocal work/project/BRFSS_2020_main_dataset.csv /;
hadoop fs -copyFromLocal work/project/Behavioral_Risk_Factor_Surveillance_System__BRFSS__Historical_Questions.csv /;
hadoop fs -copyFromLocal work/project/BRFSS_feature_codes_map.csv /;
```

#### 1.1 Imports <a class="anchor" id="imports"></a>

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count, abs

#### 1.2 Start Spark Session <a class="anchor" id="start_spark_session"></a>

In [2]:
conf = pyspark.SparkConf().setAll([('spark.master', 'local[*]'),
                                   ('spark.app.name', 'Basic Setup'),
                                    ('spark.memory.offHeap.enabled', True),
                                    ('spark.memory.offHeap.size','8g'),
                                    ('spark.executor.memory', '16g'), 
                                    ('spark.driver.memory','16g')
                                  ]
                                  )
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel('OFF')
spark.version

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-05-31 00:04:29,415 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'3.2.1'

#### 1.3 Read Data <a class="anchor" id="read_data"></a>

In [3]:
# main dataset
orig_df = spark.read.option("header", True).csv('hdfs:///BRFSS_2020_main_dataset.csv', inferSchema=True)

# csv of questionaire with possible answers
questions_df = spark.read.option("header", True).csv('hdfs:///Behavioral_Risk_Factor_Surveillance_System__BRFSS__Historical_Questions.csv', inferSchema=True)

# mapping of labels to actual questions
labels_map = spark.read.option("header", True).csv('hdfs:///BRFSS_feature_codes_map.csv', inferSchema=True)

                                                                                

## 2. Data Preparation <a class="anchor" id="data_preparation"></a>

### 2.1 Features Removing
_No additional action needed, because while the EDA we've already removed the features with too many NULLs._

In [4]:
# Note: this code was taken and modified per our needs from:
# https://stackoverflow.com/questions/51322445/how-to-drop-all-columns-with-null-values-in-a-pyspark-dataframe

def drop_null_columns(sdf, threshold=0):
    """
    Drops all columns which contain null values. 
    Leave columns, where count(not nulls) > threshhold.
    Note: Simulates pandas.DataFrame.dropna(axis=1, thresh=300000)
    
    :param Dataframe sdf: PySpark DataFrame to work with
    :param int threshold: min numbers of not nulls in column in order not to drop it
    :return: cleaned PySpark dataFrame
    """
    null_counts = sdf.select([count(when(
        col(c).isNull() | 
        col(c).contains('None') | 
        col(c).contains('NULL') | 
        col(c).contains('NA') | 
        (col(c)=='') | 
        (col(c)==99) | 
        isnan(c), 
        c)).alias(c) for c in sdf.columns]).collect()[0].asDict()
    
    to_drop = [k for k, v in null_counts.items() if v > sdf.count()-threshold]
    sdf = sdf.drop(*to_drop)
    return sdf



cleaned_df = drop_null_columns(orig_df, threshold=300000)


print(f'Original number of samples: {orig_df.count()}, , number of columns: {len(orig_df.columns)}')
print(f'Cleaned number of samples: {cleaned_df.count()}, , number of columns: {len(cleaned_df.columns)}')

                                                                                

Original number of samples: 401958, , number of columns: 279
Cleaned number of samples: 401958, , number of columns: 112


### 2.2 Dataset Cleaning <a class="anchor" id="cleaning"></a>
_No additional action needed, because while the EDA we've already dropped all remaining rows with NULLs._

In [5]:
cleaned_df = cleaned_df.na.drop()
print(f'Cleaned number of samples: {cleaned_df.count()}, and number of features: {len(cleaned_df.columns)}')

[Stage 852:>                                                        (0 + 4) / 4]

Cleaned number of samples: 326959, and number of features: 112


                                                                                

##### Create backup of the cleaned dataframe, so we can use it while re-runnning and debugging.

### 2.3 Feature Selection and Transformation  <a class="anchor" id="feature_selection"></a>

- We picked fetures according to the absolute correlation coefficients' results >0.1 in the EDA section. Let's apply it here:

In [6]:
# Candidates from correlation coef. >= 0.1:

# +--------+-------------------+
# | feature|      corr_absolute|
# +--------+-------------------+
# | GENHLTH|0.24589367600251408|
# |  _AGE_G|0.19092127239726459|
# |  _AGE80|0.19072694276261376|
# |_AGEG5YR|0.18723669342527285|
# |   _BMI5|0.18616837031030695|
# | EMPLOY1|0.16976771483478573|
# |  _MICHD|0.16944105523216146|
# |_BMI5CAT|0.16832290901401584|
# | _RFHLTH|0.16507734173422267|
# |DIFFWALK|0.15201125568861884|
# |   WTKG3|0.15188968238715578|
# |_DRDXAR2|0.14709958030515652|
# |HAVARTH4|0.14709958030515652|
# |_HCVU651|0.14388852597614002|
# |_AGE65YR|0.14235057094120832|
# |_DRNKDRV|0.13059650197464265|
# | ALCDAY5|0.13001893982345375|
# | _RFBMI5|0.12933584562653766|
# |RMVTETH4|0.12854739562343914|
# |EXERANY2|0.11950726230964258|
# |CVDINFR4| 0.1161998615943308|
# |_TOTINDA|0.11388422824979928|
# |CHECKUP1|0.11007528748766457|
# |PNEUVAC4|0.10258756884001767|
# |_PHYS14D|0.10131009704045844|
# +--------+-------------------+

cols = [
    'GENHLTH',
    '_AGE_G',
    '_BMI5',
    '_AGE80',
    '_AGEG5YR',
    '_BMI5CAT',
    '_MICHD',
    'EMPLOY1',
    '_RFHLTH',
    'WTKG3',
    'DIFFWALK',
    '_DRDXAR2',
    'HAVARTH4',
    '_HCVU651',
    '_AGE65YR',
    '_RFBMI5',
    '_DRNKDRV',
    'ALCDAY5',
    'RMVTETH4',
    'EXERANY2',
    'CVDINFR4',
    '_TOTINDA',
    'CHECKUP1',
    'PNEUVAC4',
    'DIABETE4',
]

# leave only selected columns
df_selected_features = cleaned_df[cols]

##### Take care of the label column:

In [7]:
# rename diabetes column to 'label'
df_selected_features = df_selected_features.withColumnRenamed('DIABETE4', 'label')

# # LEAVE ONLY YES AND NO ANSWERS FOR DIABETES QUESTION
# Label Response Map:
# 1 Yes
# 2 Yes, but female told only during pregnancy
# 3 No
# 4 No, prediabetes or borderline diabetes
# 7 Don’t know / Not sure
# 9 Refused

df_selected_features = df_selected_features.filter((df_selected_features['label']==1)|(df_selected_features['label']==3))  # leave only yes and no

# replace 3s by 0s
df_selected_features = df_selected_features.withColumn("label", when(df_selected_features["label"] == 3, 0).otherwise(df_selected_features["label"]))

# df_selected_features = df_selected_features.na.drop()
df_selected_features.groupby('label').count().show()

[Stage 855:>                                                        (0 + 4) / 4]

+-----+------+
|label| count|
+-----+------+
|    1| 42359|
|    0|274713|
+-----+------+



                                                                                

##### Print questions and possible answers count:

In [8]:
# we'll to feature fransformation in the separate DF
df_features_transformation = df_selected_features

def print_feature_details(columns, n, print_counts=True):
    """
    The function prints info about the feature: question, code, possible responses, counts.
    
    :param list columns: - list of features we work with
    :param int n - index of the feature we want to print info
    :param bool print_counts - True if we wat to print the counts of different responses
    :return: None
    """
    print(columns[n])
    print(labels_map.filter(labels_map['var']==columns[n])[['question']].first())
    question = questions_df.filter((questions_df['VariableName']==columns[n]) & (questions_df['year']==2020))[['Question']].first()
    responses = questions_df.filter((questions_df['VariableName']==columns[n]) & (questions_df['year']==2020))[['Responses']].first()
    print(question)
    print(responses)
    
    if print_counts:
        df_features_transformation.groupBy(columns[n]).count().show()
    else:
        print('-'*20)

In [9]:
for n in range(len(cols)-1):
    print_feature_details(cols, n, print_counts=False)

GENHLTH
Row(question='GENERAL HEALTH')
Row(Question='Would you say that in general your health is:')
Row(Responses='1=Excellent 2=Very good 3=Good 4=Fair 5=Poor 7=Don’t know/Not Sure 9=Refused')
--------------------
_AGE_G
Row(question='IMPUTED AGE IN SIX GROUPS')
None
None
--------------------
_BMI5
Row(question='COMPUTED BODY MASS INDEX')
None
None
--------------------
_AGE80
Row(question='IMPUTED AGE VALUE COLLAPSED ABOVE 80')
None
None
--------------------
_AGEG5YR
Row(question='REPORTED AGE IN FIVE-YEAR AGE CATEGORIES CALCULATED VARIABLE')
None
None
--------------------
_BMI5CAT
Row(question='COMPUTED BODY MASS INDEX CATEGORIES')
None
None
--------------------
_MICHD
Row(question='RESPONDENTS THAT HAVE EVER REPORTED HAVING CORONARY HEART DISEASE (CHD) OR MYOCARDIAL INFARCTION (MI)')
None
None
--------------------
EMPLOY1
Row(question='EMPLOYMENT STATUS')
Row(Question='Are you currently…?')
Row(Responses='1=Employed for wages 2=Self-employed 3=Out of work for 1 year or more 4=Out o

#### **After manually analyzing the variables and removing highly correlated variables (some variables are calculated from others and are linearly dependent), here is the list that we left with:**
- 'GENHLTH' - GENERAL HEALTH
- '_AGE80' - IMPUTED AGE VALUE COLLAPSED ABOVE 80
- '_BMI5' - COMPUTED BODY MASS INDEX
- '_MICHD' - RESPONDENTS THAT HAVE EVER REPORTED HAVING CORONARY HEART DISEASE (CHD) OR MYOCARDIAL INFARCTION (MI)
- 'EMPLOY1' - EMPLOYMENT STATUS
- 'DIFFWALK' - DIFFICULTY WALKING OR CLIMBING STAIRS
- '_DRDXAR2' - RESPONDENTS DIAGNOSED WITH ARTHRITIS
- 'ALCDAY5' - DAYS IN PAST 30 HAD ALCOHOLIC BEVERAGE
- 'RMVTETH4' - NUMBER OF PERMANENT TEETH REMOVED
- 'EXERANY2' - EXERCISE IN PAST 30 DAYS
- 'CHECKUP1' - LENGTH OF TIME SINCE LAST ROUTINE CHECKUP
- 'PNEUVAC4' - PNEUMONIA SHOT EVER
- 'DIABETE4 (label)' - (EVER TOLD) YOU HAD DIABETES

In [10]:
cols_final = [
    'GENHLTH',
    '_AGE80',
    '_BMI5',
    '_MICHD',
    'EMPLOY1',
    'DIFFWALK',
    '_DRDXAR2',
    'ALCDAY5',
    'RMVTETH4',
    'EXERANY2',
    'CHECKUP1',
    'PNEUVAC4',
    'label'
]

df_features_transformation = df_features_transformation[cols_final]

##### Now, let's go through the features one-by-one and make any additional cleaning needed:

In [11]:
n=0
print_feature_details(cols_final, n)

GENHLTH
Row(question='GENERAL HEALTH')
Row(Question='Would you say that in general your health is:')
Row(Responses='1=Excellent 2=Very good 3=Good 4=Fair 5=Poor 7=Don’t know/Not Sure 9=Refused')


[Stage 933:>                                                        (0 + 4) / 4]

+-------+------+
|GENHLTH| count|
+-------+------+
|      1| 65113|
|      3| 92038|
|      5| 11933|
|      9|   160|
|      4| 34981|
|      7|   407|
|      2|112440|
+-------+------+



                                                                                

**Transformation actions:**  
_1. Remove answers 9_  
_2. Apply one-hot encoding to this feature_

In [12]:
df_features_transformation = df_features_transformation.filter(df_features_transformation['GENHLTH']!=9)

***
**Next Feature**
***

In [13]:
n+=1
print_feature_details(cols_final, n)

_AGE80
Row(question='IMPUTED AGE VALUE COLLAPSED ABOVE 80')
None
None




+------+-----+
|_AGE80|count|
+------+-----+
|    31| 3241|
|    65| 7524|
|    53| 4857|
|    78| 4036|
|    34| 3652|
|    28| 3415|
|    76| 4456|
|    27| 3235|
|    26| 3056|
|    44| 3636|
|    22| 3092|
|    47| 4094|
|    52| 5104|
|    40| 4625|
|    20| 2816|
|    57| 5700|
|    54| 4868|
|    48| 4400|
|    19| 2831|
|    64| 6626|
+------+-----+
only showing top 20 rows



                                                                                

**Transformation actions:**  
_Try one hot encoding_

***
**Next Feature**
***

In [14]:
n+=1
print_feature_details(cols_final, n)

_BMI5
Row(question='COMPUTED BODY MASS INDEX')
None
None


[Stage 945:>                                                        (0 + 4) / 4]

+-----+-----+
|_BMI5|count|
+-----+-----+
| 3175|  758|
| 3918|    4|
| 1829|  174|
| 2366|   10|
| 2866|   81|
| 4101|   58|
| 4519|  121|
| 3749|   64|
| 1645|   22|
| 1959|   15|
| 2122|   64|
| 3794|   15|
| 2142|    7|
| 1591|    2|
| 2659|    1|
| 5300|    8|
| 1580|    4|
| 4818|    5|
| 2387|  126|
| 3475|  368|
+-----+-----+
only showing top 20 rows



                                                                                

**Transformation actions:**  
_We'll just apply scaling_  
_Look for outliers_

In [15]:
# remove outliers
df_features_transformation = df_features_transformation.filter(df_features_transformation['_BMI5']<6000)

***
**Next Feature**
***

In [16]:
n+=1
print_feature_details(cols_final, n)

_MICHD
Row(question='RESPONDENTS THAT HAVE EVER REPORTED HAVING CORONARY HEART DISEASE (CHD) OR MYOCARDIAL INFARCTION (MI)')
None
None


[Stage 951:>                                                        (0 + 4) / 4]

+------+------+
|_MICHD| count|
+------+------+
|     1| 27941|
|     2|288525|
+------+------+



                                                                                

**Transformation actions:**  
_1. Replace 2 by 0_

In [17]:
df_features_transformation = df_features_transformation.withColumn("_MICHD", when(df_features_transformation["_MICHD"] == 2, 0).otherwise(df_features_transformation["_MICHD"]))

***
**Next Feature**
***

In [18]:
n+=1
print_feature_details(cols_final, n)

EMPLOY1
Row(question='EMPLOYMENT STATUS')
Row(Question='Are you currently…?')
Row(Responses='1=Employed for wages 2=Self-employed 3=Out of work for 1 year or more 4=Out of work for less than 1 year 5=A homemaker 6=A student 7=Retired 8=Unable to work 9=Refused')


                                                                                

+-------+------+
|EMPLOY1| count|
+-------+------+
|      1|132701|
|      6|  8820|
|      3|  5321|
|      5| 12042|
|      9|  1930|
|      4| 12578|
|      8| 19575|
|      7| 95351|
|      2| 28148|
+-------+------+



**Transformation actions:**   
_1. Remove 9_

In [19]:
df_features_transformation = df_features_transformation.filter(df_features_transformation['EMPLOY1']!=9)

***
**Next Feature**
***

In [20]:
n+=1
print_feature_details(cols_final, n)

DIFFWALK
Row(question='DIFFICULTY WALKING OR CLIMBING STAIRS')
Row(Question='Do you have serious difficulty walking or climbing stairs?')
Row(Responses='1=Yes 2=No 7=Don’t know/Not Sure 9=Refused')


[Stage 963:>                                                        (0 + 4) / 4]

+--------+------+
|DIFFWALK| count|
+--------+------+
|       1| 45128|
|       9|   112|
|       7|   709|
|       2|268587|
+--------+------+



                                                                                

**Transformation actions:**   
_1. Remove 9_

In [21]:
df_features_transformation = df_features_transformation.filter(df_features_transformation['DIFFWALK']!=9)

***
**Next Feature**
***

In [22]:
n+=1
print_feature_details(cols_final, n)

_DRDXAR2
Row(question='RESPONDENTS DIAGNOSED WITH ARTHRITIS')
None
None


[Stage 969:>                                                        (0 + 4) / 4]

+--------+------+
|_DRDXAR2| count|
+--------+------+
|       1| 99442|
|       2|214982|
+--------+------+



                                                                                

**Transformation actions:**  
_1. Replace 2 by 0_

In [23]:
df_features_transformation = df_features_transformation.withColumn("_DRDXAR2", when(df_features_transformation["_DRDXAR2"] == 2, 0).otherwise(df_features_transformation["_DRDXAR2"]))

***
**Next Feature**
***

In [24]:
n+=1
print_feature_details(cols_final, n)

ALCDAY5
Row(question='DAYS IN PAST 30 HAD ALCOHOLIC BEVERAGE')
Row(Question='During the past 30 days, how many days per week or per month did you have at least one drink of any alcoholic beverage such as beer, wine, a malt beverage or liquor?')
Row(Responses='101-107=Days per week 201-230=Days in past 30 days 888=No drinks in past 30 days 777=Don’t know/Not sure 999=Refused')




+-------+-----+
|ALCDAY5|count|
+-------+-----+
|    211|   51|
|    101|13408|
|    210| 7054|
|    103| 7988|
|    223|   78|
|    222|  150|
|    209|  216|
|    230|13733|
|    225| 2747|
|    224|  155|
|    206| 3242|
|    777| 2532|
|    212| 1558|
|    218|  213|
|    205| 8656|
|    227|  232|
|    207| 2298|
|    202|17826|
|    107| 5981|
|    217|   90|
+-------+-----+
only showing top 20 rows



                                                                                

**Transformation actions:**   
_1. Remove 999_  
_2. Replace 888 by 0_

In [25]:
df_features_transformation = df_features_transformation.filter(df_features_transformation['ALCDAY5']!=999)
df_features_transformation = df_features_transformation.withColumn("ALCDAY5", when(df_features_transformation["ALCDAY5"] == 888, 0).otherwise(df_features_transformation["ALCDAY5"]))

***
**Next Feature**
***

In [26]:
n+=1
print_feature_details(cols_final, n)

RMVTETH4
Row(question='NUMBER OF PERMANENT TEETH REMOVED')
Row(Question='Not including teeth lost for injury or orthodontics, how many of your permanent teeth have been removed because of tooth decay or gum disease?')
Row(Responses='1=1 to 5 2=6 or more, but not all 3=All 8=None 7=Don’t know/Not sure 9=Refused')




+--------+------+
|RMVTETH4| count|
+--------+------+
|       1| 90164|
|       3| 19432|
|       9|   364|
|       8|164706|
|       7|  5821|
|       2| 32998|
+--------+------+



                                                                                

**Transformation actions:**   
_1. Remove 9_  
_2. Replace 8 by 0_

In [27]:
df_features_transformation = df_features_transformation.filter(df_features_transformation['RMVTETH4']!=9)
df_features_transformation = df_features_transformation.withColumn("RMVTETH4", when(df_features_transformation["RMVTETH4"] == 8, 0).otherwise(df_features_transformation["RMVTETH4"]))

***
**Next Feature**
***

In [28]:
n+=1
print_feature_details(cols_final, n)

EXERANY2
Row(question='EXERCISE IN PAST 30 DAYS')
Row(Question='During the past month, other than your regular job, did you participate in any physical activities or exercises such as running, calisthenics, golf, gardening, or walking for exercise?')
Row(Responses='1=Yes 2=No 7=Don’t know/Not Sure 9=Refused')


[Stage 987:>                                                        (0 + 4) / 4]

+--------+------+
|EXERANY2| count|
+--------+------+
|       1|242555|
|       9|    87|
|       7|   302|
|       2| 70177|
+--------+------+



                                                                                

**Transformation actions:**   
_1. Remove 9_ 

In [29]:
df_features_transformation = df_features_transformation.filter(df_features_transformation['EXERANY2']!=9)

***
**Next Feature**
***

In [30]:
n+=1
print_feature_details(cols_final, n)

CHECKUP1
Row(question='LENGTH OF TIME SINCE LAST ROUTINE CHECKUP')
Row(Question='About how long has it been since you last visited a doctor for a routine checkup?')
Row(Responses='1=Within past year (anytime less than 12 months ago) 2=Within past 2 years (1 year but less than 2 years ago) 3=Within past 5 years (2 years but less than 5 years ago) 4=5 or more years ago 7=Don’t know/Not sure 8=Never 9=Refused')




+--------+------+
|CHECKUP1| count|
+--------+------+
|       1|242338|
|       3| 16221|
|       9|   228|
|       4| 14400|
|       8|  1421|
|       7|  2925|
|       2| 35501|
+--------+------+



                                                                                

**Transformation actions:**   
_1. Remove 9_  
_2. Replace 8 by 0_

In [31]:
df_features_transformation = df_features_transformation.filter(df_features_transformation['CHECKUP1']!=9)
df_features_transformation = df_features_transformation.withColumn("CHECKUP1", when(df_features_transformation["CHECKUP1"] == 8, 0).otherwise(df_features_transformation["CHECKUP1"]))

***
**Next Feature**
***

In [32]:
n+=1
print_feature_details(cols_final, n)

PNEUVAC4
Row(question='PNEUMONIA SHOT EVER')
Row(Question='Have you ever had a pneumonia shot also known as a pneumococcal vaccine?')
Row(Responses='1=Yes 2=No 7=Don’t know/Not Sure 9=Refused')


[Stage 999:>                                                        (0 + 4) / 4]

+--------+------+
|PNEUVAC4| count|
+--------+------+
|       1|118534|
|       9|   103|
|       7| 25760|
|       2|168409|
+--------+------+



                                                                                

**Transformation actions:**   
_1. Remove 9_  

In [33]:
df_features_transformation = df_features_transformation.filter(df_features_transformation['PNEUVAC4']!=9)

***
##### Take a final look on the dataset:

In [34]:
df_features_transformation.groupby('label').count().show()



+-----+------+
|label| count|
+-----+------+
|    1| 41786|
|    0|270917|
+-----+------+



                                                                                

##### We have highly imbalanced dataset. Let's balance it:

In [35]:
temp = df_features_transformation.sampleBy('label', fractions={1: 1, 0: 0.1541}, seed=42)
temp.count()
temp.groupby('label').count().show()
df_features_transformation = temp

[Stage 1008:>                                                       (0 + 4) / 4]

+-----+-----+
|label|count|
+-----+-----+
|    1|41786|
|    0|41769|
+-----+-----+



                                                                                

In [36]:
print('Final number of rows:',df_features_transformation.count())
print('Final number of columns:',len(df_features_transformation.columns))

[Stage 1011:>                                                       (0 + 4) / 4]

Final number of rows: 83555
Final number of columns: 13


                                                                                

## 3. Stop Spark Session: <a class="anchor" id="stop"></a>

##### Save to HDFS for using in the next Notebook.
**Note: this should produce the file 'df_features_transformation.csv' in HDFS root directory, needed for Notebook # 3.**

In [None]:
df_features_transformation = df_features_transformation.coalesce(1)
df_features_transformation.write.csv('hdfs:///df_features_transformation.csv', mode='overwrite',header=True)

In [38]:
spark.stop()