# KFP pipeline for ASN Model

In [1]:
from kfp import dsl
from kfp.dsl import (component, 
                     Metrics,
                     Dataset,
                     Input,
                     Model,
                     Artifact,
                     OutputPath,
                     Output)
from kfp import compiler
import google.cloud.aiplatform as aiplatform
import os

# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "uk-gap-proximity-dev-2acc049a288c.json"

In [2]:
BUCKET_NAME = "gs://my-data-classification/"
PROJECT_ID = "gcp-project-raj33342"
REGION = "europe-west1"

In [3]:
PIPELINE_ROOT = f"gs://my-data-classification/pipeline_root_demo"

In [9]:
from google.cloud.aiplatform import PipelineJob

# Parameters
project_id = PROJECT_ID
bucket_name = BUCKET_NAME
region = REGION

pipeline_job = PipelineJob(
    display_name="kfp_pipeline",
    template_path="training_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    location=region,
)
pipeline_job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/606038602781/locations/europe-west1/pipelineJobs/training-pipeline-20241022085029
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/606038602781/locations/europe-west1/pipelineJobs/training-pipeline-20241022085029')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west1/pipelines/runs/training-pipeline-20241022085029?project=606038602781
PipelineJob projects/606038602781/locations/europe-west1/pipelineJobs/training-pipeline-20241022085029 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/606038602781/locations/europe-west1/pipelineJobs/training-pipeline-20241022085029 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/606038602781/locations/europe-west1/pipelineJobs/training-pipeline-20241022085029 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/606038602781/locations/europe-we

In [4]:
from google.cloud.aiplatform import PipelineJob

# Parameters
project_id = PROJECT_ID
bucket_name = BUCKET_NAME
region = REGION

pipeline_job = PipelineJob(
    display_name="kfp_batch_pipeline",
    template_path="batch_prediction_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    location=region,
)
pipeline_job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/606038602781/locations/europe-west1/pipelineJobs/batch-predict-pipeline-20241022123437
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/606038602781/locations/europe-west1/pipelineJobs/batch-predict-pipeline-20241022123437')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west1/pipelines/runs/batch-predict-pipeline-20241022123437?project=606038602781
PipelineJob projects/606038602781/locations/europe-west1/pipelineJobs/batch-predict-pipeline-20241022123437 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/606038602781/locations/europe-west1/pipelineJobs/batch-predict-pipeline-20241022123437 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/606038602781/locations/europe-west1/pipelineJobs/batch-predict-pipeline-20241022123437 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/60

In [17]:
import pandas as pd
df = pd.read_csv("gs://my-data-classification/wines.csv")


In [18]:
df_columns = df.iloc[:,1:].columns.to_list() + ["type"]

In [19]:
df.iloc[:,1:]


Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.0,0.270,0.36,20.7,0.045,45.0,170.0,1.00100,3.00,0.45,8.8,6
1,6.3,0.300,0.34,1.6,0.049,14.0,132.0,0.99400,3.30,0.49,9.5,6
2,8.1,0.280,0.40,6.9,0.050,30.0,97.0,0.99510,3.26,0.44,10.1,6
3,7.2,0.230,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6
4,7.2,0.230,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6
...,...,...,...,...,...,...,...,...,...,...,...,...
6492,6.2,0.600,0.08,2.0,0.090,32.0,44.0,0.99490,3.45,0.58,10.5,5
6493,5.9,0.550,0.10,2.2,0.062,39.0,51.0,0.99512,3.52,,11.2,6
6494,6.3,0.510,0.13,2.3,0.076,29.0,40.0,0.99574,3.42,0.75,11.0,6
6495,5.9,0.645,0.12,2.0,0.075,32.0,44.0,0.99547,3.57,0.71,10.2,5


In [21]:
df = df[df_columns]

In [22]:
df

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality,type
0,7.0,0.270,0.36,20.7,0.045,45.0,170.0,1.00100,3.00,0.45,8.8,6,white
1,6.3,0.300,0.34,1.6,0.049,14.0,132.0,0.99400,3.30,0.49,9.5,6,white
2,8.1,0.280,0.40,6.9,0.050,30.0,97.0,0.99510,3.26,0.44,10.1,6,white
3,7.2,0.230,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6,white
4,7.2,0.230,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6,white
...,...,...,...,...,...,...,...,...,...,...,...,...,...
6492,6.2,0.600,0.08,2.0,0.090,32.0,44.0,0.99490,3.45,0.58,10.5,5,red
6493,5.9,0.550,0.10,2.2,0.062,39.0,51.0,0.99512,3.52,,11.2,6,red
6494,6.3,0.510,0.13,2.3,0.076,29.0,40.0,0.99574,3.42,0.75,11.0,6,red
6495,5.9,0.645,0.12,2.0,0.075,32.0,44.0,0.99547,3.57,0.71,10.2,5,red


