In [None]:
## These Codes are Written by Mehdi Touyserkani (ir-bestpro)
## Email : ir_bestpro@yahoo.com
## Website : https://www.ir-bestpro.com

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput,ProcessingOutput
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import (LocalPipelineSession , PipelineSession)
from sagemaker.workflow.steps import (ProcessingStep,TrainingStep)
from sagemaker.workflow.parameters import (ParameterInteger,ParameterString,ParameterFloat)

import boto3
import sagemaker


In [None]:
#________________________Global Variables_____________________________________________

session = PipelineSession() #LocalPipelineSession() 
role = sagemaker.get_execution_role()
default_bucket = sagemaker.session.Session().default_bucket() # Get default bucket of user's aws space

bucket_name = 'customers-dataset'
local_path = './dataset/Customers.csv' # get local path of directories

#__Get access to Dataset and Download it to local Notepad path_____

s3 = boto3.resource('s3')
s3.Bucket(bucket_name).download_file("Customers.csv", local_path)

#__________Uplaod dataset to default s3 bucket_____________________

base_uri = f"s3://{default_bucket}/loyalty"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path = local_path,
    desired_s3_uri = base_uri,
)

#__create workflow parameters for pipeline object instance_______

input_dataset = ParameterString( # set pipeline input data
    name = "InputData",
    default_value = input_data_uri,
)

processing_instance_count = ParameterInteger( # set number of processors
    name = "ProcessingInstanceCount",
    default_value = 1
)

In [None]:
%%writefile preprocessing.py

import math
import numpy as np
import pandas as pd
from sklearn.cluster import DBSCAN
from scipy.stats import entropy
from sklearn.model_selection import train_test_split

def Entropy(X):
    Entropies = np.ones((1,X.shape[1]))
    Entropies = list([entropy(np.unique(X[:,i], return_counts=True)[1],base=10) for i in range(0,X.shape[1])])
    Entropies = sorted(enumerate(Entropies),key = lambda x: x[1])
    selected_features = list(map(lambda x: x[0], [Entropies[i] for i in range(0,math.floor(len(Entropies) * .5))]))
    return (selected_features)


if __name__ == '__main__':

    base_dir = "/opt/ml/processing" # address to container image folder for saving the input and output data

    print('Starting Preprocessing Step')
    
    #_________Reading the input csv file at first______________
    
    Dataset = pd.read_csv(f"{base_dir}/input/Customers.csv" , header=0) # address to base data directory in container and read the data files
    X = Dataset.iloc[:,:-1]
    Gender = {"Female" : 1 , "Male" : 0}
    X['Gender'] = X['Gender'].map(Gender)
    Y = Dataset.iloc[:,-1].to_numpy()

    #_______________Data Smooting________________________________

    nan_df = X.isna().any(axis=1) # get access to nan values data
    nan_df = list(map(lambda element: element[0] , filter(lambda idx: idx[1]== True , enumerate(nan_df)))) # extract nan values

    for data in nan_df:
        nan_row = X.iloc[data , :].isna().any(axis=2) # extract coloumns
        nan_cols = list(map(lambda element: element[0] , filter(lambda idx: idx[1]== True , enumerate(nan_row)))) # extract nan values
        map(lambda pointer : X.iloc[data , pointer].interpolate() , nan_cols) # Estimating Nan Values Based on it's Neighbours (Mean and interpolation)

    X = X.to_numpy()
    
    #_________________Noise Detection and removal___________________

    dist = (np.average(list(map(lambda x: np.linalg.norm(x - X) , X))) / X.shape[0]) / (0.05 * X.shape[1])
    clustering = DBSCAN(eps=dist, min_samples=3).fit(X) # Output -1 Equals to Noise
    noise_indexes = np.array(list(map( lambda y: y[0] , filter(lambda x : x[1] == -1 , enumerate(clustering.labels_))))) # Select Noise Indexes
    clean_indexes = np.array(list(filter(lambda x: noise_indexes.__contains__(x) == False ,range(0,X.shape[0])))) # Selecting Clean Indexes

    X = X[clean_indexes , :] # Remove Noise Indexes from X
    Y = Y[clean_indexes] # Remove Noise Indexes from Y
    Entropy_features = Entropy(X) # The best Features Base on sample Entropy function

    #__________Save Preprocessed data in separate files_____________

    X = X[: , Entropy_features] # feature selection
    X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=1)

    pd.DataFrame(np.column_stack((X_train , y_train))).to_csv(f"{base_dir}/output/Train/Train.csv", header=False, index=False)
    pd.DataFrame(np.column_stack((X_test , y_test))).to_csv(f"{base_dir}/output/Test/Test.csv", header=False, index=False)

