In [2]:
import kfp
from kfp import dsl
from kfp import compiler
from kfp.dsl import (component, pipeline, Input, Output, Dataset, Model, Artifact, Metrics)

@component
def ingest(raw_data: Output[Dataset]):
    import subprocess
    subprocess.run(['pip', 'install', 'google-cloud-storage', 'pandas'], check=True)
    
    from google.cloud import storage
    from io import BytesIO
    import pandas as pd

    storage1 = storage.Client()
    bucket = storage1.bucket("final_demo2_blackfriday")
    blob = bucket.blob("train.csv")
    black_friday_data = blob.download_as_bytes()

    black_friday_df = pd.read_csv(BytesIO(black_friday_data))
    black_friday_df.to_csv(raw_data.path, index=False)

@component
def preprocessing(
    raw_data: Input[Dataset],
    preprocessed_df: Output[Dataset]
):
    import subprocess
    subprocess.run(['pip', 'install', 'pandas'], check=True)
    
    """Component for data preprocessing, including dropping User_ID and Product_ID."""
    import pandas as pd
    
    # Load data
    black_friday_df = pd.read_csv(raw_data.path)
    
    # Handle missing values before encoding
    black_friday_df['Product_Category_2'] = black_friday_df['Product_Category_2'].fillna(0)
    black_friday_df['Product_Category_3'] = black_friday_df['Product_Category_3'].fillna(0)
       
    # Drop columns not needed for demographic analysis
    black_friday_df.drop(['User_ID', 'Product_ID'], axis=1, inplace=True)
    
    # Save preprocessed data to a CSV file
    black_friday_df.to_csv(preprocessed_df.path, index=False)

@component
def feature_engineering(
    preprocessed_df: Input[Dataset],
    engineered_df: Output[Dataset]
):
    import subprocess
    subprocess.run(['pip', 'install', 'pandas', 'scikit-learn'], check=True)
  
    
    """Component for feature engineering using encoding strategies."""
    
    import pandas as pd
    
    # Load data
    black_friday_df = pd.read_csv(preprocessed_df.path)
    
    # Encode categorical columns
    black_friday_df['Gender'] = black_friday_df['Gender'].replace({'F': 0, 'M': 1}).astype('int64')
    
    age_mapping = {'0-17': 0, '18-25': 1, '26-35': 2, '36-45': 3, '46-50': 4, '51-55': 5, '55+': 6}
    black_friday_df['Age'] = black_friday_df['Age'].map(age_mapping).astype('int64')
    
    city_category_mapping = {'A': 0, 'B': 1, 'C': 2}
    black_friday_df['City_Category'] = black_friday_df['City_Category'].replace(city_category_mapping).astype('int64')
    
    # Removing '+' sign from the 'Stay_In_Current_City_Years' and converting to int64
    black_friday_df['Stay_In_Current_City_Years'] = black_friday_df['Stay_In_Current_City_Years'].str.replace('+', '').astype('int64')

    # Convert relevant columns to int64
    columns_to_int = ['Occupation', 'Marital_Status', 'Product_Category_1', 'Product_Category_2', 'Product_Category_3']
    for column in columns_to_int:
        black_friday_df[column] = black_friday_df[column].astype('int64')
    
    # Save engineered data
    black_friday_df.to_csv(engineered_df.path, index=False)

@component
def feature_selection(
    engineered_df: Input[Dataset],
    finalized_features: Output[Dataset]
):
    import subprocess
    subprocess.run(['pip', 'install', 'pandas','scikit-learn','google-cloud-storage'], check=True)

    
    import pandas as pd
    from sklearn.feature_selection import SelectKBest, chi2
    import json 
    from google.cloud import storage

    black_friday_df = pd.read_csv(engineered_df.path)
    X = black_friday_df.drop('Purchase', axis=1)
    y = black_friday_df['Purchase']

    selector = SelectKBest(score_func=chi2, k=9)
    selector.fit(X, y)
    selected_indices = selector.get_support(indices=True)
    selected_feature_names = X.columns[selected_indices]
    local_model_path = "selected_features_names.json"
     # Save the selected features to a JSON file
    with open(local_model_path, 'w') as f:
        json.dump(selected_feature_names.tolist(), f)  # Convert to list before dumping
    
    # Upload the model to GCS
    client = storage.Client()
    bucket = client.bucket('final_demo2_blackfriday')
    blob = bucket.blob('selected_features_names.json')
    blob.upload_from_filename(local_model_path) 
    
    selected_features = list(selected_feature_names) + ['Purchase']
    
    black_friday_df = black_friday_df[selected_features]
    black_friday_df.to_csv(finalized_features.path, index=False)