In [23]:
sample_data = df.sample(500)

In [24]:
sample_data

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality,type
582,7.3,0.300,0.22,6.40,0.056,44.0,168.0,0.99470,3.13,0.35,10.1,6,white
1279,6.6,0.270,0.30,1.90,0.025,14.0,153.0,0.99280,3.29,0.62,10.5,6,white
3461,6.7,0.240,0.30,3.85,0.042,105.0,179.0,0.99189,3.04,0.59,11.3,8,white
3427,5.8,0.240,0.26,10.05,0.039,63.0,162.0,0.99375,3.33,0.50,11.2,6,white
6274,8.2,0.885,0.20,1.40,0.086,7.0,31.0,0.99460,3.11,0.46,10.0,5,red
...,...,...,...,...,...,...,...,...,...,...,...,...,...
112,7.2,0.310,0.50,13.30,0.056,68.0,195.0,0.99820,3.01,0.47,9.2,5,white
5488,9.0,0.450,0.49,2.60,0.084,21.0,75.0,0.99870,3.35,0.57,9.7,5,red
71,6.8,0.300,0.23,4.60,0.061,50.5,238.5,0.99580,3.32,0.60,9.5,5,white
3071,6.5,0.360,0.38,10.20,0.028,20.0,82.0,0.99274,3.10,0.43,12.1,7,white


In [25]:
gd_df = pd.DataFrame(sample_data['type'])
gd_df.to_csv("gt_sample.csv", index=False)

In [26]:
sample_test_data = sample_data.drop(columns=["type"])

In [27]:
sample_test_data.to_csv("batch_test_sample.csv", index=False)

In [28]:
sample_data['type'].value_counts()

type
white    377
red      123
Name: count, dtype: int64

In [32]:
sample_data.columns.to_list()

