### Installation
Install the packages required for executing this notebook.

In [None]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

## Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Check the versions of the packages you installed. The KFP SDK version should be >2.

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 2.14.4
google-cloud-aiplatform==1.118.0
google_cloud_pipeline_components version: 2.21.0


In [2]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.types import artifact_types

#### Project and Pipeline Configurations

In [None]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "agile-producer-471907-s7"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://temp_spotify_2038050"

#### Pipeline Component : Data Ingestion

In [None]:
@dsl.component(
    packages_to_install=["pandas", "google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def download_data(project_id: str, bucket: str, file_name: str, dataset: Output[Dataset]):
    '''Download raw data from GCS'''
    from google.cloud import storage
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket)
    blob = bucket.blob(file_name)
    blob.download_to_filename(dataset.path + ".csv")
    logging.info(f'Downloaded {file_name} from gs://{bucket}/')

#### Pipeline Component: Train-Test Split

In [None]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2', 'joblib'],
    base_image="python:3.10.7-slim"
)
def preprocess_and_split(
    raw_data: Input[Dataset],
    train_data: Output[Dataset],
    test_data: Output[Dataset],
    scaler_out: Output[Model]
):
    '''Preprocess data and split into train/test sets'''
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    import joblib
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # 1. Load raw data
    df_raw = pd.read_csv(raw_data.path + ".csv")
    logging.info(f"Raw data loaded: {df_raw.shape}")
    
    # 2. Drop unnecessary columns
    drop_cols = ["user_id", "offline_listening"]
    df_model = df_raw.drop(columns=[c for c in drop_cols if c in df_raw.columns])
    
    # 3. Select only numerical features
    selected_features = [
        "age",
        "listening_time",
        "songs_played_per_day",
        "skip_rate",
        "ads_listened_per_week"
    ]
    target_col = "is_churned"
    
    X = df_model[selected_features]
    y = df_model[target_col]
    
    # 4. Train/Test Split (80/20)
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    logging.info(f"Train size: {X_train.shape}, Test size: {X_test.shape}")
    
    # 5. Fit scaler on training data only
    scaler = StandardScaler()
    X_train_scaled = pd.DataFrame(
        scaler.fit_transform(X_train),
        columns=selected_features
    )
    X_test_scaled = pd.DataFrame(
        scaler.transform(X_test),
        columns=selected_features
    )
    
    # 6. Save train data
    train_df = X_train_scaled.copy()
    train_df[target_col] = y_train.values
    train_df.to_csv(train_data.path + ".csv", index=False)
    logging.info("Train data saved")
    
    # 7. Save test data
    test_df = X_test_scaled.copy()
    test_df[target_col] = y_test.values
    test_df.to_csv(test_data.path + ".csv", index=False)
    logging.info("Test data saved")
    
    # 8. Save scaler
    scaler_out.metadata["file_type"] = ".pkl"
    scaler_out.metadata["algo"] = "scaler"
    joblib.dump(scaler, scaler_out.path + ".pkl")
    logging.info("Scaler saved")

#### Pipeline Component : Training-RF

In [None]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2', 'joblib'],
    base_image="python:3.10.7-slim"
)
def train_rf(
    train_data: Input[Dataset],
    model_out: Output[Model]
) -> NamedTuple('outputs', metrics=dict):
    '''Train Random Forest model'''
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score, f1_score
    import joblib
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Load train data
    df_train = pd.read_csv(train_data.path + ".csv")
    X_train = df_train.drop(columns=["is_churned"])
    y_train = df_train["is_churned"]
    
    logging.info(f"Training RF on {X_train.shape[0]} samples")
    
    # Train model
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    
    # Evaluate on training data (for comparison)
    y_pred = model.predict(X_train)
    train_accuracy = accuracy_score(y_train, y_pred)
    train_f1 = f1_score(y_train, y_pred)
    
    metrics_dict = {
        "train_accuracy": float(train_accuracy),
        "train_f1_score": float(train_f1)
    }
    logging.info(f"RF Training Metrics: {metrics_dict}")
    
    # Save model
    model_out.metadata["file_type"] = ".pkl"
    model_out.metadata["algo"] = "best_model"
    joblib.dump(model, model_out.path + ".pkl")
    
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

#### Pipeline Component : Training LogisticRegression

