In [1]:
import os

# Set the GOOGLE_APPLICATION_CREDENTIALS environment variable
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "../fleet-anagram-244304-449e515c8d17.json"

# Verify the environment variable is set
assert "GOOGLE_APPLICATION_CREDENTIALS" in os.environ, "GOOGLE_APPLICATION_CREDENTIALS is not set"
print("GOOGLE_APPLICATION_CREDENTIALS is set to:", os.environ["GOOGLE_APPLICATION_CREDENTIALS"])

GOOGLE_APPLICATION_CREDENTIALS is set to: ../fleet-anagram-244304-449e515c8d17.json


In [14]:
from kfp import dsl, compiler
from kfp.dsl import (
    component,
    Dataset,
    Input,
    Output,
    InputPath,
    OutputPath,
    Metrics
)
from google.cloud.aiplatform import pipeline_jobs

In [13]:
@dsl.component(packages_to_install=["pandas", "pandera", "loguru", "gcsfs"])
def validate_input_ds(
    file_path: str,
    dataset: Output[Dataset]
):
    import datetime
    import pandas as pd
    import pandera as pa
    from loguru import logger
    from pandera.typing import Series

    # Initialize logging
    logger.add("validate_input_ds.log")

    df = pd.read_csv(file_path)

    class DfSchema(pa.DataFrameModel):
        address: Series[str] = pa.Field(nullable=False)
        area: Series[float] = pa.Field(gt=0, nullable=False)
        constraction_year: Series[int] = pa.Field(gt=0)
        rooms: Series[int] = pa.Field(gt=0, nullable=False)
        bedrooms: Series[int] = pa.Field(gt=0, nullable=False)
        bathrooms: Series[int] = pa.Field(gt=0, nullable=False)
        balcony: Series[str] = pa.Field(isin=["yes", "no"], nullable=False)
        storage: Series[str] = pa.Field(isin=["yes", "no"], nullable=False)
        parking: Series[str] = pa.Field(isin=["yes", "no"], nullable=False)
        furnished: Series[str] = pa.Field(isin=["yes", "no"], nullable=False)
        garage: Series[str] = pa.Field(isin=["yes", "no"], nullable=False)
        garden: Series[str] = pa.Field(nullable=False)
        energy: Series[str] = pa.Field(nullable=True)
        facilities: Series[str] = pa.Field(nullable=True)
        zip: Series[str] = pa.Field(nullable=False)
        neighborhood: Series[str] = pa.Field(nullable=False)
        rent: Series[int] = pa.Field(gt=0, nullable=False)

        @pa.check("garden", error="Must start with 'Present' or 'Not Present'")
        def check_garden(cls, series: Series[str]) -> Series[bool]:
            return series.str.startswith(("Present", "Not present"))

        @pa.check("energy", error="Must start with A-G")
        def check_energy(cls, series: Series[str]) -> Series[bool]:
            return series.isna() | series.str.startswith(tuple("ABCDEFG"))

        @pa.check("zip", error="Must start with numeric")
        def check_zip(cls, series: Series[str]) -> Series[bool]:
            return series.str.match(r"^\d")

        @pa.check("constraction_year", error="Year must be between 1800 and next year")
        def check_contraction_year(cls, series: Series[int]) -> Series[bool]:
            current_year = datetime.datetime.now().year
            return (series >= 1000) & (series <= current_year + 1)

        @pa.check("rent", error="Rent must match property features")
        def check_rent_plausibility(cls, series: Series[int]) -> Series[bool]:
            return series > 0

    try:
        validated_df = DfSchema.validate(df)
        logger.info("Data validation passed")
    except pa.errors.SchemaError as e:
        logger.error(f"Data validation failed: {e}")
        raise

    validated_df.to_csv(dataset.path, index=False)

  return component_factory.create_component_from_func(


In [17]:
@dsl.component(packages_to_install=["pandas", "gcsfs"])
def process_validated_data(
    dataset: Input[Dataset],
    dataset_out: Output[Metrics]
):  
    import re
    import pandas as pd

    # Read the validated data
    with open(dataset.path, "r") as train_data:
        df = pd.read_csv(train_data)
    
    processed_df= pd.get_dummies(df, 
                              columns = ['balcony',
                                         'parking', 
                                         'furnished', 
                                         'garage', 
                                         'storage'], 
                              drop_first=True)

    processed_df['garden'] = processed_df['garden'].apply(
        lambda x: 0 if x == 'Not present' else int(re.findall(r'\d+', x)[0])
    )
    
    # Save the processed data
    processed_df.to_csv(dataset_out.path, index=False)

In [19]:
@dsl.pipeline(
    pipeline_root="gs://hy-storage-bucket",
    name="test-pipeline",
)
def pipeline(project: str = "fleet-anagram-244304", region: str = "us-central1"):
    file_name = "gs://hy-storage-bucket/rent_apartments.csv"
    input_validation_task = validate_input_ds(file_path=file_name)

    processed_data_task = process_validated_data(
        dataset=input_validation_task.outputs["dataset"]
    )

In [20]:
from kfp.v2 import compiler, dsl
from google.cloud.aiplatform import pipeline_jobs

# Compile the pipeline
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='test-pipeline.json'
)

# Run the pipeline
start_pipeline = pipeline_jobs.PipelineJob(
    display_name="simple-kubeflow-pipeline",
    template_path="test-pipeline.json",
    enable_caching=False,
    location="us-central1",
)

start_pipeline.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/465835940757/locations/us-central1/pipelineJobs/test-pipeline-20250202144556
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/465835940757/locations/us-central1/pipelineJobs/test-pipeline-20250202144556')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/test-pipeline-20250202144556?project=465835940757
PipelineJob projects/465835940757/locations/us-central1/pipelineJobs/test-pipeline-20250202144556 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/465835940757/locations/us-central1/pipelineJobs/test-pipeline-20250202144556 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/465835940757/locations/us-central1/pipelineJobs/test-pipeline-20250202144556 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/465835940757/locations/us-central1/pipelineJobs/test-pipeline-