# Wprowadzenie

Ten notatnik przeprowadza użytkownika przez proces refaktoryzacji kodu z monolitycznego notatnika analitycznego (penguins-exploration.ipynb) do postaci zautomatyzowanych potoków uczenia maszynowego (ML) przy użyciu Kubeflow Pipelines (KFP) i Vertex AI. Celem jest transformacja ręcznego procesu w zautomatyzowany, powtarzalny i wersjonowalny przepływ pracy, co jest fundamentem praktyk MLOps.

Wprowadzamy tu następujące zasady MLOps:

1. Modułowość i Ponowne Użycie: Dzielimy kod z notatnika na niezależne, samodzielne komponenty, z których każdy odpowiada za jeden logiczny krok (np. pobieranie danych, trening). Komponenty te mogą być łatwo ponownie wykorzystywane w innych potokach.

2. Automatyzacja: Tworzymy dwa kluczowe potoki, które automatyzują cały cykl życia modelu:

    - Potok Treningowy: Zbudowanie, wytrenowanie, ocena i warunkowa rejestracja modelu w Vertex AI Model Registry.

    - Potok Wdrożeniowy: Wdrożenie zarejestrowanego modelu na punkcie końcowym (Endpoint) w Vertex AI w celu serwowania predykcji.

3. Powtarzalność i Wersjonowanie: Każdy komponent jest uruchamiany w izolowanym środowisku kontenerowym, a cały potok jest kompilowany do statycznego pliku (JSON). Ten plik może być wersjonowany w systemie kontroli wersji (np. Git), co gwarantuje, że każde uruchomienie potoku jest w pełni powtarzalne.

4. Przygotowanie pod CI/CD: Skompilowany plik definicji potoku staje się artefaktem, który można włączyć do procesów ciągłej integracji i ciągłego wdrażania (CI/CD), umożliwiając automatyczne testowanie i wdrażanie nowych wersji modeli.


# Część 1: Potok Treningowy
W tej części skupimy się na stworzeniu potoku, który automatyzuje cały proces trenowania modelu klasyfikacyjnego.

Na samym początku musimy zaimportować wszystkie wymagane biblioteki.
Najwazniejsze z nich to kfp dp definiowania potoków i komponentów oraz google.cloud.aiplatform do interakcji z Vertex AI 

In [None]:
import kfp
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        InputPath,
                        Model,
                        Output,
                        OutputPath,
                        component,
                        Metrics)
from kfp import compiler
from google.cloud import aiplatform
from typing import NamedTuple
import logging

### Faza 1 Definicja Komponentów

Kazdy krok w potoku jest reprezentowany przez komponent. JEst to samodzielna funckja napisana w języku python z dekoratorem @component. 
Dekorator przekształca funckję w specyfikację, którą biblioteka KFP moze wykonać w izolowanym środowisku.
W tym celu definiowane są potrzebne zalezności jak biblioteki i podstawowy obraz. 
Do definiowania wejścia i wyjścia komponentów słuzą typy Input[] i Output [], ktore stają się artefaktami lub parametrami. 

In [None]:

# --- Komponent 1: Przygotowanie danych ---
@component(
    packages_to_install=["pandas==2.2.2", "gcsfs", "fsspec", "pyarrow"],
    base_image="python:3.9"
)
def get_data(gcs_input_path: str
            , input_data : Output[Dataset]):

    import pandas as pd 
    #wczytanie danych
    df = pd.read_csv(gcs_input_path)
    df.to_csv(input_data.path, index=False)


Kazdy krok z notebooka jest przekształcony w niezalezny komponent

In [None]:
# --- Komponent 2: Przygotowanie danych ---
@component(
    packages_to_install=["pandas==2.2.2", "scikit-learn==1.5.0", "pyarrow"],
    base_image="python:3.9"
)
def preprocess_data(
    input_data: Input[Dataset],
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    test_split_ratio: float,
):

    import pandas as pd
    from sklearn.model_selection import train_test_split
    pd.options.mode.chained_assignment = None

    """Czyści, imputuje, dzieli i zapisuje dane jako artefakty treningowe/testowe."""

    df = pd.read_csv(input_data.path)
    df.loc[336, 'sex'] = 'FEMALE'
    numerical_cols = ['culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g']
    for col in numerical_cols:
        df[col] = df[col].fillna(df[col].median())
    df['sex'] = df['sex'].fillna(df['sex'].mode()[0])
    df['sex'] = df['sex'].map({'MALE': 0, 'FEMALE': 1})
    df_processed = pd.get_dummies(df, columns=['island'], drop_first=True)
    X = df_processed.drop('species', axis=1)
    y = df_processed['species']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_split_ratio, random_state=42, stratify=y)
    train_df = pd.concat([X_train, y_train], axis=1)
    test_df = pd.concat([X_test, y_test], axis=1)
    # zapis do artefaktów: 
    train_df.to_csv(train_dataset.path, index=False)
    test_df.to_csv(test_dataset.path, index=False)