In [None]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2', 'joblib'],
    base_image="python:3.10.7-slim"
)
def train_lr(
    train_data: Input[Dataset],
    model_out: Output[Model]
) -> NamedTuple('outputs', metrics=dict):
    '''Train Logistic Regression model'''
    import pandas as pd
    from sklearn.linear_model import LogisticRegression
    from sklearn.metrics import accuracy_score, f1_score
    import joblib
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Load train data
    df_train = pd.read_csv(train_data.path + ".csv")
    X_train = df_train.drop(columns=["is_churned"])
    y_train = df_train["is_churned"]
    
    logging.info(f"Training LR on {X_train.shape[0]} samples")
    
    # Train model
    model = LogisticRegression(max_iter=500, class_weight="balanced")
    model.fit(X_train, y_train)
    
    # Evaluate on training data (for comparison)
    y_pred = model.predict(X_train)
    train_accuracy = accuracy_score(y_train, y_pred)
    train_f1 = f1_score(y_train, y_pred)
    
    metrics_dict = {
        "train_accuracy": float(train_accuracy),
        "train_f1_score": float(train_f1)
    }
    logging.info(f"LR Training Metrics: {metrics_dict}")
    
    # Save model
    model_out.metadata["file_type"] = ".pkl"
    model_out.metadata["algo"] = "best_model"
    joblib.dump(model, model_out.path + ".pkl")
    
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

#### Pipeline Component : Prediction-RF

In [None]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2', 'joblib'],
    base_image="python:3.10.7-slim"
)
def predict_rf(
    model: Input[Model],
    test_data: Input[Dataset],
    predictions: Output[Dataset]
) -> NamedTuple('outputs', metrics=dict):
    '''Make predictions with RF and evaluate'''
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score, f1_score, classification_report
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Load test data
    df_test = pd.read_csv(test_data.path + ".csv")
    X_test = df_test.drop(columns=["is_churned"])
    y_test = df_test["is_churned"]
    
    # Load model
    model_rf = joblib.load(model.path + ".pkl")
    
    # Predict
    y_pred = model_rf.predict(X_test)
    
    # Metrics
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    
    metrics_dict = {
        "test_accuracy": float(accuracy),
        "test_f1_score": float(f1)
    }
    logging.info(f"RF Test Metrics: {metrics_dict}")
    logging.info(f"\n{classification_report(y_test, y_pred)}")
    
    # Save predictions
    result_df = df_test.copy()
    result_df['prediction'] = y_pred
    result_df.to_csv(predictions.path + ".csv", index=False)
    
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

#### Pipeline Component : Prediction-LR

In [None]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2', 'joblib'],
    base_image="python:3.10.7-slim"
)
def predict_lr(
    model: Input[Model],
    test_data: Input[Dataset],
    predictions: Output[Dataset]
) -> NamedTuple('outputs', metrics=dict):
    '''Make predictions with LR and evaluate'''
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score, f1_score, classification_report
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Load test data
    df_test = pd.read_csv(test_data.path + ".csv")
    X_test = df_test.drop(columns=["is_churned"])
    y_test = df_test["is_churned"]
    
    # Load model
    model_lr = joblib.load(model.path + ".pkl")
    
    # Predict
    y_pred = model_lr.predict(X_test)
    
    # Metrics
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    
    metrics_dict = {
        "test_accuracy": float(accuracy),
        "test_f1_score": float(f1)
    }
    logging.info(f"LR Test Metrics: {metrics_dict}")
    logging.info(f"\n{classification_report(y_test, y_pred)}")
    
    # Save predictions
    result_df = df_test.copy()
    result_df['prediction'] = y_pred
    result_df.to_csv(predictions.path + ".csv", index=False)
    
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

#### Pipeline Component : Algorithm Selection 

In [None]:
@dsl.component(
    base_image="python:3.10.7-slim"
)
def compare_models(rf_metrics: dict, lr_metrics: dict) -> str:
    '''Compare models and select winner'''
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    logging.info(f"RF Metrics: {rf_metrics}")
    logging.info(f"LR Metrics: {lr_metrics}")
    
    # Compare based on test F1-score
    rf_f1 = rf_metrics.get("test_f1_score", 0)
    lr_f1 = lr_metrics.get("test_f1_score", 0)
    
    if rf_f1 > lr_f1:
        logging.info(f"Winner: Random Forest (F1={rf_f1:.3f})")
        return "RF"
    else:
        logging.info(f"Winner: Logistic Regression (F1={lr_f1:.3f})")
        return "LR"

### Download Production Model from GCS