['type',
 'fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [22]:
import joblib
model = joblib.load(path)

# Assuming you have a sample input batch
sample_input = [[7.0, 0.27, 0.36, 20.7, 0.045, 45.0, 170.0, 1.001, 3.0, 0.45, 8.8, 6.0]]
prediction = model.predict(sample_input)
print("Prediction:", prediction)

FileNotFoundError: [Errno 2] No such file or directory: 'gs://sky-mlops-dev/kfp_mlops/pipeline_root_demo/213412891337/training-pipeline-20240929200747/training_5423951882714349568/ml_model/model.joblib'

In [14]:
import scikit-learn

SyntaxError: invalid syntax (464634962.py, line 1)

In [13]:
import gcsfs
import joblib

fs = gcsfs.GCSFileSystem(project='sky-mlops-dev')
with fs.open('gs://sky-mlops-dev/kfp_mlops/pipeline_root_demo/213412891337/training-pipeline-20240929200747/training_5423951882714349568/ml_model/model.joblib', 'rb') as f:
    model = joblib.load(f)


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


ValueError: node array from the pickle has an incompatible dtype:
- expected: {'names': ['left_child', 'right_child', 'feature', 'threshold', 'impurity', 'n_node_samples', 'weighted_n_node_samples', 'missing_go_to_left'], 'formats': ['<i8', '<i8', '<i8', '<f8', '<f8', '<i8', '<f8', 'u1'], 'offsets': [0, 8, 16, 24, 32, 40, 48, 56], 'itemsize': 64}
- got     : [('left_child', '<i8'), ('right_child', '<i8'), ('feature', '<i8'), ('threshold', '<f8'), ('impurity', '<f8'), ('n_node_samples', '<i8'), ('weighted_n_node_samples', '<f8')]

In [5]:
from google.cloud import storage

storage_client = storage.Client()
# List buckets
for bucket in storage_client.list_buckets():
    print(bucket.name)

artifacts.uk-gap-proximity-dev.appspot.com
asn_prediction_data
cdn-logs-config-dev
cdn-suspicious-asn-dev
cdn_logs_test
cloud-ai-platform-6e01d309-9443-4e6e-a71b-b118006bee93
cloud-ai-platform-7f322054-1127-4f3f-a8b6-a013fa86c892
cloud_logger_test
dataflow-staging-europe-west1-4b5ff1d7df0f5e4dab4a63d14ebc9c75
eu.artifacts.uk-gap-proximity-dev.appspot.com
europe-west1-uk-gap-proximi-03905d7a-bucket
europe-west1-uk-gap-proximi-06909d24-bucket
europe-west1-uk-gap-proximi-11d508c9-bucket
europe-west1-uk-gap-proximi-158eabff-bucket
europe-west1-uk-gap-proximi-20ac0f86-bucket
europe-west1-uk-gap-proximi-298c9a82-bucket
europe-west1-uk-gap-proximi-3625c588-bucket
europe-west1-uk-gap-proximi-4af96600-bucket
europe-west1-uk-gap-proximi-4cd63500-bucket
europe-west1-uk-gap-proximi-4ee0444f-bucket
europe-west1-uk-gap-proximi-586a4af1-bucket
europe-west1-uk-gap-proximi-65e5278a-bucket
europe-west1-uk-gap-proximi-740e1bbd-bucket
europe-west1-uk-gap-proximi-9f3b269c-bucket
europe-west1-uk-gap-proximi

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import recall_score
import warnings
warnings.simplefilter("ignore")
# matplotlib and seaborn are used for visualizations and warnings; we can ignore all the warnings we encounter.

# Import the dataset from your local desktop. Use pandas for it. Enter the path to the dataset file in the read_csv method. It will import the iris dataset.
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
# iris = datasets.load_iris()
#Import iris dataset
# df=pd.read_csv(r'C:UsersAdminDownloadsIris.csv')

In [19]:
df = pd.read_csv("wines.csv")

In [20]:
df['type'].value_counts()

white    4898
red      1599
Name: type, dtype: int64

In [21]:
df

Unnamed: 0,type,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,white,7.0,0.270,0.36,20.7,0.045,45.0,170.0,1.00100,3.00,0.45,8.8,6
1,white,6.3,0.300,0.34,1.6,0.049,14.0,132.0,0.99400,3.30,0.49,9.5,6
2,white,8.1,0.280,0.40,6.9,0.050,30.0,97.0,0.99510,3.26,0.44,10.1,6
3,white,7.2,0.230,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6
4,white,7.2,0.230,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6
...,...,...,...,...,...,...,...,...,...,...,...,...,...
6492,red,6.2,0.600,0.08,2.0,0.090,32.0,44.0,0.99490,3.45,0.58,10.5,5
6493,red,5.9,0.550,0.10,2.2,0.062,39.0,51.0,0.99512,3.52,0.76,11.2,6
6494,red,6.3,0.510,0.13,2.3,0.076,29.0,40.0,0.99574,3.42,0.75,11.0,6
6495,red,5.9,0.645,0.12,2.0,0.075,32.0,44.0,0.99547,3.57,0.71,10.2,5


In [22]:
df['type']= df['type'].apply(lambda x: 0 if x == 'white' else 1)

In [23]:
x = df.drop('type', axis=1)
y = df['type']

trainX, testX, trainY, testY = train_test_split(x, y, test_size = 0.2)
sc=StandardScaler()

scaler = sc.fit(trainX)
trainX_scaled = scaler.transform(trainX)
testX_scaled = scaler.transform(testX)

In [25]:
trainX_scaled = pd.DataFrame(trainX_scaled)
trainY = pd.DataFrame(trainY)

In [26]:
trainY.name = 'type'
# y_test.name = target_column

In [27]:
trainY.shape

(5197, 1)

In [28]:
trainX_scaled.shape

(5197, 12)

In [29]:
trainX_scaled

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11
0,-0.172164,1.413670,-2.196148,-0.732069,3.863664,-1.043760,-1.254779,0.698204,0.580553,0.466874,-0.922598,0.214418
1,1.810298,0.494706,0.208332,-0.710832,0.779261,-0.762982,-1.378792,0.515484,0.081585,0.874173,0.503746,0.214418
2,-0.629656,-0.179201,1.238824,0.074966,-0.517918,0.640903,-0.085513,-0.949659,-0.230270,-0.008307,1.174966,1.365588
3,-1.010898,-0.791844,-0.616061,0.520959,-0.662048,-0.482205,0.251094,-0.100350,1.017150,0.398991,0.335941,0.214418
4,-1.620887,0.494706,-2.196148,-0.774545,-0.344960,-0.706827,-0.492984,-1.057938,2.888279,1.349355,2.601310,1.365588
...,...,...,...,...,...,...,...,...,...,...,...,...
5192,-0.629656,-0.362994,-0.616061,0.117441,-0.143177,1.090147,0.818011,-0.235698,-0.417383,-0.076191,-0.251377,-0.936751
5193,-0.400910,-1.159430,-0.203864,-0.094937,0.058606,-0.145273,-0.191810,-0.032676,1.890344,-0.619256,-0.251377,1.365588
5194,-0.477159,3.067806,-1.990050,-0.668356,0.087433,0.079349,-1.201630,0.170346,1.890344,0.195342,1.510577,1.365588
5195,0.285327,-0.730580,0.139633,0.903238,-0.604396,-0.257583,0.481404,0.610228,-0.853980,-0.279840,-0.586988,0.214418


In [30]:
trainY

Unnamed: 0,type
5678,1
6051,1
4803,0
4558,0
6126,1
...,...
4505,0
945,0
5319,1
954,0


In [33]:
trainX_scaled.reset_index(drop=True, inplace=True)
trainY.reset_index(drop=True, inplace=True)
X_train = pd.concat([trainX_scaled, trainY], axis=1)


In [34]:
X_train

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,type
0,-0.172164,1.413670,-2.196148,-0.732069,3.863664,-1.043760,-1.254779,0.698204,0.580553,0.466874,-0.922598,0.214418,1
1,1.810298,0.494706,0.208332,-0.710832,0.779261,-0.762982,-1.378792,0.515484,0.081585,0.874173,0.503746,0.214418,1
2,-0.629656,-0.179201,1.238824,0.074966,-0.517918,0.640903,-0.085513,-0.949659,-0.230270,-0.008307,1.174966,1.365588,0
3,-1.010898,-0.791844,-0.616061,0.520959,-0.662048,-0.482205,0.251094,-0.100350,1.017150,0.398991,0.335941,0.214418,0
4,-1.620887,0.494706,-2.196148,-0.774545,-0.344960,-0.706827,-0.492984,-1.057938,2.888279,1.349355,2.601310,1.365588,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...
5192,-0.629656,-0.362994,-0.616061,0.117441,-0.143177,1.090147,0.818011,-0.235698,-0.417383,-0.076191,-0.251377,-0.936751,0
5193,-0.400910,-1.159430,-0.203864,-0.094937,0.058606,-0.145273,-0.191810,-0.032676,1.890344,-0.619256,-0.251377,1.365588,0
5194,-0.477159,3.067806,-1.990050,-0.668356,0.087433,0.079349,-1.201630,0.170346,1.890344,0.195342,1.510577,1.365588,1
5195,0.285327,-0.730580,0.139633,0.903238,-0.604396,-0.257583,0.481404,0.610228,-0.853980,-0.279840,-0.586988,0.214418,0


In [37]:
trainX_scaled.isnull().sum()

0     0
1     0
2     0
3     0
4     0
5     0
6     0
7     0
8     0
9     0
10    0
11    0
dtype: int64

In [14]:
rfc = RandomForestClassifier(n_estimators=100, random_state=42)
# Cross Validation
rf_score = cross_val_score(estimator = rfc,
                               X = trainX_scaled, y= trainY,
                               scoring = 'recall',cv = 10,
                               verbose = 3, n_jobs=-1)
# Fit data training
rfc.fit(trainX_scaled, trainY)
# Predict data test
y_pred = rfc.predict(testX_scaled)
print('Avarage Recall score', np.mean(rf_score))
print('Test Recall score', recall_score(testY, y_pred))

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
[Parallel(n_jobs=-1)]: Done   7 out of  10 | elapsed:    3.9s remaining:    1.7s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    4.5s finished


Avarage Recall score 0.984375
Test Recall score 0.9780564263322884
[CV] END ................................ score: (test=0.984) total time=   0.9s
[CV] END ................................ score: (test=0.961) total time=   0.9s


### Get Data Component

In [2]:
df = pd.read_csv("wines.csv")

In [3]:
from src.preprocess import get_preprocessing

df = get_preprocessing(df)

In [4]:
from src.train_test_splits import get_train_test_splits

X_train, X_test = get_train_test_splits(
    df, 'type', 0.2
)

In [5]:
from src.model_training import training_asn

training_asn(X_train, X_test, 'type')

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=-1)]: Done   7 out of  10 | elapsed:    3.8s remaining:    1.6s
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    4.4s finished


