In [1]:
import google.cloud.aiplatform as aiplatform
import kfp
from kfp import compiler, dsl
from kfp.dsl import Artifact, Dataset, Input, Metrics, Model, Output, component, Condition
from kfp.registry import RegistryClient
from typing import NamedTuple
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform

KFP SDK version: 2.7.0
google-cloud-aiplatform==1.58.0


In [2]:
PROJECT_ID = "demoespecialidadgcp"
REGION = "us-central1"
BUCKET_URI = f"gs://demo_ts"
SERVICE_ACCOUNT = "502688298240-compute@developer.gserviceaccount.com"
PIPELINE_ROOT = f"{BUCKET_URI}/pipelines"
DATASET_ID = "demo_ts"
TABLE_TRAIN = "train"
TABLE_TEST = "test"

! gcloud config set project {PROJECT_ID}
BQ_REGION = REGION.split("-")[0].upper()

Updated property [core/project].


In [3]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [4]:
@component(
    packages_to_install=["google-cloud-bigquery[pandas]==3.25.0"],
    base_image="python:3.10.4"
)
def export_datasets(
    project_id: str,
    dataset_id: str,
    table_train: str,
    table_test: str,
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset]
):
    """
    Args:
        project_id: The Project ID.
        dataset_id: The BigQuery Dataset ID. Must be pre-created in the project.
        table_train: The BigQuery train table name.
        table_test: The BigQuery test table name.
        
    Returns:
        dataset_train: The Dataset artifact with exported CSV file.
        dataset_test: The Dataset artifact with exported CSV file.
    """
    from google.cloud import bigquery
    import pandas as pd
    import numpy as np

    client = bigquery.Client(project=project_id)
    table_name = f"{project_id}.{dataset_id}.{table_train}"
    query = """
        SELECT * 
        FROM {table_name}
    """.format(
        table_name=table_name
    )
    job_config = bigquery.QueryJobConfig()
    query_job = client.query(query=query, job_config=job_config)    
    df_train = query_job.result().to_dataframe()
    
    table_name = f"{project_id}.{dataset_id}.{table_test}"
    query = """
        SELECT *
        FROM {table_name}
    """.format(
        table_name=table_name
    )
    job_config = bigquery.QueryJobConfig()
    query_job = client.query(query=query, job_config=job_config)    
    df_test = query_job.result().to_dataframe()
    
    df_train['source'] = 'train'
    df_test['source'] = 'test'
    
    dataset = pd.concat([df_train, df_test])
    
    train = dataset.loc[dataset['source'] == 'train']
    test = dataset.loc[dataset['source'] == 'test']
    train.drop('source', axis = 1, inplace = True)
    test.drop('source', axis = 1, inplace = True)
    
    train.to_csv(dataset_train.path + '.csv', index=False)
    test.to_csv(dataset_test.path + '.csv', index=False)

## Model Version 1

In [14]:
@component(
    packages_to_install=[
        "pandas==2.2.2",        
        "scikit-learn==1.5.1",
        "scipy==1.13.1",
        "numpy==1.26.4",
    ],
    base_image="python:3.10.4"
)
def train_model(
    dataset_train: Input[Dataset],
    model: Output[Model],
    metrics: Output[Metrics],
):
    """Training Linear Regressor model for demo_ts.

    Args:
        dataset_train: The training dataset.

    Returns:
        model: The model artifact stores the model.joblib file.
        metrics: The metrics of the trained model.
    """
    
    import pandas as pd
    import numpy as np
    import time, os, joblib
    from sklearn.model_selection import RandomizedSearchCV, train_test_split
    from sklearn.metrics import r2_score, mean_squared_error
    from sklearn.preprocessing import OrdinalEncoder, StandardScaler
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from sklearn.linear_model import LinearRegression
    import pickle

    
    with open(dataset_train.path + '.csv', "r") as train_data:
        dataset_train = pd.read_csv(
            train_data,
            parse_dates=['Month'],
            index_col='Month',
        ).to_period('M').reindex()
        
    dataset_train = dataset_train.sort_values('Month')
     
    X = dataset_train.drop('y', axis=1)
    y = dataset_train['y']
    
    X_train, X_test, Y_train, Y_test = train_test_split(X, y, test_size=0.1, shuffle=False)
    
    lrg = LinearRegression(fit_intercept=False)
       
    regr = Pipeline([
        ('regressor', lrg)
    ])
    
    
    lrg.fit(X_train,
            Y_train,
           )
      
    
    Y_pred_lrg = lrg.predict(X_test)
    
    r2 = r2_score(Y_test, Y_pred_lrg)
    rmse = np.sqrt(mean_squared_error(Y_test, Y_pred_lrg))
    
    metrics.log_metric("Framework", "LinearRegression")
    metrics.log_metric("Train_samples_size", len(X_train))
    metrics.log_metric("Validation_samples_size", len(X_test))
    metrics.log_metric("RMSE", round(rmse,2))
    metrics.log_metric("R2 score", round(r2,2))
    
    print("Linear Regression:")
    print("RMSE:",rmse)
    print("R2 score:", r2)
    
    # Export the model to a file
    #os.makedirs(model.path, exist_ok=True)
   # joblib.dump(lrg, os.path.join(model.path, "model.pkl"))