In [None]:
# --- Komponent 3: Trenowanie modelu SVC ---
@component(
    base_image="python:3.9",
    packages_to_install=["kfp" ,"pandas==2.2.2", "pyarrow", "scikit-learn==1.5.0", "gcsfs==2024.6.0", "fsspec", "click==8.1.7", "docstring-parser==0.16", "urllib3", "protobuf"]
)
def train_svc_model(
    train_dataset: Input[Dataset],
    model: Output[Model],
):
    """Trenuje klasyfikator SVC i zapisuje model."""
    import pandas as pd
    from sklearn.svm import SVC
    from sklearn.preprocessing import StandardScaler
    from sklearn.pipeline import Pipeline
    import pickle

    print(f"train_data : {train_dataset}")
    print(f"model : {model}")
    train_df = pd.read_csv(train_dataset.path)
    X_train = train_df.drop('species', axis=1)
    y_train = train_df['species']
    svc_pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('svc', SVC(kernel='rbf', probability=True, random_state=42))
    ])
    svc_pipeline.fit(X_train, y_train)

    file_name = model.path + f".pkl"
    with open(file_name, 'wb') as file:  
        pickle.dump(svc_pipeline, file)


In [None]:
# --- Komponent 4 : Weryfikacja modelu ---
@component(
    base_image="python:3.9",
    packages_to_install=["pandas==2.2.2", "scikit-learn==1.5.0", "joblib==1.4.2", "gcsfs==2024.6.0", "fsspec"],
)
def evaluate_svc_model(
    test_dataset: Input[Dataset],
    model: Input[Model],
    metrics: Output[Metrics],
) -> NamedTuple("Outputs", [("accuracy", float)]):
    """Ocenia model, zapisuje metryki i zwraca dokładność."""
    import pandas as pd
    from sklearn.metrics import accuracy_score, classification_report
    import pickle 

    print(f"model.path : {model.path}")
    file_name = model.path + f".pkl"
    print(f"file_name : {file_name}")
    
    with open(file_name, 'rb') as file:  
        model = pickle.load(file)

    test_df = pd.read_csv(test_dataset.path)
    X_test = test_df.drop('species', axis=1)
    y_test = test_df['species']
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    
    metrics.log_metric("accuracy", (accuracy * 100.0))
    print(f"Accuracy: {accuracy}")

    report = classification_report(y_test, y_pred, output_dict=True)
        
    for class_label, class_metrics in report.items():
        if isinstance(class_metrics, dict):
            for metric_name, metric_value in class_metrics.items():
                metrics.log_metric(f"{class_label}_{metric_name}", metric_value)
    
    return (accuracy * 100.0,)

In [None]:

# --- Komponent 5: Rejestracja modelu w Vertex AI Model Registry ---
@component(
    base_image="python:3.9",
    packages_to_install=["pandas", "google-cloud-aiplatform==1.55.0", "gcsfs==2024.6.0", "fsspec", "pyarrow", "scikit-learn==1.5.0" ],
)
def register_model(
    model: Input[Model],
    project_id: str,
    region: str,
    model_display_name: str,
):
    """Rejestruje model w Vertex AI Model Registry."""
    from google.cloud import aiplatform
    import os

    print(f"project_id : {project_id}")
    print(f"region : {region}")
    print(f"model : {model}")

    aiplatform.init(project=project_id, location=region)
    serving_container_image = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-5:latest"
    model_path = '/'.join(model.uri.split('/')[:-1])
    # Przesłanie i rejestracja modelu
    registered_model = aiplatform.Model.upload(
        display_name=model_display_name,
        artifact_uri=model_path,
        serving_container_image_uri=serving_container_image,
        sync=True,
        labels = {"model_type": "svc", "framework" : "scikit-learn"}
    )
    print(f"Zarejestrowano model: {registered_model.resource_name}")

### Faza 2 Definicja potoku  
Gdy mamy już zdefiniowane komponenty, możemy połączyć je w potok. Robimy to za pomocą dekoratora @kfp.dsl.pipeline.
Funkcja z tym dekoratorem definiuje cały przepływ pracy w potoku. W funkcji wywołujemy poszczególne komponenty. 
Zalezności pomiędzy poszczególnymi komponentami są tworzone niejawnie. KFP automatycznie tworzy graf zaleznośći na podstawie przepływu artefaktów między komponentami (input/output)
Kazdy komponent moze przyjąć artefakt z innego komponentu

In [None]:


