# Azure ML v2 — Batch Endpoint: 3 modele (PL/EN/DE) na 1 endpoint + 3 compute clustery (min=0)

Ten notebook robi:
1. Logowanie do Azure ML (v2 SDK) i utworzenie `MLClient`
2. Utworzenie **3 AmlCompute clusterów** (`min_instances=0`) — brak kosztów, gdy nie ma jobów
3. Utworzenie (lub update) **jednego Batch Endpointu**
4. Utworzenie (lub update) **3 Batch Deploymentów** (PL/EN/DE), każdy przypięty do osobnego clustra
5. (Opcjonalnie) Uruchomienie 3 batch jobów równolegle (po jednym na język)

## Ważne o kosztach i opóźnieniach
- `min_instances=0` na clustrze oznacza: **0 VM działających bez jobów → 0 kosztu idle**.
- Pierwszy job po okresie bezczynności ma **cold start** (provisioning node’ów). To jest nieuniknione przy `min=0`.
- Żeby użytkownicy nie „czekali na siebie” przy równoległych językach, robimy **osobne clustery per język**.
- Dodatkowo ustawiamy `idle_time_before_scale_down` na **niski** poziom, aby VM szybko się wyłączały po jobie.

> Uwaga: Nie da się mieć jednocześnie: (a) zerowego kosztu idle i (b) absolutnie zerowego cold startu. Można tylko zbalansować.


## 0) Instalacja / wersje
Upewnij się, że masz `azure-ai-ml` i `azure-identity`.
Jeśli uruchamiasz lokalnie, odkomentuj komendę instalacji.

In [None]:
# %pip install -U azure-ai-ml azure-identity


## 1) Konfiguracja: Workspace, nazwy zasobów, modele, env, ścieżki kodu
Wypełnij pola poniżej.

In [None]:
# --- WORKSPACE ---
SUBSCRIPTION_ID = "<SUBSCRIPTION_ID>"
RESOURCE_GROUP  = "<RESOURCE_GROUP>"
WORKSPACE_NAME  = "<WORKSPACE_NAME>"

# --- BATCH ENDPOINT ---
BATCH_ENDPOINT_NAME = "doc-classifier-batch"  # nazwa jednego batch endpointu

# --- ENVIRONMENT (zarejestrowane w AML) ---
ENV_NAME    = "model-x-env"
ENV_VERSION = "5"

# --- MODELE (zarejestrowane w AML) ---
# Jeśli masz konkretne wersje - podaj je (zalecane w prod).
MODELS = {
    "pl": {"name": "model-x-pl", "version": "1"},
    "en": {"name": "model-x-en", "version": "1"},
    "de": {"name": "model-x-de", "version": "1"},
}

# --- CODE + SCORING SCRIPT ---
# Folder z kodem (np. zawiera score.py oraz dodatkowe pliki)
CODE_DIR = "./src"
SCORING_SCRIPT = "score.py"

# --- COMPUTE: 3 clustery (po jednym na język) ---
# Size dobierz do potrzeb; to jest przykład CPU.
COMPUTE_SIZE = "Standard_DS3_v2"

# min_instances=0 -> brak kosztów bez jobów
MIN_NODES = 0

# max_instances -> limit równoległości (ile VM może maksymalnie wystartować na klastrze)
# Ustaw tak, aby obsłużyć równoległe joby / wolumen dokumentów.
MAX_NODES_PER_LANG = {
    "pl": 4,
    "en": 4,
    "de": 4,
}

# Jak szybko klaster ma gasić VM po bezczynności (sekundy).
# Niższa wartość = mniej kosztu po jobie, ale częstsze cold starty.
IDLE_TIME_BEFORE_SCALE_DOWN = 120

# Równoległość wewnątrz node’a (ile „porcji pracy” naraz na jednej VM)
MAX_CONCURRENCY_PER_INSTANCE = 2


## 2) Logowanie i MLClient
Używamy `DefaultAzureCredential` (działa w Azure ML Compute Instance, VS Code + az login, Managed Identity itd.).

In [None]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

ml_client = MLClient(
    DefaultAzureCredential(),
    subscription_id=SUBSCRIPTION_ID,
    resource_group_name=RESOURCE_GROUP,
    workspace_name=WORKSPACE_NAME,
)

print("Connected to:", ml_client.workspaces.get(WORKSPACE_NAME).name)


## 3) Utworzenie 3 AmlCompute clusterów (min=0)
Każdy język dostaje własny cluster, co zapewnia izolację capacity:
- job DE nie może „zabrać” node’ów EN
- oba mogą skalować się niezależnie do `max_nodes`

Uwaga: cluster jako zasób może istnieć stale, ale **koszt generują tylko działające node’y**.


In [None]:
from azure.ai.ml.entities import AmlCompute

compute_names = {
    lang: f"cpu-batch-{lang}" for lang in MODELS.keys()
}

for lang, cname in compute_names.items():
    compute = AmlCompute(
        name=cname,
        size=COMPUTE_SIZE,
        min_instances=MIN_NODES,                    # 0 -> brak kosztów, gdy nie ma jobów
        max_instances=MAX_NODES_PER_LANG[lang],     # limit ile VM może wystartować
        idle_time_before_scale_down=IDLE_TIME_BEFORE_SCALE_DOWN,  # szybkie gaszenie po jobie
    )
    print(f"Creating/updating compute: {cname} (min={MIN_NODES}, max={MAX_NODES_PER_LANG[lang]})")
    ml_client.compute.begin_create_or_update(compute).result()

