# Mllib package of pyspark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master('local') \
        .appName('test') \
        .config('spark.executor.memory', '1gb') \
        .getOrCreate()

In [4]:
import pyspark.sql.types as typ

labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.StringType()),
    ('BIRTH_YEAR', typ.IntegerType()),
    ('BIRTH_MONTH', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('MOTHER_RACE_6CODE', typ.StringType()),
    ('MOTHER_EDUCATION', typ.StringType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('FATHER_EDUCATION', typ.StringType()),
    ('MONTH_PRECARE_RECODE', typ.StringType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_BMI_RECODE', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.StringType()),
    ('DIABETES_GEST', typ.StringType()),
    ('HYP_TENS_PRE', typ.StringType()),
    ('HYP_TENS_GEST', typ.StringType()),
    ('PREV_BIRTH_PRETERM', typ.StringType()),
    ('NO_RISK', typ.StringType()),
    ('NO_INFECTIONS_REPORTED', typ.StringType()),
    ('LABOR_IND', typ.StringType()),
    ('LABOR_AUGM', typ.StringType()),
    ('STEROIDS', typ.StringType()),
    ('ANTIBIOTICS', typ.StringType()),
    ('ANESTHESIA', typ.StringType()),
    ('DELIV_METHOD_RECODE_COMB', typ.StringType()),
    ('ATTENDANT_BIRTH', typ.StringType()),
    ('APGAR_5', typ.IntegerType()),
    ('APGAR_5_RECODE', typ.StringType()),
    ('APGAR_10', typ.IntegerType()),
    ('APGAR_10_RECODE', typ.StringType()),
    ('INFANT_SEX', typ.StringType()),
    ('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),
    ('INFANT_WEIGHT_GRAMS', typ.IntegerType()),
    ('INFANT_ASSIST_VENTI', typ.StringType()),
    ('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),
    ('INFANT_NICU_ADMISSION', typ.StringType()),
    ('INFANT_SURFACANT', typ.StringType()),
    ('INFANT_ANTIBIOTICS', typ.StringType()),
    ('INFANT_SEIZURES', typ.StringType()),
    ('INFANT_NO_ABNORMALITIES', typ.StringType()),
    ('INFANT_ANCEPHALY', typ.StringType()),
    ('INFANT_MENINGOMYELOCELE', typ.StringType()),
    ('INFANT_LIMB_REDUCTION', typ.StringType()),
    ('INFANT_DOWN_SYNDROME', typ.StringType()),
    ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
    ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),
    ('INFANT_BREASTFED', typ.StringType())
]

schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])

In [5]:
births = spark.read.csv('births_train.csv', header=True, schema=schema)

In [6]:
recode_dictionary = {
    'YUN' : {
        'Y' : 1,
        'N' : 0,
        'U' : 0
    }
}

In [9]:
selected_features = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_BEFORE', 
    'CIG_1_TRI', 
    'CIG_2_TRI', 
    'CIG_3_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'MOTHER_DELIVERY_WEIGHT', 
    'MOTHER_WEIGHT_GAIN', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]

births_trimmed = births.select(selected_features)

In [19]:
births_trimmed.show(5)

+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINED_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+
|                     N|          1|              29|                 99|        99|       99|       99|       99|              99|              999|                   999|                99|           N| 

In [13]:
births_trimmed.printSchema()

