In [1]:
!pip3 install --upgrade  google-cloud-aiplatform \
                                 google-cloud-storage \
                                 kfp \
                                 google-cloud-pipeline-components

Collecting google-cloud-aiplatform
  Downloading google_cloud_aiplatform-1.44.0-py2.py3-none-any.whl.metadata (27 kB)
Collecting google-cloud-storage
  Downloading google_cloud_storage-2.16.0-py2.py3-none-any.whl.metadata (6.1 kB)
Collecting kfp
  Downloading kfp-2.7.0.tar.gz (441 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m441.8/441.8 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting google-cloud-pipeline-components
  Downloading google_cloud_pipeline_components-2.11.0-py3-none-any.whl.metadata (5.9 kB)
Collecting google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,<3.0.0dev,>=1.34.1 (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,<3.0.0dev,>=1.34.1->google-cloud-aiplatform)
  Downloading google_api_core-2.18.0-py3-none-any.whl.metadata (2.7 kB)
Collecting kfp-pipeline-spec==0.3.0 (from kfp)
  Downloading kfp_p

In [1]:
!gcloud config list

[core]
account = 617832854196-compute@developer.gserviceaccount.com
disable_usage_reporting = True
project = end2end-416809

Your active configuration is: [default]


In [2]:
PROJECT_ID = "end2end-416809"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [3]:
from typing import NamedTuple

import kfp
from kfp import dsl
from kfp import compiler
from kfp.dsl import InputPath, Model, OutputPath, component
from google.cloud import aiplatform as aip

In [4]:
# Initializing AI platform
BUCKET_URI = "gs://kfp-churn-bucket"
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [5]:
@dsl.component(base_image='python:3.12', packages_to_install = ["pandas==2.1.4", "fsspec==2024.3.1","gcsfs"])
def prepare_data(output_csv: OutputPath(), 
                 train1_csv_url: str 
                 ) -> None:
    
    import pandas as pd
    print("---- Inside prepare_data component ----")
    # Load dataset
    train1 = pd.read_csv(train1_csv_url)
    
    with open(output_csv, 'w') as f:
        train1.to_csv(f, index=False)
    # print("\n ---- data csv is saved to PV location /data/final_df.csv ----")

In [6]:
@dsl.component(base_image='python:3.12', 
               packages_to_install = ["pandas==2.1.4", "scikit-learn", "numpy", "fsspec==2024.3.1", "gcsfs"])
def train_test_split(cc_data_csv: InputPath(), 
                     train_csv: OutputPath(), 
                     test_csv: OutputPath(), 
                     target: str) -> None:
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    print("---- Inside train_test_split component ----")
    with open(cc_data_csv) as f:
        final_data = pd.read_csv(f)
    
    train_df, test_df = train_test_split(final_data, test_size=0.3,stratify = final_data[target], random_state=47)

    with open(train_csv, 'w') as f:
        train_df.to_csv(f, index=False)

    with open(test_csv, 'w') as f:
        test_df.to_csv(f, index=False)
    

In [7]:
@dsl.component(base_image='python:3.12', 
               packages_to_install = ["pandas==2.1.4", "scikit-learn", "numpy", "fsspec==2024.3.1","gcsfs"])
def training_basic_classifier(train_csv: InputPath(), 
                              trained_model: OutputPath(), 
                              target_column: str) -> None:
    import pandas as pd
    import numpy as np
    from sklearn.linear_model import LogisticRegression
    from sklearn.preprocessing import LabelEncoder
    from sklearn.impute import SimpleImputer

    
    print("---- Inside training_basic_classifier component ----")
    
    with open(train_csv) as f:
        df = pd.read_csv(f)
    target = target_column
    df = df.apply(LabelEncoder().fit_transform)
    
    features = df.drop(target, axis= 1)
    ytrain = df[target]
    
    input_median = SimpleImputer(missing_values=np.nan, strategy ='median')
    xtrain = input_median.fit_transform(features)

    classifier = LogisticRegression(max_iter=500)
    classifier.fit(xtrain,ytrain)
    import pickle
    with open(trained_model, 'wb') as f:
        pickle.dump(classifier, f)
    
    print("\n logistic regression classifier is trained on iris data and saved to PV location /data/model.pkl ----")

In [8]:
@dsl.component(base_image='python:3.12', 
               packages_to_install = ["pandas==2.1.4", "scikit-learn", "numpy", "fsspec==2024.3.1","gcsfs"])
def predict_on_test_data(trained_model: InputPath(), 
                         test_csv: InputPath(),
                         y_pred_csv: OutputPath(), 
                         target:str) -> None:
    import pandas as pd
    import numpy as np
    import pickle
    from sklearn.preprocessing import LabelEncoder
    from sklearn.impute import SimpleImputer
    
    print("---- Inside predict_on_test_data component ----")

    with open(test_csv,'rb') as f:
        test_df = pd.read_csv(f)
        
    with open(trained_model,'rb') as f:
        logistic_reg_model = pickle.load(f)
        
    df = test_df.apply(LabelEncoder().fit_transform)
    
    features = df.drop(target, axis= 1)
    input_median = SimpleImputer(missing_values=np.nan, strategy ='median')
    
    xtest = input_median.fit_transform(features)
    ytest = df[target]

    y_pred = logistic_reg_model.predict(xtest)
    y_pred_df = pd.DataFrame(y_pred)
    print(y_pred)

    with open(y_pred_csv, 'w') as f:
        y_pred_df.to_csv(f, index=False)


In [9]:
@dsl.component(base_image='python:3.12', 
               packages_to_install = ["pandas==2.1.4", "scikit-learn", "numpy", "fsspec==2024.3.1","gcsfs"])
def get_metrics(trained_model: InputPath(), 
                         test_csv: InputPath(),
                         y_pred_csv: InputPath(), 
                         target:str) -> None:
    import pandas as pd
    import numpy as np
    from sklearn.metrics import accuracy_score,precision_score,recall_score,log_loss
    from sklearn import metrics
    import pickle
    from sklearn.preprocessing import LabelEncoder
    from sklearn.impute import SimpleImputer
    
    print("---- Inside get_metrics component ----")

    with open(test_csv) as f:
        test_df = pd.read_csv(f)
    df = test_df.apply(LabelEncoder().fit_transform)
    
    with open(trained_model,'rb') as f:
        logistic_reg_model = pickle.load(f)

    with open(y_pred_csv,'rb') as f:
        y_pred_df = pd.read_csv(f)

    y_pred = y_pred_df.to_numpy()

    y_test = df[target]
    
    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred,average='micro')
    recall = recall_score(y_test, y_pred,average='micro') 
    
    print(metrics.classification_report(y_test, y_pred))
    
    print("\n Model Metrics:", {'accuracy': round(acc, 2), 'precision': round(prec, 2), 'recall': round(recall, 2)})

