In [0]:

%spark.pyspark
text_data = spark.createDataFrame([
['''Machine learning can be applied to a wide variety
of data types, such as vectors, text, images, and
structured data. This API adopts the DataFrame from
Spark SQL in order to support a variety of data
types.'''],
['''DataFrame supports many basic and structured types; 
see the Spark SQL datatype reference for a list of supported types. 
In addition to the types listed in the Spark SQL guide, DataFrame can use ML Vector types.'''],
['''A DataFrame can be created either implicitly or explicitly from a regular RDD. 
See the code examples below and the Spark SQL programming guide for examples.'''],

['''Columns in a DataFrame are named. The code examples
below use names such as "text," "features," and
"label."''']
], ['input'])


In [1]:
%spark.pyspark
text_data.show()


In [2]:
%spark.pyspark
import pandas as pd
import numpy as np



In [3]:
%spark.pyspark
text_data.coalesce(1).write.mode('overwrite').csv('s3://lerawzone/users/gyang/outfile/text_data.csv',header='true')


In [4]:
%spark.pyspark
import pyspark.sql.types as typ
labels = [
('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', typ.IntegerType()),
('FATHER_COMBINED_AGE', typ.IntegerType()),
('CIG_BEFORE', typ.IntegerType()),
('CIG_1_TRI', typ.IntegerType()),
('CIG_2_TRI', typ.IntegerType()),
('CIG_3_TRI', typ.IntegerType()),
('MOTHER_HEIGHT_IN', typ.IntegerType()),
('MOTHER_PRE_WEIGHT', typ.IntegerType()),
('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
('DIABETES_PRE', typ.IntegerType()),
('DIABETES_GEST', typ.IntegerType()),
('HYP_TENS_PRE', typ.IntegerType()),
('HYP_TENS_GEST', typ.IntegerType()),
('PREV_BIRTH_PRETERM', typ.IntegerType())]
schema=typ.StructType([typ.StructField(e[0],e[1],False) for e in labels])


In [5]:
%spark.pyspark
import xgboost


In [6]:
%spark.pyspark
from pyspark.sql.types import *


In [7]:
%spark.pyspark
schema=StructType([StructField('INFANT_ALIVE_AT_REPORT',LongType(),True),
                  StructField('BIRTH_PLACE',LongType(),True)])


In [8]:
%spark.pyspark
births=spark.read.csv('s3://lerawzone/users/gyang/births_transformed.csv.gz',header=True)


In [9]:
%spark.pyspark
email=spark.read.csv('s3://lerawzone/users/gyang/outfile/email_address.csv',header=True)

In [10]:
%spark.pyspark
births.select(*births.columns[:5]).coalesce(1).write.mode('overwrite').csv('s3://lerawzone/users/gyang/outfile/births_test2.csv',header='true')

In [11]:
%spark.pyspark
parquet=spark.read.parquet('s3://lerefinedzone/core/paid_search/crealytics_order_feed/create_dt=2018-06-14/*')

In [12]:
%spark.pyspark
births1=births.select(*births.columns[:5])



In [13]:
%spark.pyspark
births1.show(10)


In [14]:
%spark.pyspark
births.


In [15]:
%spark.pyspark
births.describe().show()


In [16]:
%spark.pyspark
from pyspark.sql.functions import *
import pyspark.ml.feature as ft
from pyspark.ml.feature import Binarizer
import pyspark.ml.classification as cl
from pyspark.ml import Pipeline
import pyspark.ml.evaluation as ev
import pyspark.ml.tuning as tune
from pyspark.mllib.evaluation import MulticlassMetrics



In [17]:
%spark.pyspark
births.count()


In [18]:
%spark.pyspark
for column in births.columns:
    births=births.withColumn(column,births[column].cast('integer'))


In [19]:
%spark.pyspark
births.printSchema()


In [20]:
%spark.pyspark
features=births.columns[1:]
features


In [21]:
%spark.pyspark
births.select(*(sum(col(c).isNull().cast('integer')).alias(c) for c in births.columns)).show()


In [22]:
%spark.pyspark
for i in births.columns:
    print(births.select(i).distinct().count())


In [23]:
%spark.pyspark
births.distinct().count()

In [24]:
%spark.pyspark
births.createOrReplaceTempView('births_sql')


In [25]:
%spark.pyspark
spark.sql('select count(distinct *) from births_sql').show()


In [26]:
%spark.pyspark
spark.sql('select count(*) from births_sql').show()


In [27]:
%spark.pyspark
births2=births.dropDuplicates()


In [28]:
%spark.pyspark
from pyspark.sql.functions import *

In [29]:
%spark.pyspark
df = sqlContext.createDataFrame([
    (1, "a"),
    (2, "b"),
    (3, "c"),
], ["ID", "Text"])

categories = df.select("Text").distinct().rdd.flatMap(lambda x: x).collect()

In [30]:
%spark.pyspark
df.show()

In [31]:
%spark.pyspark
exprs = [when(col("Text") == category, 1).otherwise(0).alias(category)
         for category in categories]
type(exprs)


In [32]:
%spark.pyspark
df=df.select('ID',*exprs)


In [33]:
%spark.pyspark
df.show()

In [34]:
%spark.pyspark
encoder=ft.OneHotEncoder(inputCol='BIRTH_PLACE',outputCol='BIRTH_PLACE_VEC')
type(encoder)

In [35]:
%spark.pyspark
encoder.getOutputCol()


In [36]:
%spark.pyspark

featuresCreator=ft.VectorAssembler(inputCols=[col for col in features],outputCol='features')



In [37]:
%spark.pyspark
births2=featuresCreator.transform(births)

In [38]:
%spark.pyspark
regP=range(0.01,0.1,0.01)


In [39]:
%spark.pyspark

logistic=cl.LogisticRegression(maxIter=50,regParam=0.0,labelCol='INFANT_ALIVE_AT_REPORT')


In [40]:
%spark.pyspark
pipeline=Pipeline(stages=[featuresCreator,logistic])
##births_train,births_test=births.randomSplit([0.7,0.3],seed=666)


In [41]:
%spark.pyspark
births_train.show(5)

In [42]:
%spark.pyspark
births2_train,births2_test=births2.randomSplit([0.7,0.3],seed=666)


In [43]:
%spark.pyspark
births_train,births_test=births.randomSplit([0.7,0.3],seed=666)


In [44]:
%spark.pyspark
model2=pipeline.fit(births2_train)


In [45]:
%spark.pyspark
train2=model2.transform(births2_train)


In [46]:
%spark.pyspark
test2=model2.transform(births2_test)
test2.take(1)


In [47]:
%spark.pyspark
test2.select('probability').show(10)


In [48]:
%spark.pyspark
model=pipeline.fit(births_train)



In [49]:
%spark.pyspark


In [50]:
%spark.pyspark
test_model=model.transform(births_test)
test_model.take(1)


In [51]:
%spark.pyspark
test_model.select('probability').show(10)


In [52]:
%spark.pyspark
evaluator=ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol=births_train.columns[0])
train_model=model.transform(births_train)