@component
def train_validation_test_split(
    finalized_features: Input[Dataset],
    train_data: Output[Dataset],
    validation_data: Output[Dataset],
    test_data: Output[Dataset]
):
    import subprocess
    subprocess.run(['pip', 'install', 'pandas', 'scikit-learn'], check=True)

    
    import pandas as pd
    from sklearn.model_selection import train_test_split as sk_train_test_split

    # Load the data
    black_friday_df = pd.read_csv(finalized_features.path)

    # Split the data into train and test sets
    train_df, test_df = sk_train_test_split(black_friday_df, test_size=0.1, random_state=42)
    
    # Further split the train data into train and validation sets
    train_df, validation_df = sk_train_test_split(train_df, test_size=0.2, random_state=42)
    
    # Save the train, validation, and test sets to CSV files
    train_df.to_csv(train_data.path, index=False)
    validation_df.to_csv(validation_data.path, index=False)
    test_df.to_csv(test_data.path, index=False)

@component
def hyperparameter_tuning(
    validation_data: Input[Dataset],
    best_params: Output[Artifact]
):
    import subprocess
    subprocess.run(['pip', 'install', 'pandas', 'scikit-learn', 'xgboost', 'optuna'], check=True)

    import optuna
    import pandas as pd
    import xgboost as xgb
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import mean_squared_error
    from functools import partial
    import json

    validation_df = pd.read_csv(validation_data.path)
    X_val = validation_df.drop('Purchase', axis=1)
    y_val = validation_df['Purchase']

    def objective(trial, X, y):
        params = {
            "objective": "reg:squarederror",
            "booster": trial.suggest_categorical("booster", ["gbtree", "gblinear", "dart"]),
            "lambda": trial.suggest_loguniform("lambda", 1e-8, 1.0),
            "alpha": trial.suggest_loguniform("alpha", 1e-8, 1.0),
            "max_depth": trial.suggest_int("max_depth", 3, 9),
            "eta": trial.suggest_loguniform("eta", 1e-8, 1.0),
            "gamma": trial.suggest_loguniform("gamma", 1e-8, 1.0),
            "grow_policy": trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"]),
            "subsample": trial.suggest_float("subsample", 0.5, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
            "colsample_bylevel": trial.suggest_float("colsample_bylevel", 0.5, 1.0),
            "colsample_bynode": trial.suggest_float("colsample_bynode", 0.5, 1.0),
            "min_child_weight": trial.suggest_loguniform("min_child_weight", 1e-8, 1.0),
            "max_delta_step": trial.suggest_int("max_delta_step", 0, 10),
            "scale_pos_weight": trial.suggest_loguniform("scale_pos_weight", 1e-8, 1.0),
            "n_estimators": trial.suggest_int("n_estimators", 100, 1000),
            "sampling_method": trial.suggest_categorical("sampling_method", ["uniform", "gradient_based"])
        }

        model = xgb.XGBRegressor(**params)
        model.fit(X, y)
        y_pred = model.predict(X)
        rmse = mean_squared_error(y, y_pred, squared=False)
        return rmse

    study = optuna.create_study(direction="minimize")
    study.optimize(partial(objective, X=X_val, y=y_val), n_trials=2)
    best_params_dict = study.best_params

    best_params_path = best_params.path + ".json"
    with open(best_params_path, 'w') as f:
        json.dump(best_params_dict, f)
        
@component
def model_building(
    train_data: Input[Dataset],
    best_params: Input[Artifact],
    model_output: Output[Model]
):
    import subprocess
    subprocess.run(['pip', 'install', 'pandas','scikit-learn', 'xgboost', 'joblib', 'google-cloud-storage'], check=True)
    
    import pandas as pd
    import xgboost as xgb
    import joblib
    import json
    from google.cloud import storage

    # Load the train data
    train_df = pd.read_csv(train_data.path)
    X_train = train_df.drop('Purchase', axis=1)
    y_train = train_df['Purchase']

    # Load best parameters
    best_params_path = best_params.path + ".json"
    with open(best_params_path, 'r') as f:
        best_params_dict = json.load(f)

    # Train the model using best parameters
    model = xgb.XGBRegressor(**best_params_dict)
    model.fit(X_train, y_train)

    # Save the model
    local_model_path = "model.joblib"
    joblib.dump(model, local_model_path)
    
    # Upload the model to GCS
    client = storage.Client()
    bucket = client.bucket('final_demo2_blackfriday')
    blob = bucket.blob('model.joblib')
    blob.upload_from_filename(local_model_path) 
    
     # Output the model artifact
    model_output.uri = f"gs://{bucket.name}/model.joblib"
        

@component
def upload_model_to_vertex_ai(
    model_output: Input[Model],
    project_id: str,
    region: str,
    display_name: str,
    serving_image: str,
    parent_model: str
):
    import subprocess
    subprocess.run(['pip', 'install', 'google-cloud-aiplatform','google-cloud-storage'], check=True)
    import os
    from google.cloud import storage
    from google.cloud import aiplatform
    
    import os
    from google.cloud import storage
    from google.cloud import aiplatform
 
    # Define the GCS bucket and file details
    model_gcs_uri = model_output.uri
    storage_client = storage.Client()

    # Upload model to Vertex AI Model Registry
    aiplatform.init(project=project_id, location=region)
    model = aiplatform.Model.upload(
        display_name=display_name,
        artifact_uri=os.path.dirname(model_gcs_uri),
        serving_container_image_uri=serving_image,
        parent_model=parent_model if parent_model.lower() != 'none' else None
        
    )

@component
def model_evaluation(
    train_data: Input[Dataset],
    test_data: Input[Dataset],
    model_output: Input[Model],
    metrics: Output[Metrics]
):
    import subprocess
    subprocess.run(['pip', 'install', 'pandas', 'xgboost', 'joblib', 'scikit-learn', 'google-cloud-storage'], check=True)

    
    import pandas as pd
    import xgboost as xgb
    import joblib
    from sklearn.metrics import mean_squared_error, r2_score
    from google.cloud import storage
    import os

    # Load the train and test data
    train_df = pd.read_csv(train_data.path)
    X_train = train_df.drop('Purchase', axis=1)
    y_train = train_df['Purchase']
    
    test_df = pd.read_csv(test_data.path)
    X_test = test_df.drop('Purchase', axis=1)
    y_test = test_df['Purchase']

    # Load the model from GCS
    client = storage.Client()
    bucket = client.bucket('final_demo2_blackfriday')
    blob = bucket.blob('model.joblib')
    local_model_path = "/tmp/model.joblib"
    
    # Create the directory if it doesn't exist
    os.makedirs(os.path.dirname(local_model_path), exist_ok=True)
    
    # Download the model
    blob.download_to_filename(local_model_path)
    
    # Load the model
    model = joblib.load(local_model_path)
    
    # Predict and evaluate on train data
    y_train_pred = model.predict(X_train)
    train_rmse = mean_squared_error(y_train, y_train_pred, squared=False)
    train_r2 = r2_score(y_train, y_train_pred)

    # Predict and evaluate on test data
    y_test_pred = model.predict(X_test)
    test_rmse = mean_squared_error(y_test, y_test_pred, squared=False)
    test_r2 = r2_score(y_test, y_test_pred)
    
    # Log metrics
    metrics.log_metric("Train RMSE", train_rmse)
    metrics.log_metric("Train R2", train_r2)
    metrics.log_metric("Test RMSE", test_rmse)
    metrics.log_metric("Test R2", test_r2)

@pipeline(
    name='black_friday_sales_model_training_pipeline',
    description='A pipeline that processes data from Black Friday sales and builds a predictive model.' ,
)
def black_friday_sales_model_training_pipeline(
    project_id: str,
    region: str,
    display_name: str,
    serving_image: str,
    parent_model: str
):
    ingest_task = ingest()
    preprocessed_data = preprocessing(raw_data=ingest_task.output)
    engineered_data = feature_engineering(preprocessed_df=preprocessed_data.output)
    selected_features_data = feature_selection(engineered_df=engineered_data.output)
    
    split_data = train_validation_test_split(
        finalized_features=selected_features_data.output
    )
    
    tuned_params = hyperparameter_tuning(
        validation_data=split_data.outputs['validation_data']
    )
    
    trained_model = model_building(
        train_data=split_data.outputs['train_data'],
        best_params=tuned_params.output
    )
    
     # Upload model to Vertex AI Model Registry
    upload_model_task = upload_model_to_vertex_ai(
        model_output=trained_model.outputs['model_output'],
        project_id=project_id,
        region=region,
        display_name=display_name,
        serving_image=serving_image,
        parent_model=parent_model 
    )
    
    model_evaluation(
        train_data=split_data.outputs['train_data'],
        test_data=split_data.outputs['test_data'],
        model_output=trained_model.output
    )

# Compile the updated pipeline
kfp.compiler.Compiler().compile(black_friday_sales_model_training_pipeline, 'black_friday_sales_model_training_pipeline.yaml')

In [3]:
from google.cloud import aiplatform

# Initialize the Vertex AI client
aiplatform.init(project='brldi-gcpcapabilities-ai-audit', location='us-central1')

# Create a pipeline job with all required parameters
job = aiplatform.PipelineJob(
    display_name="black_friday_sales_model_training_pipeline",
    template_path='black_friday_sales_model_training_pipeline.yaml',
    pipeline_root='gs://final_demo2_blackfriday/root',
    parameter_values={
        'project_id': 'brldi-gcpcapabilities-ai-audit',
        'region': 'us-central1',
        'display_name': 'black_friday_model',
        'serving_image':'us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-6:latest',
        'parent_model': 'None'
    },
    enable_caching=True
)

# Run the pipeline job
job.run()

Creating PipelineJob


InvalidArgument: 400 You do not have permission to act as service_account: 971203737354-compute@developer.gserviceaccount.com. (or it may not exist).