In [None]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def download_production_model(
    project_id: str, 
    bucket_name: str,
    production_model: Output[Model]
) -> str:
    '''Download current production model from GCS (if exists)'''
    from google.cloud import storage
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket_name)
    
    # Check if production model exists
    blob = bucket.blob("best_model.pkl")
    
    if not blob.exists():
        logging.warning("No production model found. This is the first deployment.")
        return "NO_BASELINE"
    
    # Download production model
    blob.download_to_filename(production_model.path + ".pkl")
    production_model.metadata["file_type"] = ".pkl"
    production_model.metadata["algo"] = "production_model"
    
    logging.info(f"✅ Downloaded production model from gs://{bucket_name}/best_model.pkl")
    return "BASELINE_EXISTS"

### Evaluate Production Model

In [None]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2', 'joblib'],
    base_image="python:3.10.7-slim"
)
def evaluate_production_model(
    model: Input[Model],
    test_data: Input[Dataset]
) -> NamedTuple('outputs', metrics=dict):
    '''Evaluate production model on test set'''
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score, f1_score, classification_report
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Load test data
    df_test = pd.read_csv(test_data.path + ".csv")
    X_test = df_test.drop(columns=["is_churned"])
    y_test = df_test["is_churned"]
    
    # Load model
    production_model = joblib.load(model.path + ".pkl")
    
    # Predict
    y_pred = production_model.predict(X_test)
    
    # Metrics
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    
    metrics_dict = {
        "test_accuracy": float(accuracy),
        "test_f1_score": float(f1)
    }
    logging.info(f"Production Model Test Metrics: {metrics_dict}")
    logging.info(f"\n{classification_report(y_test, y_pred)}")
    
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

### Compare Winner vs Production Baseline

In [None]:
@dsl.component(
    base_image="python:3.10.7-slim"
)
def compare_with_baseline(
    new_model_metrics: dict,
    production_metrics: dict,
    baseline_status: str,
    metric_name: str = "test_f1_score"
) -> str:
    '''
    Compare new model against production baseline.
    Returns "DEPLOY" or "REJECT"
    '''
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # If no baseline exists, deploy new model
    if baseline_status == "NO_BASELINE":
        logging.info("DECISION: DEPLOY - No baseline model exists (first deployment)")
        return "DEPLOY"
    
    # Extract scores
    production_score = production_metrics.get(metric_name, 0)
    new_score = new_model_metrics.get(metric_name, 0)
    
    improvement = new_score - production_score
    improvement_pct = (improvement / production_score * 100) if production_score > 0 else 0
    
    # Minimum acceptable: 98% of production (allow 2% degradation)
    min_acceptable = 0.98 * production_score
    
    logging.info("=" * 60)
    logging.info("BASELINE COMPARISON")
    logging.info("=" * 60)
    logging.info(f"Production Model {metric_name}: {production_score:.4f}")
    logging.info(f"New Model {metric_name}: {new_score:.4f}")
    logging.info(f"Improvement: {improvement:+.4f} ({improvement_pct:+.2f}%)")
    logging.info("=" * 60)
    
    # Decision: new model must be >= production model
    if new_score >= min_acceptable:
        logging.info("DECISION: DEPLOY")
        logging.info("   New model performs equal to or better than production")
        return "DEPLOY"
    else:
        logging.warning("DECISION: REJECT")
        logging.warning("   New model worse than production baseline")
        logging.warning(f"   Required: >= {production_score:.4f}")
        logging.warning(f"   Achieved: {new_score:.4f}")
        return "REJECT"

### Upload Model and Metrics to Google Bucket 

In [None]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_to_gcs(project_id: str, bucket_name: str, artifact: Input[Model]):
    '''Upload artifact to GCS'''
    from google.cloud import storage
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket_name)
    
    file_name = artifact.metadata["algo"] + artifact.metadata["file_type"]
    blob = bucket.blob(file_name)
    blob.upload_from_filename(artifact.path + artifact.metadata["file_type"])
    
    logging.info(f"Uploaded {file_name} to gs://{bucket_name}/")

#### Define the Pipeline

