This pipeline aims to process the SQL extracts of SES dataframe (raw dataframe) to the format iPredict/ML models could run on.

# set up env 

In [102]:
source("~/head.r")
require(knitr)
library(feather)
library(caret)
library(mice)
library(VIM)
library(xtable)
# specific package just for disply in jupyter nb.
library(repr)#, lib.loc='/gnshealthcare/software/R/3.1.1/lib/R/library')
library(IRdisplay)
options(repr.plot.width=6, repr.plot.height=5)

setwd("/gnshealthcare/shared/yyang/data/ses_data")

# read in data 

In [103]:
# census_dt <- feather::read_feather("census_tables.feather") %>% as.data.table

# life_dt <- feather::read_feather("lfstl_tables.feather") %>% as.data.table


# census_dt <- feather::read_feather("census_tables_2016n2017.feather") %>% as.data.table

# life_dt <- feather::read_feather("lfstl_tables_2016n2017.feather") %>% as.data.table


census_dt <- feather::read_feather("census_tables_oncology_m4_8_6.feather") %>% as.data.table

life_dt <- feather::read_feather("lfstl_tables_oncology_m4_8_6.feather") %>% as.data.table

In [104]:
census_dt %>% glimpse

Observations: 3,982
Variables: 45
$ indiv_enterprise_id            <chr> "10044079", "10049553", "10049744", "10051016", "10074", "1007869", "1008974...
$ c24_004_ocp_manufacturing_pp   <chr> "06", "06", "06", "11", "03", "05", "19", "04", "09", "08", "00", "05", "04"...
$ c08_001_wa16p_pp_wrk_in_cnty   <chr> "37", "77", "49", "55", "92", "61", "89", "88", "97", "56", "17", "12", "93"...
$ c23_006_empd_fa16p_pp          <chr> "55", "53", "72", "54", "49", "50", "50", "58", "57", "68", "56", "68", "59"...
$ c25_064_ohph_electricity       <chr> "30", "74", "18", "04", "10", "96", "39", "96", "88", "27", "16", "54", "98"...
$ c27_010_hi_pp_no_hi            <chr> "14", "38", "07", "16", "04", "17", "29", "21", "18", "03", "07", "13", "06"...
$ c05_001_ctltpp_ntv_citzn       <chr> "89", "58", "88", "72", "81", "90", "68", "74", "29", "92", "80", "88", "84"...
$ c19_043_hhip_ss                <chr> "45", "21", "12", "28", "45", "34", "38", "24", "31", "10", "16", "50", "33"...
$ c01_018_hspo

In [105]:
life_dt %>% glimpse

Observations: 3,982
Variables: 17
$ indiv_enterprise_id            <chr> "10044079", "10049553", "10049744", "10051016", "10074", "1007869", "1008974...
$ v3589_mcu_primtm_tv            <chr> "03", "07", "05", "05", "05", "06", "04", "05", "05", "08", "06", "02", "08"...
$ v4000_consmr_promnc_ind        <chr> "09", "02", "10", "01", "10", "01", "10", "06", "10", "07", "10", "10", "10"...
$ v8655_lfestg_cd                <chr> "345", NA, "335", "141", "215", NA, "262", NA, "355", "355", "281", "324", "...
$ v8671_incm_estd_hh_narw_rng    <chr> "A", "D", "8", "D", "B", NA, "7", "1", "B", "C", "D", "5", "5", "B", "9", "6...
$ v3102_cntry_of_orgn_high_dtl   <chr> "4G", "30", "4G", "4Z", "4G", "4J", "4G", "30", "30", "40", "4G", "4G", "4G"...
$ v8201_incm_estd_hh_highr_rng   <chr> "E", "J", "D", "I", "F", NA, "D", "1", "F", "F", "J", "8", "8", "F", "E", "A...
$ v8617_a2yrincr_2nd_indiv       <chr> "62", NA, "56", "54", NA, "50", NA, NA, "54", "44", "50", "62", "38", "42", ...
$ v8637_occup_

In [106]:
setkey(life_dt, indiv_enterprise_id)
setkey(census_dt, indiv_enterprise_id)

In [107]:
tables()

               NAME  NROW NCOL MB