Avarage Recall score 0.9864952380952381
Test Recall score 0.9676470588235294
[CV] END ................................ score: (test=0.984) total time=   0.9s
[CV] END ................................ score: (test=0.992) total time=   0.9s
[CV] END ................................ score: (test=1.000) total time=   0.9s
[CV] END ................................ score: (test=0.992) total time=   0.9s
[CV] END ................................ score: (test=0.992) total time=   0.9s
[CV] END ................................ score: (test=0.952) total time=   0.9s
[CV] END ................................ score: (test=0.976) total time=   0.6s
[CV] END ................................ score: (test=1.000) total time=   0.9s
[CV] END ................................ score: (test=0.992) total time=   0.9s
[CV] END ................................ score: (test=0.984) total time=   0.6s


In [7]:
@component(base_image="europe-docker.pkg.dev/vertex-ai/training/tf-cpu.2-14.py310:latest")
def get_data(filepath: str, input_data: Output[Dataset]):
    import pandas as pd
    import logging

    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)

    
    df_train = pd.read_csv(filepath + '/model_data_20240410.csv')
    logger.info(f"Input Data shape: {df_train.shape}")
    df_train.to_csv(input_data.path, index=False)
    # dataset_train.path = filepath + '/model_data_20240410.csv'

### Preprocessing Component