In [None]:
@kfp.dsl.pipeline(name="spotify-churn-training-pipeline")
def pipeline(
    project_id: str,
    data_bucket: str,
    dataset_filename: str,
    model_repo: str
):
    # Step 1: Download raw data
    download_op = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=dataset_filename
    )
    
    # Step 2: Preprocess and split data
    split_op = preprocess_and_split(
        raw_data=download_op.outputs["dataset"]
    )
    
    # Step 3: Train both models in parallel
    train_rf_op = train_rf(
        train_data=split_op.outputs["train_data"]
    )
    
    train_lr_op = train_lr(
        train_data=split_op.outputs["train_data"]
    )
    
    # Step 4: Predict with both models on test data
    predict_rf_op = predict_rf(
        model=train_rf_op.outputs["model_out"],
        test_data=split_op.outputs["test_data"]
    )
    
    predict_lr_op = predict_lr(
        model=train_lr_op.outputs["model_out"],
        test_data=split_op.outputs["test_data"]
    )
    
    # Step 5: Compare RF vs LR models based on test performance
    compare_op = compare_models(
        rf_metrics=predict_rf_op.outputs["metrics"],
        lr_metrics=predict_lr_op.outputs["metrics"]
    ).after(predict_rf_op, predict_lr_op)
    
    # Step 6: Download current production model (if exists)
    download_prod_op = download_production_model(
        project_id=project_id,
        bucket_name=model_repo
    ).after(compare_op)
    
    # Step 7: Evaluate production model on test set (only if baseline exists)
    with dsl.If(download_prod_op.outputs["Output"] == "BASELINE_EXISTS"):
        eval_prod_rf_op = evaluate_production_model(
            model=download_prod_op.outputs["production_model"],
            test_data=split_op.outputs["test_data"]
        )
    
    # Step 8a: Compare winner (RF) against baseline
    with dsl.If(compare_op.output == "RF"):
        # If baseline exists, compare; otherwise get dummy metrics
        with dsl.If(download_prod_op.outputs["Output"] == "BASELINE_EXISTS"):
            baseline_compare_rf = compare_with_baseline(
                new_model_metrics=predict_rf_op.outputs["metrics"],
                production_metrics=eval_prod_rf_op.outputs["metrics"],
                baseline_status=download_prod_op.outputs["Output"]
            )
        with dsl.Else():
            baseline_compare_rf = compare_with_baseline(
                new_model_metrics=predict_rf_op.outputs["metrics"],
                production_metrics={"test_f1_score": 0.0},  # Dummy
                baseline_status="NO_BASELINE"
            )
        
        # Upload RF model only if approved
        with dsl.If(baseline_compare_rf.output == "DEPLOY"):
            upload_to_gcs(
                project_id=project_id,
                bucket_name=model_repo,
                artifact=train_rf_op.outputs["model_out"]
            )
            upload_to_gcs(
                project_id=project_id,
                bucket_name=model_repo,
                artifact=split_op.outputs["scaler_out"]
            )
    
    # Step 8b: Compare winner (LR) against baseline
    with dsl.If(compare_op.output == "LR"):
        # If baseline exists, compare; otherwise get dummy metrics
        with dsl.If(download_prod_op.outputs["Output"] == "BASELINE_EXISTS"):
            baseline_compare_lr = compare_with_baseline(
                new_model_metrics=predict_lr_op.outputs["metrics"],
                production_metrics=eval_prod_rf_op.outputs["metrics"],
                baseline_status=download_prod_op.outputs["Output"]
            )
        with dsl.Else():
            baseline_compare_lr = compare_with_baseline(
                new_model_metrics=predict_lr_op.outputs["metrics"],
                production_metrics={"test_f1_score": 0.0},  # Dummy
                baseline_status="NO_BASELINE"
            )
        
        # Upload LR model only if approved
        with dsl.If(baseline_compare_lr.output == "DEPLOY"):
            upload_to_gcs(
                project_id=project_id,
                bucket_name=model_repo,
                artifact=train_lr_op.outputs["model_out"]
            )
            upload_to_gcs(
                project_id=project_id,
                bucket_name=model_repo,
                artifact=split_op.outputs["scaler_out"]
            )

#### Compile the pipeline into a JSON file

In [None]:
# Compile
from kfp import compiler
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='spotify_churn_pipeline.yaml'
)

#### Submit the pipeline run

In [None]:
# Initialize Vertex AI
import google.cloud.aiplatform as aip

aip.init(
    project=PROJECT_ID,
    staging_bucket=PIPELINE_ROOT,
    location=REGION
)

job = aip.PipelineJob(
    display_name="spotify-churn-pipeline",
    enable_caching=False,
    template_path="spotify_churn_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID,
        'data_bucket': 'data_spotify_2038050',
        'dataset_filename': 'spotify_churn_dataset.csv',
        'model_repo': 'models_spotify_2038050'
    }
)

job.run()