1:        census_dt 3,982   45  2
2:      colClass_dt    60    2  0
3:          life_dt 3,982   17  1
4:  replace_dt_part 4,102   60  2
5:           ses_dt 4,102  115  3
6: ses_dt_onehotted 4,102  258  7
7:             trsf 4,102  143  5
                                                                                                                                                                                                              COLS
1:                                                            indiv_enterprise_id,c24_004_ocp_manufacturing_pp,c08_001_wa16p_pp_wrk_in_cnty,c23_006_empd_fa16p_pp,c25_064_ohph_electricity,c27_010_hi_pp_no_hi,...
2:                                                                                                                                                                                                   varName,class
3:                                                                    indiv_enterprise_id,v3589

In [108]:
life_dt$indiv_enterprise_id %>% duplicated %>% sum

In [109]:
n_distinct(life_dt)

In [110]:
life_dt %>% nrow

## inference: due to SES variables could be collected at different timestamps, we could have an individual has more than one record of a SES variable.

In [111]:
life_dt[  "100238",  ]

indiv_enterprise_id,v3589_mcu_primtm_tv,v4000_consmr_promnc_ind,v8655_lfestg_cd,v8671_incm_estd_hh_narw_rng,v3102_cntry_of_orgn_high_dtl,v8201_incm_estd_hh_highr_rng,v8617_a2yrincr_2nd_indiv,v8637_occup_ip_indiv,v7616_1_a2yrincr_1_indv_cd,v8626_a2yrincr_ip_indiv,v8616_a2yrincr_1st_indiv,v8610_2_fng_1st_indiv_mid_init,v8640_num_of_src,v2778_brnd_nm_med_ppns,v8642_hmv_estd,v8644_home_purchs_yr_yyyy
100238,,,,,,,,,,,,,,,,


In [112]:
census_dt[  "100238",  ]

indiv_enterprise_id,c24_004_ocp_manufacturing_pp,c08_001_wa16p_pp_wrk_in_cnty,c23_006_empd_fa16p_pp,c25_064_ohph_electricity,c27_010_hi_pp_no_hi,c05_001_ctltpp_ntv_citzn,c19_043_hhip_ss,c01_018_hspo_a65p_tlpp,c08_019_wa16p_cmtm_mdn,⋯,c25_062_ohph_utlty_gas,c25_118_soohu_pct_mrtg,c24_032_ocp_pfp_nself_empd_pp,c25_074_hup_3_brm,c03_001_hspogn_tpp_hsp,c08_025_wa16p_pp_cmtm_60p_mn,c06_004_pobtpp_forgn_born,c16_001_lah_pp_english_only,c08_020_wa16p_pp_cmtm_1_29_mn,c15_007_ea_pp_atand_bd
100238,,,,,,,,,,⋯,,,,,,,,,,


In [157]:
#: the data.table S3 method `merge` will dispatch upon encountering class "data.table", which is fast join.
ses_dt <- merge(life_dt, census_dt, by = "indiv_enterprise_id") # by default, use shared key and inner join.

# or use a slower version below.

# plyr::join(life_dt, census_dt, by = "indiv_enterprise_id", type = "inner", match = "first")

In [158]:
ses_dt %>% dim

In [159]:
ses_dt %>% map_chr(.,class) %>% table

.
character 
       61 

In [160]:
ses_dt %>% tbl_vars

# setup parallel env

In [161]:
# #----set up parallel backend-----
# # cl <- makeCluster(8, type="SOCK")
# #cl <- makeCluster(detectCores() - 3, type="MPI")
# cl <- makeCluster(detectCores() - 3, type="SOCK", outfile="") # outfile setting will  Setting outfile to the empty string ("") will prevent snow from redirecting the output,
# #often resulting in the output from your print messages showing up on the terminal of the master process.
# registerDoSNOW(cl)
# clusterEvalQ(cl, {
#                source("~/head.r")
#                            })

# clusterEvalQ(cl, getwd())
# clusterEvalQ(cl, search())
# clusterEvalQ(cl, ls())

# getDoParWorkers()


In [162]:
#: registerDoMC for small quick slave jobs
registerDoMC(detectCores() - 3)

In [163]:
getDoParWorkers()

***

