# Adding pyspark libraries

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'

# Upload Credit Card Data to ODH Ceph-Nano

In [None]:
import boto3
s3_endpoint_url = os.environ['ENDPOINT_URL']
s3_access_key = os.environ['AWS_ACCESS_KEY_ID']
s3_secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
s3_bucket="TOMBUCKET"

s3 = boto3.client(service_name='s3',aws_access_key_id = s3_access_key,aws_secret_access_key = s3_secret_key, endpoint_url=s3_endpoint_url)

result = s3.list_objects(Bucket=s3_bucket)
print(result)


# Read Data Using Spark 

In [None]:
import pyspark
import time
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import boto3
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.decomposition import PCA
from sklearn.metrics import precision_recall_curve,\
                            average_precision_score,\
                            roc_auc_score, roc_curve
        
print("Getting a spark session with the distributed spark cluster running on Openshift ")

#Spark Session
# spark_cluster_url = f"spark://{os.environ['SPARK_CLUSTER']}:7077"
import socket
spark_address = socket.gethostbyname("spark-cluster-opentlc-mgr.00odh")
spark_cluster_url = f"spark://{spark_address}:7077"

print(spark_cluster_url)
spark = SparkSession.builder.appName("odh-pyspark").master(spark_cluster_url).getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")
print(spark.sparkContext.version)
print("Spark Session Success")

#Set the Hadoop configurations to access Ceph S3
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.path.style.access", "true");
hadoopConf.set("fs.s3a.access.key", s3_access_key) 
hadoopConf.set("fs.s3a.secret.key", s3_secret_key)
hadoopConf.set("fs.s3a.endpoint", s3_endpoint_url)
hadoopConf.set("fs.s3a.connection.ssl.enabled", 'false')

print("Spark reading transactional data")
df = spark.read.format("csv").option("header", "true").option("inferSchema", "True").option("mode", "DROPMALFORMED").load(f"s3a://{s3_bucket}/OPEN/uploaded/creditcard.csv")

print("Total number of credit card transaction rows: %d" % df.count())
### Check the total number of rows with fraud is detected
print("Total number of rows with fraud")
print(df[(df['Class']==1)].count())


In [None]:
s3_endpoint_url= "[ROOK CEPH URL - NO PROTOCOL]"
s3_endpoint_url

# Train Sklearn Random Forest Model

In [None]:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.externals import joblib

#Order the credit card transaction by transaction time
df.orderBy("Time")

#number of rows in the dataset
n_samples = df.count()
print(n_samples)

#Split into train and test
train_size = 0.75

train_limit = int(n_samples * train_size)
df_train = df.limit(train_limit)     
df_test = df.subtract(df_train) 

#Data Schema
print("Original Data Schema")
df_test.printSchema()

print('Number of train transactions: %s', df_train.count())
print('Number of test  transactions: %s', df_test.count())

#Define features and target variables for convenience.
drop_time_class = ['_c0', 'Time', 'Class']
drop_class=['Class']

#Create Train Datasets
features_train = df_train.drop(*drop_time_class)
target_train = df_train.select("Class")

#Create Test Datasets
features_test = df_test.drop(*drop_time_class)
target_test = df_test.select("Class")

#Create a RondomForest Classifier mode
model = RandomForestClassifier(n_estimators=100, max_depth=4, n_jobs=10)

#Convert to pandas
features_test_pd = features_test.toPandas()
target_test_pd = target_test.toPandas()

features_train_pd = features_train.toPandas()
target_train_pd = target_train.toPandas()

model.fit(features_train_pd, target_train_pd.values.ravel())

pred_train = model.predict(features_train_pd)
pred_test = model.predict(features_test_pd)

pred_train_prob = model.predict_proba(features_train_pd)
pred_test_prob = model.predict_proba(features_test_pd)

print("Number of features")
print(len(model.feature_importances_))

# Confusion Matrix

In [None]:
%%bash
pip3 install matplotlib