In [53]:
%spark.pyspark
evaluator2=ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol=births2_train.columns[0])


In [54]:
%spark.pyspark
print(evaluator2.evaluate(train2,{evaluator2.metricName:'areaUnderROC'}))
print(evaluator2.evaluate(test2,{evaluator2.metricName:'areaUnderROC'}))


In [55]:
%spark.pyspark
print(evaluator.evaluate(train_model,{evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test_model,{evaluator.metricName:'areaUnderROC'}))


In [56]:
%spark.pyspark
import numpy as np
import pandas as pd


In [57]:
%spark.pyspark
np.zeros((6,3))


In [58]:
%spark.pyspark
result=np.zeros((6,3))
for i,depth in enumerate(range(5,11)):
    rf_classifier=cl.RandomForestClassifier(maxDepth=depth,labelCol='INFANT_ALIVE_AT_REPORT',numTrees=100)
    rf_pip=Pipeline(stages=[featuresCreator,rf_classifier])
    rf_model=rf_pip.fit(births_train)
    rf_train=rf_model.transform(births_train)
    rf_test=rf_model.transform(births_test)
    rf_eval=ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol=births.columns[0])
    result[i,0]=depth
    result[i,1]=rf_eval.evaluate(rf_train,{rf_eval.metricName:'areaUnderROC'})
    result[i,2]=rf_eval.evaluate(rf_test,{rf_eval.metricName:'areaUnderROC'})



In [59]:
%spark.pyspark
result

In [60]:
%spark.pyspark
rf_eval=ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol=births.columns[0])
print(rf_eval.evaluate(rf_train,{rf_eval.metricName:'areaUnderROC'}))
print(rf_eval.evaluate(rf_test,{rf_eval.metricName:'areaUnderROC'}))