# Assign column classes (factor or continuous) and create ordered factor copies of continuous variables (binned or not)

In [164]:
#: a function
class_designation <- function(dt, varNames) {
    # @desc: ths logic is we have all character type columns in the beginning (regardless of column values), 
    # if can coerce to numeric value, do it; otherwise(try and error/warning), coerce to factor. If error again, will stop,
    # then manually check is needed.
    #
    # @param dt: data.table object
    # @param varNames: string vector of colnames in the dt.
    # @return a modified dt[, varNames, with = FALSE] with coerced col class for each of the `varNames` specified.
    dt[ , map(.SD, ~try_catch(as.numeric(.), 
                              .e = function(e) return(as.factor(.)), 
                              .w = function(w) return(as.factor(.)))),
        .SDcols = varNames]
   
}
# then manually assign the dt's varNames columns with returned new values (after type conversion)                            
            
                              
#: a function                              
continuous_binnedCategorical <- function(dt, varNames, n_levels = 10, 
                                         cut_func = c(ggplot2::cut_number, ggplot2::cut_interval)) {
    # @desc: ths logic is at first check if those cols of varNames are numeric and integer columns,
    # then decide if distinct value > n_levels, if so bin them to n_levels ordered factors, otherwise keep the levels asIs and coerce to ordered factors. 
    #
    # @param dt: data.table object
    # @param varNames: string vector of colnames in the dt.
    # @param n_levels: an integer specifying how many levels you want.
    # @param cut_func: which cut function to use. `cut_interval` makes 'n' groups with equal range, `cut_number`
    #  makes 'n' groups with (approximately) equal numbers of observations.
    # @return a modified dt with numeric and integer columns coerced to ordered factors (might be binned)
    stop_if_not(dt[, map(.SD, ~(is.numeric(.)|is.integer(.))), .SDcols = varNames] %>% unlist %>% all, 
                msg = "Provided `varNames` cols in `dt` are not all numeric or integer type!")
    
    #: DONT use `ifelse`, doesnt work for vector here.
    #dt[ , map_dfc(.SD, ~ifelse(n_distinct(.) <= n_levels, yes = as.factor(.), no = ggplot2::cut_number(., n = n_levels )))
     #  , .SDcols = varNames]
    
    
    
    #: use standard if-else syntax.
    dt[ , .SD, .SDcols = varNames] %>% map_dfc(., ~if(n_distinct(.) <= n_levels) { as.ordered(.)} 
                                               else { 
                                                   #: if error or warning, use base::cut function to be safe.
                                                   try_catch(cut_func(., n = n_levels, ordered_result = TRUE ), 
                                                      .e = function(e) return( base::cut(., breaks = n_levels, ordered_result = TRUE) ), 
                                                      .w = function(w) return(base::cut(., breaks = n_levels, ordered_result = TRUE)))
                                               })
    # or equivalently, 
    #dt[ , map_dfc(.SD, ~if(n_distinct(.) <= n_levels) { as.factor(.)} else { ggplot2::cut_number(., n = n_levels )}), .SDcols = varNames]


}                              
                        
                              
                              

In [165]:
varNames <- ses_dt %>% colnames %>% setdiff(., "indiv_enterprise_id")

#varNames <- intersect(varNames, colnames(joined_dt))

In [166]:
catt("We have", length(varNames), "SES vars to process!")

We have 60 SES vars to process! 


In [167]:
#: execute `class_designation`
replace_dt_part <- class_designation(ses_dt, varNames)

ses_dt[ , (varNames) := replace_dt_part]

In [168]:
ses_dt %>% sapply(., class) %>% table

.
character    factor   numeric 
        1         6        54 

## add prefix to all the SES vars for naming pattern convenience

In [169]:
setnames(ses_dt, varNames, sprintf("ses_%s", varNames))

In [170]:
varNames = sprintf("ses_%s", varNames)

## generate varName - class mapping table

In [171]:
colClass_dt <- ses_dt[, .SD, .SDcols = varNames] %>% map_chr(., class) %>% as.data.table(., keep.rownames = TRUE)

colClass_dt %>% head

colClass_dt %>% setnames(., c("varName","class"))