In [None]:
import matplotlib.pylab as plt
import matplotlib.colors
from sklearn.metrics import precision_recall_curve,\
                            average_precision_score,\
                            roc_auc_score, roc_curve,\
                            confusion_matrix, classification_report

def plot_confusion_matrix(train_labels, train_pred):
    fig = plt.figure(figsize=(6,6))
    ax = plt.subplot()

    labels = list(train_labels['Class'].value_counts().index)
    print(labels)

    confusion = confusion_matrix(train_labels, train_pred, labels=labels)
    ax.matshow(np.log(confusion + 1.001))

    ax.set_xticks(range(len(labels)))
    ax.set_yticks(range(len(labels)))

    ax.set_xticklabels(labels, rotation=90);
    ax.set_yticklabels(labels);

    for i in range(len(labels)):
        for j in range(len(labels)):        
            ax.text(j, i, confusion[i,j], va='center', ha='center')

    plt.xlabel('predicted')    
    plt.ylabel('true')
    
    return fig

print(target_train_pd['Class'].value_counts())

_=plot_confusion_matrix(target_train_pd, model.predict(features_train_pd))

_=plot_confusion_matrix(target_test_pd, model.predict(features_test_pd))

# Check the Important Features

In [None]:

%matplotlib inline
import operator
import matplotlib.pylab as plt

feat_imp = sorted(zip(features_train_pd.columns, model.feature_importances_), key=operator.itemgetter(1), reverse=True)
plt.figure(figsize=(10,10))
plt.plot([i[0] for i in feat_imp], [i[1] for i in feat_imp], 'p-')
_ = plt.xticks(rotation=90)

##### Re-create the model with Important Features

In [None]:
#Define features and target variables for convenience.
## From the graph we only want seven important features V3,V4,V10,V11,V12,V14,V17
drop_time_class = ['_c0', 'Time', 'Class','V1','V2','V5','V6','V7','V8','V9','V13','V15','V16','V18','V19','V20','V21','V22','V23','V24','V25','V26','V27','V28']
drop_class=['Class']


features_train = df_train.drop(*drop_time_class)
target_train = df_train.select("Class")

features_test = df_test.drop(*drop_time_class)
target_test = df_test.select("Class")
features_test.printSchema()

model = RandomForestClassifier(n_estimators=200, max_depth=6, n_jobs=10, class_weight='balanced')
                               
#Convert to pandas
features_test_pd = features_test.toPandas()
target_test_pd = target_test.toPandas()

features_train_pd = features_train.toPandas()
target_train_pd = target_train.toPandas()

model.fit(features_train_pd, target_train_pd.values.ravel())

pred_train = model.predict(features_train_pd)
pred_test = model.predict(features_test_pd)

pred_train_prob = model.predict_proba(features_train_pd)
pred_test_prob = model.predict_proba(features_test_pd)

print("Number of features")
print(len(model.feature_importances_))
  
#save mode in filesystem
joblib.dump(model, 'model.pkl') 

# Confusion Matrix

In [None]:
_ = plot_confusion_matrix(target_train_pd, model.predict(features_train_pd))

_ = plot_confusion_matrix(target_test_pd, model.predict(features_test_pd))

# Test New Model

In [None]:
import json
import pandas as pd
import numpy as np
import time


df_test_pandas = df_test.toPandas()
fraudTest = df_test_pandas.loc[df_test_pandas['Class']== 1]
notFraudTest = df_test_pandas.loc[df_test_pandas['Class']== 0]

fraudTestFeatures = fraudTest.drop(columns=['Time','Class', '_c0','V1','V2','V5','V6','V7','V8','V9','V13','V15','V16','V18','V19','V20','V21','V22','V23','V24','V25','V26','V27','V28'])
notFraudTestFeatures = notFraudTest.drop(columns=['Time','Class', '_c0','V1','V2','V5','V6','V7','V8','V9','V13','V15','V16','V18','V19','V20','V21','V22','V23','V24','V25','V26','V27','V28'])