# Export the model to a file
    os.makedirs(model.path, exist_ok=True)
    model_file_path = os.path.join(model.path, "model.pkl")

    with open(model_file_path, 'wb') as f:
        pickle.dump(lrg, f)
    
    
    

In [15]:
@component(
    packages_to_install=[
        "pandas==2.2.2",        
        "scikit-learn==1.5.1",
        "joblib==1.4.2",
        "numpy==1.26.4",
    ],
    base_image="python:3.10.4"
)
def evaluate_model(
    dataset_test: Input[Dataset],
    model: Input[Model],
    metrics: Output[Metrics]
) -> NamedTuple('EvaluationOutput', [('r2', float), ('rmse', float)]):
    """Evaluate the trained model with test data.

    Args:
        dataset_test: The testing dataset.
        model: The trained model.
        
    Returns:
        metrics: The evaluation metrics of the model.
        r2: The R2 score of the model.
        rmse: The RMSE of the model.
    """
    import pandas as pd
    import numpy as np
    import joblib
    import pickle
    from sklearn.metrics import r2_score, mean_squared_error
    from sklearn.linear_model import LinearRegression

    try:
        # Load the test dataset
        with open(dataset_test.path + '.csv', "r") as test_data:
            test_dataset = pd.read_csv(
            test_data,
            parse_dates=['Month'],
            index_col='Month',
        ).to_period('M').reindex()
            
        test_dataset = test_dataset.sort_values('Month')
        
        X_test = test_dataset.drop("y", axis=1)
        Y_test = test_dataset["y"]
        

        
        # Load the trained model
        model_file = model.path + "/model.pkl"
        with open(model_file, 'rb') as f:
            trained_model = pickle.load(f)
            
        print(f"Model loaded successfully: {type(trained_model)}")

        # Predict with the model
        Y_pred = trained_model.predict(X_test)

        # Calculate evaluation metrics
        r2 = r2_score(Y_test, Y_pred)
        rmse = np.sqrt(mean_squared_error(Y_test, Y_pred))

        metrics.log_metric("RMSE", round(rmse, 2))
        metrics.log_metric("R2 score", round(r2, 2))

        print("Evaluation results:")
        print("RMSE:", rmse)
        print("R2 score:", r2)

        from collections import namedtuple
        EvaluationOutput = namedtuple('EvaluationOutput', ['r2', 'rmse'])
        return EvaluationOutput(r2=float(r2), rmse=float(rmse))
    except Exception as e:
        print(f"Error during model evaluation: {str(e)}")
        raise



In [16]:
@component(
    packages_to_install=[
        "pandas==2.2.2",        
        "scikit-learn==1.5.1",
        "numpy==1.26.4",
    ],
    base_image="python:3.10.4"
)
def inference(
    dataset_test: Input[Dataset],
    model: Input[Model],
    predictions: Output[Dataset]
):
    """Perform inference with the trained model on the test dataset.

    Args:
        dataset_test: The testing dataset.
        model: The trained model.
        
    Returns:
        predictions: The dataset with predictions.
    """
    import pandas as pd
    import joblib
    from sklearn.linear_model import LinearRegression
    import pickle
        
    # Load the test dataset
    with open(dataset_test.path + '.csv', "r") as test_data:
        test_dataset = pd.read_csv(
            test_data,
            parse_dates=['Month'],
            index_col='Month',
        ).to_period('M').reindex()
        
    X_test = test_dataset.drop("y", axis=1)

    
     # Load the trained model
    model_file = model.path + "/model.pkl"
    with open(model_file, 'rb') as f:
        trained_model = pickle.load(f)
    
    # Predict with the model
    Y_pred = trained_model.predict(X_test)
    
    # Save predictions to the output
    predictions_df = test_dataset.copy()
    predictions_df["Predicted_Sales"] = Y_pred
    predictions_df.to_csv(predictions.path + '.csv', index=False)


