## CSE545-SDG3-Matrix


In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import numpy as np
import pandas as pd
import scipy
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *
import os

In [2]:
sc = pyspark.SparkContext(appName="SDG3")
sqlCtx = SQLContext(sc)
sc.version

'2.2.0'

In [3]:
data_path = r'../LinkPE14US'
print(os.path.join(data_path, "VS14LINK.USNUMPUB"))
df = sqlCtx.read.text(os.path.join(data_path, "VS14LINK.USNUMPUB"))  #350 mb
#df = sqlCtx.read.text("/VS14LINK.USDENPUB") # 5gb
#df = sqlCtx.read.text("/*.*PUB") # both
df

../LinkPE14US/VS14LINK.USNUMPUB


DataFrame[value: string]

In [4]:
def blank_as_null(x):
    return when(col(x) != ' ', col(x)).otherwise(np.nan)

In [5]:
# pandas data frame after adding column metadata
#pndf type = pyspark.sql.dataframe.DataFrame
pndf = df.select(
    df.value.substr(9,4).cast(IntegerType()).alias('Birth_Year'),
    df.value.substr(13,2).cast(IntegerType()).alias('Birth_Month'),
    df.value.substr(75,2).cast(IntegerType()).alias('Mothers_Age'),
    #df.value.substr(299,3).cast(IntegerType()).alias('Delivery_Weight_lbs'),
    df.value.substr(332,2).cast(IntegerType()).alias('Num_Prev_Cesareans'),
    #435
    df.value.substr(435,1).cast(IntegerType()).alias('Payment_Source'),
    df.value.substr(436,1).cast(IntegerType()).alias('Payment_Recode'),
    df.value.substr(444,2).cast(IntegerType()).alias('Five_Minute_APGAR_Score'),
    df.value.substr(446,1).cast(IntegerType()).alias('Five_Minute_APGAR_Recode'),
    df.value.substr(448,2).cast(IntegerType()).alias('Ten_Minute_APGAR_Score'),
    df.value.substr(450,1).cast(IntegerType()).alias('Ten_Minute_APGAR_Recode'),
    df.value.substr(454,1).cast(IntegerType()).alias('Plurality'),
#     df.value.substr(456,1).cast(IntegerType()).alias('Plurality_Imputed'),
    df.value.substr(475,1).cast(StringType()).alias('Sex_Of_Infant'),
#     df.value.substr(476,1).cast(IntegerType()).alias('Imputed_Sex'),
    df.value.substr(477,2).cast(IntegerType()).alias('Last_Normal_Menses_Month'),
    df.value.substr(481,4).cast(IntegerType()).alias('Last_Normal_Menses_Year'),
#     df.value.substr(488,4).cast(IntegerType()).alias('Combined_Gestation_Imputed'),
    df.value.substr(489,1).cast(IntegerType()).alias('Obstetric_Estimate_of_Gestation_Used_Flag'),
    df.value.substr(490,2).cast(IntegerType()).alias('Combined_Gestation_Detail_in_Weeks'),
    df.value.substr(492,1).cast(IntegerType()).alias('Combined_Gestation_Recode_Weeks'),
    df.value.substr(509,2).cast(IntegerType()).alias('Birth_Weight_Recode_14'),
    df.value.substr(511,1).cast(IntegerType()).alias('Birth_Weight_Recode_4'),
    df.value.substr(512,4).cast(IntegerType()).alias('Imputed_Birthwieght'),
    #Abnormal Conditions of the Newborn
    df.value.substr(517,1).cast(StringType()).alias('Assisted_Ventilation'),
    df.value.substr(518,1).cast(StringType()).alias('Assisted_Ventilation_Greater_Than_6_Hours'),
    df.value.substr(519,1).cast(StringType()).alias('Admission_to_NICU'),
    df.value.substr(520,1).cast(StringType()).alias('Surfactant'),
    df.value.substr(521,1).cast(StringType()).alias('Antibiotics'),
    df.value.substr(522,1).cast(StringType()).alias('Seizures'),
    df.value.substr(531,1).cast(IntegerType()).alias('No_Abnormal_Conditions_Checked'),
    #
    #Congenital Anomalies of the Newborn
    df.value.substr(537,1).cast(StringType()).alias('Anencephaly'),
    df.value.substr(538,1).cast(StringType()).alias('Meningomyelocele_or_Spina_Bifida'),
    df.value.substr(539,1).cast(StringType()).alias('Cyanotic_Congenital_Heart_Disease'),
    df.value.substr(540,1).cast(StringType()).alias('Congenital_Diaphragmatic_Hernia'),
    df.value.substr(541,1).cast(StringType()).alias('Omphalocele'),
    df.value.substr(542,1).cast(StringType()).alias('Gastroschisis'),
    df.value.substr(549,1).cast(StringType()).alias('Limb_Reduction_Defect'),
    df.value.substr(550,1).cast(StringType()).alias('Cleft_Lip_w_or_wo_Cleft_Palate'),
    df.value.substr(551,1).cast(StringType()).alias('Cleft_Palate_alone'),
    df.value.substr(553,1).cast(StringType()).alias('Suspected_Chromosomal_Disorder'),
    df.value.substr(554,1).cast(StringType()).alias('Hypospadias'),
    df.value.substr(561,1).cast(IntegerType()).alias('No_Congenital_Anomalies_Checked'),
    #
    df.value.substr(568,1).cast(StringType()).alias('Infant_Living_at_Time_of_Report'),
    df.value.substr(569,1).cast(StringType()).alias('Infant_Being_Breastfed')
    #870
    
).withColumn("id", monotonically_increasing_id())

