<a href="https://colab.research.google.com/github/ysowti/Data-Science-Portfolio-Yahya/blob/master/Deep_Learning_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get update

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://mirrors.sonic.net/apache/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Deep Learning Pipeline").getOrCreate()

In [None]:
pip install elephas

In [None]:
# Spark Session, Pipeline, Functions, and Metrics
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import rand
from pyspark.mllib.evaluation import MulticlassMetrics

# Keras / Deep Learning
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Elephas for Deep Learning on Spark
from elephas.ml_model import ElephasEstimator

In [6]:
from google.colab import files
uploaded = files.upload()

Saving bank.csv to bank (1).csv


In [26]:
df = spark.read.csv('bank.csv', inferSchema=True, header=True)

In [27]:
df.limit(5).toPandas()

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,deposit
0,59,admin.,married,secondary,no,2343,yes,no,unknown,5,may,1042,1,-1,0,unknown,yes
1,56,admin.,married,secondary,no,45,no,no,unknown,5,may,1467,1,-1,0,unknown,yes
2,41,technician,married,secondary,no,1270,yes,no,unknown,5,may,1389,1,-1,0,unknown,yes
3,55,services,married,secondary,no,2476,yes,no,unknown,5,may,579,1,-1,0,unknown,yes
4,54,admin.,married,tertiary,no,184,no,no,unknown,5,may,673,2,-1,0,unknown,yes


In [9]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [10]:
df = df.drop('day', 'month')
df.columns

['age',
 'job',
 'marital',
 'education',
 'default',
 'balance',
 'housing',
 'loan',
 'contact',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'poutcome',
 'deposit']

In [11]:
def select_features(df, lower_skew=None, upper_skew=None, dtypes='int32', drop_col = []):
  selected_features = []
  feature_list = list(df.toPandas().select_dtypes(include=[dtypes]).columns.drop(drop_col))
  if lower_skew or upper_skew:
    for feature in feature_list:
      if df.toPandas()[feature].kurtosis() < lower_skew or df.toPandas()[feature].kurtosis() > upper_skew:
        selected_features.append(feature)
  else:
    selected_features = feature_list
  return selected_features

In [12]:
label = 'deposit'
cat_features = select_features(df, dtypes='object', drop_col=[label])
num_features = select_features(df)
stages = []

for feature in cat_features:
  string_indexer = StringIndexer(inputCol=feature, outputCol=feature + '_index')
  encoder = OneHotEncoder(inputCol=string_indexer.getOutputCol(), outputCol= feature + '_class_vec')
  stages += [string_indexer, encoder]

unscaled_features = select_features(df, lower_skew=-2, upper_skew=2, dtypes='int32')
unscaled_assembler = VectorAssembler(inputCols=unscaled_features, outputCol='unscaled_features')
scaler = StandardScaler(inputCol='unscaled_features', outputCol='scaled_features')
stages += [unscaled_assembler, scaler]

unscaled_num_features = list(set(num_features) - set(unscaled_features))
num_str_assembler = VectorAssembler(inputCols=[cat + '_class_vec' for cat in cat_features] +
                                    unscaled_num_features, outputCol='assembled_inputs')
stages += [num_str_assembler]

final_assembler = VectorAssembler(inputCols=['assembled_inputs', 'scaled_features'], outputCol='features')
stages += [final_assembler]

label_indexer = StringIndexer(inputCol=label, outputCol='label_index')
stages += [label_indexer]

In [13]:
stages

[StringIndexer_07e341da0889,
 OneHotEncoder_cc7721ef5eb7,
 StringIndexer_3fa613e3c07f,
 OneHotEncoder_fc78a049d11a,
 StringIndexer_d99c999fc1bd,
 OneHotEncoder_f459d440330c,
 StringIndexer_8ca87bc9550f,
 OneHotEncoder_5dc1a9a23711,
 StringIndexer_863228da661a,
 OneHotEncoder_c684d573cd29,
 StringIndexer_4efb055e6620,
 OneHotEncoder_040230b12232,
 StringIndexer_94cde08c14c8,
 OneHotEncoder_f7096b861146,
 StringIndexer_2d9cf9550bcf,
 OneHotEncoder_aa23903e073e,
 VectorAssembler_728eef4041e9,
 StandardScaler_3bd7fbb8aba7,
 VectorAssembler_de890137d403,
 VectorAssembler_369fac0116a8,
 StringIndexer_e479ba02615f]