print('The first step is executed now!')


In [None]:
inputs  = [ProcessingInput  (source = input_dataset   , destination = '/opt/ml/processing/input')] # copy local data to sklearn container
outputs = [ProcessingOutput (output_name = 'Train' , source = '/opt/ml/processing/output/Train') , 
           ProcessingOutput (output_name = 'Test'  , source = '/opt/ml/processing/output/Test')] # copy preprocessed data to sklearn container

processing_job_instance = SKLearnProcessor(framework_version ='1.0-1',
                                           role = role,
                                           sagemaker_session = session,
                                           base_job_name = "customer-loyalty",
                                           instance_type = 'ml.t3.medium',
                                           instance_count = 1)

preprocessing_step = ProcessingStep(name = "PreProcessingStep",
                                    step_args = processing_job_instance.run(inputs = inputs, outputs = outputs,code = "preprocessing.py"))

In [None]:
%%writefile ./model/train-model.py

#_________Import Libraries_______________________

import numpy as np
import pandas as pd
import os
import torch.optim
import torch.nn as nn
import torch
import boto3
import sagemaker
import pickle

class Dense_Network(nn.Module):

    def __init__(self , features_length , number_of_clesses):
        super(Dense_Network , self).__init__()
        self.FC1 = nn.Sequential(nn.Linear(features_length,256) , nn.BatchNorm1d(256) , nn.ReLU())
        self.FC2 = nn.Sequential(nn.Linear(256,512) , nn.BatchNorm1d(512) ,  nn.ReLU())
        self.FC3 = nn.Sequential(nn.Linear(512,1024) , nn.BatchNorm1d(1024) ,  nn.ReLU())
        self.FC4 = nn.Sequential(nn.Linear(1024,number_of_clesses))

    def forward(self , x):
        out = self.FC1(x)
        out = self.FC2(out)
        out = self.FC3(out)
        out = self.FC4(out)
        return out


if __name__ == '__main__':

    #______________setting the model hyperparameters_______________

    LEARNING_RATE = 0.01
    EPOCHS = 100
    BATCH_SIZE = 32

    base_dir = "/opt/ml/input/data" # base directory of local container input data (Training step container)

    train_Data = pd.read_csv(f"{base_dir}/train/Train.csv").to_numpy()
    X_train = train_Data[:,:-1] # get train csv file from processing step output
    y_train = train_Data[:,-1]

    #______________Create Dense_Network Instances_____________________

    model = Dense_Network( X_train.shape[1] , 1) # create Dense_Network Instance
    optimizer = torch.optim.Adam(model.parameters() , LEARNING_RATE) # using adam weight Optimizer
    Loss = nn.BCEWithLogitsLoss() # using binrary cross entropy loss function

    #_______________Train Phase__________________

    for epoch in range(EPOCHS): # running the model in N epochs
        temp_loss = 0 # temp loss in each epoch
        steps = 0 # counting number of batches
        temp_acc = 0
        for batch in range(0 , len(X_train) , BATCH_SIZE): # reading all coefficients in serveral batches (80% For Train)

            X = torch.from_numpy(X_train[batch : (batch + BATCH_SIZE) , :]).float() # reading current coefficients vectors
            targets = torch.from_numpy(y_train[batch : (batch + BATCH_SIZE)]).float() # reading datalabels
            output = model(X).squeeze(1) # calling the model and get generated outputs
            loss = Loss(output, targets) # calc loss function
            temp_loss += loss.item() # accumulative sum of loss values
            steps += 1

            model.zero_grad() # don't save gradient history
            loss.backward() # backPropagation process
            optimizer.step() # update Weights

        print('Train Phase - Epoch # ' , str(epoch + 1) , ', Loss : ' , str(temp_loss / steps))


    #____________Saving the model and upload to s3 default bucket___________________

    local_model_path = '/opt/ml/model' 
    pickle.dump(model, open(f"{local_model_path}/model.sav", 'wb')) # save model to a separate file


In [None]:
#________________________create training step____________________________________

from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.estimator import Estimator
from sagemaker.pytorch import PyTorch

model_estimator = PyTorch('./model/train-model.py', # create estimator instance object
                            instance_type ='ml.m4.xlarge',
                            instance_count = 1,
                            framework_version ='1.8.0',
                            py_version = 'py3',
                            sagemaker_session = session,
                            role = role,
                            base_job_name = "customer-loyalty")