# --- Definicja głównego potoku Vertex AI ---
@kfp.dsl.pipeline(
    name="pipeline-classification",
    description="Potok trenujący i rejestrujący model SVC."
)
def training_pipeline(
    gcs_data_path: str,
    project_id: str, 
    region: str ,
    model_name: str,
    test_split_ratio: float = 0.3,
    min_accuracy_threshold: float = 95.0,
):
    """Definiuje przepływ pracy w potoku z warunkową rejestracją."""
    get_data_task = get_data(gcs_input_path=gcs_data_path)
    
    transform_data_task = preprocess_data(
        input_data=get_data_task.outputs["input_data"],
        test_split_ratio=test_split_ratio
    )
    
    train_task = train_svc_model(
        train_dataset=transform_data_task.outputs["train_dataset"]
    )
    
    evaluate_task = evaluate_svc_model(
        test_dataset=transform_data_task.outputs["test_dataset"],
        model=train_task.outputs["model"],
    )

    # Warunek: zarejestruj model tylko, jeśli dokładność jest wystarczająco wysoka
    with dsl.If(
        evaluate_task.outputs["accuracy"] >= min_accuracy_threshold,
        name="accuracy-check",
    ):
        register_model(
            project_id=project_id,
            region=region,
            model_display_name=model_name,
            model=train_task.outputs["model"],
        )


Definicja parametrów potoku - nalezy podmienić parametry na własne

In [None]:
# Uzupełnij poniższe zmienne swoimi wartościami

PROJECT_ID = "mlops-2137"
REGION = "us-central1"

# Konfiguracja Konta Serwisowego ---
SERVICE_ACCOUNT_EMAIL = f"vertex-ai-runner@{PROJECT_ID}.iam.gserviceaccount.com"

# --- Ścieżki w Google Cloud Storage ---
# Automatycznie generowane nazwy bucketów na podstawie PROJECT_ID
VERTEX_AI_BUCKET = f"s25537-mlops-vertex-ai-bucket"
DATA_BUCKET = f"s25537-mlops-data-bucket"

PIPELINE_ROOT = f"gs://{VERTEX_AI_BUCKET}/training-pipeline"
GCS_DATA_PATH = f"gs://{DATA_BUCKET}/penguins.csv"

# Parametry potoku
TEST_SPLIT_RATIO = 0.3
MIN_ACCURACY_THRESHOLD = 0.95 

# Nazwy używane w Vertex AI
PIPELINE_NAME = "przykladowy-potok-klasyfikacji"
PIPELINE_JSON_NAME = f"{PIPELINE_NAME}.json"
PIPELINE_DISPLAY_NAME = f"run-{PIPELINE_NAME}"
MODEL_NAME = "SVC-Model"

### Faza 3 Kompilacja potoku

Definicja potoku napisana w Pythonie musi zostać przetłumaczona na statyczny format YAML lub JSON. Używamy do tego klasy kfp.compiler.Compiler. Skompilowany plik jest kompletną, przenośną definicją przepływu pracy, gotową do wersjonowania i wdrożenia.

In [None]:
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=training_pipeline,
    package_path=PIPELINE_JSON_NAME
)

print(f"Potok został skompilowany do pliku: {PIPELINE_JSON_NAME}")



### Faza 4 Uruchomienie Potoku w Vertex AI

Ostatnim krokiem jest przesłanie skompilowanego pliku do Vertex AI i uruchomienie go jako zadania (PipelineJob). Używamy do tego biblioteki google-cloud-aiplatform, przekazując zdefiniowane wcześniej parametry.

Aby zadanie zadziałało nalezby zalogowac sie do google cloud wykonując komende:

gcloud auth application-default login



In [None]:
from google.cloud import aiplatform

# Inicjalizacja klienta Vertex AI
aiplatform.init(project=PROJECT_ID, location=REGION)

# Utworzenie zadania na podstawie skompilowanego pliku
job = aiplatform.PipelineJob(
    display_name=PIPELINE_DISPLAY_NAME,
    template_path=PIPELINE_JSON_NAME,
    pipeline_root=PIPELINE_ROOT,
    # Przekazanie wartości do parametrów zdefiniowanych w potoku
    parameter_values={
        "gcs_data_path": GCS_DATA_PATH,
        "project_id": PROJECT_ID,
        "region": REGION,
        "model_name": MODEL_NAME,
        "test_split_ratio": TEST_SPLIT_RATIO,
        "min_accuracy_threshold": MIN_ACCURACY_THRESHOLD
    },
    enable_caching=True
)

# Uruchomienie zadania
print("Uruchamianie potoku w Vertex AI...")
job.run(service_account=SERVICE_ACCOUNT_EMAIL)

# Część 2: Potok Wdrożeniowy
Po pomyślnym uruchomieniu potoku treningowego i zarejestrowaniu modelu, następnym krokiem jest wdrożenie go na punkcie końcowym (Endpoint), aby mógł przyjmować żądania predykcji.