rn,.
ses_v3589_mcu_primtm_tv,numeric
ses_v4000_consmr_promnc_ind,numeric
ses_v8655_lfestg_cd,numeric
ses_v8671_incm_estd_hh_narw_rng,factor
ses_v3102_cntry_of_orgn_high_dtl,factor
ses_v8201_incm_estd_hh_highr_rng,factor


In [172]:
ses_dt %>% dim

# Deal with NA values in factor, ordered factor and continuous columns

## check several columns, decide how to do with NA

##### For example,

1. for factor, we could group NAs into "missing value" level

2. for continuou, we could fill with mean/median or fill with 0 (whether 0 already have been taken as values in the column? Be caucious, for example, NA in utilization may be able to be replaced with 0 while NA in age replaced with 0 dont make sense. ); otherwise, we could resort to imputation methods (e.g., package mice)

3. for later factorized (ordered) continuous variable (binned or not), we could adopt 1. or first do 2. when they are still in continuous scale then do factorization/categorization.

## Insert NA_vs_nonNA as factor columns (optional, just use subset from section "4"). 

In [173]:
#: bring in those significant variables

result_isna_association_pvalue <- foreach( vn = varNames, .combine = c) %do% {
    
    temp_value <- ses_dt[[vn]]
    
    temp_value_binary <- ifelse(is.na(temp_value), yes = 1, no = 0)
    
    message(mean(temp_value_binary))
    
    if (n_distinct(temp_value_binary) == 2) {
        
        set(ses_dt, i = NULL, j = sprintf("%s_isNA_factor", vn), value = temp_value_binary %>% as.factor)
        
    }
    
   
    message(vn, "finished!")
    
    
    return (NULL)
    
    
    
}

0.0353486104339347
ses_v3589_mcu_primtm_tvfinished!
0.0175524134568503
ses_v4000_consmr_promnc_indfinished!
0.148951730862994
ses_v8655_lfestg_cdfinished!
0.0960507069722087
ses_v8671_incm_estd_hh_narw_rngfinished!
0.0353486104339347
ses_v3102_cntry_of_orgn_high_dtlfinished!
0.098732325694783
ses_v8201_incm_estd_hh_highr_rngfinished!
0.375914188200878
ses_v8617_a2yrincr_2nd_indivfinished!
0.414188200877621
ses_v8637_occup_ip_indivfinished!
0.00999512432959532
ses_v7616_1_a2yrincr_1_indv_cdfinished!
0.0953193564115066
ses_v8626_a2yrincr_ip_indivfinished!
0.112627986348123
ses_v8616_a2yrincr_1st_indivfinished!
0.160165772793759
ses_v8610_2_fng_1st_indiv_mid_initfinished!
0.0121891760117016
ses_v8640_num_of_srcfinished!
0.0353486104339347
ses_v2778_brnd_nm_med_ppnsfinished!
0.0121891760117016
ses_v8642_hmv_estdfinished!
0.204046806435885
ses_v8644_home_purchs_yr_yyyyfinished!
0.00999512432959532
ses_c24_004_ocp_manufacturing_ppfinished!
0.00999512432959532
ses_c08_001_wa16p_pp_wrk_in_cnty

In [174]:
ses_dt %>% dim

## Fill factor with "missing value" and continuous with "median". 

In [175]:
colClass_dt %>% head
colClass_dt$class %>% table

varName,class
ses_v3589_mcu_primtm_tv,numeric
ses_v4000_consmr_promnc_ind,numeric
ses_v8655_lfestg_cd,numeric
ses_v8671_incm_estd_hh_narw_rng,factor
ses_v3102_cntry_of_orgn_high_dtl,factor
ses_v8201_incm_estd_hh_highr_rng,factor


.
 factor numeric 
      6      54 

In [176]:
catg_var_names <-  colClass_dt[ class %in% c("ordered","factor"), varName]
catt("categorical variables have", length(catg_var_names))

catg_var_withNAs_names <- ses_dt[ , map_lgl(.SD, ~sum(is.na(.))!=0), .SDcols = catg_var_names] %>% as.data.table(., keep.rownames = TRUE)

catg_var_withNAs_names %>% head

catg_var_withNAs_names <- catg_var_withNAs_names[. == TRUE, rn]

catt("categorical variables with any NAs have", length(catg_var_withNAs_names))