train_step_args = model_estimator.fit({'train': TrainingInput( # run pytorch model
                                          s3_data = preprocessing_step.properties.ProcessingOutputConfig.Outputs["Train"].S3Output.S3Uri,
                                          content_type="text/csv")})

training_step = TrainingStep( # create trining step
    name = "TrainginStep",
    step_args = train_step_args
)


In [None]:
%%writefile ./model/evaluation.py

import json
import pathlib
import boto3
import sagemaker
import pickle
import tarfile
import os

import torch.optim
import torch.nn as nn
import torch
import joblib
import numpy as np
import pandas as pd


class Dense_Network(nn.Module):

    def __init__(self , features_length , number_of_clesses):
        super(Dense_Network , self).__init__()
        self.FC1 = nn.Sequential(nn.Linear(features_length,256) , nn.BatchNorm1d(256) , nn.ReLU())
        self.FC2 = nn.Sequential(nn.Linear(256,512) , nn.BatchNorm1d(512) ,  nn.ReLU())
        self.FC3 = nn.Sequential(nn.Linear(512,1024) , nn.BatchNorm1d(1024) ,  nn.ReLU())
        self.FC4 = nn.Sequential(nn.Linear(1024,number_of_clesses))

    def forward(self , x):
        out = self.FC1(x)
        out = self.FC2(out)
        out = self.FC3(out)
        out = self.FC4(out)
        return out

if __name__ == "__main__":

    #___________Get model from s3 bucket_____________________

    model_path = f"/opt/ml/processing/model/model.tar.gz" # local model path in container
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("model.sav", "rb"))
    #model = pickle.load(open(f"{model_path}/model.sav", "rb"))

    test_path = "/opt/ml/processing/test" # local test data path in container
    test_Data = pd.read_csv(f"{test_path}/Test.csv").to_numpy()
    X_test = test_Data[:,:-1] # get test csv file from processing step output
    y_test = test_Data[:,-1]
    
    X_test = torch.from_numpy(X_test).float() # convert to Tensor object
    y_test = torch.from_numpy(y_test).float() # convert to Tensor object

    #________________Test Phase______________________________

    acc = 0 # accuracy value
    #with torch.no_grad(): # stop weight updating
    outputs = model(X_test).squeeze(1) # test with 20% of data
    predicted = (outputs.data > 0.5).float() # predicted labels
    del outputs # free Ram Space
    true = (predicted == y_test).sum().item() # correct answers
    acc = (true / len(y_test))
    print('The Model Accuracy Is : ' , str(100 * (true / len(y_test))) + '%') # Pprint the final accuracy

    report_dict = {
        "Model Accuracy": {
            "acc": {"value": acc},
        },
    }

    #____________save acc value to json file__________________
    
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))
        

In [None]:
#________________Create evaluation processing Step_________________________________________

from sagemaker.pytorch.processing import PyTorchProcessor

eval_inputs  = [ProcessingInput  (source = training_step.properties.ModelArtifacts.S3ModelArtifacts,
                                  destination = '/opt/ml/processing/model'),
                ProcessingInput  (source = preprocessing_step.properties.ProcessingOutputConfig.Outputs["Test"].S3Output.S3Uri,
                                  destination = '/opt/ml/processing/test')] # copy local data to sklearn container

eval_outputs = [ProcessingOutput (output_name = 'Evaluation' , source = '/opt/ml/processing/evaluation')] # local container output directory


evaluation_job_instance = PyTorchProcessor( # run pytorch codes in processing step (evaluation step)
                                            framework_version ='1.8',
                                            role = role,
                                            instance_type ='ml.t3.medium',
                                            instance_count =1,
                                            sagemaker_session = session,
                                            base_job_name ='customer-loyalty')

evaluation_step = ProcessingStep(name = "EvaluationStep",
                                 step_args = evaluation_job_instance.run(inputs = eval_inputs, outputs = eval_outputs, code = "./model/evaluation.py"))



In [None]:
#______________create pipeline and set parameters_____________________________

from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True) # set job instance name for each step

pipeline = Pipeline( # create pipeline instance object
    name = "customer-loyalty-pipeline",
    steps = [preprocessing_step , training_step , evaluation_step],
    parameters = [processing_instance_count,input_dataset],
    sagemaker_session = session,
    pipeline_definition_config = definition_config
)

pipeline.create(
    role_arn=role,
    description="customer-loyalty-pipeline"
)

execution = pipeline.start() # start the pipeline
execution.list_steps() # show the list of steps

In [None]:
#client = boto3.client('sagemaker')
#response = client.delete_pipeline(PipelineName='customer-loyalty-pipeline')