In [14]:
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df)
df_transform = pipeline_model.transform(df)

In [30]:
df_transform.limit(5).toPandas()

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,duration,campaign,pdays,previous,poutcome,deposit,job_index,job_class_vec,marital_index,marital_class_vec,education_index,education_class_vec,default_index,default_class_vec,housing_index,housing_class_vec,loan_index,loan_class_vec,contact_index,contact_class_vec,poutcome_index,poutcome_class_vec,unscaled_features,scaled_features,assembled_inputs,features,label_index
0,59,admin.,married,secondary,no,2343,yes,no,unknown,1042,1,-1,0,unknown,yes,3.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 0.0)",0.0,"(1.0, 0.0, 0.0)",0.0,(1.0),1.0,(0.0),0.0,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[2343.0, 1042.0, 1.0, -1.0, 0.0]","[0.7264185278681131, 3.0017712260834295, 0.367...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0
1,56,admin.,married,secondary,no,45,no,no,unknown,1467,1,-1,0,unknown,yes,3.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 0.0)",0.0,"(1.0, 0.0, 0.0)",0.0,(1.0),0.0,(1.0),0.0,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[45.0, 1467.0, 1.0, -1.0, 0.0]","[0.013951700279157103, 4.226102100445672, 0.36...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0
2,41,technician,married,secondary,no,1270,yes,no,unknown,1389,1,-1,0,unknown,yes,2.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 0.0)",0.0,"(1.0, 0.0, 0.0)",0.0,(1.0),1.0,(0.0),0.0,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[1270.0, 1389.0, 1.0, -1.0, 0.0]","[0.39374798565621155, 4.001401375268602, 0.367...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0
3,55,services,married,secondary,no,2476,yes,no,unknown,579,1,-1,0,unknown,yes,4.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 0.0)",0.0,"(1.0, 0.0, 0.0)",0.0,(1.0),1.0,(0.0),0.0,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[2476.0, 579.0, 1.0, -1.0, 0.0]","[0.7676535531376218, 1.667970767660562, 0.3673...","(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...",1.0
4,54,admin.,married,tertiary,no,184,no,no,unknown,673,2,-1,0,unknown,yes,3.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 0.0)",1.0,"(0.0, 1.0, 0.0)",0.0,(1.0),0.0,(1.0),0.0,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[184.0, 673.0, 2.0, -1.0, 0.0]","[0.05704695225255348, 1.938763949284211, 0.734...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0


In [16]:
df_transform_fin = df_transform.select('features', 'label_index')
df_transform_fin.show()