root
 |-- INFANT_ALIVE_AT_REPORT: string (nullable = true)
 |-- BIRTH_PLACE: string (nullable = true)
 |-- MOTHER_AGE_YEARS: integer (nullable = true)
 |-- FATHER_COMBINED_AGE: integer (nullable = true)
 |-- CIG_BEFORE: integer (nullable = true)
 |-- CIG_1_TRI: integer (nullable = true)
 |-- CIG_2_TRI: integer (nullable = true)
 |-- CIG_3_TRI: integer (nullable = true)
 |-- MOTHER_HEIGHT_IN: integer (nullable = true)
 |-- MOTHER_PRE_WEIGHT: integer (nullable = true)
 |-- MOTHER_DELIVERY_WEIGHT: integer (nullable = true)
 |-- MOTHER_WEIGHT_GAIN: integer (nullable = true)
 |-- DIABETES_PRE: string (nullable = true)
 |-- DIABETES_GEST: string (nullable = true)
 |-- HYP_TENS_PRE: string (nullable = true)
 |-- HYP_TENS_GEST: string (nullable = true)
 |-- PREV_BIRTH_PRETERM: string (nullable = true)



In [21]:
import pyspark.sql.functions as func

# 自定义UDF函数，将INFANT_NICU_ADMISSION列转换成我们想要的样子 Y->1 N->0 U->0
def recode(col, key):
    return recode_dictionary[key][col]

def correct_cig(feat):
    return func \
        .when(func.col(feat) != 99, func.col(feat)) \
        .otherwise(0)

rec_integer = func.udf(recode, typ.IntegerType())

In [14]:
births_transformed = births_trimmed \
    .withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\
    .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\
    .withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\
    .withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))

In [15]:
births_transformed.printSchema()

root
 |-- INFANT_ALIVE_AT_REPORT: string (nullable = true)
 |-- BIRTH_PLACE: string (nullable = true)
 |-- MOTHER_AGE_YEARS: integer (nullable = true)
 |-- FATHER_COMBINED_AGE: integer (nullable = true)
 |-- CIG_BEFORE: integer (nullable = true)
 |-- CIG_1_TRI: integer (nullable = true)
 |-- CIG_2_TRI: integer (nullable = true)
 |-- CIG_3_TRI: integer (nullable = true)
 |-- MOTHER_HEIGHT_IN: integer (nullable = true)
 |-- MOTHER_PRE_WEIGHT: integer (nullable = true)
 |-- MOTHER_DELIVERY_WEIGHT: integer (nullable = true)
 |-- MOTHER_WEIGHT_GAIN: integer (nullable = true)
 |-- DIABETES_PRE: string (nullable = true)
 |-- DIABETES_GEST: string (nullable = true)
 |-- HYP_TENS_PRE: string (nullable = true)
 |-- HYP_TENS_GEST: string (nullable = true)
 |-- PREV_BIRTH_PRETERM: string (nullable = true)



In [18]:
births_transformed.select(['CIG_BEFORE', 'CIG_1_TRI', 'CIG_2_TRI', 'CIG_3_TRI']).show(10)

+----------+---------+---------+---------+
|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|
+----------+---------+---------+---------+
|         0|        0|        0|        0|
|         0|        0|        0|        0|
|         0|        0|        0|        0|
|         0|        0|        0|        0|
|         6|        4|        2|        2|
|         0|        0|        0|        0|
|         0|        0|        0|        0|
|         0|        0|        0|        0|
|         0|        0|        0|        0|
|         0|        0|        0|        0|
+----------+---------+---------+---------+
only showing top 10 rows



In [35]:
cols = [(col.name, col.dataType) for col in births_trimmed.schema]

YNU_cols = []

for i, s in enumerate(cols):
    if s[1] == typ.StringType():
        dis = births.select(s[0]) \
            .distinct() \
            .rdd \
            .map(lambda row: row[0]) \
            .collect()

        if 'Y' in dis:
            YUN_cols.append(s[0])

In [36]:
exprs_YNU = [
    rec_integer(x, func.lit('YNU')).alias(x) 
    if x in YNU_cols 
    else x 
    for x in births_transformed.columns
]

births_transformed = births_transformed.select(exprs_YNU)

In [37]:
births_transformed.select(YNU_cols[-5:]).show(5)

++
||
++
||
||
||
||
||
++
only showing top 5 rows



# Get to know your data