In [17]:
@component(
    packages_to_install=[
        'google-cloud-aiplatform',
        'kfp==2.5.0',
        'click>=8.0.0,<9',
        'kfp-pipeline-spec==0.2.2',
        'kfp-server-api>=2.0.0,<2.1.0',
        'kubernetes>=8.0.0,<27',
        'PyYAML>=5.3,<7',
        'requests-toolbelt>=0.8.0,<1',
        'tabulate>=0.8.6,<1',
        'protobuf>=3.13.0,<4',
        'urllib3<2.0.0',
        "numpy==1.26.4",
        "joblib==1.3.2",
        "scikit-learn==1.5.1",
    ],
    base_image='python:3.10.4'
)
def deploy_model_to_endpoint(
    project: str,
    location: str,
    model_display_name: str,
    model: Input[Model],
    endpoint_display_name: str,
    machine_type: str = 'n1-standard-2',
    min_replica_count: int = 1,
    max_replica_count: int = 1
):
    from google.cloud import aiplatform
    import os
    import pickle
    from pathlib import Path
    import numpy as np

    aiplatform.init(project=project, location=location)

    # Get the directory containing the model file
    model_dir = model.path

    model_file = os.path.join(model_dir, "model.pkl")

    print(f"Model directory: {model_dir}")
    print(f"Model file: {model_file}")
    print(f"Model path: {Path(model_dir).parent}")

    # Load the model to verify it
    try:
        with open(model_file, 'rb') as f:
            loaded_model = pickle.load(f)
        print(f"Model loaded successfully: {type(loaded_model)}")
    except Exception as e:
        print(f"Error loading model: {str(e)}")

    # Upload the model to Vertex AI
    uploaded_model = aiplatform.Model.upload(
        display_name=model_display_name,
        artifact_uri=model_dir,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest"
    )

    # Create the endpoint
    endpoint = aiplatform.Endpoint.create(display_name=endpoint_display_name)

    # Deploy the model to the endpoint
    deployment = endpoint.deploy(
        model=uploaded_model,
        machine_type=machine_type,
        min_replica_count=min_replica_count,
        max_replica_count=max_replica_count
    )

    print(f"Model deployed to endpoint {endpoint.resource_name}")

In [18]:
@dsl.pipeline(
    name="demo-time-series",
)
def pipeline(
    PROJECT_ID: str,
    DATASET_ID: str,
    TABLE_TRAIN: str,
    TABLE_TEST: str,
):
    export_dataset_task = export_datasets(
        project_id=PROJECT_ID,
        dataset_id=DATASET_ID,
        table_train=TABLE_TRAIN,
        table_test=TABLE_TEST,
    )
    
    train_model_task = train_model(
        dataset_train=export_dataset_task.outputs["dataset_train"],
        
    )
    
    evaluate_model_task = evaluate_model(
        dataset_test=export_dataset_task.outputs["dataset_train"],
        model=train_model_task.outputs["model"],
    )
    
    
    with dsl.Condition(evaluate_model_task.outputs['r2'] > 0.8):
        inference_task = inference(
            dataset_test=export_dataset_task.outputs["dataset_test"],
            model=train_model_task.outputs["model"],   
            
        )
        
        deploy_model_task = deploy_model_to_endpoint(
            project=PROJECT_ID,
            location=REGION,
            model=train_model_task.outputs['model'],
            model_display_name="demo-time-series",
            endpoint_display_name="demo-time-series-endpoint"

    )

  with dsl.Condition(evaluate_model_task.outputs['r2'] > 0.8):


In [19]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="../src/pipe_comps/demo_ts_pipeline.yaml")

In [20]:
job = aiplatform.PipelineJob(
    display_name="demo_ts",
    template_path="../src/pipe_comps/demo_ts_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        'PROJECT_ID': PROJECT_ID,
        'DATASET_ID': DATASET_ID,
        'TABLE_TRAIN': TABLE_TRAIN,
        'TABLE_TEST': TABLE_TEST
    },
    enable_caching=True,
)

job.submit(service_account=SERVICE_ACCOUNT)

Creating PipelineJob
PipelineJob created. Resource name: projects/502688298240/locations/us-central1/pipelineJobs/demo-time-series-20240912134420
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/502688298240/locations/us-central1/pipelineJobs/demo-time-series-20240912134420')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/demo-time-series-20240912134420?project=502688298240


In [21]:
client_registry = RegistryClient(host=f"https://us-central1-kfp.pkg.dev/demoespecialidadgcp/time-series-demo")

In [22]:
templateName, versionName = client_registry.upload_pipeline(
  file_name="../src/pipe_comps/demo_ts_pipeline.yaml",
  tags=["latest"],
  extra_headers={"description":"Pipeline para la transformacion de datos, entrenamiento de modelos, registro de metricas, predicciones y modelo, y despliegue del modelo"})