In [61]:
%spark.pyspark
result1=np.zeros((6,3))
for i,depth in enumerate(range(5,11)):
    GB_classifier=cl.GBTClassifier(labelCol='INFANT_ALIVE_AT_REPORT',maxDepth=depth)
    GB_pip=Pipeline(stages=[featuresCreator,GB_classifier])
    GB_model=GB_pip.fit(births_train)
    GB_train=GB_model.transform(births_train)
    GB_test=GB_model.transform(births_test)
    GB_eval=ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol=births.columns[0])
    result1[i,0]=depth
    result1[i,1]=GB_eval.evaluate(GB_train,{GB_eval.metricName:'areaUnderROC'})
    result1[i,2]=GB_eval.evaluate(GB_test,{GB_eval.metricName:'areaUnderROC'})



In [62]:
%spark.pyspark
result1


In [63]:
%spark.pyspark


In [64]:
%spark.pyspark
GB_eval=ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol=births.columns[0])
print(GB_eval.evaluate(GB_train,{GB_eval.metricName:'areaUnderROC'}))
print(GB_eval.evaluate(GB_test,{GB_eval.metricName:'areaUnderROC'}))


In [65]:
%spark.pyspark
grid=tune.ParamGridBuilder().addGrid(logistic.maxIter,[2,10,50]).addGrid(logistic.regParam,[0.01,0.05,0.3]).build()
cv=tune.CrossValidator(estimator=logistic,estimatorParamMaps=grid,evaluator=evaluator)
pipeline=Pipeline(stages=[encoder,featuresCreator])
data_transformer=pipeline.fit(births_train)


In [66]:
%spark.pyspark
cvModel=cv.fit(data_transformer.transform(births_train))


In [67]:
%spark.pyspark
data_test=data_transformer.transform(births_test)
data_train=data_transformer.transform(births_train)
results_train=cvModel.transform(data_train)
results_test=cvModel.transform(data_test)
print(evaluator.evaluate(results_train,{evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results_test,{evaluator.metricName:'areaUnderROC'}))


In [68]:
%spark.pyspark
cvModel=cv.fit(data_transformer.transform(births_train))



In [69]:
%spark.pyspark
svc_classifier=cl.LinearSVC(labelCol='INFANT_ALIVE_AT_REPORT')
pip=Pipeline(stages=[featuresCreator,svc_classifier])
svc_model=pip.fit(births_train)
train_prob=svc_model.transform(births_train)
test_prob=svc_model.transform(births_test)
svc_eval=ev.BinaryClassificationEvaluator(labelCol='INFANT_ALIVE_AT_REPORT')
print(svc_eval.evaluate(train_prob,{svc_eval.metricName:'areaUnderROC'}))
print(svc_eval.evaluate(test_prob,{svc_eval.metricName:'areaUnderROC'}))



In [70]:
%spark.pyspark
weather=spark.read.csv('s3://lerawzone/users/gyang/daily_weather.csv',header=True)


In [71]:
%spark.pyspark
print(weather.count())
weather.printSchema()

In [72]:
%spark.pyspark
weather=weather.drop('number')
weather.columns


In [73]:
%spark.pyspark
for cols in weather.columns:
    weather=weather.withColumn(cols,col(cols).cast('float'))


In [74]:
%spark.pyspark
weather=weather.dropna()


In [75]:
%spark.pyspark
weather.show(6)


In [76]:
%spark.pyspark
weather=weather.withColumn('relative_humidity_3pm',when(col('relative_humidity_3pm')<24.99999,0).otherwise(1))



In [77]:
%spark.pyspark
weather.printSchema()
print(weather.count())


In [78]:
%spark.pyspark
features=weather.columns[:-1]
featuresAss=ft.VectorAssembler(inputCols=features,outputCol='features')


In [79]:
%spark.pyspark
train_weather,test_weather=weather.randomSplit([0.7,0.3],seed=13234)
print(train_weather.count(),test_weather.count())


In [80]:
%spark.pyspark
train_weather=featuresAss.transform(train_weather)
test_weather=featuresAss.transform(test_weather)


In [81]:
%spark.pyspark
dt=cl.DecisionTreeClassifier(featuresCol='features',labelCol='relative_humidity_3pm',maxDepth=3,minInstancesPerNode=5,impurity='gini')
model=dt.fit(train_weather)


In [82]:
%spark.pyspark
train_pred=model.transform(train_weather)


In [83]:
%spark.pyspark
train_pred.select(['probability']).show()


In [84]:
%spark.pyspark
test_pred=model.transform(test_weather)
test_pred.select('relative_humidity_3pm','prediction').show(10)

In [85]:
%spark.pyspark
labels


In [86]:
%spark.pyspark
evaluator2=ev.BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='relative_humidity_3pm')

In [87]:
%spark.pyspark
predictions_mat=train_pred.select('prediction','relative_humidity_3pm')
predictions_mat.rdd.take(10)


