## Connection and Data Validation Notebook

## Table of Contents
* [Check for Training Data in Project Space](#DataCheck)
    * [Load the Training Data from COS if the file doesn't exist](#section_1_1)
    * [Check the connection and data loading](#section_1_2)
  
* [Data Validation](#chapter2)
    * [Split the Data](#Optional)

    * [Generate Training Stats on both Splits](#section_2_2)
    * [Infer Schema on both Splits](#section_2_3) 
    * [Check for anomalies](#section_2_4) 
    * [Return a boolean to validate the tests](#section_3_1) 


## Imports

In [1]:
from botocore.client import Config
from sklearn.model_selection import train_test_split
from dataclasses import dataclass
#import tensorflow_data_validation as tfdv
from ibm_watson_studio_pipelines import WSPipelines
import warnings
import numpy as np
import logging
import pandas as pd
import ibm_boto3
import os, types
import pandas as pd
warnings.filterwarnings("ignore")

These environment variables are set in WS Pipelines

In [2]:
cpd_api_key = os.getenv("cpd_api_key")
training_file_name = os.getenv("training_file_name")

### Load the Credentials


In [3]:
# {
#     "apikey": "8vgCMAMPy4ND7yv9OZ_oZTMdMuQZwGcM9JAAWtt9XmRX",
#     "endpoints": "https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints",
#     "iam_apikey_description": "Auto-generated for key crn:v1:bluemix:public:cloud-object-storage:global:a/30af6f53bf2952b41e227b2cd22197b1:ec25505d-5429-4901-b328-8249cc621f3a:resource-key:05b5dfe4-d718-4121-b331-897d3dc4b6bb",
#     "iam_apikey_name": "uob-cos",
#     "iam_serviceid_crn": "crn:v1:bluemix:public:iam-identity::a/30af6f53bf2952b41e227b2cd22197b1::serviceid:ServiceId-a1d750d8-83f9-41b4-b61c-afd262f5fc36",
#     "resource_instance_id": "crn:v1:bluemix:public:cloud-object-storage:global:a/30af6f53bf2952b41e227b2cd22197b1:ec25505d-5429-4901-b328-8249cc621f3a::"
# }

In [4]:
# @hidden_cell

##MLOPS COS
ENDPOINT_URL_MLOPS = "https://s3.jp-tok.cloud-object-storage.appdomain.cloud"
AUTH_ENDPOINT = "https://iam.cloud.ibm.com/oidc/token"
API_KEY_MLOPS = "W7libKUAsvOKR9sQaYBzpQiReuiVp4Vm9JWqugRsW6Rq"
CRN_MLOPS = "crn:v1:bluemix:public:cloud-object-storage:global:a/694a501fd56b4f6b9aa747128ab5267b:5e28c0cc-7d72-49b7-be0b-4e0606310e4e::"
BUCKET_MLOPS  = "mlops-cpd-onprem"

training_file_name ='german_credit_risk.csv'

## Check for Training Data in Project Space

In [5]:
def check_for_file_in_project_cos(key):
    try:
        if os.path.exists(f"/project_data/data_asset/{key}"):
            return True
    except Exception as e :
        print(e)
        return False
    
    
def read_data_from_mlops_cos(key):
    def __iter__(self): return 0
    MLOPS_DATA_STORE_client = ibm_boto3.client(
        service_name='s3',
        ibm_api_key_id=API_KEY_MLOPS,
        ibm_service_instance_id=CRN_MLOPS,
        ibm_auth_endpoint=AUTH_ENDPOINT,
        config=Config(signature_version='oauth'),
        endpoint_url=ENDPOINT_URL_MLOPS)

    body = MLOPS_DATA_STORE_client.get_object(Bucket=BUCKET_MLOPS, Key=key)['Body']
    # add missing __iter__ method, so pandas accepts body as file-like object
    if not hasattr(body, "__iter__"): body.__iter__ = types.MethodType( __iter__, body )

    gcf_df = pd.read_csv(body)
    return gcf_df
    
    
def load_data_from_project(key):
    body = check_for_file_in_project_cos(key)
    if body:
        gcf_df = pd.read_csv(f"/project_data/data_asset/{key}")
        return gcf_df
    else:
        print("\n")
        print(f"{key} file is probably not in project. Loading File from MLOps COS Bucket.")
        gcf_df = read_data_from_mlops_cos(key)
        return gcf_df

## Load the Training Data from COS if the file doesn't exist

In [6]:
gcr_df = load_data_from_project(training_file_name)

## Encode for ease of use with OpenScale

gcr_df['Risk'] = gcr_df['Risk'].map({'Risk':1,'No Risk':0})
gcr_df.head()



german_credit_risk.csv file is probably not in project. Loading File from MLOps COS Bucket.


Unnamed: 0,CheckingStatus,LoanDuration,CreditHistory,LoanPurpose,LoanAmount,ExistingSavings,EmploymentDuration,InstallmentPercent,Sex,OthersOnLoan,...,OwnsProperty,Age,InstallmentPlans,Housing,ExistingCreditsCount,Job,Dependents,Telephone,ForeignWorker,Risk
0,0_to_200,31,credits_paid_to_date,other,1889,100_to_500,less_1,3,female,none,...,savings_insurance,32,none,own,1,skilled,1,none,yes,0
1,less_0,18,credits_paid_to_date,car_new,462,less_100,1_to_4,2,female,none,...,savings_insurance,37,stores,own,2,skilled,1,none,yes,0
2,less_0,15,prior_payments_delayed,furniture,250,less_100,1_to_4,2,male,none,...,real_estate,28,none,own,2,skilled,1,yes,no,0
3,0_to_200,28,credits_paid_to_date,retraining,3693,less_100,greater_7,3,male,none,...,savings_insurance,32,none,own,1,skilled,1,none,yes,0
4,no_checking,28,prior_payments_delayed,education,6235,500_to_1000,greater_7,3,male,none,...,unknown,57,none,own,2,skilled,1,none,yes,1


## Data Validation 

In [7]:
@dataclass
class Datavalidation:
    """
    
    Data Validation Class
    
    """
    dataframe : pd.DataFrame
    mask_per :int
    
    
    def split_data(self,seed=32):
        """
        Split Data into Train and Test Splits
        
        """
        np.random.seed(seed)
        mask = np.random.rand(len(self.dataframe)) <= self.mask_per
        training_data = gcr_df[mask]
        testing_data = gcr_df[~mask]

        print(f"No. of training examples: {training_data.shape[0]}")
        print(f"No. of testing examples: {testing_data.shape[0]}")
        
        return training_data, testing_data
    
    
    def save_data_in_cos(self,df,filename,key):
        """
        
        Save Data in IBM Cloud Object Storage
        
        """
        try:
            df.to_csv(filename,index=False)
            mlops_res = ibm_boto3.resource(
                service_name='s3',
                ibm_api_key_id=API_KEY_MLOPS,
                ibm_service_instance_id=CRN_MLOPS,
                ibm_auth_endpoint=AUTH_ENDPOINT,
                config=Config(signature_version='oauth'),
                endpoint_url=ENDPOINT_URL_MLOPS)

            mlops_res.Bucket(BUCKET_MLOPS).upload_file(filename,key)
            print(f"File {filename} uploaded successfully")
        except Exception as e:
            print(e)
            print("File upload for {filename} failed")
    
    
    def generate_statistics(self,df):
        """
        
        Generate Statistics on a given Dataframe
        
        """
        train_stats = tfdv.generate_statistics_from_dataframe(df)
        tfdv.visualize_statistics(train_stats)
        return train_stats
    
    def inferSchema(self,stats):
        
        """
        InferSchema on a given Dataframe
        
        """
        schema = tfdv.infer_schema(statistics=stats)
        tfdv.display_schema(schema=schema)
        return schema
    
    def compare_statistics(self,lhs,rhs):
        """
        
        Compare Statistics between a test dataframe and reference Schema
        
        """
        # Compare evaluation data with training data
        tfdv.visualize_statistics(lhs_statistics=lhs, rhs_statistics=rhs,
                                  lhs_name='TEST_DATASET', rhs_name='TRAIN_DATASET')
        
        
    def check_for_anomalies(self,testable_stats,ref_schema):
        """
        
        Check for any anomalies based on statistics and schema and values
        
        """
        anomalies = tfdv.validate_statistics(statistics=testable_stats, schema=ref_schema)
        tfdv.display_anomalies(anomalies)
        if len(anomalies.anomaly_info.items()) > 0:
            logger.error("Anomalies found in dataset...")
            logger.error(str(self.anomalies.anomaly_info.items()))
            return True
        else:
            return False

def check_if_file_exists(filename):
    mlops_client = ibm_boto3.client(
        service_name='s3',
        ibm_api_key_id=API_KEY_MLOPS,
        ibm_service_instance_id=CRN_MLOPS,
        ibm_auth_endpoint=AUTH_ENDPOINT,
        config=Config(signature_version='oauth'),
        endpoint_url=ENDPOINT_URL_MLOPS)
    
    for key in mlops_client.list_objects(Bucket=BUCKET_MLOPS)['Contents']:
        files = key['Key']
        if files == filename:
            return True
    return False

###  Split Data into Train and Eval Splits to Check for Consistency

In [8]:
classvalidate = Datavalidation(dataframe=gcr_df,mask_per=0.8) 

training_data, testing_data = classvalidate.split_data()


No. of training examples: 3995
No. of testing examples: 1005


## Generate Training Stats on both Splits

In [9]:
#train_stats = classvalidate.generate_statistics(training_data)

In [10]:
#test_stats = classvalidate.generate_statistics(testing_data)

## Infer Training Data Schema

In [11]:
#train_schema = classvalidate.inferSchema(train_stats)

## Infer Test Data Schema

In [12]:
#test_schema = classvalidate.inferSchema(test_stats)

## Compare Eval and Train Data 

In [13]:
#classvalidate.compare_statistics(lhs=test_stats,rhs=train_stats)

## Check For Data Anomalies 

### Check eval data for errors by validating the eval data stats using the previously inferred schema.

In [14]:
# anomaly_status = classvalidate.check_for_anomalies(test_stats,train_schema)
# anomaly_status

In [15]:
anomaly_status = True

## Save Train and Test Data for Data Preparation Stage

In [16]:
if not anomaly_status:
    classvalidate.save_data_in_cos(df=training_data,filename="train_gcr.csv",key="train_gcr.csv")
    classvalidate.save_data_in_cos(df=testing_data,filename="test_gcr.csv",key="test_gcr.csv")

## Check if files Exists in COS

In [17]:
files_copied_in_cos = check_if_file_exists("train_gcr.csv") and check_if_file_exists("test_gcr.csv")
files_copied_in_cos

True

## Register a Boolean Variable in WS Pipeline

In [18]:
validation_params = {}
validation_params['anomaly_status'] = anomaly_status
validation_params['files_copied_in_cos'] = files_copied_in_cos

In [19]:
pipelines_client = WSPipelines.from_token(os.environ['USER_ACCESS_TOKEN'])
pipelines_client.store_results(validation_params)

Running outside of Watson Studio Pipeline - storing results in the local filesystem for testing purposes...

  output paths:
    - "anomaly_status": .ibm_watson_studio_pipelines/results/anomaly_status
    - "files_copied_in_cos": .ibm_watson_studio_pipelines/results/files_copied_in_cos


<ibm_cloud_sdk_core.detailed_response.DetailedResponse at 0x7facfee73460>