In [1]:
# Spark Session, Pipeline, Functions, and Metrics
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import rand
from pyspark.mllib.evaluation import MulticlassMetrics

import pandas as pd

# Keras / Deep Learning
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras import optimizers, regularizers
from keras.optimizers import Adam

# Elephas for Deep Learning on Spark
from elephas.ml_model import ElephasEstimator

Using TensorFlow backend.




In [2]:
from pyspark.sql.functions import *
from pyspark.ml.classification import  RandomForestClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, VectorSlicer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.mllib.stat import Statistics
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import numpy as np
import itertools

In [5]:
conf=SparkConf().setMaster("local")
sc=SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [6]:
#sc.stop()

In [8]:
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data.csv')
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Temerrut_flag: integer (nullable = true)
 |-- Basvuru_tarihi: string (nullable = true)
 |-- Kullanim_orani: double (nullable = true)
 |-- Musteri_yasi: integer (nullable = true)
 |-- Dpd_30_adeti: integer (nullable = true)
 |-- Borcun_gelire_orani: double (nullable = true)
 |-- Gelir: double (nullable = true)
 |-- Acik_kredi_sayisi: integer (nullable = true)
 |-- Onceki_temerrut_sayisi: integer (nullable = true)
 |-- Acik_ev_kredi_sayisi: integer (nullable = true)
 |-- Son_2_yil_dpd_31_60_adeti: integer (nullable = true)
 |-- Bakmakla_yukumlu_kisi_sayisi: double (nullable = true)
 |-- Guncel_yil_ort_vdsz_mev_tutari: double (nullable = true)
 |-- Onceki_yil_max_vdsz_mev_tutari: double (nullable = true)
 |-- Onceki_yil_min_vdsz_mev_tutari: double (nullable = true)
 |-- Rotatif_krediler_kullanim_orani: string (nullable = true)
 |-- Kredi_vadesi: string (nullable = true)
 |-- Guncel_yil_gunluk_ort_kk_risk_tuta

In [9]:
drop_list = ['_c0', 'ID', 'Basvuru_tarihi']
data = data.select([column for column in data.columns if column not in drop_list])

In [10]:
from pyspark.sql.functions import col, count, isnan, lit, sum

def count_not_null(c, nan_as_null=False):
    """Use conversion between boolean and integer
    - False -> 0
    - True ->  1
    """
    pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
    return sum(pred.cast("integer")).alias(c)

In [11]:
exprs = [(count_not_null(c) / count("*")).alias(c) for c in data.columns]
completeness = data.agg(*exprs).toPandas()

In [12]:
completeness.to_dict()

