In [1]:
import pandas as pd

# Cargar el CSV
df = pd.read_csv('data/y_train/ytrain.csv')

df.to_csv('data/pipeline_train_model_v1-1/ytrain.csv', index=False)

In [2]:
from google.cloud import bigquery
import os

# Cliente BigQuery
client = bigquery.Client()

project_id = "diabetes-datapath1"
dataset_id = "diabetes"
folder_path = "data/pipeline_train_model_v1-1"

# Configurar la tabla de destino
dataset_ref = client.dataset(dataset_id)

# Iterar sobre los archivos en la carpeta
for file_name in os.listdir(folder_path):
    if file_name.endswith(".csv"):
        table_id = os.path.splitext(file_name)[0] 
        table_ref = dataset_ref.table(table_id)

        # Configurar el job de carga
        job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.CSV,
            skip_leading_rows=1,
            autodetect=True,
        )

        csv_file_path = os.path.join(folder_path, file_name)

        # Leer el archivo CSV y cargarlo a BigQuery
        with open(csv_file_path, "rb") as source_file:
            load_job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

        print(f"Starting job {load_job.job_id} for {file_name}")
        load_job.result()  # Esperar a que termine el trabajo
        print(f"Job {load_job.job_id} finished for {file_name}")

        destination_table = client.get_table(table_ref)
        print(f"Loaded {destination_table.num_rows} rows into {table_ref}.")


Starting job 466f79a0-1f62-47a4-b45c-a20aa06b6993 for ytrain.csv
Job 466f79a0-1f62-47a4-b45c-a20aa06b6993 finished for ytrain.csv
Loaded 2340 rows into diabetes-datapath1.diabetes.ytrain.
Starting job 93b18b6b-8478-4ca3-8675-8838854d67f5 for selected_features.csv
Job 93b18b6b-8478-4ca3-8675-8838854d67f5 finished for selected_features.csv
Loaded 50 rows into diabetes-datapath1.diabetes.selected_features.
Starting job 09a5dd87-76c6-46db-92a8-65f355726e97 for xtrain.csv
Job 09a5dd87-76c6-46db-92a8-65f355726e97 finished for xtrain.csv
Loaded 2340 rows into diabetes-datapath1.diabetes.xtrain.


In [2]:
import kfp
from google.cloud import aiplatform
from kfp.v2 import dsl, compiler
from kfp.v2.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output, component,Dataset)

In [3]:
@component(
    packages_to_install=[
        "google-cloud-bigquery",
        "google-cloud-bigquery-storage",
        "pandas",
        "db-dtypes",
        "pyarrow"
    ],
)
def process_data(
    project: str,
    source_x_train_table: str,
    features_table: str,
    dataset: Output[Dataset],
):
    
    import sys
    from google.cloud import bigquery
    import pandas as pd
    import pyarrow.parquet as pq

    client = bigquery.Client(project=project)
    
    
    X_train = client.query(
    '''SELECT * FROM `{dsource_table}`
        '''.format(dsource_table=source_x_train_table)).to_dataframe()
    
    features = client.query(
    '''SELECT * FROM `{dsource_table}`
        '''.format(dsource_table=features_table))
    
    df = features.to_dataframe()
    
    # Seleccionar características
    features = df["string_field_0"].tolist()

    X_train = X_train[features]

    X_train.to_parquet(f'{dataset.path}.parquet',engine='pyarrow', index=False)

In [4]:
@component(
    packages_to_install=[
        "google-cloud-bigquery",
        "google-cloud-bigquery-storage",
        "google-cloud-storage",
        "pandas",
        "scikit-learn",
        "joblib",
        "db-dtypes"
    ],
)
def train_model(
    project: str,
    source_y_train_table: str,
    inputd: Input[Dataset],
):
    
    import sys
    from google.cloud import bigquery
    from google.cloud import storage
    import pandas as pd
    from sklearn.linear_model import LogisticRegression
    import joblib

    # Crear cliente de BigQuery
    client = bigquery.Client(project=project)
    
    # Consultar los datos de Y_train desde BigQuery
    Y_train = client.query(
    '''SELECT * FROM `{dsource_table}`
        '''.format(dsource_table=source_y_train_table)).to_dataframe()

    # Leer datos de Parquet del conjunto de datos de entrada (X_train)
    X_train = pd.read_parquet(f'{inputd.path}.parquet')
    
    # Configurar el modelo de regresión logística
    log_model = LogisticRegression(random_state=0, max_iter=1000)  
    
    # Entrenar el modelo
    log_model.fit(X_train, Y_train) 
    
    # Guardar el modelo entrenado en un archivo
    model_filename = 'lasso_model.joblib'
    joblib.dump(log_model, model_filename)
    
    # Subir el modelo a Google Cloud Storage (GCS)
    storage_client = storage.Client(project=project)
    bucket_name = "diabetes-bucket1"
    destination_blob_name = "proyectoD/data/model/model.joblib"
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(model_filename)

In [5]:
@kfp.dsl.pipeline(
    name="pipeline-training-model", 
    description="intro",
    pipeline_root="gs://diabetes-bucket1/proyectoD"
)
def main_pipeline(
    project: str,
    source_x_train_table: str,
    source_y_train_table: str,
    features_table: str,
    gcp_region: str = "us-central1",
):
    get_data = process_data(
        project = project,
        source_x_train_table = source_x_train_table,
        features_table = features_table
    )
    get_data.set_display_name("PROCESS_DATA")
    
    train = train_model(
        project = project,
        source_y_train_table = source_y_train_table,
        inputd = get_data.output
    ).after(get_data)
    train.set_display_name("TRAIN_MODEL")

In [6]:
compiler.Compiler().compile(
    pipeline_func=main_pipeline,
    package_path="pipeline_training.json"
)



In [7]:
aiplatform.init(project="diabetes-datapath", location="us-central1")

In [8]:
job = aiplatform.PipelineJob(
    display_name="pipeline de prueba",
    template_path="pipeline_training.json",
    enable_caching=True,
    project="diabetes-datapath1",
    location="us-central1",
    parameter_values={"project": "diabetes-datapath1", 
                      "source_x_train_table": "diabetes-datapath1.diabetes.xtrain",
                      "source_y_train_table": "diabetes-datapath1.diabetes.ytrain",
                      "features_table": "diabetes-datapath1.diabetes.selected_features"
                     }
    #labels={"module": "ml", "application": "app", "chapter": "mlops", "company": "datapat", "environment": "dev", "owner": "xxxx"}
)

print('submit pipeline job ...')
job.submit(service_account="diabetes@diabetes-datapath1.iam.gserviceaccount.com")

submit pipeline job ...
Creating PipelineJob
PipelineJob created. Resource name: projects/830795959507/locations/us-central1/pipelineJobs/pipeline-training-model-20241017233125
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/830795959507/locations/us-central1/pipelineJobs/pipeline-training-model-20241017233125')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipeline-training-model-20241017233125?project=830795959507