categorical variables have 6 


rn,.
ses_v8671_incm_estd_hh_narw_rng,True
ses_v3102_cntry_of_orgn_high_dtl,True
ses_v8201_incm_estd_hh_highr_rng,True
ses_v8637_occup_ip_indiv,True
ses_v8610_2_fng_1st_indiv_mid_init,True
ses_v8642_hmv_estd,True


categorical variables with any NAs have 6 


### assign value for "catg_part_dt"

In [177]:
#: memory efficient version, replace in place, serial.
for (var in catg_var_withNAs_names) {
    
    message("+ ", appendLF = FALSE)
    #temp <- temp %>% unlist
    temp <- ses_dt[[var]]
    #head(temp) %>% print
    levels(temp) <- c(levels(temp), "missing_value")
    temp[is.na(temp)] <- "missing_value"
    set(ses_dt, i = NULL, j = var, value = temp )
    
}

# or

# #: parallel, faster with spread on 29 cores, but need more memory.
# catg_part_dt <- foreach (temp = joined_dt[, catg_var_withNAs_names, with = FALSE], .inorder = TRUE, .combine = cbind) %dopar% {
    
#     #message("+ ", appendLF = FALSE)
#     #temp <- temp %>% unlist
#     #temp <- joined_dt[ , var, with = FALSE] %>% unlist 
#     #head(temp) %>% print
#     levels(temp) <- c(levels(temp), "missing_value")
#     temp[is.na(temp)] <- "missing_value"
#     #set(joined_dt, i = NULL, j = var, value = temp )
#     #head(temp) %>% print
#     return(temp %>% as.data.table)
    
# }

# #: then assign collected data.table back to `joined_dt`s corresponding colnames.
# joined_dt[, (catg_var_withNAs_names) := catg_part_dt]

+ + + + + + 

In [178]:
# setnames(catg_part_dt, catg_var_withNAs_names)

In [179]:
#: check if all filled now
ses_dt[ , map_lgl(.SD, ~sum(is.na(.))!=0), .SDcols = catg_var_names] %>% table

.
FALSE 
    6 

### assign value for "contin_part_dt"

In [180]:
contin_var_names <-  colClass_dt[ class %in% c("numeric", "double", "integer"), varName]
catt("continuous variables have", length(contin_var_names))

contin_var_withNAs_names <- ses_dt[ , map_lgl(.SD, ~sum(is.na(.))!=0), .SDcols = contin_var_names] %>% as.data.table(., keep.rownames = TRUE)

contin_var_withNAs_names %>% head

contin_var_withNAs_names <- contin_var_withNAs_names[. == TRUE, rn]

catt("continuous variables with any NAs have", length(contin_var_withNAs_names))

continuous variables have 54 


rn,.
ses_v3589_mcu_primtm_tv,True
ses_v4000_consmr_promnc_ind,True
ses_v8655_lfestg_cd,True
ses_v8617_a2yrincr_2nd_indiv,True
ses_v7616_1_a2yrincr_1_indv_cd,True
ses_v8626_a2yrincr_ip_indiv,True


continuous variables with any NAs have 54 


In [181]:
ses_dt[, contin_var_withNAs_names, with = FALSE] %>% map_int(., ~sum(is.na(.))) %>% table %>% sort

.
  50   72  391  462  611  837 1542  145   41 
   1    1    1    1    1    1    1    2   45 

In [182]:
for (var in contin_var_withNAs_names) {
    
    set(ses_dt, i = which(is.na(ses_dt[[var]])), j = var, value =  ses_dt[[var]] %>% median(., na.rm = TRUE))
    message("+ ", appendLF = FALSE)
}

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 

In [183]:
#: check and confirm
ses_dt[, contin_var_withNAs_names, with = FALSE] %>% map_int(., ~sum(is.na(.))) %>% table %>% sort

In [184]:
dim(ses_dt)

In [185]:
ses_dt %>% sapply(., class) %>% table

.
character    factor   numeric 
        1        66        54 

# one hot encoding

In [186]:
library(caret)

In [187]:
catg_var_names

In [188]:
ses_dt[ , catg_var_names, with = FALSE] %>% map(., n_distinct)