pndf.printSchema
pndf = pndf.na.fill({'Num_Prev_Cesareans': 0.0})
#pndf = pndf.withColumn("Infant_Living", blank_as_null("Infant_Living"))

In [6]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(pndf) for column in list(set(pndf.columns)-set(['Five_Minute_APGAR_Score'])) ]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_numeric").fit(pndf) for column in list(  set(pndf.columns) - set(['Birth_Year', 'Birth_Month', 'Mothers_Age', 'Delivery_Weight_lbs', 'Num_Prev_Cesareans', 'Payment_Source', 'Payment_Recode', 'Five_Minute_APGAR_Score', 'Five_Minute_APGAR_Recode', 'Ten_Minute_APGAR_Score', 'Ten_Minute_APGAR_Recode', 'Plurality', 'Plurality_Imputed', 'Imputed_Sex', 'Last_Normal_Menses_Month', 'Last_Normal_Menses_Year', 'Combined_Gestation_Imputed', 'Obstetric_Estimate_of_Gestation_Used_Flag', 'Combined_Gestation_Detail_in_Weeks', 'Combined_Gestation_Recode_Weeks', 'Birth_Weight_Recode_14', 'Birth_Weight_Recode_4', 'Imputed_Birthwieght', 'No_Abnormal_Conditions_Checked', 'No_Congenital_Anomalies_Checked'])) ]
pipeline = Pipeline(stages=indexers)
pndf = pipeline.fit(pndf).transform(pndf)
# drop_list = {'Sex_of_Infant', 'Assisted_Ventilation', 'Assisted_Ventilation_Greater_Than_6_Hours','Admission_to_NICU',
#             'Surfactant', 'Antibiotics', 'Seizures', 'Anencephaly', 'Meningomyelocele_or_Spina_Bifida', 'Cyanotic_Congenital_Heart_Disease',
#             'Congenital_Diaphragmatic_Hernia', 'Omphalocele', 'Gastroschisis', 'Limb_Reduction_Defect', 'Cleft_Lip_w_or_wo_Cleft_Palate',
#             'Cleft_Palate_alone', 'Suspected_Chromosomal_Disorder', 'Hypospadias', 'Infant_Living_at_Time_of_Report', 'Infant_Being_Breastfed'}

# print(pndf.select([column for column in pndf.columns if column not in drop_list]))
# # pndf.drop(df.Sex_of_Infant).collect()
# pndf.drop('Sex_of_Infant')
# # print(pndf.columns)
# pndf.show()

# pca_cols = list(set(pca_cols).difference(set(drop_list)))
# stringIndexer = StringIndexer(inputCol="Infant_Living_at_Time_of_Report", outputCol='Infant_Living_Index')
# pndf = stringIndexer.fit(pndf).transform(pndf)
pndf.head(1)

