# Faza 2 i 3: Budowa Potoku MLOps z Modularnych Komponentów

Ten notatnik stanowi kontynuację procesu ewolucji projektu MLOps. W Fazie 1 dokonano eksploracji danych i stworzono prototyp w notatniku Jupyter. Teraz, w Fazie 2, logika ta została wyodrębniona do samodzielnych, reużywalnych skryptów w języku Python, które pełnią rolę komponentów. W Fazie 3 skupiamy się na zdefiniowaniu **potoku (pipeline)** w **Vertex AI** przy użyciu biblioteki **Kubeflow Pipelines (KFP)**, który orkiestruje te komponenty.

Celem jest zademonstrowanie, jak luźno powiązane skrypty można zintegrować w spójny, zautomatyzowany i łatwy w utrzymaniu system. Potok będzie realizował następujące kroki:
1.  **Pobranie danych** z Google Cloud Storage.
2.  **Przetworzenie danych:** czyszczenie, imputacja braków i podział na zbiory treningowe/testowe.
3.  **Trenowanie modelu** klasyfikatora SVC.
4.  **Ewaluacja modelu** w oparciu o metrykę dokładności.
5.  **Pobranie informacji o poprzedniej wersji modelu** (jeśli istnieje).
6.  **Warunkowa rejestracja nowego modelu** w Vertex AI Model Registry, jeśli jego dokładność przekroczy zdefiniowany próg.

## 1. Definicje Komponentów

Poniżej znajdują się definicje poszczególnych komponentów potoku. Każdy komponent to funkcja w języku Python opakowana dekoratorem `@component` z biblioteki KFP. Dekorator ten definiuje środowisko wykonawcze (obraz bazowy i pakiety), w którym zostanie uruchomiony kod komponentu. Takie podejście zapewnia izolację i powtarzalność każdego kroku.

### Komponent 1: Pobieranie Danych (`get_data.py`)

In [None]:
from kfp.dsl import Artifact, Dataset, Input, Output, Component

@Component(
    packages_to_install=["pandas==2.2.2", "gcsfs", "fsspec", "pyarrow"],
    base_image="python:3.9"
)
def get_data(gcs_input_path: str, input_data: Output[Dataset]):
    import pandas as pd 
    # Wczytanie danych z podanej ścieżki GCS
    df = pd.read_csv(gcs_input_path)
    # Zapisanie danych jako artefakt wyjściowy
    df.to_csv(input_data.path, index=False)

### Komponent 2: Przetwarzanie Wstępne Danych (`preprocess_data.py`)

In [None]:
@Component(
    packages_to_install=["pandas==2.2.2", "scikit-learn==1.5.0", "pyarrow"],
    base_image="python:3.9"
)
def preprocess_data(
    input_data: Input[Dataset],
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    test_split_ratio: float,
):
    """Czyści, imputuje, dzieli i zapisuje dane jako artefakty treningowe/testowe."""
    import pandas as pd
    from sklearn.model_selection import train_test_split
    pd.options.mode.chained_assignment = None

    df = pd.read_csv(input_data.path)
    df.loc[336, 'sex'] = 'FEMALE'
    numerical_cols = ['culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g']
    for col in numerical_cols:
        df[col] = df[col].fillna(df[col].median())
    df['sex'] = df['sex'].fillna(df['sex'].mode()[0])
    df['sex'] = df['sex'].map({'MALE': 0, 'FEMALE': 1})
    df_processed = pd.get_dummies(df, columns=['island'], drop_first=True)
    
    X = df_processed.drop('species', axis=1)
    y = df_processed['species']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_split_ratio, random_state=42, stratify=y)
    
    train_df = pd.concat([X_train, y_train], axis=1)
    test_df = pd.concat([X_test, y_test], axis=1)
    
    # Zapis do artefaktów wyjściowych
    train_df.to_csv(train_dataset.path, index=False)
    test_df.to_csv(test_dataset.path, index=False)

### Komponent 3: Trenowanie Modelu (`train_svc_model.py`)

In [None]:
@Component(
    base_image="python:3.9",
    packages_to_install=["pandas==2.2.2", "scikit-learn==1.5.0", "gcsfs==2024.6.0", "fsspec", "pyarrow"]
)
def train_svc_model(
    train_dataset: Input[Dataset],
    model: Output[Model],
):
    """Trenuje klasyfikator SVC i zapisuje model."""
    import pandas as pd
    from sklearn.svm import SVC
    from sklearn.preprocessing import StandardScaler
    from sklearn.pipeline import Pipeline
    import pickle

    train_df = pd.read_csv(train_dataset.path)
    X_train = train_df.drop('species', axis=1)
    y_train = train_df['species']
    
    svc_pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('svc', SVC(kernel='rbf', probability=True, random_state=42))
    ])
    svc_pipeline.fit(X_train, y_train)

    # Zapisanie modelu do pliku .pkl w ścieżce artefaktu
    file_name = model.path + ".pkl"
    with open(file_name, 'wb') as file:  
        pickle.dump(svc_pipeline, file)

### Komponent 4: Ewaluacja Modelu (`evaluate_svc_model.py`)

In [None]:
from typing import NamedTuple