In [88]:
%spark.pyspark
print(evaluator2.evaluate(train_pred,{evaluator2.metricName:'accuracy'}))
print(evaluator2.evaluate(test_pred,{evaluator2.metricName:'accuracy'}))


In [89]:
%spark.pyspark
predictions_mat=train_pred.select('prediction','relative_humidity_3pm')
metrics=MulticlassMetrics(predictions_mat.rdd.map(tuple))
metrics.confusionMatrix().toArray()


In [90]:
%spark.pyspark
evaluator3=ev.MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='relative_humidity_3pm')
print(evaluator3.evaluate(train_pred,{evaluator3.metricName:'accuracy'}))
print(evaluator3.evaluate(test_pred,{evaluator3.metricName:'accuracy'}))



In [91]:
%spark.pyspark
import pyspark.ml.clustering as clus



In [92]:
%spark.pyspark
featuresCreator


In [93]:
%spark.pyspark
kmeans=clus.KMeans(k=5,featuresCol='features')
clus_pip=Pipeline(stages=[featuresCreator,kmeans])
clus_model=clus_pip.fit(births_train)


In [94]:
%spark.pyspark
train_pred=clus_model.transform(births_train)
train_pred.groupBy('prediction').count()


In [95]:
%spark.pyspark
train_cluster=train_pred.groupBy('prediction').count()
train_total=train_pred.count()
train_cluster.show()
train_cluster=train_cluster.withColumn('Perc',round(train_cluster['count']/train_total,3))
train_cluster.show()


In [96]:
%spark.pyspark
pred=clus_model.transform(births_test)
pred.select(['prediction']).show(10)


In [97]:
%spark.pyspark

test_cluster=pred.groupBy('prediction').count()
test_total=pred.count()
test_cluster.show()
test_cluster=test_cluster.withColumn('Perc',round(test_cluster['count']/test_total,3))
test_cluster.show()


In [98]:
%spark.pyspark
df = spark.createDataFrame([(0, 33.3, -17.5),
                              (1, 40.4, -20.5),
                              (2, 28., -23.9),
                              (3, 29.5, -19.0),
                              (4, 32.8, -18.84)
                             ],
                              ["other","lat", "long"])

df.show()


In [99]:
%spark.pyspark
print('hello')


In [100]:
%spark.pyspark
df_vec=ft.VectorAssembler(inputCols=['lat','long'],outputCol='features')
df_kmean=clus.KMeans(k=2,featuresCol='features')
df_pip=Pipeline(stages=[df_vec,df_kmean])
df_kmodel=df_pip.fit(df)


In [101]:
%spark.pyspark

ccFraud.agg(*[skewness(col).alias(col+'_skew') for col in numerical]).show()


In [102]:
%spark.pyspark
n_num=len(numerical)
corr=[]
for i in range(n_num):
    cor=[ccFraud.corr(numerical[i],numerical[j]) for j in range(n_num) ]
    corr.append(cor)
corr

In [103]:
%spark.pyspark

import gc
del ccFraud
gc.collect()

In [104]:
%spark.pyspark
ccFraud.count()

In [105]:
%spark.pyspark
df=spark.sql("SELECT * FROM core_clickstream.daily LIMIT 10")



In [106]:
%spark.pyspark
df.printSchema()
df.show()

In [107]:
%spark.pyspark
df1=df.select('geo_city','geo_region','geo_zip','year')
df1.show()


In [108]:
%spark.pyspark
x.show()


In [109]:
%spark.pyspark
df1 = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])





In [110]:
%spark.pyspark

print(df1.count())
print(df1.distinct().count())


In [111]:
%spark.pyspark
df=df.dropDuplicates()


In [112]:
%spark.pyspark
df.show()


In [113]:
%spark.pyspark
df=df.dropDuplicates(subset=[c for c in df.columns if c!='id'])


In [114]:
%spark.pyspark
df.show()


In [115]:
%spark.pyspark
import pyspark.sql.functions as fn 


In [116]:
%spark.pyspark
df.agg(fn.count('id').alias('count'),fn.countDistinct('id').alias('distinct')).show()


In [117]:
%spark.pyspark
df_miss = spark.createDataFrame([
(1, 143.5, 5.6, 28, 'M', 100000),
(2, 167.2, 5.4, 45, 'M', None),
(3, None , 5.2, None, None, None),
(4, 144.5, 5.9, 33, 'M', None),
(5, 133.2, 5.7, 54, 'F', None),
(6, 124.1, 5.2, None, 'F', None),
(7, 129.2, 5.3, 42, 'M', 76000),
], ['id', 'weight', 'height', 'age', 'gender', 'income'])
df_miss.show()