In [41]:
# using colStats() to know the descriptive statistics
import pyspark.mllib.stat as st
import numpy as np
numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE',
                'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI',
                'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT',
                'MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN'
               ]
numeric_rdd = births_transformed.select(numeric_cols).rdd.map(lambda row: [e for e in row])

mllib_stats = st.Statistics.colStats(numeric_rdd)

for col, m, v in zip(numeric_cols, 
                     mllib_stats.mean(), 
                     mllib_stats.variance()):
    print('{0}: \t {1:.2f} \t {2:.2f}'.format(col, m, np.sqrt(v)))

MOTHER_AGE_YEARS: 	 28.30 	 6.08
FATHER_COMBINED_AGE: 	 44.55 	 27.55
CIG_BEFORE: 	 1.43 	 5.18
CIG_1_TRI: 	 0.91 	 3.83
CIG_2_TRI: 	 0.70 	 3.31
CIG_3_TRI: 	 0.58 	 3.11
MOTHER_HEIGHT_IN: 	 65.12 	 6.45
MOTHER_PRE_WEIGHT: 	 214.50 	 210.21
MOTHER_DELIVERY_WEIGHT: 	 223.63 	 180.01
MOTHER_WEIGHT_GAIN: 	 30.74 	 26.23


In [42]:
numeric_rdd.take(5)

[[29, 99, 0, 0, 0, 0, 99, 999, 999, 99],
 [22, 29, 0, 0, 0, 0, 65, 180, 198, 18],
 [38, 40, 0, 0, 0, 0, 63, 155, 167, 12],
 [39, 42, 0, 0, 0, 0, 60, 128, 152, 24],
 [18, 99, 6, 4, 2, 2, 61, 110, 130, 20]]

In [43]:
# for categorical variables

categorical_cols = [e for e in births_transformed.columns
                       if e not in numeric_cols]

categorical_rdd = births_transformed.select(categorical_cols).rdd.map(lambda row: [e for e in row])

for i, col in enumerate(categorical_cols):
    agg = categorical_rdd.groupBy(lambda row: row[i]).map(lambda row: (row[0], len(row[1])))
    print(col, sorted(agg.collect(), key=lambda e: e[1], reverse=True))

INFANT_ALIVE_AT_REPORT [('Y', 23349), ('N', 22080)]
BIRTH_PLACE [('1', 44558), ('4', 327), ('3', 224), ('2', 136), ('7', 91), ('5', 74), ('6', 11), ('9', 8)]
DIABETES_PRE [('N', 44689), ('Y', 548), ('U', 192)]
DIABETES_GEST [('N', 43259), ('Y', 1978), ('U', 192)]
HYP_TENS_PRE [('N', 44156), ('Y', 1081), ('U', 192)]
HYP_TENS_GEST [('N', 43110), ('Y', 2127), ('U', 192)]
PREV_BIRTH_PRETERM [('N', 42896), ('Y', 2341), ('U', 192)]


In [45]:
corrs = st.Statistics.corr(numeric_rdd)

for i, el in enumerate(corrs > 0.5):
    correlated = [
        (numeric_cols[j], corrs[i][j])
        for j, e in enumerate(el)
        if e == 1.0 and j != i]
    if len(correlated) > 0:
        for e in correlated:
            print('{0}-to-{1}: {2:.2f}' \
                  .format(numeric_cols[i], e[0], e[1]))

