# **Pipeline Development**

This repository serves as a scaffold for the interactive and experimental-friendly development of Data and ML pipelines. The development of this environment was guided by the following philosophy:

**Please read the README.md carefully before starting the development.**

### **Create Folder Structure for Pipeline Components**

In [None]:
from tensoryze_pipelines import MLPipelineSetup

MLPipelineSetup.setup_pipeline_structure(
    pipeline_step_names=["ingestion", "train", "test", "publish"]
)
MLPipelineSetup.load_env_vars()

### **Data preparation: app.py**

In [None]:
#%%writefile components/ingestion/app.py
import glob, os, dotenv
from tensoryze_pipelines.io.interfaces import DataLakeInterface
from tensoryze_pipelines.io.datalake.clients import LakeFSClient
from tensoryze_pipelines.io.datalake.ml import MachineLearningDataLakeClient
client = LakeFSClient(interface=DataLakeInterface())
client = MachineLearningDataLakeClient(client)
branches = client.create_train_test_branch(test_ratio=0.25)

##### **dockerfile**

In [None]:
%%writefile components/ingestion/dockerfile

# Define base image
FROM python:3.10-slim as base

# Set environment variables
ENV VIRTUAL_ENV=/opt/venv
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# Build stage for cloning the repository
FROM base as builder

RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/*
RUN python3 -m venv $VIRTUAL_ENV

WORKDIR /code

ARG GITHUB_INSTALL_TOKEN

COPY requirements.txt .

# install dependencies
RUN pip install git+https://${GITHUB_INSTALL_TOKEN}@github.com/tensoryze-dev/tensoryze_pipelines.git#egg=tensoryze_pipelines && \
    pip install -r requirements.txt && \
    rm -rf ~/.cache/pip


COPY app.py .
COPY .env .

CMD ["python", "./app.py"]

## **Modell Training**

##### **inference_processing.py**

In [2]:
%%writefile inference_artifacts/inference_preprocessing.py
import torch
from PIL import Image
from tensoryze_pipelines.modeling import OpticalInspectionTransformation, TensorPILImageConverter
from tensoryze_service.datamodel import TensoryzeEvent

#to be used in inference service
def inference_preprocessing(event: TensoryzeEvent) -> Image.Image:
    data = event.get_data()
    TR = OpticalInspectionTransformation(img_size = 224) #, crop = [set_y, set_x, w_size, w_size])
    data: torch.Tensor = TR.transform(data).unsqueeze(0)
    return TensorPILImageConverter.convert(data)


Overwriting inference_artifacts/inference_preprocessing.py


##### **app.py**

In [3]:
%%writefile components/train/app.py

import pytorch_lightning as pl
import  os, glob
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.callbacks import ModelCheckpoint

from tensoryze_pipelines.modeling import (
    OpticalInspectionDataloader, OpticalInspectionDataset,  OpticalInspectionTransformation, 
    TemperatureScaledActiveLearner, ResNet18
)
from tensoryze_pipelines.utils.logging import logger
from tensoryze_pipelines.io import (
    DataLakeInterface, LakeFSClient, MachineLearningDataLakeClient,
    write_dict_to_yaml, read_pickle, write_json, image_folder_to_dataset,
    ExperimentTrackingInterface, MLFlowExperimentTracking, 
)

from tensoryze_pipelines.entities.execution import ModelArtifact

N_EPOCHS = os.environ.get('N_EPOCHS', 10)

log = logger
experiment_tracker = MLFlowExperimentTracking(ExperimentTrackingInterface())
client = MachineLearningDataLakeClient(
    LakeFSClient(interface=DataLakeInterface()) 
)
local_path = client.download_dataset(folder = "/tmp")

X_test, y_test = image_folder_to_dataset(local_path, subfolder="test-data")  
X_train, y_train = image_folder_to_dataset(local_path, subfolder="train-data")  

log.info("🛠️   preparing training data")
TR = OpticalInspectionTransformation(img_size = 224) #, crop = [set_y, set_x, w_size, w_size])
DS_TRAIN = OpticalInspectionDataset(X_train, y_train, sensor = "vision_line", transform = TR.transform)
DL_TRAIN = OpticalInspectionDataloader(DS_TRAIN, log=log, test_split=False)

DS_TEST = OpticalInspectionDataset(X_test, y_test, sensor = "vision_line", transform = TR.transform)
DL_TEST = OpticalInspectionDataloader(DS_TEST, log=log, test_split=False, val_split=False)

DL_TEST.prepare_dataloaders()
DL_TRAIN.prepare_dataloaders()

log.info("✔️   prepared training data")

# TODO: Log sample images to MLFlow
img = DL_TRAIN.show_images(DL_TEST.testloader, n=10)

with experiment_tracker as experiment_tracker:
    experiment_tracker.log_figure(img, file_name="verify_data.png")

    early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=0.00, patience=50, verbose=False, mode="min")
    checkpoint_callback = ModelCheckpoint(dirpath='/tmp/best_after_fit', save_top_k=1, verbose=True, monitor='val_loss', mode='min') # Define a ModelCheckpoint callback to save the best model


    trainer = pl.Trainer(
        max_epochs=N_EPOCHS, 
        accelerator="gpu", 
        devices=1,
        callbacks=[
            early_stop_callback,
            checkpoint_callback
        ],
    )

    log.info("[ ] Starting training")

    trainer.fit(
        model=ResNet18(), 
        train_dataloaders=DL_TRAIN.trainloader, 
        val_dataloaders=DL_TRAIN.valloader, 
    )

    log.info("[x]  Training completed")

    best_model_checkpoint = checkpoint_callback.best_model_path # Load the best mode checkpoint into a variable
    print(best_model_checkpoint)
    loaded_model = ResNet18.load_from_checkpoint(best_model_checkpoint) # Load the best checkpointed model into a variable == uncalibrated model
    loaded_model.eval()    
    
    active_learner = TemperatureScaledActiveLearner(loaded_model, DL_TRAIN)
    al_infer_config = active_learner.get_config(manual_treshold=0.25)
    write_json(al_infer_config, './inference_artifacts/al_infer_config.json')

    log.info("[ ] logging inference artifacts...")
    for file in glob.glob(os.path.join("./inference_artifacts", "*.*")):
        experiment_tracker.log_artifact(file_name = file)
        log.info(f"[x] Logged to mlflow: {file}")

    log_names = ["/tmp/model.pkl","/tmp/transform.pkl"]
    objects = [trainer.model, TR]   
    
    for f_name, obj in zip(log_names, objects):
        experiment_tracker.log_artifact(file_name = f_name, artifact=obj)


    artifact = ModelArtifact(
        artifact_name=ResNet18.__name__,
        run_id = f"{experiment_tracker.experiment_id}/{experiment_tracker.run_id}"
    )
    artifact.write()



Overwriting components/train/app.py


In [4]:
%%writefile components/train/dockerfile
FROM pytorch/pytorch:2.2.0-cuda12.1-cudnn8-runtime as base


# Build stage for cloning the repository
FROM base as builder

WORKDIR /code

RUN apt-get update && apt-get install -y libglib2.0-0 libgl1-mesa-glx && rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/*


ARG GITHUB_INSTALL_TOKEN
COPY requirements.txt .
# install dependencies
RUN pip install git+https://${GITHUB_INSTALL_TOKEN}@github.com/tensoryze-dev/tensoryze_pipelines.git#egg=tensoryze_pipelines[ml] && \
    pip install -r requirements.txt && \
    rm -rf ~/.cache/pip


COPY inference_artifacts/ ./inference_artifacts
COPY app.py .
COPY .env .

# command to run on container start
CMD [ "python", "./app.py" ] 

Overwriting components/train/dockerfile


## **Model Testing**

##### **app.py**

In [5]:
%%writefile components/test/app.py
import pickle, os
from tensoryze_pipelines.entities.execution import ModelArtifact, DeploymentRule
from tensoryze_pipelines.deployment import read_local_model_id, DeploymentSolver
from tensoryze_pipelines.ml_testing import (
    TestFactory, MetricFactory, RunManager, TransformationFactory
)
from tensoryze_pipelines.modeling import OpticalInspectionDataloader, OpticalInspectionDataset, OpticalInspectionTransformation
from tensoryze_pipelines.io import (
    image_folder_to_dataset, ExperimentTrackingInterface, LakeFSClient, DataLakeInterface, MachineLearningDataLakeClient,
)
from tensoryze_pipelines.utils.logging import logger


model_artifact = ModelArtifact.read()
NEW_MODEL_ID = model_artifact.run_id

TEST_SPECIFICATION = r"testing_artifacts/test_specification.yaml"


client = LakeFSClient(interface=DataLakeInterface()) 
client = MachineLearningDataLakeClient(client)

_ = ExperimentTrackingInterface()

parsed_model_id = NEW_MODEL_ID.split("/")[1]

with open("/tmp/model.pkl", 'rb') as f:
    model = pickle.load(f)

local_path = client.download_dataset(folder = "/tmp", branch_name=client.test_branch_name)
X_test, y_test = image_folder_to_dataset(local_path, subfolder=client.test_branch_name)
  
logger.info("[ ] preparing training data")
TR = OpticalInspectionTransformation(img_size = 224)
DS_TEST = OpticalInspectionDataset(X_test, y_test, sensor = "vision_line", transform = TR.transform)
DL_TEST = OpticalInspectionDataloader(DS_TEST, log=logger, test_split=False, val_split=False)

logger.info("[x] prepared training data")

transformation_factory = TransformationFactory(224, None, DS_TEST, OpticalInspectionDataloader)
metric_factory = MetricFactory()
test_factory = TestFactory(model, transformation_factory, metric_factory)

run_manager = RunManager(specification_path=TEST_SPECIFICATION, model=model, test_factory=test_factory)
run_manager.execute_tests(run_id = parsed_model_id)

solver = DeploymentSolver(
    test_results=run_manager.test_results,
    hierarchy=run_manager.hierarchy_list,
)
solver.solve()

Overwriting components/test/app.py


##### **dockerfile**

In [7]:
%%writefile components/test/dockerfile

FROM pytorch/pytorch:2.2.0-cuda12.1-cudnn8-runtime as base

# Build stage for cloning the repository
FROM base as builder

WORKDIR /code

RUN apt-get update && apt-get install -y libglib2.0-0 libgl1-mesa-glx && rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/*


ARG GITHUB_INSTALL_TOKEN
COPY requirements.txt .
# install dependencies
RUN pip install git+https://${GITHUB_INSTALL_TOKEN}@github.com/tensoryze-dev/tensoryze_pipelines.git#egg=tensoryze_pipelines[ml] && \
    pip install -r requirements.txt && \
    rm -rf ~/.cache/pip

# copy the content of the local src directory to the working directory
COPY app.py .
COPY testing_artifacts/ ./testing_artifacts/
COPY inference_artifacts/ ./inference_artifacts/
COPY .env .


# command to run on container start
CMD [ "python", "./app.py" ] 

Overwriting components/test/dockerfile


## **CT Pipeline Definition**

In [None]:
#%%writefile pipeline/manifest.json
{
    "name": "pcb-demo-pipeline",                                 
    "kind": "ml-job",                                                        
    "pipeline_steps": {
        "ingestion": {
            "name": "ingest_pcb_data", "image": "pcb-demo-data/ingestion:latest"
        },
        "train": {
            "name": "train_resnet", "image": "pcb-demo-data/train:latest"
        },
        "test": {
            "name": "test_robustness", "image":"pcb-demo-data/test:latest"
        }
    },
    "pipeline_dag": {
        "root": ["ingestion"],
        "ingestion": ["train"],
        "train": ["test"],
        "test": ["end"]
    },
    "scheduler": {
        "kind": "TimeRESTScheduler",
        "condition": "* * *1 * *"
    }
}

## Deployment of Pipeline
1. make local_run
2. make push_to_registry
3. make register