In [8]:
@component(base_image="europe-docker.pkg.dev/vertex-ai/training/tf-cpu.2-14.py310:latest")
def preprocessing(train_df: Input[Dataset], input_data_preprocessed: Output[Dataset]):
    import pandas as pd
    import numpy as np
    import logging

    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)
    
    df = pd.read_csv(train_df.path)
    df=df[df.asn_number.notnull()]
    df['asn_number'] = df['asn_number'].astype(int)
    df['illegal']=np.where((df.asn_lk_blocked==True) | (df.cidr_block=='yes'),1,0)
    df['asn_country'] = df['asn_country'].fillna('UNK')
    df['asn_type'] = df['asn_type'].fillna('UNK')
    df['resp_type_total']= df.resp_type_txt_xml+df.resp_type_app_xml+df.resp_type_app_oct+df.resp_type_txt_html+df.resp_type_app_dash+df.resp_type_txt_plain+df.resp_type_unk
    df['resp_type_txt_xml_pct'] = df.resp_type_txt_xml/df['resp_type_total']
    df['resp_type_app_xml_pct'] = df.resp_type_app_xml/df['resp_type_total']
    df['resp_type_app_oct_pct'] = df.resp_type_app_oct/df['resp_type_total']
    df['resp_type_txt_html_pct'] = df.resp_type_txt_html/df['resp_type_total']
    df['resp_type_app_dash_pct'] = df.resp_type_app_dash/df['resp_type_total']
    df['resp_type_txt_plain_pct'] = df.resp_type_txt_plain/df['resp_type_total']
    df['resp_type_unk_pct'] = df.resp_type_unk/df['resp_type_total']
    # http_status_code
    df['http_status_total'] = df.http_200+df.http_206+df.http_404+df.http_403+df.http_512+df.http_400+df.http_502+df.http_416+df.http_304+df.http_499+df.http_503+df.http_0+df.http_504+df.http_500+df.http_302
    df['http_200_pct']=df.http_200/df['http_status_total']
    df['http_206_pct']=df.http_206/df['http_status_total']
    df['http_404_pct']=df.http_404/df['http_status_total']
    df['http_403_pct']=df.http_403/df['http_status_total']
    df['http_512_pct']=df.http_512/df['http_status_total']
    df['http_400_pct']=df.http_400/df['http_status_total']
    df['http_502_pct']=df.http_502/df['http_status_total']
    df['http_416_pct']=df.http_416/df['http_status_total']
    df['http_304_pct']=df.http_304/df['http_status_total']
    df['http_499_pct']=df.http_499/df['http_status_total']
    df['http_503_pct']=df.http_503/df['http_status_total']
    df['http_0_pct']=df.http_0/df['http_status_total']
    df['http_504_pct']=df.http_504/df['http_status_total']
    df['http_500_pct']=df.http_500/df['http_status_total']
    df['http_302_pct']=df.http_302/df['http_status_total']
    # cache_status
    df['cache_total']=df.cache_3+df.cache_1+df.cache_0+df.cache_unk
    df['cache_3_pct']=df.cache_3/df['cache_total']
    df['cache_1_pct']=df.cache_1/df['cache_total']
    df['cache_0_pct']=df.cache_0/df['cache_total']
    df['cache_unk_pct']=df.cache_unk/df['cache_total']
    # create URN as a few instances of ASN number are sometime not unique: there can be some with our without c3ri during the same day
    df['urn']=df.index
    # Drop Columns 
    to_drop=['asn_number','asn_lk_blocked','cidr_block','c3ri_present','urn','asn_country_1']
    df.drop(to_drop, axis=1, inplace=True)           
    logger.info(f"Preprocessed Data shape: {df.shape}")
    df.to_csv(input_data_preprocessed.path, index=False)

