In [1]:
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID = shell_output[0]
PROJECT_ID

'tottus-413614'

In [2]:
BUCKET_NAME ="gs://" + 'tottus-413623' + "-pipeline"
BUCKET_NAME

'gs://tottus-413623-pipeline'

In [3]:
import matplotlib.pyplot as plt
import pandas as pd

from kfp import compiler, dsl
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath
from typing import Optional
from google.cloud import aiplatform

from google.cloud import aiplatform_v1
import warnings
warnings.filterwarnings('ignore')

  from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath


In [4]:
import matplotlib.pyplot as plt
import pandas as pd

from kfp import compiler, dsl
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath
from typing import Optional
from google.cloud import aiplatform

from google.cloud import aiplatform_v1
import warnings
warnings.filterwarnings('ignore')

In [5]:
PATH =%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-east1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


'gs://tottus-413623-pipeline/pipeline_root/'

In [6]:
@component(
    packages_to_install=["pandas", "pyarrow", "scikit-learn"],
    base_image ="python:3.10",
    output_component_file="dataset_creating_1.yaml"
)

def get_data_from_bq(
    output_data_path: OutputPath("Dataset"),
    bq_table: Optional[str] = None
):
    
    import pandas as pd
    from sklearn.datasets import load_iris

    
    if bq_table is not None:
        print("¡Atención! Este componente no utiliza la tabla de BigQuery proporcionada.")
        print("Se cargará el conjunto de datos Iris y se escribirá en un archivo CSV.")
    
    # Cargar el conjunto de datos Iris
    iris = load_iris()
    X, Y = iris.data, iris.target

    # Crear un DataFrame de Pandas con las características y las etiquetas
    df = pd.DataFrame(X, columns=iris.feature_names)
    df['target'] = Y
    
    df.to_csv(output_data_path, index=False)


In [7]:
@component(
    packages_to_install=["scikit-learn", "pandas", "joblib", "scikit-learn"],
    base_image="python:3.10",
    output_component_file="model_training.yaml",
)
def training_classmod(
    data: Input[Dataset],
    metrics: Output[Metrics],
    model: Output[Model],
    predictions: OutputPath("Dataset")
):
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.metrics import accuracy_score
    from joblib import dump
    from sklearn.datasets import load_iris
    
    iris = load_iris()
    data_encoded=pd.read_csv(data.path)
    
    X_train, X_test, y_train, y_test = train_test_split(data_encoded[iris.feature_names], data_encoded['target'],test_size = 0.3)
    dt = DecisionTreeClassifier()
    dt.fit(X_train, y_train)
    y_pred=dt.predict(X_test)
    score=dt.score(X_test, y_test)
    print('accuracy is:', score)
    
    metrics.log_metric("accuracy", (score*100.0))
    metrics.log_metric("model", "Tree Class")
    
    predictions_df = pd.DataFrame({'predicted': y_pred})
    predictions_df.to_csv(predictions, index=False)
    
    dump(dt, model.path + ".joblib")
    

In [8]:
@component(
    packages_to_install=["google-cloud-aiplatform"],
    base_image="python:3.10",
    output_component_file="model_deployment.yaml",
)
def model_deployment(
    model: Input[Model],
    project: str,
    region: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform
    import os
    model_uri = os.path.dirname(model.uri) + '/'
    print(model_uri)
    aiplatform.init(project=project, location=region)
    
    deployed_model = aiplatform.Model.upload(
        display_name="custom-model-pipeline",
        artifact_uri=model_uri,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest"
    )
    endpoint=deployed_model.deploy(machine_type="n1-standard-8", min_replica_count=1)
    
    vertex_endpoint.uri= endpoint.resource_name
    vertex_model.uri=deployed_model.resource_name

In [9]:
@pipeline(
    pipeline_root=PIPELINE_ROOT,
    name="custom-pipeline",
)
def pipeline(
    bq_table: str ="",
    output_data_path: str="data.csv",
    project: str = PROJECT_ID,
    region: str = REGION
):
    # Tarea para obtener los datos de BigQuery
    dataset_task = get_data_from_bq(bq_table=bq_table)
    
    # Tarea para entrenar el modelo, calcular predicciones y guardarlas
    training_task = training_classmod(data=dataset_task.output)
        
    # Tarea para implementar/deploy el modelo
    deploy_task = model_deployment(model=training_task.outputs["model"],
                                   project=project,
                                   region=region)


In [10]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="custom-pipeline-classifier.json")

In [11]:
run1 = aiplatform.PipelineJob(
    display_name="custom-training-vertex-ai-pipeline",
    template_path="custom-pipeline-classifier.json",
    job_id="custom-pipeline-ef-25",
    enable_caching=False,
)

In [12]:
run1.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/601827927420/locations/us-central1/pipelineJobs/custom-pipeline-ef-24
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/601827927420/locations/us-central1/pipelineJobs/custom-pipeline-ef-24')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-pipeline-ef-24?project=601827927420