In [189]:
ses_dt[ , catg_var_names, with = FALSE] %>% map(., unique)

In [190]:
RHS <- paste(catg_var_names, collapse = " + ")

In [191]:
dmy <- dummyVars(sprintf(" ~ %s", RHS), data = ses_dt, sep = "_") # use "_" instead of "."
trsf <- data.table(predict(dmy, newdata = ses_dt))

In [192]:
trsf %>% dim

In [193]:
trsf %>% sapply(., class) %>% table

.
numeric 
    143 

## cbind and remove original factor cols (since we have one-hot encoding variables)

In [194]:
ses_dt[ , (catg_var_names) := NULL]
ses_dt_onehotted <- cbind(ses_dt, trsf)

In [195]:
ses_dt_onehotted %>% sapply(., class) %>% table

.
character    factor   numeric 
        1        60       197 

In [196]:
ses_dt_onehotted

indiv_enterprise_id,ses_v3589_mcu_primtm_tv,ses_v4000_consmr_promnc_ind,ses_v8655_lfestg_cd,ses_v8617_a2yrincr_2nd_indiv,ses_v7616_1_a2yrincr_1_indv_cd,ses_v8626_a2yrincr_ip_indiv,ses_v8616_a2yrincr_1st_indiv,ses_v8640_num_of_src,ses_v2778_brnd_nm_med_ppns,⋯,ses_v8642_hmv_estd_K,ses_v8642_hmv_estd_L,ses_v8642_hmv_estd_M,ses_v8642_hmv_estd_N,ses_v8642_hmv_estd_O,ses_v8642_hmv_estd_P,ses_v8642_hmv_estd_Q,ses_v8642_hmv_estd_R,ses_v8642_hmv_estd_S,ses_v8642_hmv_estd_missing_value
10044079,3,9,345,62,62,62,62,40,7,⋯,0,0,0,0,0,0,0,0,0,0
10049553,7,2,314,56,46,64,56,40,4,⋯,0,0,0,1,0,0,0,0,0,0
10049744,5,10,335,56,56,54,56,46,6,⋯,0,0,1,0,0,0,0,0,0,0
10051016,5,1,141,54,26,18,26,41,8,⋯,0,0,0,0,0,0,1,0,0,0
10074,5,10,215,56,44,44,44,40,10,⋯,0,0,0,0,0,0,1,0,0,0
1007869,6,1,314,50,54,50,54,39,8,⋯,0,0,0,0,0,0,0,0,0,0
10089743,4,10,262,56,48,48,48,38,9,⋯,0,0,0,0,0,0,0,0,0,0
10152833,5,6,314,56,44,44,44,44,6,⋯,0,0,0,0,0,0,0,0,0,0
10166078,5,10,355,54,64,54,64,39,4,⋯,0,0,0,0,0,0,0,0,0,0
10169064,8,7,355,44,60,44,60,34,7,⋯,0,0,0,0,0,0,1,0,0,0


# save to disk

In [197]:
# write_feather(ses_dt_onehotted, path = "/gnshealthcare/shared/yyang/data/ses_data/ses_dt_onehotted_100k_Ning.feather")

# fwrite(ses_dt_onehotted, file = "/gnshealthcare/shared/yyang/data/ses_data/ses_dt_onehotted_100k_Ning.csv")


# write_feather(ses_dt_onehotted, path = "/gnshealthcare/shared/yyang/data/ses_data/ses_dt_onehotted_2.5million_Ning.feather")

# fwrite(ses_dt_onehotted, file = "/gnshealthcare/shared/yyang/data/ses_data/ses_dt_onehotted_2.5million_Ning.csv")


write_feather(ses_dt_onehotted, path = "/gnshealthcare/shared/yyang/data/ses_data/ses_dt_onehotted_oncology_m4_8_6.feather")

fwrite(ses_dt_onehotted, file = "/gnshealthcare/shared/yyang/data/ses_data/ses_dt_onehotted_oncology_m4_8_6.csv")

In [198]:
ses_dt_onehotted %>% tbl_vars

In [199]:
#ses_dt_onehotted <- read_feather(path = "/gnshealthcare/shared/yyang/data/ses_data/joined_dt_NAfilled.feather") %>% as.data.table