### Train Test Split Component

In [9]:
@component(
    base_image="europe-docker.pkg.dev/vertex-ai/training/tf-cpu.2-14.py310:latest", packages_to_install=["imblearn"]
)

def train_test_data_split(dataset_in: Input[Dataset],
                     target_column: str,
                     dataset_train: Output[Dataset],
                     dataset_test: Output[Dataset],
                     test_size: float = 0.2):
    
    from sklearn.utils import shuffle
    from imblearn.under_sampling import RandomUnderSampler
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler, OneHotEncoder
    import pandas as pd
    import numpy as np
    import logging

    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)
    
    # Shuffle data
    data = pd.read_csv(dataset_in.path)
    data = shuffle(data, random_state=42)
    X = data.drop(columns=[target_column])
    y = data[target_column]
    
    logger.info(f"train_test_data_split Input data shape before Undersampling : {X.shape}")
    
    class_distribution_before = y.value_counts()
    logger.info(f'Class distribution before undersampling: {class_distribution_before.to_dict()}')

    # Undersampling positive class
    sampler = RandomUnderSampler(sampling_strategy=0.5, random_state=42)
    X_resampled, y_resampled = sampler.fit_resample(X, y)
    
    logger.info(f"train_test_data_split Input data shape after Undersampling: {X_resampled.shape}")
   
    class_distribution_after = y_resampled.value_counts()
    logger.info(f'Class distribution after undersampling: {class_distribution_after.to_dict()}')
    
    
    # Split data into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X_resampled, y_resampled, test_size=test_size, random_state=42)
    logger.info(f"Shapes after train_test_split X_train shape: {X_train.shape}, X_test shape: {X_test.shape}, y_train shape: {y_train.shape}, y_test shape: {y_test.shape}")
    # Standardize numeric features
    numeric_features = X_train.select_dtypes(include=[np.number]).columns.tolist()
    scaler = StandardScaler()
    X_train[numeric_features] = scaler.fit_transform(X_train[numeric_features])
    X_test[numeric_features] = scaler.transform(X_test[numeric_features])

    # One-hot encode categorical features
    categorical_features = X_train.select_dtypes(exclude=[np.number]).columns.tolist()
    encoder = OneHotEncoder(handle_unknown='ignore')
    X_train_encoded = pd.DataFrame(encoder.fit_transform(X_train[categorical_features]).toarray())
    X_test_encoded = pd.DataFrame(encoder.transform(X_test[categorical_features]).toarray())
    X_train.reset_index(drop=True, inplace=True)
    X_train_encoded.reset_index(drop=True, inplace=True)
    X_train = pd.concat([X_train.drop(columns=categorical_features), X_train_encoded], axis=1)
    X_test.reset_index(drop=True, inplace=True)
    X_test_encoded.reset_index(drop=True, inplace=True)
    X_test = pd.concat([X_test.drop(columns=categorical_features), X_test_encoded], axis=1)
    X_train = X_train.reset_index(drop=True)
    y_train = y_train.reset_index(drop=True)
    X_test = X_test.reset_index(drop=True)
    y_test = y_test.reset_index(drop=True)
    
    X_train = pd.concat([X_train,y_train], axis=1)
    X_test = pd.concat([X_test,y_test], axis=1)
    logger.info(f"Shapes after Catogorical encoding X_train shape: {X_train.shape}, X_test shape: {X_test.shape}, y_train shape: {y_train.shape}, y_test shape: {y_test.shape}")
    logger.info(f"X_train columns: {list(X_train.columns)}")
    logger.info(f"X_test columns: {list(X_test.columns)}")

    X_train.to_csv(dataset_train.path, index=False)
    X_test.to_csv(dataset_test.path, index=False)