CIG_BEFORE-to-CIG_1_TRI: 0.83
CIG_BEFORE-to-CIG_2_TRI: 0.72
CIG_BEFORE-to-CIG_3_TRI: 0.62
CIG_1_TRI-to-CIG_BEFORE: 0.83
CIG_1_TRI-to-CIG_2_TRI: 0.87
CIG_1_TRI-to-CIG_3_TRI: 0.76
CIG_2_TRI-to-CIG_BEFORE: 0.72
CIG_2_TRI-to-CIG_1_TRI: 0.87
CIG_2_TRI-to-CIG_3_TRI: 0.89
CIG_3_TRI-to-CIG_BEFORE: 0.62
CIG_3_TRI-to-CIG_1_TRI: 0.76
CIG_3_TRI-to-CIG_2_TRI: 0.89
MOTHER_PRE_WEIGHT-to-MOTHER_DELIVERY_WEIGHT: 0.54
MOTHER_PRE_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.65
MOTHER_DELIVERY_WEIGHT-to-MOTHER_PRE_WEIGHT: 0.54
MOTHER_DELIVERY_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.60
MOTHER_WEIGHT_GAIN-to-MOTHER_PRE_WEIGHT: 0.65
MOTHER_WEIGHT_GAIN-to-MOTHER_DELIVERY_WEIGHT: 0.60


In [46]:
# drop most of highly correlated features

features_to_keep = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_1_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]
births_transformed = births_transformed.select([e for e in features_to_keep])

In [47]:
import pyspark.mllib.linalg as ln

for cat in categorical_cols[1:]:
    agg = births_transformed \
        .groupby('INFANT_ALIVE_AT_REPORT') \
        .pivot(cat) \
        .count()    

    agg_rdd = agg \
        .rdd\
        .map(lambda row: (row[1:])) \
        .flatMap(lambda row: 
                 [0 if e == None else e for e in row]) \
        .collect()

    row_length = len(agg.collect()[0]) - 1
    agg = ln.Matrices.dense(row_length, 2, agg_rdd)
    
    test = st.Statistics.chiSqTest(agg)
    print(cat, round(test.pValue, 4))

BIRTH_PLACE 0.0
DIABETES_PRE 0.0
DIABETES_GEST 0.0
HYP_TENS_PRE 0.0
HYP_TENS_GEST 0.0
PREV_BIRTH_PRETERM 0.0


In [48]:
# labeldPointes
import pyspark.mllib.feature as ft
import pyspark.mllib.regression as reg

hashing = ft.HashingTF(7)

births_hashed = births_transformed \
    .rdd \
    .map(lambda row: [
            list(hashing.transform(row[1]).toArray()) 
                if col == 'BIRTH_PLACE' 
                else row[i] 
            for i, col 
            in enumerate(features_to_keep)]) \
    .map(lambda row: [[e] if type(e) == int else e 
                      for e in row]) \
    .map(lambda row: [item for sublist in row 
                      for item in sublist]) \
    .map(lambda row: reg.LabeledPoint(
            row[0], 
            ln.Vectors.dense(row[1:]))
        )

In [55]:
births_train, births_test = births_hashed.randomSplit([0.6, 0.4])

In [61]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

LR_Model = LogisticRegressionWithLBFGS.train(births_train, iterations=10)

In [None]:
# predict the test set
LR_results = (
        births_test.map(lambda row: row.label) \
        .zip(LR_Model \
             .predict(births_test\
                      .map(lambda row: row.features)))
    ).map(lambda row: (row[0], row[1] * 1.0))

In [None]:
import pyspark.mllib.evaluation as ev
LR_evaluation = ev.BinaryClassificationMetrics(LR_results)

print('Area under PR: {0:.2f}' \
      .format(LR_evaluation.areaUnderPR))
print('Area under ROC: {0:.2f}' \
      .format(LR_evaluation.areaUnderROC))
LR_evaluation.unpersist()

In [None]:
selector = ft.ChiSqSelector(4).fit(births_train)

topFeatures_train = (
        births_train.map(lambda row: row.label) \
        .zip(selector \
             .transform(births_train \
                        .map(lambda row: row.features)))
    ).map(lambda row: reg.LabeledPoint(row[0], row[1]))

topFeatures_test = (
        births_test.map(lambda row: row.label) \
        .zip(selector \
             .transform(births_test \
                        .map(lambda row: row.features)))
    ).map(lambda row: reg.LabeledPoint(row[0], row[1]))