Biblioteka google_cloud_pipeline_components zapewnia gotowe komponenty do operacji na modelach i endppointach:

In [None]:
from google_cloud_pipeline_components.v1.model import ModelGetOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp

### Faza 1 : Definicja komponentów 

Tworzymy własny komponent, który sprawdzi, czy punkt końcowy o danej nazwie już istnieje. Jeśli nie, utworzy nowy.

In [None]:
# --- Komponent 4 : Definicja komponentu do zarządzania punktem końcowym ---
@component(
    base_image="python:3.9",
    packages_to_install=["google-cloud-aiplatform==1.55.0"] 
)
def get_or_create_endpoint(
    project: str,
    location: str,
    display_name: str,
    endpoint: Output[Artifact],
):
    """Sprawdza, czy endpoint o podanej nazwie istnieje. Jeśli nie, tworzy nowy."""
    import logging
    from google.cloud import aiplatform
    
    logging.getLogger().setLevel(logging.INFO)
    aiplatform.init(project=project, location=location)

    endpoints = aiplatform.Endpoint.list(
        filter=f'display_name="{display_name}"',
        order_by="create_time desc"
    )

    if endpoints:
        endpoint_resource = endpoints[0]
        logging.info(f"Endpoint '{display_name}' już istnieje: {endpoint_resource.resource_name}")
    else:
        logging.info(f"Endpoint '{display_name}' nie znaleziony. Tworzę nowy.")
        endpoint_resource = aiplatform.Endpoint.create(display_name=display_name)

    endpoint.uri = f"https://{location}-aiplatform.googleapis.com/v1/{endpoint_resource.resource_name}"
    endpoint.metadata["resourceName"] = endpoint_resource.resource_name

### Faza 2 : Definicja Potoku Wdrożeniowego

Ten potok pobiera zarejestrowany model, tworzy (lub znajduje) punkt końcowy i wdraża na nim model.

In [None]:
@kfp.dsl.pipeline(
    name="deployment-pipeline",
    description="Potok tworzy endpoint i wdraża na nim podany model"
)
def deployment_pipeline(
    endpoint_name: str,
    model_resource_name: str,
    project_id: str,
    region: str,
):
    get_or_create_endpoint_task = get_or_create_endpoint(
        project=project_id,
        location=region,
        display_name=endpoint_name
    )

    # Pobranie artefaktu modelu na podstawie jego nazwy zasobu
    get_model_op = ModelGetOp(
        model_name=model_resource_name
    )

    model_deploy_op = ModelDeployOp(
        model=get_model_op.outputs["model"], # Użycie artefaktu modelu
        endpoint=get_or_create_endpoint_task.outputs["endpoint"],
        deployed_model_display_name=MODEL_NAME, 
        dedicated_resources_machine_type="n1-standard-2",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1
    )



### Faza 3  Konfiguracja i kompilacja  potoku wdrożeniowego

Teraz konfigurujemy i uruchamiamy potok wdrożeniowy. Ważne: Trzeba podać MODEL_RESOURCE_NAME zarejestrowanego modelu, który można znaleźć w konsoli Vertex AI w sekcji "Modele" po udanym przebiegu potoku treningowego.



In [None]:

# --- Parametry ---
# WAŻNE: Wklej tutaj "Resource name" swojego modelu z Vertex AI Model Registry
# Przykład: "projects/123456/locations/us-central1/models/7891011"
MODEL_RESOURCE_NAME = "" 
ENDPOINT_NAME = "model-endpoint"
PIPELINE_ROOT_DEPLOYMENT = ""

# --- Nazwy ---
PIPELINE_JSON_DEPLOYMENT = "deployment_pipeline.json"
PIPELINE_DISPLAY_NAME_DEPLOYMENT = "run-deployment-pipeline"

# --- Kompilacja ---
compiler.Compiler().compile(
    pipeline_func=deployment_pipeline,
    package_path=PIPELINE_JSON_DEPLOYMENT,
)
print(f"Potok wdrożeniowy skompilowany do: {PIPELINE_JSON_DEPLOYMENT}")


### Faza 4 Uruchomienie Potoku w Vertex AI


In [None]:
job_deployment = aiplatform.PipelineJob(
    display_name=PIPELINE_DISPLAY_NAME_DEPLOYMENT,
    template_path=PIPELINE_JSON_DEPLOYMENT,
    pipeline_root=PIPELINE_ROOT_DEPLOYMENT,
    parameter_values={
        "endpoint_name": ENDPOINT_NAME,
        "model_resource_name": MODEL_RESOURCE_NAME,
        "project_id": PROJECT_ID,
        "region": REGION,
    },
    enable_caching=False # Wyłączamy cache, aby zawsze próbować wdrożyć
)
print("Uruchamianie potoku wdrożeniowego...")
job_deployment.run(service_account=SERVICE_ACCOUNT_EMAIL)