### Training Component

In [16]:
@component(
    base_image="europe-docker.pkg.dev/vertex-ai/training/tf-cpu.2-14.py310:latest"
)
def training(
    dataset_train: Input[Dataset],
    dataset_test: Input[Dataset],
    target: str,
    best_params: dict,
    metrics: Output[Metrics],
    param_artifact: Output[Artifact],
    ml_model: Output[Model]):
    
    import pandas as pd
    from sklearn.metrics import confusion_matrix, classification_report,accuracy_score, precision_score, recall_score, f1_score
    import tensorflow as tf
    import json
    from tensorflow.keras.layers import Dense, Input as Ip, Concatenate
    from tensorflow.keras.models import Model

    from sklearn.metrics import confusion_matrix, accuracy_score
    from keras.models import Sequential
    from keras.optimizers import Adam
    import logging
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)
    
    def create_model(X_train, params):
        # Use a single input layer
        input_shape=X_train.shape[1]
        input_layer = Ip(shape=(input_shape,))
        hidden_layers = []
        # Create hidden layers from given units
        x = input_layer
        for units in params['hidden_units']:
            x = Dense(units, activation=params['activation'])(x)
            hidden_layers.append(x)

        # Concatenate the outputs of hidden layers (if necessary)
        if len(hidden_layers) > 1:
            merge_layer = Concatenate()(hidden_layers)
        else:
            merge_layer = hidden_layers[0]

        output_layer = Dense(1, activation='sigmoid')(merge_layer)

        model = Model(inputs=input_layer, outputs=output_layer)
        model.compile(optimizer=params['optimizer'], loss='binary_crossentropy', metrics=['accuracy'])
        return model
    
    X_train = pd.read_csv(dataset_train.path)
    X_test = pd.read_csv(dataset_test.path)
    y_train = X_train[target]
    y_test =  X_test[target]
    X_train = X_train.drop(target, axis=1)
    X_test = X_test.drop(target, axis=1)
    logger.info(f"Training data:: X_train: {X_train.shape}, X_test: {X_test.shape}, y_train: {y_train.shape}, y_test: {y_test.shape}")
    logger.info(f"X_train columns: {list(X_train.columns)}")
    logger.info(f"X_test columns: {list(X_test.columns)}")
    asn_model = create_model(X_train, best_params)
    history = asn_model.fit([X_train] * len(best_params['hidden_units']), y_train, epochs=best_params['epochs'], 
                        batch_size=best_params['batch_size'], verbose=0)

    loss, acc = asn_model.evaluate([X_test] * len(best_params['hidden_units']), y_test, verbose=0)
    train_loss = history.history['loss'][-1] # Using training loss
    y_pred_prob = asn_model.predict(X_test)
    y_pred = (y_pred_prob > 0.5).astype(int)
    
    # Save model
    asn_model.save(ml_model.path)
    
    # Log param
    with open(param_artifact.path, 'w') as f:
        json.dump(best_params, f)
    
    # Save metrics
    accuracy = round(accuracy_score(y_test, y_pred), 2)
    precision = round(precision_score(y_test, y_pred), 2)
    recall = round(recall_score(y_test, y_pred), 2)
    f1 = round(f1_score(y_test, y_pred), 2)
    metrics.log_metric("train_loss",  float(round(train_loss,2)))
    metrics.log_metric("loss",  float(round(loss,2)))
    metrics.log_metric("accuracy",  float(accuracy))
    metrics.log_metric("precision",  float(precision))
    metrics.log_metric("recall",  float(recall))
    metrics.log_metric("f1",  float(f1))
    
    

### Deploy Component

In [17]:
@component(base_image="europe-docker.pkg.dev/vertex-ai/training/tf-cpu.2-14.py310:latest")
def deploy_model(ml_model: Input[Model]):
    from google.cloud.aiplatform import Model

    model = Model.upload(
        display_name="ASN_model_demo",
        artifact_uri=ml_model.path,
        serving_container_image_uri="europe-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-13:latest"
    )