@Component(
    base_image="python:3.9",
    packages_to_install=["pandas==2.2.2", "scikit-learn==1.5.0", "gcsfs==2024.6.0", "fsspec"],
)
def evaluate_svc_model(
    test_dataset: Input[Dataset],
    model: Input[Model],
    metrics: Output[Metrics],
) -> NamedTuple("Outputs", [("accuracy", float)]):
    """Ocenia model, zapisuje metryki i zwraca dokładność."""
    import pandas as pd
    from sklearn.metrics import accuracy_score, classification_report
    import pickle 

    file_name = model.path + ".pkl"
    with open(file_name, 'rb') as file:  
        model = pickle.load(file)

    test_df = pd.read_csv(test_dataset.path)
    X_test = test_df.drop('species', axis=1)
    y_test = test_df['species']
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    
    # Zapisanie metryk do artefaktu Vertex AI
    metrics.log_metric("accuracy", (accuracy * 100.0))
    print(f"Accuracy: {accuracy}")

    report = classification_report(y_test, y_pred, output_dict=True)
    for class_label, class_metrics in report.items():
        if isinstance(class_metrics, dict):
            for metric_name, metric_value in class_metrics.items():
                metrics.log_metric(f"{class_label}_{metric_name}", metric_value)
    
    # Zwrócenie dokładności jako wartości wyjściowej komponentu
    return ((accuracy * 100.0),)

### Komponent 5: Pobranie Modelu Nadrzędnego (`get_parent_model.py`)

Ten komponent służy do wersjonowania modeli. Sprawdza on w Vertex AI Model Registry, czy istnieje już model o podanej nazwie. Jeśli tak, zwraca jego identyfikator (resource name), który zostanie użyty do oznaczenia nowego modelu jako kolejnej wersji.

In [None]:
@Component(
    base_image="python:3.9",
    packages_to_install=["google-cloud-aiplatform"]
)
def get_parent_model(
    project: str,
    region: str,
    model_display_name: str,
) -> NamedTuple("Outputs", [("parent_model_resource_name", str)]):
    from google.cloud import aiplatform

    aiplatform.init(project=project, location=region)
    
    models = aiplatform.Model.list(
        filter=f'display_name="{model_display_name}"'
    )
    
    parent_model_resource_name = ""
    if models:
        parent_model_resource_name = models[0].resource_name
        print(f"Znaleziono istniejący model: {parent_model_resource_name}")
    else:
        print(f"Nie znaleziono modelu o nazwie: {model_display_name}. Zostanie utworzony nowy wpis.")
        
    return (parent_model_resource_name,)

### Komponent 6: Rejestracja Modelu (`register_model.py`)

In [None]:
@Component(
    base_image="python:3.9",
    packages_to_install=["google-cloud-aiplatform==1.55.0"],
)
def register_model(
    model: Input[Model],
    project_id: str,
    region: str,
    model_display_name: str,
    parent_model: str = "",
    model_labels: str = '{}'
):
    """Rejestruje model w Vertex AI Model Registry."""
    from google.cloud import aiplatform
    import json

    aiplatform.init(project=project_id, location=region)

    try:
        labels = json.loads(model_labels)
    except json.JSONDecodeError:
        labels = {"model_type": "svc", "framework" : "scikit-learn"} # Domyślne etykiety

    serving_container_image = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-5:latest"
    model_path = '/'.join(model.uri.split('/')[:-1])
    
    # Przesłanie i rejestracja modelu
    registered_model = aiplatform.Model.upload(
        display_name=model_display_name,
        artifact_uri=model_path,
        serving_container_image_uri=serving_container_image,
        sync=True,
        parent_model=parent_model,
        labels=labels
    )
    print(f"Zarejestrowano model: {registered_model.resource_name}")

## 2. Definicja i Kompilacja Głównego Potoku

Po zdefiniowaniu wszystkich komponentów, łączymy je w jeden, spójny potok. Funkcja `training_pipeline` orkiestruje cały proces, definiując zależności i przepływ danych. Na końcu kompilujemy potok do pliku `pipeline.json`, który jest gotowy do uruchomienia w Vertex AI.

In [None]:
from kfp import dsl, compiler

@dsl.pipeline(
    name="training-pipeline-from-components",
    description="Potok trenujący i rejestrujący model SVC z importowanych komponentów.",
    pipeline_root=PIPELINE_ROOT,
)
def training_pipeline(
    gcs_data_path: str = "gs://data-s25537/penguins.csv",
    project_id: str = PROJECT_ID,
    region: str = REGION,
    model_name: str = "penguin-svc-model",
    model_labels_str: str = '{}',
    test_split_ratio: float = 0.3,
    min_accuracy_threshold: float = 95.0,
):
    """Definiuje przepływ pracy w potoku z warunkową rejestracją modelu."""
    
    get_data_task = get_data(gcs_input_path=gcs_data_path)
    
    transform_data_task = preprocess_data(
        input_data=get_data_task.outputs["input_data"],
        test_split_ratio=test_split_ratio
    )
    
    train_task = train_svc_model(
        train_dataset=transform_data_task.outputs["train_dataset"]
    )
    
    evaluate_task = evaluate_svc_model(
        test_dataset=transform_data_task.outputs["test_dataset"],
        model=train_task.outputs["model"],
    )

    with dsl.If(
        evaluate_task.outputs["accuracy"] >= min_accuracy_threshold,
        name="accuracy-check",
    ):
        get_parent_model_task = get_parent_model(
            project=project_id,
            region=region,
            model_display_name=model_name,
        )
        
        register_model(
            project_id=project_id,
            region=region,
            model_display_name=model_name,
            model=train_task.outputs["model"],
            parent_model=get_parent_model_task.outputs["parent_model_resource_name"],
            model_labels=model_labels_str
        )

# Kompilacja potoku
compiler.Compiler().compile(
    pipeline_func=training_pipeline,
    package_path="training_pipeline.json",
)

print("Potok został pomyślnie skompilowany do pliku 'training_pipeline.json'")