In [118]:
%spark.pyspark
df_miss.printSchema()


In [119]:
%spark.pyspark
df_miss[df_miss.age.isNull()].show()

In [120]:
%spark.pyspark
df_miss.rdd.map(lambda row: (row['id'],sum([c==None for c in row]))).collect()


In [121]:
%spark.pyspark
df_miss.where('id==3').show()


In [122]:
%spark.pyspark
df_miss.drop('income').show()

In [123]:
%spark.pyspark
from pyspark.sql.functions import *

In [124]:
%spark.pyspark
df_miss.columns[1:4]

In [125]:
%spark.pyspark
df_miss.show()

In [126]:
%spark.pyspark
np.array(df_miss.columns)[df_miss.agg(*[(sum(col(c).isNull().cast('int'))>0).alias(c) for c in df_miss.columns]).toPandas().values.reshape(1,6)]


In [127]:
%spark.pyspark
np.array(df_miss.columns).reshape(1,6)[df_miss.agg(*[(sum(col(c).isNull().cast('int'))>0).alias(c) for c in df_miss.columns]).toPandas().values.reshape(1,6)]

In [128]:
%spark.pyspark
import numpy as np
import pandas as pd

In [129]:
%spark.pyspark
np.array(df_miss.columns)[np.array([True,False,True,False,True,False])]

In [130]:
%spark.pyspark
df_miss_noincome=df_miss.select([c for c in df_miss.columns if c!='income'])


In [131]:
%spark.pyspark
df_miss_noincome.show()


In [132]:
%spark.pyspark
median={}
for var in df_miss_noincome.columns[1:4]:
    med=df_miss_noincome.approxQuantile(var,[0.5],0.01)
    median[var]=med[0]
median

In [133]:
%spark.pyspark
df_miss_noincome.fillna(median).show()


In [134]:
%spark.pyspark
means=df_miss_noincome.agg(*[mean(c).alias(c) for c in df_miss_noincome.columns if c!='gender']).toPandas().to_dict('record')[0]
means



In [135]:
%spark.pyspark
df_miss_noincome.agg(*[col(c).isNull() for c in df_miss_noincome.columns if c!='gender']).show()

In [136]:
%spark.pyspark
df_gen_miss=df_miss_noincome.groupby('gender').count()

In [137]:
%spark.pyspark
df_gen_miss.select('gender').filter(df_gen_miss['count']==max_count).collect()[0]['gender']

In [138]:
%spark.pyspark
max_count=df_gen_miss.agg(max('count').alias('max_count')).collect()[0]['max_count']

In [139]:
%spark.pyspark
df_miss_noincome.fillna(means).show()


In [140]:
%spark.pyspark
df_miss_noincome.show()

In [141]:
%spark.pyspark

df_outliers = spark.createDataFrame([
(1, 143.5, 5.3, 28),
(2, 154.2, 5.5, 45),
(3, 342.3, 5.1, 99),
(4, 144.5, 5.5, 33),
(5, 133.2, 5.4, 54),
(6, 124.1, 5.1, 21),
(7, 129.2, 5.3, 42),
], ['id', 'weight', 'height', 'age'])
df_outliers.show()


In [142]:
%spark.pyspark
median={}
for var in df_outliers.columns[1:]:
    q1=df_outliers.approxQuantile(var,[0.5],0.01)
    median[var]=q1
median


In [143]:
%spark.pyspark
cols=df_outliers.columns[1:]
bounds={}
for col in cols:
    quantiles=df_outliers.approxQuantile(col,[0.25,0.75],0.05)
    IQR=quantiles[1]-quantiles[0]
    bounds[col]=[quantiles[0]-1.5*IQR,quantiles[1]+1.5*IQR]
quantiles   


In [144]:
%spark.pyspark
outliers=df_outliers.select(*['id']+[((df_outliers[c]<bounds[c][0]) | (df_outliers[c]>bounds[c][1])).alias(c+'_o') for c in cols])
outliers.show()




In [145]:
%spark.pyspark
df_outliers=df_outliers.join(outliers,on='id')
df_outliers.filter('weight_o').select('id','weight').show()
df_outliers.filter('age_o').select('id','age').show()


In [146]:
%sh
wget http://tomdrabas.com/data/LearningPySpark/ccFraud.csv.gz


In [147]:
%spark.pyspark
import pyspark.sql.types as typ