### Pipeline Defination

In [18]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name="pipeline-asn"
    
)
def pipeline(
    data_filepath: str = f"{BUCKET_NAME}/data",
    project: str = PROJECT_ID,
    region: str = REGION):

    data_op = get_data(filepath=data_filepath)
    data_preprocess_op = preprocessing(train_df=data_op.outputs["input_data"])
    train_test_split_op = train_test_data_split(dataset_in=data_preprocess_op.outputs["input_data_preprocessed"], 
                                           target_column='illegal', test_size=0.2)
    train_model_op  = training(dataset_train=train_test_split_op.outputs["dataset_train"], 
                               dataset_test=train_test_split_op.outputs["dataset_test"],
                               target='illegal',
                               best_params= {
                                'learning_rate': 0.00052,
                                'hidden_units': [64],
                                'batch_size': 32,
                                'activation': 'relu',
                                'optimizer': 'adam',
                                'epochs': 30
                                })       
    deploy_model_op = deploy_model(ml_model=train_model_op.outputs["ml_model"])


In [19]:
{'activation': 0, 'batch_size': 1, 'epochs': 2, 'hidden_units': 2, 'l2_lambda': 0.0019446327983430817, 'learning_rate': 0.0005253293784967611, 'lr_decay': 1, 'optimizer': 0}

{'activation': 0,
 'batch_size': 1,
 'epochs': 2,
 'hidden_units': 2,
 'l2_lambda': 0.0019446327983430817,
 'learning_rate': 0.0005253293784967611,
 'lr_decay': 1,
 'optimizer': 0}

### Compile Pipeline

In [20]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="kfp_asn_pipeline.json")

### Run Kubeflow Pipeline

In [21]:
from google.cloud.aiplatform import PipelineJob

# Parameters
project_id = PROJECT_ID
bucket_name = BUCKET_NAME
region = REGION

pipeline_job = PipelineJob(
    display_name="kfp_asn_pipeline",
    template_path="kfp_asn_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    location=region,
)
pipeline_job.run()

Creating PipelineJob


2024-06-20 14:15:28,862 - google.cloud.aiplatform.pipeline_jobs - INFO - Creating PipelineJob


PipelineJob created. Resource name: projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528


2024-06-20 14:15:29,042 - google.cloud.aiplatform.pipeline_jobs - INFO - PipelineJob created. Resource name: projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528


To use this PipelineJob in another session:


2024-06-20 14:15:29,044 - google.cloud.aiplatform.pipeline_jobs - INFO - To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528')


2024-06-20 14:15:29,046 - google.cloud.aiplatform.pipeline_jobs - INFO - pipeline_job = aiplatform.PipelineJob.get('projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west1/pipelines/runs/pipeline-asn-20240620141528?project=213412891337


2024-06-20 14:15:29,047 - google.cloud.aiplatform.pipeline_jobs - INFO - View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west1/pipelines/runs/pipeline-asn-20240620141528?project=213412891337


PipelineJob projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528 current state:
PipelineState.PIPELINE_STATE_RUNNING


2024-06-20 14:15:34,308 - google.cloud.aiplatform.pipeline_jobs - INFO - PipelineJob projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528 current state:
PipelineState.PIPELINE_STATE_RUNNING


2024-06-20 14:15:44,685 - google.cloud.aiplatform.pipeline_jobs - INFO - PipelineJob projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528 current state:
PipelineState.PIPELINE_STATE_RUNNING


2024-06-20 14:16:05,337 - google.cloud.aiplatform.pipeline_jobs - INFO - PipelineJob projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528 current state:
PipelineState.PIPELINE_STATE_RUNNING


2024-06-20 14:16:46,763 - google.cloud.aiplatform.pipeline_jobs - INFO - PipelineJob projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528 current state:
PipelineState.PIPELINE_STATE_RUNNING


2024-06-20 14:18:09,669 - google.cloud.aiplatform.pipeline_jobs - INFO - PipelineJob projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob run completed. Resource name: projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528


2024-06-20 14:19:42,932 - google.cloud.aiplatform.pipeline_jobs - INFO - PipelineJob run completed. Resource name: projects/213412891337/locations/europe-west1/pipelineJobs/pipeline-asn-20240620141528