print("Compute clusters ready:", compute_names)


## 4) Pobranie Environment
Deployment batch potrzebuje środowiska (conda/docker) zarejestrowanego w AML.

In [None]:
env = ml_client.environments.get(name=ENV_NAME, version=ENV_VERSION)
print("Using env:", env.name, env.version)


## 5) Utworzenie (lub update) Batch Endpointu
Jeśli endpoint już istnieje, kod go zaktualizuje (bez niszczenia deploymentów).

In [None]:
from azure.ai.ml.entities import BatchEndpoint

endpoint = BatchEndpoint(
    name=BATCH_ENDPOINT_NAME,
    description="Batch endpoint for PL/EN/DE models (separate compute clusters, min=0).",
)

ml_client.batch_endpoints.begin_create_or_update(endpoint).result()
print("Batch endpoint ready:", BATCH_ENDPOINT_NAME)


## 6) Utworzenie 3 Batch Deploymentów (PL/EN/DE) na jednym endpoint
Każdy deployment:
- wskazuje inny model (PL/EN/DE)
- ma to samo `score.py`
- używa dedykowanego clustra (`compute=cpu-batch-<lang>`)

Dzięki temu równoległe joby na różne języki nie blokują się nawzajem (blokada byłaby tylko na poziomie ich własnego clustra).

In [None]:
from azure.ai.ml.entities import ModelBatchDeployment

deployment_names = {lang: f"deploy-{lang}" for lang in MODELS.keys()}

for lang, spec in MODELS.items():
    model = ml_client.models.get(name=spec["name"], version=spec["version"])
    dep = ModelBatchDeployment(
        name=deployment_names[lang],
        endpoint_name=BATCH_ENDPOINT_NAME,
        model=model,
        environment=env,
        code_configuration={
            "code": CODE_DIR,           # folder z kodem
            "scoring_script": SCORING_SCRIPT,  # entrypoint batch
        },
        compute=compute_names[lang],
        instance_count=1,  # zwykle zostawia się 1; klaster skaluje node’y wg potrzeby
        max_concurrency_per_instance=MAX_CONCURRENCY_PER_INSTANCE,  # równoległość per VM
    )

    print(f"Creating/updating deployment: {dep.name} -> model={model.name}:{model.version}, compute={compute_names[lang]}")
    ml_client.batch_deployments.begin_create_or_update(dep).result()

print("Deployments ready:", deployment_names)


## 7) (Opcjonalnie) Ustawienie default deployment
Batch endpoint nie ma traffic split jak online. Możesz ustawić deployment domyślny, jeśli czasem nie podajesz `deployment_name` przy jobie.
W praktyce przy routing po języku zwykle **zawsze podajesz deployment_name**.

In [None]:
endpoint = ml_client.batch_endpoints.get(BATCH_ENDPOINT_NAME)
endpoint.default_deployment_name = deployment_names["en"]  # np. EN jako default
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()
print("Default deployment set to:", endpoint.default_deployment_name)


## 8) (Opcjonalnie) Uruchomienie jobów równolegle: PL + EN + DE
Poniżej przykład jak odpalić trzy niezależne batch joby w tym samym czasie.

### Input/Output
- Ustaw `INPUT_*` na swoje ścieżki w datastore.
- `uri_folder` jest typowe przy dokumentach (PDF, itp.).


In [None]:
from azure.ai.ml.entities import BatchJob, Input
import time

INPUTS = {
    "pl": "azureml://datastores/workspaceblobstore/paths/in/pl/",
    "en": "azureml://datastores/workspaceblobstore/paths/in/en/",
    "de": "azureml://datastores/workspaceblobstore/paths/in/de/",
}

OUTPUTS = {
    "pl": "azureml://datastores/workspaceblobstore/paths/out/pl/",
    "en": "azureml://datastores/workspaceblobstore/paths/out/en/",
    "de": "azureml://datastores/workspaceblobstore/paths/out/de/",
}

submitted = {}

for lang in MODELS.keys():
    job = BatchJob(
        name=f"run-{lang}-{int(time.time())}",
        endpoint_name=BATCH_ENDPOINT_NAME,
        deployment_name=deployment_names[lang],  # routing po języku
        inputs={
            "input_data": Input(type="uri_folder", path=INPUTS[lang])
        },
        outputs={
            "output_data": Input(type="uri_folder", path=OUTPUTS[lang])
        },
    )

    created = ml_client.batch_jobs.begin_create_or_update(job).result()
    submitted[lang] = created.name
    print("Submitted:", lang, created.name)

print("All jobs submitted:", submitted)


## 9) Sprawdzenie statusu jobów
Compute ma `min=0`, więc po zakończeniu jobów VM zostaną automatycznie wygaszone po `idle_time_before_scale_down`.


In [None]:
def show_job(job_name: str):
    j = ml_client.batch_jobs.get(job_name)
    print(f"{job_name}: status={j.status}, created={j.creation_context.created_at}")

# Podmień na swoje job_id lub użyj 'submitted' ze wcześniejszej komórki.
# for lang, job_name in submitted.items():
#     show_job(job_name)