In [10]:

# Define the pipeline
@dsl.pipeline(
   name='Credit Card Fraud Detection classifier Kubeflow Demo Pipeline',
   description='A sample pipeline that performs Credit Card Fraud Detection classifier task'
)
# Define parameters to be fed into pipeline
def credit_card_fraud_detection_pipeline():
    
    target = "isFraud"
    train_transaction = "gs://credit-card-bucket-v1/credit-card-fraud-detection-data.csv"
    # train_identity = "gs://kfp-churn-bucket/train_transaction.csv"
    
    prepare_data_task = prepare_data(train1_csv_url= train_transaction)
    
    train_test_split_data = train_test_split(cc_data_csv = prepare_data_task.outputs["output_csv"], target=target).after(prepare_data_task)
    
    classifier_training = training_basic_classifier(train_csv= train_test_split_data.outputs["train_csv"], target_column= target).after(train_test_split)
    
    log_predicted_class = predict_on_test_data(trained_model= classifier_training.outputs["trained_model"], 
                         test_csv= train_test_split_data.outputs["test_csv"],target = target).after(classifier_training)
    
    log_metrics_task = get_metrics(trained_model= classifier_training.outputs["trained_model"], 
                         test_csv = train_test_split_data.outputs["test_csv"],
                         y_pred_csv= log_predicted_class.outputs["y_pred_csv"], 
                         target = target).after(log_predicted_class)


In [11]:
kfp.compiler.Compiler().compile(
    pipeline_func=credit_card_fraud_detection_pipeline,
    package_path='credit_card_fraud_detection.yaml')


In [12]:
job = aip.PipelineJob(
    display_name="credi-card-fraud-detection",
    template_path="credit_card_fraud_detection.yaml",
    enable_caching = False
)

In [13]:
job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/617832854196/locations/us-central1/pipelineJobs/credit-card-fraud-detection-classifier-kubeflow-demo-pipeline-20240323101901
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/617832854196/locations/us-central1/pipelineJobs/credit-card-fraud-detection-classifier-kubeflow-demo-pipeline-20240323101901')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/credit-card-fraud-detection-classifier-kubeflow-demo-pipeline-20240323101901?project=617832854196
PipelineJob projects/617832854196/locations/us-central1/pipelineJobs/credit-card-fraud-detection-classifier-kubeflow-demo-pipeline-20240323101901 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/617832854196/locations/us-central1/pipelineJobs/credit-card-fraud-detection-classifier-kubeflow-demo-pipeline-20240323101901 current state:
PipelineState.PIPELINE_STATE_R