In [148]:
%spark.pyspark
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.StringType()),
    ('BIRTH_YEAR', typ.IntegerType()),
    ('BIRTH_MONTH', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('MOTHER_RACE_6CODE', typ.StringType()),
    ('MOTHER_EDUCATION', typ.StringType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('FATHER_EDUCATION', typ.StringType()),
    ('MONTH_PRECARE_RECODE', typ.StringType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_BMI_RECODE', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.StringType()),
    ('DIABETES_GEST', typ.StringType()),
    ('HYP_TENS_PRE', typ.StringType()),
    ('HYP_TENS_GEST', typ.StringType()),
    ('PREV_BIRTH_PRETERM', typ.StringType()),
    ('NO_RISK', typ.StringType()),
    ('NO_INFECTIONS_REPORTED', typ.StringType()),
    ('LABOR_IND', typ.StringType()),
    ('LABOR_AUGM', typ.StringType()),
    ('STEROIDS', typ.StringType()),
    ('ANTIBIOTICS', typ.StringType()),
    ('ANESTHESIA', typ.StringType()),
    ('DELIV_METHOD_RECODE_COMB', typ.StringType()),
    ('ATTENDANT_BIRTH', typ.StringType()),
    ('APGAR_5', typ.IntegerType()),
    ('APGAR_5_RECODE', typ.StringType()),
    ('APGAR_10', typ.IntegerType()),
    ('APGAR_10_RECODE', typ.StringType()),
    ('INFANT_SEX', typ.StringType()),
    ('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),
    ('INFANT_WEIGHT_GRAMS', typ.IntegerType()),
    ('INFANT_ASSIST_VENTI', typ.StringType()),
    ('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),
    ('INFANT_NICU_ADMISSION', typ.StringType()),
    ('INFANT_SURFACANT', typ.StringType()),
    ('INFANT_ANTIBIOTICS', typ.StringType()),
    ('INFANT_SEIZURES', typ.StringType()),
    ('INFANT_NO_ABNORMALITIES', typ.StringType()),
    ('INFANT_ANCEPHALY', typ.StringType()),
    ('INFANT_MENINGOMYELOCELE', typ.StringType()),
    ('INFANT_LIMB_REDUCTION', typ.StringType()),
    ('INFANT_DOWN_SYNDROME', typ.StringType()),
    ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
    ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),
    ('INFANT_BREASTFED', typ.StringType())
]
schema=StructType([StructField(i[0],i[1],False) for i in labels])


In [149]:
%spark.pyspark


In [150]:
%spark.pyspark
births2=spark.read.csv('s3://lerawzone/users/gyang/births_train.csv.gz',header=True,schema=schema)


In [151]:
%spark.pyspark
features=[i for i in births2.columns ]
features


In [152]:
%spark.pyspark
births2[features[20:]].dtypes


In [153]:
%spark.pyspark
selected_features = ['INFANT_ALIVE_AT_REPORT',
'BIRTH_PLACE',
'MOTHER_AGE_YEARS',
'FATHER_COMBINED_AGE',
'CIG_BEFORE',
'CIG_1_TRI',
'CIG_2_TRI',
'CIG_3_TRI',
'MOTHER_HEIGHT_IN',
'MOTHER_PRE_WEIGHT',
'MOTHER_DELIVERY_WEIGHT',
'MOTHER_WEIGHT_GAIN',
'DIABETES_PRE',
'DIABETES_GEST',
'HYP_TENS_PRE',
'HYP_TENS_GEST',
'PREV_BIRTH_PRETERM'
]


In [154]:
%spark.pyspark
births_trimmed=births2[selected_features]


In [155]:
%spark.pyspark
births_trimmed.show()
births_trimmed.dtypes


In [156]:
%spark.pyspark
recode_dictionry={'YNU':{'Y':1,'N':0,"U":0}}
def recode(col,key):
    return recode_dictionary[key][col]
def correct_cig(feat):
    return when(col(feat)!=99,col(feat)).otherwise(0)


In [157]:
%spark.pyspark
rec_integer=udf(recode,IntegerType())

In [158]:
%spark.pyspark
selected_features[4:8]


In [159]:
%spark.pyspark
births_trans=births_trimmed.withColumn('CIG_BEFORE',when(births_trimmed.CIG_BEFORE!=99,births_trimmed.CIG_BEFORE).otherwise(0))



In [160]:
%spark.pyspark
for feat in selected_features[5:8]:
    births_trans=births_trans.withColumn(feat,correct_cig(feat))


In [161]:
%spark.pyspark
births_trans.show()


In [162]:
%spark.pyspark
births2.groupby(features[0]).agg(*[sum(i).alias(i+'_sum') for i in births2[features[11:19]].columns]).show()


In [163]:
%spark.pyspark
ccFraud=spark.read.csv('http://packages.revolutionanalytics.com/datasets/ccFraud.csv')


In [164]:
%spark.pyspark
onetime_buyers=spark.read.csv('s3://lerawzone/users/gyang/onetime_clean3.csv',header=True)


In [165]:
%spark.pyspark
onetime_buyers.count()


In [166]:
%spark.pyspark
onetime_buyers=onetime_buyers.drop('_c0')


In [167]:
%spark.pyspark
import pyspark.ml.feature as ft
from pyspark.sql.functions import *
import pyspark.ml.classification as cl
import pyspark.ml.evaluation as ev 
import pandas as pd
import numpy as np 


In [168]:
%spark.pyspark
onetime_buyers1=onetime_buyers.drop('TOTAL_C4DMD','MEN_C4DMD', 'MEN_C4ORDER', 'WM_C4DMD', 'WM_C4ORDER', 'KIDS_C4DMD',
       'KIDS_C4ORDER', 'HOME_C4DMD', 'HOME_C4ORDER', 'UNIFORM_C4DMD012','UNIFORM_C4ORDER012', 'OTHER_C4DMD', 'OTHER_C4ORDER',
        'HOME_ACC_UNITS','BED_UNITS', 'BATH_UNITS', 'WM_OUTERWEAR_UNITS',
       'WM_ACTIVEWEAR_UNITS', 'WM_SWEATERS_UNITS', 'WM_WOVEN_TOPS_UNITS',
       'WM_BOTTOMS_UNITS', 'WM_JACKETS_UNITS', 'WM_SLEEPWEAR_UNITS',
       'WM_KNIT_TOPS_UNITS', 'WM_DRESSES_UNITS', 'WM_SWIMWEAR_UNITS',
       'WXR_OUTERWEAR_UNITS', 'WXR_ACTIVEWEAR_UNITS', 'WXR_SWEATERS_UNITS',
       'WXR_WOVEN_TOPS_UNITS', 'WXR_BOTTOMS_UNITS', 'WXR_JACKETS_UNITS',
       'WXR_SLEEPWEAR_UNITS', 'WXR_KNIT_TOPS_UNITS', 'WXR_DRESSES_UNITS',
       'WXR_SWIMWEAR_UNITS', 'MN_SOFT_ACCESSORIES_UNITS',
       'MN_OUTERWEAR_UNITS', 'MN_SWIMWEAR_UNITS', 'MN_BOTTOMS_UNITS',
       'MN_SLEEPWEAR_UNITS', 'MN_WOVEN_TOPS_UNITS',
       'MN_DRESS_SHIRTS_UNITS', 'MN_CWA_UNITS', 'MN_SWEATERS_UNITS',
       'MN_ACTIVE_WEAR_UNITS', 'MN_KNIT_TOPS_UNITS',
       'MN_DRESS_SEPARATES_UNITS', 'WM_SOFT_ACC_UNITS', 'WM_BAGS_UNITS',
       'KIDS_FTWR_UNITS', 'WM_FTWR_UNITS', 'MN_FTWR_UNITS', 'WM_CWA_UNITS',
       'UNF_GIRLS_UNITS', 'UNF_BOYS_UNITS', 'UNF_COED_UNITS',
       'UNF_OUTERWEAR_UNITS', 'KIDS_BABY_UNITS', 'KIDS_SOFT_ACC_UNITS',
       'BOYS_BOTTOMS_UNITS', 'GIRLS_DRESSES_UNITS', 'KIDS_SLEEPWEAR_UNITS',
       'GIRLS_OUTERWEAR_UNITS', 'GIRLS_BOTTOMS_UNITS',
       'BOYS_OUTERWEAR_UNITS', 'BOYS_SWIMWEAR_UNITS', 'GIRLS_TOPS_UNITS',
       'BOYS_TOPS_UNITS', 'KIDS_BAGS_UNITS', 'GIRLS_SWIMEAR_UNITS',
       'KIDS_CWA_UNITS', 'LEBO_UNITS','KIDS_HOME_DEMAND',
       'HOME_ACC_DEMAND', 'BED_DEMAND', 'BATH_DEMAND',
       'WM_OUTERWEAR_DEMAND', 'WM_ACTIVEWEAR_DEMAND', 'WM_SWEATERS_DEMAND',
       'WM_WOVEN_TOPS_DEMAND', 'WM_BOTTOMS_DEMAND', 'WM_JACKETS_DEMAND',
       'WM_SLEEPWEAR_DEMAND', 'WM_KNIT_TOPS_DEMAND', 'WM_DRESSES_DEMAND',
       'WM_SWIMWEAR_DEMAND', 'WXR_OUTERWEAR_DEMAND',
       'WXR_ACTIVEWEAR_DEMAND', 'WXR_SWEATERS_DEMAND',
       'WXR_WOVEN_TOPS_DEMAND', 'WXR_BOTTOMS_DEMAND', 'WXR_JACKETS_DEMAND',
       'WXR_SLEEPWEAR_DEMAND', 'WXR_KNIT_TOPS_DEMAND',
       'WXR_DRESSES_DEMAND', 'WXR_SWIMWEAR_DEMAND',
       'MN_SOFT_ACCESSORIES_DEMAND', 'MN_OUTERWEAR_DEMAND',
       'MN_SWIMWEAR_DEMAND', 'MN_BOTTOMS_DEMAND', 'MN_SLEEPWEAR_DEMAND',
       'MN_WOVEN_TOPS_DEMAND', 'MN_DRESS_SHIRTS_DEMAND', 'MN_CWA_DEMAND',
       'MN_SWEATERS_DEMAND', 'MN_ACTIVE_WEAR_DEMAND',
       'MN_KNIT_TOPS_DEMAND', 'MN_DRESS_SEPARATES_DEMAND',
       'WM_SOFT_ACC_DEMAND', 'WM_BAGS_DEMAND', 'KIDS_FTWR_DEMAND',
       'WM_FTWR_DEMAND', 'MN_FTWR_DEMAND', 'WM_CWA_DEMAND',
       'UNF_GIRLS_DEMAND', 'UNF_BOYS_DEMAND', 'UNF_COED_DEMAND',
       'UNF_OUTERWEAR_DEMAND', 'KIDS_BABY_DEMAND', 'KIDS_SOFT_ACC_DEMAND',
       'BOYS_BOTTOMS_DEMAND', 'GIRLS_DRESSES_DEMAND',
       'KIDS_SLEEPWEAR_DEMAND', 'GIRLS_OUTERWEAR_DEMAND',
       'GIRLS_BOTTOMS_DEMAND', 'BOYS_OUTERWEAR_DEMAND',
       'BOYS_SWIMWEAR_DEMAND', 'GIRLS_TOPS_DEMAND', 'BOYS_TOPS_DEMAND',
       'KIDS_BAGS_DEMAND', 'GIRLS_SWIMEAR_DEMAND', 'KIDS_CWA_DEMAND',
       'LEBO_DEMAND')


In [169]:
%spark.pyspark
for column in onetime_buyers1.columns:
    onetime_buyers1=onetime_buyers1.withColumn(column,col(column).cast('float'))


In [170]:
%spark.pyspark
onetime_buyers1.columns


In [171]:
%spark.pyspark
features=onetime_buyers1.columns[1:-1]
featureCreator=ft.VectorAssembler(inputCols=features,outputCol='features')



In [172]:
%spark.pyspark
train_ot1,test_ot1=onetime_buyers1.randomSplit([0.7,0.3],seed=123)




In [173]:
%spark.pyspark
train_ot1=featureCreator.transform(train_ot1)
test_ot1=featureCreator.transform(test_ot1)


In [174]:
%spark.pyspark
result=np.zeros((6,4))
m=0
for i in range(3,6):
    for j in range(50,70,10):
        GB=cl.GBTClassifier(maxDepth=i,labelCol='resp',maxIter=j)
        onetimeModel=GB.fit(train_ot)
        train_pred=onetimeModel.transform(train_ot)
        test_pred=onetimeModel.transform(test_ot)
        GB_eval=ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='resp')
        result[m,0]=i
        result[m,1]=j
        result[m,2]=GB_eval.evaluate(train_pred,{GB_eval.metricName:'areaUnderROC'})
        result[m,3]=GB_eval.evaluate(test_pred,{GB_eval.metricName:'areaUnderROC'})
        m+=1

    


In [175]:
%spark.pyspark
result


In [176]:
%spark.pyspark
GB_best=cl.GBTClassifier(maxDepth=3,maxIter=150,labelCol='resp')
onetime_best=GB_best.fit(train_ot1)
train_best=onetime_best.transform(train_ot1)
test_best=onetime_best.transform(test_ot1)
GB_eval=ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='resp')
print(GB_eval.evaluate(train_best,{GB_eval.metricName:'areaUnderROC'}))
print(GB_eval.evaluate(test_best,{GB_eval.metricName:'areaUnderROC'}))

In [177]:
%spark.pyspark
onetime_best.featureImportances


In [178]:
%spark.pyspark
pd.DataFrame({'Features':features,'Importance':onetime_best.featureImportances.values})

In [179]:
%spark.pyspark
import tensorflow as tf


In [180]:
%spark.pyspark
