In [1]:
import pyspark.sql.types as typ

labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.StringType()),
    ('BIRTH_YEAR', typ.IntegerType()),
    ('BIRTH_MONTH', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('MOTHER_RACE_6CODE', typ.StringType()),
    ('MOTHER_EDUCATION', typ.StringType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('FATHER_EDUCATION', typ.StringType()),
    ('MONTH_PRECARE_RECODE', typ.StringType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_BMI_RECODE', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.StringType()),
    ('DIABETES_GEST', typ.StringType()),
    ('HYP_TENS_PRE', typ.StringType()),
    ('HYP_TENS_GEST', typ.StringType()),
    ('PREV_BIRTH_PRETERM', typ.StringType()),
    ('NO_RISK', typ.StringType()),
    ('NO_INFECTIONS_REPORTED', typ.StringType()),
    ('LABOR_IND', typ.StringType()),
    ('LABOR_AUGM', typ.StringType()),
    ('STEROIDS', typ.StringType()),
    ('ANTIBIOTICS', typ.StringType()),
    ('ANESTHESIA', typ.StringType()),
    ('DELIV_METHOD_RECODE_COMB', typ.StringType()),
    ('ATTENDANT_BIRTH', typ.StringType()),
    ('APGAR_5', typ.IntegerType()),
    ('APGAR_5_RECODE', typ.StringType()),
    ('APGAR_10', typ.IntegerType()),
    ('APGAR_10_RECODE', typ.StringType()),
    ('INFANT_SEX', typ.StringType()),
    ('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),
    ('INFANT_WEIGHT_GRAMS', typ.IntegerType()),
    ('INFANT_ASSIST_VENTI', typ.StringType()),
    ('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),
    ('INFANT_NICU_ADMISSION', typ.StringType()),
    ('INFANT_SURFACANT', typ.StringType()),
    ('INFANT_ANTIBIOTICS', typ.StringType()),
    ('INFANT_SEIZURES', typ.StringType()),
    ('INFANT_NO_ABNORMALITIES', typ.StringType()),
    ('INFANT_ANCEPHALY', typ.StringType()),
    ('INFANT_MENINGOMYELOCELE', typ.StringType()),
    ('INFANT_LIMB_REDUCTION', typ.StringType()),
    ('INFANT_DOWN_SYNDROME', typ.StringType()),
    ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
    ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),
    ('INFANT_BREASTFED', typ.StringType())
]

schema = typ.StructType([
        typ.StructField(e[0], e[1], False) for e in labels
    ])

In [2]:

births = spark.read.csv('/FileStore/tables/8zkqo80m1507531980762/births_train_csv-dc6be.gz', header=True, schema=schema)

In [3]:
births.count()

In [4]:
display(births)

In [5]:

recode_dictionary = {
    'YNU': {
        'Y': 1,
        'N': 0,
        'U': 0
    }
}

In [6]:
len(births.columns)

In [7]:
selected_features = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_BEFORE', 
    'CIG_1_TRI', 
    'CIG_2_TRI', 
    'CIG_3_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'MOTHER_DELIVERY_WEIGHT', 
    'MOTHER_WEIGHT_GAIN', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]

In [8]:
births_trimmed = births.select(selected_features)

In [9]:
import pyspark.sql.functions as func

#looks up the correct key from recode dictionary ( given the key )& returns corrected value
def recode(col, key):        
    return recode_dictionary[key][col] 

#Checks when the value of feature feat is not equal to 99 and returns value of feature otherwise
def correct_cig(feat):
    return func \
        .when(func.col(feat) != 99, func.col(feat))\
        .otherwise(0)

rec_integer = func.udf(recode, typ.IntegerType())

In [10]:
births.columns

In [11]:
births.select([
        'INFANT_NICU_ADMISSION', 
        rec_integer(
            'INFANT_NICU_ADMISSION', func.lit('YNU')
        ) \
        .alias('INFANT_NICU_ADMISSION_RECODE')]
     ).take(5)

In [12]:
births_transformed = births_trimmed \
    .withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\
    .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\
    .withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\
    .withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))

In [13]:
display(births_transformed)

In [14]:
cols = [(col.name, col.dataType) for col in births_trimmed.schema]


In [15]:
cols

In [16]:
YNU_cols = []

for i, s in enumerate(cols):
  if s[1] == typ.StringType():
    dis = births.select(s[0]).distinct().rdd.map(lambda r : r[0]).collect()
  if 'Y' in dis:
    YNU_cols.append(s[0])

In [17]:
YNU_cols

In [18]:
exprs_YNU = [
  rec_integer(x, func.lit('YNU')).alias(x)
  if x in YNU_cols
  else x
  for x in births_transformed.columns
]

In [19]:
births_transformed = births_transformed.select(exprs_YNU)

<h3>Get to know your data</h3>
* Descriptive statistics

In [21]:
import pyspark.mllib.stat as st
import numpy as np

In [22]:
numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE',
                'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI',
                'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT',
                'MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN'
               ]


In [23]:
numeric_rdd = births_transformed.select(numeric_cols).rdd \
              .map(lambda row : [e for e in row])

In [24]:
numeric_rdd.take(5)

In [25]:
mllib_stats = st.Statistics.colStats(numeric_rdd)

In [26]:
mllib_stats.mean()

In [27]:
mllib_stats.max()

In [28]:
for col, m ,v in zip(numeric_cols,mllib_stats.mean(), mllib_stats.variance()):
  print col ,':',m,':',np.sqrt(v)

In [29]:
categorical_cols = [ e for e in births_transformed.columns if e not in numeric_cols]

In [30]:
categorical_rdd = births_transformed.select(categorical_cols).rdd.map(lambda row: [e for e in row])

In [31]:
categorical_rdd.take(5)

In [32]:
for i,col in enumerate(categorical_cols):
  agg = categorical_rdd.groupBy(lambda r : r[i])
  l = agg.collect()
  print col,
  for k,v in l:
    print (k,len(list(v))),
  print '\n'
  

In [33]:
for i,col in enumerate(categorical_cols):
  agg = categorical_rdd.groupBy(lambda r : r[i]).map(lambda r: (r[0],len(r[1])))
  
  print(col, sorted(agg.collect(),key=lambda el:el[1],reverse=True))

In [34]:
corrs = st.Statistics.corr(numeric_rdd)

In [35]:
corrs[0][6]


In [36]:
numeric_cols

In [37]:
import matplotlib.pyplot as plt

In [38]:
p = plt.plot([1,2,3,4],[2,3,4,5])

In [39]:
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(12,10))
sns.heatmap(corrs,annot=True,cbar=True)
plt.xticks(rotation=90)
#plt.savefig('/FileStore/tables/8zkqo80m1507531980762/a.jpeg')

<h3>create final data</h3>

In [41]:
import pyspark.mllib.feature as ft
import pyspark.mllib.regression as reg

In [42]:
hashing = ft.HashingTF(7)

In [43]:
hashing.transform(['a','b','a','a','b','c'])

In [44]:
display(
  births_transformed)

In [45]:
features_to_keep = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_1_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]

In [46]:

births_transformed.first()

In [47]:
from pyspark.mllib.regression import LabeledPoint
pos = sc.parallelize([ LabeledPoint(1.0, [1.0, 0.0, 3.0]), LabeledPoint(0.0, [0.0, 0.0, 3.0])])
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

In [48]:
LR_model = LogisticRegressionWithLBFGS.train(pos,iterations=10)

In [49]:
pos.map(lambda x: LR_model.predict(x.features)).collect()

In [50]:
x = LabeledPoint(1.0, [1.0, 0.0, 3.0])

In [51]:
type(x.features)