{'Temerrut_flag': {0: 1.0},
 'Kullanim_orani': {0: 1.0},
 'Musteri_yasi': {0: 1.0},
 'Dpd_30_adeti': {0: 1.0},
 'Borcun_gelire_orani': {0: 1.0},
 'Gelir': {0: 1.0},
 'Acik_kredi_sayisi': {0: 1.0},
 'Onceki_temerrut_sayisi': {0: 1.0},
 'Acik_ev_kredi_sayisi': {0: 1.0},
 'Son_2_yil_dpd_31_60_adeti': {0: 1.0},
 'Bakmakla_yukumlu_kisi_sayisi': {0: 0.97384},
 'Guncel_yil_ort_vdsz_mev_tutari': {0: 0.7908733333333333},
 'Onceki_yil_max_vdsz_mev_tutari': {0: 0.7909466666666667},
 'Onceki_yil_min_vdsz_mev_tutari': {0: 0.7909733333333333},
 'Rotatif_krediler_kullanim_orani': {0: 0.7896733333333333},
 'Kredi_vadesi': {0: 1.0},
 'Guncel_yil_gunluk_ort_kk_risk_tutari': {0: 0.36102},
 'Guncel_vdsz_mev_tutari': {0: 0.028373333333333334},
 'SGK_kodu': {0: 0.01058},
 'Evin_sahiplik_durumu': {0: 1.0},
 'Istenen_evin_fiyati': {0: 1.0},
 'Toplam_calisma_suresi': {0: 0.2904},
 'Onceki_yil_ort_vdsz_mev_tutari': {0: 0.028373333333333334},
 'Calisma_tipi': {0: 1.0},
 'Son_1_yil_kredi_basvuru_sayisi': {0: 1.0}

In [13]:
drop_list = ['Bakmakla_yukumlu_kisi_sayisi', 'Guncel_yil_ort_vdsz_mev_tutari', 'Onceki_yil_max_vdsz_mev_tutari',
            'Onceki_yil_min_vdsz_mev_tutari', 'Rotatif_krediler_kullanim_orani', 'Guncel_yil_gunluk_ort_kk_risk_tutari',
            'Guncel_vdsz_mev_tutari', 'SGK_kodu', 'Toplam_calisma_suresi', 'Onceki_yil_ort_vdsz_mev_tutari', 'KK_limiti',
            'En_son_calisma_gun_sayisi', 'Isyeri_tel_no']
data = data.select([column for column in data.columns if column not in drop_list])
data.printSchema()

root
 |-- Temerrut_flag: integer (nullable = true)
 |-- Kullanim_orani: double (nullable = true)
 |-- Musteri_yasi: integer (nullable = true)
 |-- Dpd_30_adeti: integer (nullable = true)
 |-- Borcun_gelire_orani: double (nullable = true)
 |-- Gelir: double (nullable = true)
 |-- Acik_kredi_sayisi: integer (nullable = true)
 |-- Onceki_temerrut_sayisi: integer (nullable = true)
 |-- Acik_ev_kredi_sayisi: integer (nullable = true)
 |-- Son_2_yil_dpd_31_60_adeti: integer (nullable = true)
 |-- Kredi_vadesi: string (nullable = true)
 |-- Evin_sahiplik_durumu: string (nullable = true)
 |-- Istenen_evin_fiyati: integer (nullable = true)
 |-- Calisma_tipi: integer (nullable = true)
 |-- Son_1_yil_kredi_basvuru_sayisi: integer (nullable = true)
 |-- Son_6_ay_kredi_basvuru_sayisi: integer (nullable = true)
 |-- Son_3_ay_kredi_basvuru_sayisi: integer (nullable = true)
 |-- Son_2_yil_kabul_edilmis_kredi_sayisi: integer (nullable = true)
 |-- Son_1_yil_kabul_edilmis_kredi_sayisi: integer (nullable

<div class="alert alert-block alert-info">
<b>Step 8:</b> DENEME
</div>

In [14]:
deneme=["Temerrut_flag","Musteri_yasi","Dpd_30_adeti"]
data = data.select([column for column in data.columns if column in deneme])
data.printSchema()

root
 |-- Temerrut_flag: integer (nullable = true)
 |-- Musteri_yasi: integer (nullable = true)
 |-- Dpd_30_adeti: integer (nullable = true)



<div class="alert alert-block alert-info">
<b>Step 8:</b> DENEME
</div>

In [20]:
#Data Preprocessing
# one hot encoding and assembling
#encoding_var = [i[0] for i in df.dtypes if (i[1]=='string') & (i[0]!='y')]
num_var = [i[0] for i in data.dtypes if ((i[1]=='int') | (i[1]=='double')) & (i[0]!='Temerrut_flag')]

#string_indexes = [StringIndexer(inputCol = c, outputCol = 'IDX_' + c, handleInvalid = 'keep') for c in encoding_var]
#onehot_indexes = [OneHotEncoderEstimator(inputCols = ['IDX_' + c], outputCols = ['OHE_' + c]) for c in encoding_var]
#label_indexes = StringIndexer(inputCol = 'y', outputCol = 'label', handleInvalid = 'keep')
assembler = VectorAssembler(inputCols = num_var , outputCol = "features")
assembled = assembler.transform(data)

In [21]:
assembled.select("features").rdd.take(10)

[Row(features=DenseVector([45.0, 2.0])),
 Row(features=DenseVector([40.0, 0.0])),
 Row(features=DenseVector([38.0, 1.0])),
 Row(features=DenseVector([30.0, 0.0])),
 Row(features=DenseVector([49.0, 1.0])),
 Row(features=DenseVector([74.0, 0.0])),
 Row(features=DenseVector([57.0, 0.0])),
 Row(features=DenseVector([39.0, 0.0])),
 Row(features=DenseVector([27.0, 0.0])),
 Row(features=DenseVector([57.0, 0.0]))]

In [22]:
assembled = assembled.orderBy(rand())
# Split Data into Train / Test Sets
train_data, test_data = assembled.randomSplit([.8, .2],seed=1234)

In [23]:
train_data.select("features").rdd.take(10)

[Row(features=DenseVector([21.0, 0.0])),
 Row(features=DenseVector([21.0, 0.0])),
 Row(features=DenseVector([21.0, 0.0])),
 Row(features=DenseVector([21.0, 0.0])),
 Row(features=DenseVector([21.0, 0.0])),
 Row(features=DenseVector([21.0, 0.0])),
 Row(features=DenseVector([21.0, 0.0])),
 Row(features=DenseVector([21.0, 0.0])),
 Row(features=DenseVector([21.0, 0.0])),
 Row(features=DenseVector([21.0, 0.0]))]

In [24]:
# Number of Classes
nb_classes = train_data.select("Temerrut_flag").distinct().count()

# Number of Inputs or Input Dimensions
input_dim = len(train_data.select("features").first()[0])

In [33]:
input_dim

2

In [25]:
# Set up Deep Learning Model / Architecture
model = Sequential()
model.add(Dense(256, input_shape=(input_dim,), activity_regularizer=regularizers.l2(0.01)))
model.add(Activation('relu'))
model.add(Dropout(rate=0.3))
model.add(Dense(256, activity_regularizer=regularizers.l2(0.01)))
model.add(Activation('relu'))
model.add(Dropout(rate=0.3))
model.add(Dense(nb_classes))
model.add(Activation('sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam')

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.


In [26]:
model.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_1 (Dense)              (None, 256)               768       
_________________________________________________________________
activation_1 (Activation)    (None, 256)               0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 256)               0         
_________________________________________________________________
dense_2 (Dense)              (None, 256)               65792     
_________________________________________________________________
activation_2 (Activation)    (None, 256)               0         
_________________________________________________________________
dropout_2 (Dropout)          (None, 256)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 2)                 514       
__________

In [27]:
from systemml.mllearn import Keras2DML
sysml_model = Keras2DML(spark, model, weights='weights_dir')
# sysml_model.setConfigProperty("sysml.native.blas", "auto")
# sysml_model.setGPU(True).setForceGPU(True)
sysml_model.fit(X_train, y_train)
sysml_model.score(X_test, y_test)

Exception: The data format channels_last is not supported. Please use keras.backend.set_image_data_format("channels_first")

In [32]:
model.compile(optimizer='adam',#'sgd'
             loss='sparse_categorical_crossentropy',
             metrics=['accuracy']) 
             
             
# Train your model, adjust batch size and epochs iteratively. Optionally time your training.
 
import time
start = time.time()
model.fit(X_train, y_train, epochs=5, batch_size=100) 
end=time.time()
print("training time:", (end-start))

# Test your model on the secluded test set

model.evaluate(X_test, y_test)

# Make predictions and Reshape your npndarrays to be able to verify your predictions by plotting out the image

predictions = model.predict([X_test])




NameError: name 'X_train' is not defined

In [None]:
X_t