[Row(Birth_Year=2013, Birth_Month=7, Mothers_Age=24, Num_Prev_Cesareans=0, Payment_Source=2, Payment_Recode=2, Five_Minute_APGAR_Score=9, Five_Minute_APGAR_Recode=4, Ten_Minute_APGAR_Score=88, Ten_Minute_APGAR_Recode=5, Plurality=1, Sex_Of_Infant='F', Last_Normal_Menses_Month=10, Last_Normal_Menses_Year=2012, Obstetric_Estimate_of_Gestation_Used_Flag=None, Combined_Gestation_Detail_in_Weeks=42, Combined_Gestation_Recode_Weeks=1, Birth_Weight_Recode_14=11, Birth_Weight_Recode_4=3, Imputed_Birthwieght=4119, Assisted_Ventilation='N', Assisted_Ventilation_Greater_Than_6_Hours='N', Admission_to_NICU='N', Surfactant='N', Antibiotics='N', Seizures='N', No_Abnormal_Conditions_Checked=1, Anencephaly='N', Meningomyelocele_or_Spina_Bifida='N', Cyanotic_Congenital_Heart_Disease='N', Congenital_Diaphragmatic_Hernia='N', Omphalocele='N', Gastroschisis='N', Limb_Reduction_Defect='N', Cleft_Lip_w_or_wo_Cleft_Palate='N', Cleft_Palate_alone='N', Suspected_Chromosomal_Disorder='N', Hypospadias='N', No_Co

In [7]:
from pyspark.ml.feature import VectorAssembler

drop_list = ['Sex_Of_Infant', 'Assisted_Ventilation', 'Assisted_Ventilation_Greater_Than_6_Hours','Admission_to_NICU',
            'Surfactant', 'Antibiotics', 'Seizures', 'Anencephaly', 'Meningomyelocele_or_Spina_Bifida', 'Cyanotic_Congenital_Heart_Disease',
            'Congenital_Diaphragmatic_Hernia', 'Omphalocele', 'Gastroschisis', 'Limb_Reduction_Defect', 'Cleft_Lip_w_or_wo_Cleft_Palate',
            'Cleft_Palate_alone', 'Suspected_Chromosomal_Disorder', 'Hypospadias', 'Infant_Living_at_Time_of_Report', 'Infant_Being_Breastfed']


pca_cols = pndf.columns
pca_cols = [e for e in pca_cols if e not in drop_list]

pndf.na.drop()
assembler = VectorAssembler(inputCols=pca_cols, outputCol='features')
vector_df = assembler.transform(pndf.na.drop())
vector_df.head()

Row(Birth_Year=2013, Birth_Month=12, Mothers_Age=30, Num_Prev_Cesareans=0, Payment_Source=1, Payment_Recode=1, Five_Minute_APGAR_Score=0, Five_Minute_APGAR_Recode=1, Ten_Minute_APGAR_Score=0, Ten_Minute_APGAR_Recode=1, Plurality=1, Sex_Of_Infant='M', Last_Normal_Menses_Month=99, Last_Normal_Menses_Year=9999, Obstetric_Estimate_of_Gestation_Used_Flag=1, Combined_Gestation_Detail_in_Weeks=25, Combined_Gestation_Recode_Weeks=0, Birth_Weight_Recode_14=3, Birth_Weight_Recode_4=1, Imputed_Birthwieght=964, Assisted_Ventilation='Y', Assisted_Ventilation_Greater_Than_6_Hours='N', Admission_to_NICU='Y', Surfactant='Y', Antibiotics='Y', Seizures='N', No_Abnormal_Conditions_Checked=0, Anencephaly='N', Meningomyelocele_or_Spina_Bifida='N', Cyanotic_Congenital_Heart_Disease='N', Congenital_Diaphragmatic_Hernia='N', Omphalocele='N', Gastroschisis='N', Limb_Reduction_Defect='N', Cleft_Lip_w_or_wo_Cleft_Palate='N', Cleft_Palate_alone='N', Suspected_Chromosomal_Disorder='N', Hypospadias='N', No_Congenit

### Scaling and Normalization

In [8]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='features', 
                        outputCol='scaledFeatures', withMean=False, withStd=True) # TODO: should withMean be True??
scaled_df = scaler.fit(vector_df).transform(vector_df)
scaled_df.head()

Row(Birth_Year=2013, Birth_Month=12, Mothers_Age=30, Num_Prev_Cesareans=0, Payment_Source=1, Payment_Recode=1, Five_Minute_APGAR_Score=0, Five_Minute_APGAR_Recode=1, Ten_Minute_APGAR_Score=0, Ten_Minute_APGAR_Recode=1, Plurality=1, Sex_Of_Infant='M', Last_Normal_Menses_Month=99, Last_Normal_Menses_Year=9999, Obstetric_Estimate_of_Gestation_Used_Flag=1, Combined_Gestation_Detail_in_Weeks=25, Combined_Gestation_Recode_Weeks=0, Birth_Weight_Recode_14=3, Birth_Weight_Recode_4=1, Imputed_Birthwieght=964, Assisted_Ventilation='Y', Assisted_Ventilation_Greater_Than_6_Hours='N', Admission_to_NICU='Y', Surfactant='Y', Antibiotics='Y', Seizures='N', No_Abnormal_Conditions_Checked=0, Anencephaly='N', Meningomyelocele_or_Spina_Bifida='N', Cyanotic_Congenital_Heart_Disease='N', Congenital_Diaphragmatic_Hernia='N', Omphalocele='N', Gastroschisis='N', Limb_Reduction_Defect='N', Cleft_Lip_w_or_wo_Cleft_Palate='N', Cleft_Palate_alone='N', Suspected_Chromosomal_Disorder='N', Hypospadias='N', No_Congenit

In [9]:
from pyspark.ml.feature import Normalizer
nrmlzer = Normalizer(inputCol='scaledFeatures', outputCol='normalizeFeatures', p=1.0)
l1Normalized = nrmlzer.transform(scaled_df)
l1Normalized.head()

Row(Birth_Year=2013, Birth_Month=12, Mothers_Age=30, Num_Prev_Cesareans=0, Payment_Source=1, Payment_Recode=1, Five_Minute_APGAR_Score=0, Five_Minute_APGAR_Recode=1, Ten_Minute_APGAR_Score=0, Ten_Minute_APGAR_Recode=1, Plurality=1, Sex_Of_Infant='M', Last_Normal_Menses_Month=99, Last_Normal_Menses_Year=9999, Obstetric_Estimate_of_Gestation_Used_Flag=1, Combined_Gestation_Detail_in_Weeks=25, Combined_Gestation_Recode_Weeks=0, Birth_Weight_Recode_14=3, Birth_Weight_Recode_4=1, Imputed_Birthwieght=964, Assisted_Ventilation='Y', Assisted_Ventilation_Greater_Than_6_Hours='N', Admission_to_NICU='Y', Surfactant='Y', Antibiotics='Y', Seizures='N', No_Abnormal_Conditions_Checked=0, Anencephaly='N', Meningomyelocele_or_Spina_Bifida='N', Cyanotic_Congenital_Heart_Disease='N', Congenital_Diaphragmatic_Hernia='N', Omphalocele='N', Gastroschisis='N', Limb_Reduction_Defect='N', Cleft_Lip_w_or_wo_Cleft_Palate='N', Cleft_Palate_alone='N', Suspected_Chromosomal_Disorder='N', Hypospadias='N', No_Congenit

### Dimensionality Reduction: PCA

In [10]:
from pyspark.ml.feature import PCA
num_principal_comp = 4
pca = PCA(k=num_principal_comp, inputCol='normalizeFeatures', outputCol='features_pca')
pca_model = pca.fit(l1Normalized)
pca_feat = pca_model.transform(l1Normalized).select('features_pca')
pca_feat.show(truncate=False)

+------------------------------------------------------------------------------------+
|features_pca                                                                        |
+------------------------------------------------------------------------------------+
|[-0.9773906524390217,0.05025056921897018,0.02195834976615432,-0.061329670995928745] |
|[-0.9774646996160999,0.050171893164933624,0.02218523129324467,-0.0610788982575477]  |
|[-0.9758110964779552,0.049409839402227505,0.02234098722348654,-0.061229296345224955]|
|[-0.9757377918591028,0.04970255599909392,0.022200371318900278,-0.061021684925443916]|
|[-0.9759353825070085,0.04970956691001368,0.022185242979935016,-0.06091589992850187] |
|[-0.9777686007037071,0.05000521836032045,0.0222233366139892,-0.061410655762089164]  |
|[-0.9757855758507324,0.049479657677946896,0.02231091434692497,-0.061166216721117064]|
|[-0.9756880064575638,0.04948736834263715,0.022300402960926995,-0.06103840703773347] |
|[-0.9756676225831385,0.04947396835266351,0