+--------------------+-----------+
|            features|label_index|
+--------------------+-----------+
|(30,[3,11,13,16,1...|        1.0|
|(30,[3,11,13,16,1...|        1.0|
|(30,[2,11,13,16,1...|        1.0|
|(30,[4,11,13,16,1...|        1.0|
|(30,[3,11,14,16,1...|        1.0|
|(30,[0,12,14,16,2...|        1.0|
|(30,[0,11,14,16,2...|        1.0|
|(30,[5,13,16,18,2...|        1.0|
|(30,[2,11,13,16,1...|        1.0|
|(30,[4,12,13,16,1...|        1.0|
|(30,[3,12,13,16,1...|        1.0|
|(30,[1,11,13,16,1...|        1.0|
|(30,[0,11,14,16,2...|        1.0|
|(30,[1,12,14,16,1...|        1.0|
|(30,[2,12,14,16,1...|        1.0|
|(30,[0,14,16,18,2...|        1.0|
|(30,[1,12,15,16,1...|        1.0|
|(30,[4,11,13,16,1...|        1.0|
|(30,[3,11,13,16,1...|        1.0|
|(30,[3,13,16,20,2...|        1.0|
+--------------------+-----------+
only showing top 20 rows



In [17]:
df_transform_fin = df_transform_fin.orderBy(rand())
train_data, test_data = df_transform_fin.randomSplit([0.8, 0.2], seed=1234)

In [18]:
nb_classes = train_data.select('label_index').distinct().count()
input_dim = len(train_data.select('features').first()[0])

In [20]:
model = keras.Sequential()
model.add(keras.layers.Dense(256, input_shape=(input_dim,), activation='relu', activity_regularizer=keras.regularizers.l2(0.01)))
model.add(keras.layers.Dropout(0.3))
model.add(keras.layers.Dense(256, activation='relu', activity_regularizer=keras.regularizers.l2(0.01)))
model.add(keras.layers.Dropout(0.3))
model.add(keras.layers.Dense(nb_classes, activation='sigmoid'))

In [21]:
model.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 256)               7936      
_________________________________________________________________
dropout (Dropout)            (None, 256)               0         
_________________________________________________________________
dense_1 (Dense)              (None, 256)               65792     
_________________________________________________________________
dropout_1 (Dropout)          (None, 256)               0         
_________________________________________________________________
dense_2 (Dense)              (None, 2)                 514       
Total params: 74,242
Trainable params: 74,242
Non-trainable params: 0
_________________________________________________________________


In [22]:
# Set and Serialize Optimizer
optimizer_conf = keras.optimizers.Adam(lr=0.01)
opt_conf = keras.optimizers.serialize(optimizer_conf)

# Initialize SparkML Estimator and Get Settings
estimator = ElephasEstimator()
estimator.setFeaturesCol("features")
estimator.setLabelCol("label_index")
estimator.set_keras_model_config(model.to_yaml())
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(1)
estimator.set_epochs(25) 
estimator.set_batch_size(64)
estimator.set_verbosity(1)
estimator.set_validation_split(0.10)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("binary_crossentropy")
estimator.set_metrics(['acc'])

ElephasEstimator_b9a9846d15f7

In [23]:
# Create Deep Learning Pipeline
dl_pipeline = Pipeline(stages=[estimator])

In [24]:
def dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,
                                  train_data=train_data,
                                  test_data=test_data,
                                  label='label_index'):
    
    fit_dl_pipeline = dl_pipeline.fit(train_data)
    pred_train = fit_dl_pipeline.transform(train_data)
    pred_test = fit_dl_pipeline.transform(test_data)
    
    pnl_train = pred_train.select(label, "prediction")
    pnl_test = pred_test.select(label, "prediction")
    
    pred_and_label_train = pnl_train.rdd.map(lambda row: (row[label], row['prediction']))
    pred_and_label_test = pnl_test.rdd.map(lambda row: (row[label], row['prediction']))
    
    metrics_train = MulticlassMetrics(pred_and_label_train)
    metrics_test = MulticlassMetrics(pred_and_label_test)
    
    print("Training Data Accuracy: {}".format(round(metrics_train.precision(),4)))
    print("Training Data Confusion Matrix")
    display(pnl_train.crosstab('label_index', 'prediction').toPandas())
    
    print("\nTest Data Accuracy: {}".format(round(metrics_test.precision(),4)))
    print("Test Data Confusion Matrix")
    display(pnl_test.crosstab('label_index', 'prediction').toPandas())

In [25]:
dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,
                              train_data=train_data,
                              test_data=test_data,
                              label='label_index');

>>> Fit model
>>> Synchronous training complete.
Training Data Accuracy: 0.7873
Training Data Confusion Matrix


Unnamed: 0,label_index_prediction,0.0,1.0
0,1.0,430,3843
1,0.0,3192,1471



Test Data Accuracy: 0.7884
Test Data Confusion Matrix


Unnamed: 0,label_index_prediction,0.0,1.0
0,1.0,83,933
1,0.0,822,388