for index, row in fraudTestFeatures.iterrows():
    data = row
    rowdf = pd.DataFrame([data.tolist()], columns = ['V3','V4','V10','V11','V12','V14','V17','Amount'])
    print(model.predict(rowdf))
    time.sleep(2)




# Upload Model to Rook/Ceph

In [None]:
import boto3

key = "uploaded/model.pkl"
s3.upload_file(Bucket=s3_bucket, Key=key, Filename="model.pkl")
prefix='uploaded/'
result = s3.list_objects(Bucket=s3_bucket, Prefix=prefix, Delimiter='/')
print(result)

In [None]:

print("Disconnect from Spark")
spark.stop()

# Install OpenShift client

In [None]:
%%bash
curl -o oc.tar.gz -L https://mirror.openshift.com/pub/openshift-v3/clients/4.0.22/linux/oc.tar.gz
tar xzf oc.tar.gz
cp oc ~/../bin/oc


# Login into Cluster

In [None]:
%%bash

oc login -u opentlc-mgr -p 'r3dh4t1!' --insecure-skip-tls-verify [OPENSHIFT MASTER API URL]:6443
oc project 00odh

# Serve Model With Seldon

In [None]:
%%bash
oc project 00odh
oc create -n 00odh -f https://raw.githubusercontent.com/nakfour/odh-kubeflow/master/mymodel.json
oc get seldondeployments

# Test Served Full Model in Curl

In [None]:
%%bash
cp jq ~/../bin/jq
chmod 777 ~/../bin/jq
export TOKENJSON=$(curl -XPOST -u oauth-key:oauth-secret [SELDON URL]/oauth/token -d 'grant_type=client_credentials')
export TOKEN=$(echo $TOKENJSON | jq ".access_token" -r)
echo $TOKEN

curl -v --header "Authorization: Bearer $TOKEN" [SELDON URL]/api/v0.1/predictions -d '{"strData": "0.365194527642578,0.819750231339882,-0.5927999453145171,-0.619484351930421,-2.84752569239798,1.48432160780265,0.499518887687186,72.98"}' -H "Content-Type: application/json"

# Test Served Full Model In Python

In [None]:
### Testing the served model from python using the test dataframe
import requests
import time

# Get the token
post_data = {"grant_type": "client_credentials"}
requestOauth = requests.post('[SELDON URL]/oauth/token', auth=('oauth-key', 'oauth-secret'), data=post_data, json={'grant_type=client_credentials'})

data = requestOauth.json();
print(data['access_token'])
access_token = data['access_token']

headers = {'Content-type': 'application/json', 'Authorization': 'Bearer {}'.format(access_token)}
#Read the test dataframe and stream each row
df_test_pandas = df_test.toPandas()
fraudTest = df_test_pandas.loc[df_test_pandas['Class']== 1]
notFraudTest = df_test_pandas.loc[df_test_pandas['Class']== 0]

fraudTestFeatures = fraudTest.drop(columns=['Time','Class', '_c0','V1','V2','V5','V6','V7','V8','V9','V13','V15','V16','V18','V19','V20','V21','V22','V23','V24','V25','V26','V27','V28'])
notFraudTestFeatures = notFraudTest.drop(columns=['Time','Class', '_c0','V1','V2','V5','V6','V7','V8','V9','V13','V15','V16','V18','V19','V20','V21','V22','V23','V24','V25','V26','V27','V28'])
#for index, row in features_test.toPandas().iterrows():
for index, row in fraudTestFeatures.iterrows():
    data = row
    str1 = ','.join(str(e) for e in  data)
    requestPrediction = requests.post('[SELDON URL]/api/v0.1/predictions', headers=headers, json={"strData": str1 })
    predictionData = requestPrediction.json();
    datafield = predictionData['data']
    predictionArray = datafield['ndarray']
    print(predictionArray[0])
    time.sleep(2)

# Clear Demo

In [None]:
%%bash
oc project 00odh
oc delete seldondeployments mymodel

