In [None]:
import os
#from azure.ai.ml import MLClient, Input, MpiDistribution, command
from azure.ai.ml import MLClient, Input, Output, PyTorchDistribution, command
from azure.ai.ml.entities import (
    AmlCompute, Environment, BuildContext, Data,
    ManagedOnlineEndpoint, ManagedOnlineDeployment, CodeConfiguration, OnlineRequestSettings
)
from azure.identity import DefaultAzureCredential
from azure.ai.ml.constants import AssetTypes
import datetime

from dotenv import load_dotenv
load_dotenv(override=True)

# Azure ML workspace configuration
SUBSCRIPTION_ID = os.getenv("SUBSCRIPTION_ID")
RESOURCE_GROUP = os.getenv("RESOURCE_GROUP")
WORKSPACE_NAME = os.getenv("WORKSPACE_NAME")
COMPUTE_CLUSTER = "demo-gpucluster01"
HF_TOKEN = os.getenv("HF_TOKEN")


# authentication via managed identity or service principal (no hard-coded creds)
ml_client = MLClient(DefaultAzureCredential(), SUBSCRIPTION_ID, RESOURCE_GROUP, WORKSPACE_NAME)

# ensure compute cluster exists or create it
try:
    ml_client.compute.get(COMPUTE_CLUSTER)
except Exception:
    print("demo-gpucluster01 was not found")

<h5>Prepare Environment</h5>

In [None]:
os.makedirs("environment", exist_ok=True)
os.makedirs("src", exist_ok=True)

In [None]:
%%writefile ./environment/Dockerfile
FROM mcr.microsoft.com/aifx/acpt/stable-ubuntu2204-cu118-py310-torch271:biweekly.202509.1

# Install pip dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt --no-cache-dir
# Upgrade known vulnerable packages again just to be safe
RUN pip install --upgrade \
    requests==2.32.4 \
    urllib3==2.5.0 \
    pillow==11.3.0 || true
#ENV MAX_JOBS=32
#RUN pip install flash-attn==2.4.2 --no-build-isolation

# Repeat for other envs if applicable
RUN /opt/conda/bin/pip install --upgrade \
    requests==2.32.4 \
    urllib3==2.5.0 \
    pillow==11.3.0 || true

RUN /opt/conda/envs/ptca/bin/pip install --upgrade \
    requests==2.32.4 \
    urllib3==2.5.0 \
    pillow==11.3.0 || true

# Inference requirements
COPY --from=mcr.microsoft.com/azureml/o16n-base/python-assets:20230419.v1 /artifacts /var/
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
        libcurl4 \
        liblttng-ust1 \
        libunwind8 \
        libxml++2.6-2v5 \
        nginx-light \
        psmisc \
        rsyslog \
        runit \
        unzip && \
    apt-get clean && rm -rf /var/lib/apt/lists/* && \
    cp /var/configuration/rsyslog.conf /etc/rsyslog.conf && \
    cp /var/configuration/nginx.conf /etc/nginx/sites-available/app && \
    ln -sf /etc/nginx/sites-available/app /etc/nginx/sites-enabled/app && \
    rm -f /etc/nginx/sites-enabled/default

RUN apt-get update && \
    apt-get install -y --only-upgrade \
        libpython3.10-stdlib \
        python3.10 \
        libpython3.10-minimal \
        python3.10-minimal \
        libpam0g \
        libpam-modules-bin \
        libpam-modules \
        libpam-runtime \
        sudo && \
    apt-get clean && rm -rf /var/lib/apt/lists/*

ENV SVDIR=/var/runit
ENV WORKER_TIMEOUT=400
EXPOSE 5001 8883 8888

# support Deepspeed launcher requirement of passwordless ssh login
RUN apt-get update
RUN apt-get install -y openssh-server openssh-client

In [None]:
%%writefile ./environment/requirements.txt
azureml-core==1.60.0.post1
azureml-dataset-runtime==1.60.0
azureml-defaults==1.60.0
azure-ml==0.0.1
azure-ml-component==0.9.18.post2
azureml-mlflow==1.60.0.post1
azureml-contrib-services==1.60.0
azureml-contrib-services==1.60.0
azureml-inference-server-http
inference-schema
MarkupSafe==2.1.2
regex
pybind11
urllib3==2.5.0
requests==2.32.4
pillow==11.3.0
transformers==4.54.1
cryptography>=42.0.4
aiohttp>=3.12.14
py-spy==0.3.12
debugpy~=1.6.3
ipykernel~=6.0
tensorboard
psutil~=5.8.0
matplotlib~=3.5.0
tqdm~=4.66.3
py-cpuinfo==5.0.0
torch-tb-profiler~=0.4.0
peft==0.11.1
datasets
#accelerate==0.30.1
accelerate
#deepspeed==0.15.1
Levenshtein
av>=12
ninja
#bitsandbytes==0.43.1
trl>=0.20.0
trackio
pynvml

In [None]:
env_name = "env-phi35v-03"
docker_dir="./environment"

env_docker_image = Environment(
    build=BuildContext(path=docker_dir),
    name=env_name,
    description="Environment created from a Docker context.",
)
env_asset = ml_client.environments.create_or_update(env_docker_image)

<h3>Prepare Dataset</h3>
<br>Run the following command

```bash
python convert_ucf101.py --out_dir /path/to/converted_ucf101
```
<br>Then upload the folder to somewhere on default blobstore
<br>In this script, it is referred to by the uri
"azureml://datastores/workspaceblobstore/paths/converted_ucf101/"
<br>It is possible to directly load data using "Register Dataset" cell below, but will get timeout.

<h5>Preview Data</h5>

In [None]:
# Preview converted UCF101 dataset (jsonl structure) and build label map
import os, json, itertools
from collections import Counter
from pathlib import Path

DATA_DIR = Path("./converted_ucf101")  # local relative (mounted in AML job as well)
if not DATA_DIR.exists():
    # fallback to workspaceblobstore mount path in AML run context (user mode this may not resolve)
    print("[WARN] Local converted_ucf101 not found. Ensure this path exists when running locally.")

train_file = DATA_DIR / "ucf101_train.jsonl"
val_file = DATA_DIR / "ucf101_val.jsonl"
print("Train file:", train_file, "Exists:", train_file.exists())
print("Val file:", val_file, "Exists:", val_file.exists())

example_lines = []
if train_file.exists():
    with open(train_file, 'r', encoding='utf-8') as f:
        for _ in range(3):
            line = f.readline()
            if not line:
                break
            example_lines.append(json.loads(line))

print("Sample converted entries (truncated):")
for ex in example_lines:
    conv = ex['conversations'][0]
    print({k: (str(v)[:80] + '...') if isinstance(v, str) else v for k, v in ex.items() if k != 'conversations'})
    print(" images:", conv['images'][:3], " user:", conv['user'][:60], " label:", conv['assistant'])

# Build class label list
all_labels = []
if train_file.exists():
    with open(train_file, 'r', encoding='utf-8') as f:
        for line in f:
            if not line.strip():
                continue
            obj = json.loads(line)
            all_labels.append(obj['conversations'][0]['assistant'])
label_freq = Counter(all_labels)
class_labels = sorted(label_freq.keys())
label2id = {c:i for i,c in enumerate(class_labels)}
id2label = {i:c for c,i in label2id.items()}
print(f"Classes ({len(class_labels)}):", class_labels)
print("Label freq (top 5):", label_freq.most_common(5))


<h5>Register Dataset</h5>

In [None]:
#data_uri = "./converted_ucf101/"
data_uri = "azureml://datastores/workspaceblobstore/paths/converted_ucf101/"

data = Data(
    path = data_uri,
    type = AssetTypes.URI_FOLDER,
    description = "phi35vft-ucf101",
    name = "phi35vft-ucf101-02",
    version = '1'
)
ml_client.data.create_or_update(data)

<h5>Data Collators</h5>

In [None]:
%%writefile ./src/phi3v_dataset.py
import copy
import json
from pathlib import Path

import torch
from PIL import Image
from torch.utils.data import Dataset

IGNORE_INDEX = -100


def pad_sequence(sequences, padding_side='right', padding_value=0):
    """Pad a list of 1D tensors to the same length and stack."""
    assert padding_side in ['right', 'left']
    max_len = max(seq.size(0) for seq in sequences)
    batch_size = len(sequences)
    device = sequences[0].device
    dtype = sequences[0].dtype
    out = torch.full((batch_size, max_len), padding_value, dtype=dtype, device=device)
    for i, seq in enumerate(sequences):
        length = seq.size(0)
        if padding_side == 'right':
            out[i, :length] = seq
        else:
            out[i, -length:] = seq
    return out


class Phi3VDataset(Dataset):
    """
    A minimal implementation that stays as close as possible to the original PhiCookBook structure.
    - Processes one example in __getitem__ (image → prompt → tokenization)
    - Converts input_ids / labels to list[int] (to avoid tensor-based truth value errors in TRL's truncate_dataset)
    - If max_seq_length is specified, performs a simple trim keeping the right end
    - map() is a no-op to satisfy TRL's .map call (within truncate_dataset)
    → Column-based operations are disabled since this is not a Hugging Face datasets.Dataset
    """
    def __init__(self, jsonl_file: str, image_dir: str, processor, max_seq_length: int | None = None):
        self.image_dir = Path(image_dir)
        with open(jsonl_file, 'r', encoding='utf-8') as f:
            self.examples = [json.loads(line) for line in f]
        self.processor = processor
        self.max_seq_length = max_seq_length

    def __len__(self):
        return len(self.examples)

    def shard(self, num_shards, shard_id):
        sharded = copy.deepcopy(self)
        sharded.examples = [self.examples[i] for i in range(shard_id, len(self.examples), num_shards)]
        return sharded

    def _get_inputs(self, user_text, image_paths):
        images = [Image.open(self.image_dir / image_path) for image_path in image_paths]
        image_tag_text = ''.join([f'<|image_{i}|>' for i in range(1, len(images) + 1)])
        prompt_message = {'role': 'user', 'content': f'{image_tag_text}\n{user_text}'}
        prompt = self.processor.tokenizer.apply_chat_template(
            [prompt_message], tokenize=False, add_generation_prompt=True
        )
        inputs = self.processor(prompt, images, return_tensors='pt')
        return inputs

    def _truncate_pair(self, ids: torch.Tensor, labels: torch.Tensor):
        if self.max_seq_length is None:
            return ids, labels
        if ids.size(0) <= self.max_seq_length:
            return ids, labels
        return ids[-self.max_seq_length:], labels[-self.max_seq_length:]

    def __getitem__(self, idx):
        example = self.examples[idx]
        all_input_ids = []
        all_labels = []
        all_pixel_values = []
        all_image_sizes = []

        for turn in example['conversations']:
            inputs = self._get_inputs(turn['user'], turn['images'])
            prompt_input_ids = inputs['input_ids']  # (1, P)
            assistant_text = turn['assistant']
            response = f'{assistant_text}<|end|>\n<|endoftext|>'
            response_input_ids = self.processor.tokenizer(
                response, add_special_tokens=False, return_tensors='pt'
            )['input_ids']  # (1, R)

            input_ids = torch.cat([prompt_input_ids, response_input_ids], dim=1).squeeze(0)
            labels = torch.cat([
                torch.full((prompt_input_ids.size(1),), IGNORE_INDEX, dtype=torch.long),
                response_input_ids.squeeze(0)
            ], dim=0)

            all_input_ids.append(input_ids)
            all_labels.append(labels)
            all_pixel_values.append(inputs['pixel_values'])
            all_image_sizes.append(inputs['image_sizes'])

        input_ids = torch.cat(all_input_ids, dim=0)
        labels = torch.cat(all_labels, dim=0)
        input_ids, labels = self._truncate_pair(input_ids, labels)
        pixel_values = torch.cat(all_pixel_values, dim=0)
        image_sizes = torch.cat(all_image_sizes, dim=0)

        return {
            'id': example['id'],
            'input_ids': input_ids.tolist(),   # list[int]
            'labels': labels.tolist(),         # list[int]
            'pixel_values': pixel_values,      # tensor
            'image_sizes': image_sizes,        # tensor
        }

    # Required for TRL's truncate_dataset which calls dataset.map(truncate, ...).
    # Since this is not a Hugging Face datasets.Dataset, simply return self without doing anything.
    def map(self, function, *args, **kwargs):  # pragma: no cover
        return self


class Phi3VDataCollator:
    def __init__(self, pad_token_id: int):
        self.pad_token_id = pad_token_id

    def __call__(self, examples):
        batch_input_ids = [torch.tensor(ex['input_ids'], dtype=torch.long) for ex in examples]
        batch_label_ids = [torch.tensor(ex['labels'], dtype=torch.long) for ex in examples]
        batch_pixel_values = [ex['pixel_values'] for ex in examples]
        batch_image_sizes = [ex['image_sizes'] for ex in examples]

        input_ids = pad_sequence(batch_input_ids, padding_side='right', padding_value=self.pad_token_id)
        attention_mask = input_ids != self.pad_token_id
        labels = pad_sequence(batch_label_ids, padding_side='right', padding_value=IGNORE_INDEX)
        pixel_values = torch.cat(batch_pixel_values, dim=0)
        image_sizes = torch.cat(batch_image_sizes, dim=0)

        return {
            'input_ids': input_ids,
            'labels': labels,
            'attention_mask': attention_mask,
            'pixel_values': pixel_values,
            'image_sizes': image_sizes,
        }


class Phi3VEvalDataset(Phi3VDataset):
    def __getitem__(self, idx):
        example = self.examples[idx]
        messages = []
        all_images = []
        for i, turn in enumerate(example['conversations']):
            images = [Image.open(self.image_dir / image_path) for image_path in turn['images']]
            image_tag_text = ''.join([f'<|image_{i}|>' for i in range(1, len(images) + 1)])
            prompt_message = {'role': 'user', 'content': f"{image_tag_text}\n{turn['user']}"}
            messages.append(prompt_message)
            all_images.extend(images)
            if i + 1 == len(example['conversations']):
                break
            response_message = {'role': 'assistant', 'content': f"{turn['assistant']}<|end|>\n<|endoftext|>"}
            messages.append(response_message)
        prompt = self.processor.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        inputs = self.processor(prompt, all_images, return_tensors='pt')
        answer = example['conversations'][-1].get('assistant')
        return {
            'id': example['id'],
            'input_ids': inputs['input_ids'].squeeze(0).tolist(),
            'pixel_values': inputs['pixel_values'],
            'image_sizes': inputs['image_sizes'],
            'answer': answer,
        }


class Phi3VEvalDataCollator(Phi3VDataCollator):
    def __call__(self, examples):
        unique_ids = [ex['id'] for ex in examples]
        batch_input_ids = [torch.tensor(ex['input_ids'], dtype=torch.long) for ex in examples]
        batch_pixel_values = [ex['pixel_values'] for ex in examples]
        batch_image_sizes = [ex['image_sizes'] for ex in examples]
        answers = [ex['answer'] for ex in examples]

        input_ids = pad_sequence(batch_input_ids, padding_side='left', padding_value=self.pad_token_id)
        attention_mask = input_ids != self.pad_token_id
        pixel_values = torch.cat(batch_pixel_values, dim=0)
        image_sizes = torch.cat(batch_image_sizes, dim=0)

        return {
            'unique_ids': unique_ids,
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'pixel_values': pixel_values,
            'image_sizes': image_sizes,
            'answers': answers,
        }

<h5>Processor Patch</h5>

In [None]:
%%writefile ./src/processor_patch.py
"""
Processor patch utilities for robust save_pretrained behavior.

Why this exists:
- Upstream AutoProcessor for multimodal Phi models occasionally references optional
  attributes (e.g., chat_template, audio_tokenizer, video_processor) that may not be
  present depending on model / transformers version.
- During Trainer checkpoint saves, AttributeError would abort training unless we guard it.

Usage:
    from processor_patch import patch_processor_save
    processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
    patch_processor_save(processor)

Extension:
- If new optional attributes appear upstream, append them to OPTIONAL_ATTRS below.
- For silent mode (suppress warnings), set env PHI_PROC_PATCH_SILENT=1.
"""
from __future__ import annotations
import os
from typing import List, Optional

OPTIONAL_ATTRS: List[str] = [
    "chat_template",
    "audio_tokenizer",
    "video_processor",
]

WARN_PREFIX = "[PROC-PATCH]"


def _log(msg: str):  # minimal logging helper
    if os.getenv("PHI_PROC_PATCH_SILENT"):
        return
    print(f"{WARN_PREFIX} {msg}")


def ensure_optional_placeholders(processor, extra: Optional[List[str]] = None):
    attrs = OPTIONAL_ATTRS + (extra or [])
    for attr in attrs:
        if not hasattr(processor, attr):
            setattr(processor, attr, None)
    return processor


def safe_save_processor_components(processor, out_dir: str):
    # tokenizer
    tok = getattr(processor, "tokenizer", None)
    if tok is not None:
        try:
            tok.save_pretrained(out_dir)
        except Exception as e:
            _log(f"tokenizer save failed: {e}")
    # possible vision components
    for attr in ["image_processor", "feature_extractor", "vision_processor"]:
        comp = getattr(processor, attr, None)
        if comp is not None:
            try:
                comp.save_pretrained(out_dir)
            except Exception as e:
                _log(f"{attr} save failed: {e}")


def patch_processor_save(processor, optional_attrs: Optional[List[str]] = None, verbose: bool = True):
    """Monkeypatch processor.save_pretrained to be resilient.

    1. Ensure optional attrs exist (set to None if missing)
    2. Wrap original save_pretrained; on AttributeError (or generic Exception) fall back
       to component-wise saves.
    """
    ensure_optional_placeholders(processor, optional_attrs)
    original = getattr(processor, "save_pretrained", None)

    def _safe_save_pretrained(save_directory, *args, **kwargs):
        if original is not None:
            try:
                return original(save_directory, *args, **kwargs)
            except AttributeError as e:
                if verbose:
                    _log(f"AttributeError caught: {e}; falling back to component saves.")
            except Exception as e:  # catch-all so training never aborts on save
                if verbose:
                    _log(f"{type(e).__name__} during save_pretrained: {e}; falling back to component saves.")
        safe_save_processor_components(processor, save_directory)

    processor.save_pretrained = _safe_save_pretrained  # type: ignore
    if verbose:
        _log("processor.save_pretrained patched (resilient mode enabled)")
    return processor


<h5> Training code

In [None]:
%%writefile ./src/train.py
"""
Minimal Phi-3.5 Vision LoRA fine-tuning script (UCF101) aligned closely with PhiCookBook original.
- Uses TRL SFTTrainer
- Static LoRA target modules (language + vision)
- Optional bf16 (preferred) fallback to fp16
- Keeps remove_unused_columns=False for multimodal batches
- Merges LoRA adapter into base model after training (for simplified deployment)

Processor save resilience is externalized to processor_patch.py to keep this script focused.
"""

import argparse
from pathlib import Path
import torch
from transformers import AutoProcessor, AutoModelForCausalLM
from peft import LoraConfig, get_peft_model
from trl import SFTConfig, SFTTrainer

from processor_patch import patch_processor_save  # externalized monkeypatch


def parse_args():
    p = argparse.ArgumentParser()
    p.add_argument('--data_dir', type=str, required=True, help='Folder with ucf101_train.jsonl / ucf101_val.jsonl / images/')
    p.add_argument('--train_file', type=str, default='ucf101_train.jsonl')
    p.add_argument('--val_file', type=str, default='ucf101_val.jsonl')
    p.add_argument('--model_name', type=str, default='microsoft/phi-3.5-vision-instruct')
    p.add_argument('--output_dir', type=str, default='./outputs')
    p.add_argument('--epochs', type=int, default=1)
    p.add_argument('--lr', type=float, default=5e-5)
    p.add_argument('--warmup_ratio', type=float, default=0.03)
    p.add_argument('--batch_size', type=int, default=1)
    p.add_argument('--grad_accum', type=int, default=16)
    p.add_argument('--logging_steps', type=int, default=25)
    p.add_argument('--save_steps', type=int, default=200)
    p.add_argument('--lora_r', type=int, default=16)
    p.add_argument('--lora_alpha', type=int, default=32)
    p.add_argument('--lora_dropout', type=float, default=0.05)
    p.add_argument('--seed', type=int, default=42)
    p.add_argument('--bf16', action='store_true', help='Use bfloat16 if supported, else fallback to fp16.')
    return p.parse_args()


def set_seed(seed: int):
    import random, numpy as np
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)


def select_dtype(use_bf16: bool):
    if not torch.cuda.is_available():
        raise RuntimeError('CUDA GPU required for this vision fine-tuning script.')
    if use_bf16 and torch.cuda.is_bf16_supported():
        return torch.bfloat16
    return torch.float16


def create_lora_config(rank: int, alpha: int, dropout: float):
    # Close to cookbook: explicit module list (language + vision)
    target_modules = [
        # language side
        'qkv_proj', 'o_proj', 'down_proj', 'gate_up_proj', 'lm_head',
        # vision side (attention + MLP + projection)
        'q_proj', 'k_proj', 'v_proj', 'out_proj', 'fc1', 'fc2', 'img_projection.0', 'img_projection.2'
    ]
    return LoraConfig(
        r=rank,
        lora_alpha=alpha,
        lora_dropout=dropout,
        target_modules=target_modules,
    )


def main():
    args = parse_args()
    set_seed(args.seed)

    torch_dtype = select_dtype(args.bf16)
    print(f"[INFO] Using dtype={torch_dtype}")

    # Load processor & base model
    processor = AutoProcessor.from_pretrained(args.model_name, trust_remote_code=True)
    patch_processor_save(processor)  # install resilient save behavior

    model = AutoModelForCausalLM.from_pretrained(
        args.model_name,
        attn_implementation='flash_attention_2',  # assumes flash-attn installed via environment or launcher
        torch_dtype=torch_dtype,
        trust_remote_code=True,
    )

    # Apply LoRA
    lora_cfg = create_lora_config(
        args.lora_r,
        args.lora_alpha,
        args.lora_dropout
        )
    print(f"[INFO] LoRA target modules ({len(lora_cfg.target_modules)}): {lora_cfg.target_modules}")

    model = get_peft_model(model, lora_cfg)
    model.print_trainable_parameters()

    # Datasets
    data_dir = Path(args.data_dir)
    train_path = data_dir / args.train_file
    val_path = data_dir / args.val_file

    from phi3v_dataset import Phi3VDataset, Phi3VDataCollator
    print(f"[INFO] Loading train dataset: {train_path}")
    train_ds = Phi3VDataset(str(train_path), str(data_dir / 'images'), processor)
    eval_ds = Phi3VDataset(str(val_path), str(data_dir / 'images'), processor) if val_path.exists() else None

    pad_token_id = processor.tokenizer.pad_token_id or processor.tokenizer.eos_token_id
    collator = Phi3VDataCollator(pad_token_id=pad_token_id)

    training_args = SFTConfig(
        output_dir=args.output_dir,
        num_train_epochs=args.epochs,
        per_device_train_batch_size=args.batch_size,
        gradient_accumulation_steps=args.grad_accum,
        learning_rate=args.lr,
        warmup_ratio=args.warmup_ratio,
        logging_steps=args.logging_steps,
        save_steps=args.save_steps,
        save_total_limit=2,
        bf16=(torch_dtype == torch.bfloat16),
        fp16=(torch_dtype == torch.float16),
        gradient_checkpointing=True,
        gradient_checkpointing_kwargs={'use_reentrant': False},
        ddp_find_unused_parameters=False,
        report_to="trackio", # changed from none
        remove_unused_columns=False,  # keep pixel_values, image_sizes
    )

    trainer = SFTTrainer(
        model=model,
        args=training_args,
        train_dataset=train_ds,
        eval_dataset=eval_ds,
        data_collator=collator,
        processing_class=processor,  # safe due to patch
    )

    trainer.train()

    print('[INFO] Saving final adapter & processor ...')
    trainer.model.save_pretrained(args.output_dir)
    processor.save_pretrained(args.output_dir)

    # Merge LoRA adapter into base model for simplified deployment
    try:
        print('[INFO] Merging LoRA adapter into base model...')
        merged_model = trainer.model.merge_and_unload()
        merged_dir = Path(args.output_dir) / "merged"
        merged_dir.mkdir(parents=True, exist_ok=True)
        
        # Use PyTorch bin format to handle tied weights (embed_tokens <-> vision_embed_tokens.wte)
        merged_model.save_pretrained(str(merged_dir), safe_serialization=False)
        processor.save_pretrained(str(merged_dir))
        print(f'[INFO] Merged model saved at: {merged_dir}')
    except Exception as e:
        import traceback
        print(f'[WARN] Could not merge model automatically: {e}')
        traceback.print_exc()
        print('[WARN] Adapter-only checkpoint still available in output_dir.')

    print('[INFO] Done.')


if __name__ == '__main__':
    main()


<h5>Training shell</h5>

In [None]:
%%writefile ./src/train.sh
#!/usr/bin/env bash
# Lightweight launcher that installs flash-attn (if not already present) then delegates to train.py
# Usage: bash train.sh --data_dir <path> --train_file <jsonl> --val_file <jsonl> [other train.py args]
# You can still run python train.py directly via AML command() if you do not need flash-attn.

set -euxo pipefail

# Adjust CUDA_TAG if you change the base image (e.g. cu121 for CUDA 12.1). Default aligns with current base image.
CUDA_TAG=${CUDA_TAG:-cu118}

# Limit parallel build jobs to reduce memory pressure on shared GPU nodes.
export MAX_JOBS=$(python -c "import os; n=min(8, os.cpu_count() or 8); print(n)")

echo "[INFO] Using CUDA_TAG=${CUDA_TAG} MAX_JOBS=${MAX_JOBS}" 

LOCK=/tmp/train_install.lock
flock -x "$LOCK" bash -c '
# Ensure recent pip & ninja (ninja accelerates any light builds triggered by wheel fallbacks)
python -m pip install -U pip ninja

# Install flash-attn (idempotent). Prefer pre-built wheels from vendor index.
python -m pip install --no-build-isolation --prefer-binary \
  --extra-index-url https://flash-attn.ai/whl/${CUDA_TAG} \
  flash-attn==2.4.2 || echo "[WARN] flash-attn install failed or already satisfied"

python -m pip install -U pip
python -m pip install --prefer-binary deepspeed==0.15.1 || true
'

echo "[INFO] Launching training (delegating to train.py)"
python train.py "$@"

<h5>Submit Job</h5>

In [None]:
# AML job configuration for Phi-3.5 Vision fine-tuning on UCF101
NUM_NODES = 1  # Start single node; scale after validation
NUM_GPU_PER_NODE = 1

# Distributed config (can expand to multi-node later)
dist = PyTorchDistribution(
    process_count_per_instance=NUM_GPU_PER_NODE,
    node_count=NUM_NODES
)

# Data asset folder created earlier: phi35vft-ucf101 version 1 contains jsonl + images
# We mount the folder and point --data_dir to the mount root.

vision_job = command(
    code="./src",
    command=(
        "bash -lc "
        "'./train.sh "
        #"python train.py "
        "--data_dir ${{inputs.data_dir}} "
        "--train_file ucf101_train.jsonl "
        "--val_file ucf101_val.jsonl "
        "--model_name microsoft/phi-3.5-vision-instruct "
        "--epochs 1 "
        "--batch_size 1 "
        "--grad_accum 16 "
        "--lr 5e-5 "
        "--warmup_ratio 0.03 "
        "--save_steps 200 "
        "--logging_steps 5 "
        "--bf16 '"  # model script enforces flash-attn with half precision"
    ),
    inputs={
        "data_dir": Input(
            type=AssetTypes.URI_FOLDER,
            path="phi35vft-ucf101:1"  # Explicit version
        )
    },
    outputs={                         # not used here
        "model_dir": Output(
            type=AssetTypes.URI_FOLDER,
            mode="rw_mount",
            path="azureml://datastores/workspaceblobstore/paths/models/phi35v-ucf101"
        )
    },
    environment="env-phi35v-03:2",  # Built earlier in this notebook
    compute=COMPUTE_CLUSTER,
    display_name="phi35v-vision-ucf101-ft",
    experiment_name="phi35v-vision-ft-exp",
    instance_count=NUM_NODES,
    distribution=dist,
    environment_variables={
        "NCCL_DEBUG": "WARN",
        "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True",
        "HF_TOKEN": HF_TOKEN or "",
        "HF_HOME": "./outputs/hfhome",
        "TRACKIO_PROJECT": "ft-project",
    }
)
returned_job = ml_client.jobs.create_or_update(vision_job)
print(returned_job.studio_url)

<h5> Register Model </h5>

In [None]:
model_path_from_job = "azureml://jobs/{0}/outputs/{1}".format(
    returned_job.name, "artifacts/outputs/merged"
)

print("path to register model: ", model_path_from_job)

In [None]:
#model_path_from_job = "azureml://jobs/dynamic_reggae_snc3rdfp9r/outputs/artifacts/outputs/merged"

In [None]:
from azure.ai.ml.entities import Model
model = Model(
    path=model_path_from_job,  # Reference to training job output
    name="phi-35-vision-ucf101",  # Model name in registry
    description="LoRA fine-tuned phi-35-vision model for UCF101 dataset",
    type="custom_model",   # Hugging Face models are commonly registered as custom_model
    version="2",             # Explicit version (or omit for auto increment)
)

registered_model = ml_client.models.create_or_update(model)
print(f"Registered: {registered_model.name}:{registered_model.version}")

<h5> Score.py </h5>

In [None]:
%%writefile ./src/score.py
"""
Phi-3.5 Vision UCF101 Action Classification Inference Script
Handles multi-frame video classification using fine-tuned merged model.
"""
import os
import json
import base64
from io import BytesIO
import torch
from PIL import Image
from transformers import AutoProcessor, AutoModelForCausalLM


# UCF101 class list (10 classes as shown in test data)
UCF101_CLASSES = [
    "ApplyEyeMakeup", "ApplyLipstick", "Archery", "BabyCrawling", "BalanceBeam",
    "BandMarching", "BaseballPitch", "Basketball", "BasketballDunk", "BenchPress"
]


def init():
    """Initialize model and processor from merged checkpoint."""
    global model, processor
    
    model_root = os.environ.get("AZUREML_MODEL_DIR", ".")
    model_dir = os.path.join(model_root, os.getenv("MODEL_SUBDIR", "merged"))
    
    print(f"[INFO] Loading model from: {model_dir}")
    
    # Load processor (handles tokenizer + image_processor)
    processor = AutoProcessor.from_pretrained(
        model_dir,
        trust_remote_code=True,
        local_files_only=True
    )
    
    # Load merged model (base + LoRA already fused)
    model = AutoModelForCausalLM.from_pretrained(
        model_dir,
        trust_remote_code=True,
        torch_dtype=(torch.bfloat16 if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else torch.float16),
        device_map=("auto" if torch.cuda.is_available() else "cpu"),
        local_files_only=True,
    )
    model.eval()
    print(f"[INFO] Model loaded successfully on device: {model.device}")


def _load_images_from_request(data):
    """
    Load images from request payload.
    Supports:
    - "image_paths": list of file paths (for local testing)
    - "images": list of base64-encoded image strings
    - "image": single base64-encoded image string
    """
    images = []
    
    # Option 1: Multiple image paths (local file testing)
    if "image_paths" in data:
        for path in data["image_paths"]:
            images.append(Image.open(path).convert("RGB"))
    
    # Option 2: Multiple base64 images
    elif "images" in data:
        for img_b64 in data["images"]:
            img_bytes = base64.b64decode(img_b64)
            images.append(Image.open(BytesIO(img_bytes)).convert("RGB"))
    
    # Option 3: Single base64 image (fallback)
    elif "image" in data:
        img_bytes = base64.b64decode(data["image"])
        images.append(Image.open(BytesIO(img_bytes)).convert("RGB"))
    
    else:
        raise ValueError("Request must contain 'image_paths', 'images', or 'image' field")
    
    return images


def run(raw_data):
    """
    Run inference on UCF101 action classification.
    
    Expected input format:
    {
        "images": ["<base64>", "<base64>", ...],  # or "image_paths" for local test
        "prompt": "Classify the video...",  # optional, uses default if not provided
        "max_new_tokens": 50  # optional
    }
    
    Returns:
    {
        "predicted_class": "ApplyLipstick",
        "raw_output": "ApplyLipstick"
    }
    """
    try:
        data = json.loads(raw_data) if isinstance(raw_data, str) else raw_data
        
        # Load images
        images = _load_images_from_request(data)
        print(f"[INFO] Loaded {len(images)} images for inference")
        
        # Build prompt (default to UCF101 classification task)
        user_prompt = data.get(
            "prompt",
            f"Classify the video into one of the following classes: {', '.join(UCF101_CLASSES)}."
        )
        
        # Build image tags for multi-frame input
        image_tag_text = ''.join([f'<|image_{i}|>' for i in range(1, len(images) + 1)])
        
        # Create user message with chat template
        messages = [
            {"role": "user", "content": f"{image_tag_text}\n{user_prompt}"}
        ]
        
        # Apply chat template
        prompt = processor.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True
        )
        
        # Preprocess (tokenize + vision encoding)
        inputs = processor(prompt, images, return_tensors="pt")
        
        # Move to device
        inputs = {k: v.to(model.device) if isinstance(v, torch.Tensor) else v 
                  for k, v in inputs.items()}
        
        # Generation config
        max_new_tokens = int(data.get("max_new_tokens", 50))
        
        # Generate (use_cache=False to avoid DynamicCache.seen_tokens AttributeError)
        with torch.no_grad():
            output_ids = model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                do_sample=False,  # Greedy for classification
                use_cache=False,  # Disable KV cache to avoid trust_remote_code compatibility issues
                pad_token_id=processor.tokenizer.pad_token_id or processor.tokenizer.eos_token_id,
            )
        
        # Decode (remove prompt echo)
        input_len = inputs["input_ids"].shape[1]
        generated_ids = output_ids[0, input_len:]
        raw_output = processor.tokenizer.decode(generated_ids, skip_special_tokens=True).strip()
        
        # Extract predicted class (simple heuristic: first matching class name)
        predicted_class = None
        for cls in UCF101_CLASSES:
            if cls.lower() in raw_output.lower():
                predicted_class = cls
                break
        
        return json.dumps({
            "predicted_class": predicted_class,
            "raw_output": raw_output
        }, ensure_ascii=False)
    
    except Exception as e:
        import traceback
        error_trace = traceback.format_exc()
        print(f"[ERROR] {error_trace}")
        return json.dumps({
            "error": str(e),
            "traceback": error_trace
        }, ensure_ascii=False)


<h5> Create Endpoint </h5>

In [None]:
endpoint_name = f"phi35v-jp-{datetime.datetime.now():%m%d%H%M}"

# 1) Create endpoint
endpoint = ManagedOnlineEndpoint(
    name=endpoint_name, 
    description="phi-3.5-vision UCF101 demo endpoint",
    auth_mode="key")
ml_client.begin_create_or_update(endpoint).result()

<h5> Deploy Model </h5>

In [None]:
#endpoint_name="phi35v-jp-10150503"

In [None]:
# 2) Deploy (refer to the already registered "merged" model in Model Registry)
registered = ml_client.models.get("phi-35-vision-ucf101", version="2")

deploy = ManagedOnlineDeployment(
    name="blue",
    endpoint_name=endpoint_name,
    model=registered.id,
    environment="env-phi35v-01a@latest",
    code_configuration=CodeConfiguration(code="./src", scoring_script="score.py"),
    instance_type="Standard_NC40ads_H100_v5", 
    instance_count=1,
    request_settings=OnlineRequestSettings(
        request_timeout_ms=180000,
        max_concurrent_requests_per_instance=1,
        max_queue_wait_ms=180000,
    ),
)
ml_client.begin_create_or_update(deploy).result()

<h5> Change the traffic </h5>

In [None]:
# 3) Traffic routing
ep = ml_client.online_endpoints.get(endpoint_name)
ep.traffic = {"blue": 100}
ml_client.begin_create_or_update(ep).result()

print("endpoint:", endpoint_name)

<h5> Send Request </h5>

In [None]:
# Test UCF101 inference endpoint using test_payload format
import json
import base64
import requests
from pathlib import Path
from io import BytesIO
from PIL import Image

# Get endpoint credentials
keys = ml_client.online_endpoints.get_keys(name=endpoint_name).primary_key
scoring_url = ml_client.online_endpoints.get(endpoint_name).scoring_uri
headers = {"Authorization": f"Bearer {keys}", "Content-Type": "application/json"}

# Load test example from UCF101 test set
test_jsonl = Path("./converted_ucf101/ucf101_test.jsonl")
image_dir = Path("./converted_ucf101/images")

if test_jsonl.exists():
    with open(test_jsonl, 'r', encoding='utf-8') as f:
        test_example = json.loads(f.readline())
    
    # Build test_payload (same format as local test)
    test_payload = {
        "image_paths": [str(image_dir / img_path) for img_path in test_example['conversations'][0]['images']],
        "prompt": test_example['conversations'][0]['user'],
        "max_new_tokens": 50
    }
    
    print(f"[INFO] Test payload prepared:")
    print(f"  - Images: {len(test_payload['image_paths'])} frames")
    print(f"  - Ground truth: {test_example['conversations'][0]['assistant']}")
    
    # Convert image_paths to base64 for endpoint request
    encoded_images = []
    for img_path in test_payload["image_paths"]:
        img = Image.open(img_path).convert("RGB")
        buffer = BytesIO()
        img.save(buffer, format="JPEG")
        img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
        encoded_images.append(img_b64)
    
    # Build endpoint payload (convert image_paths -> images with base64)
    endpoint_payload = {
        "images": encoded_images,  # base64-encoded images instead of paths
        "prompt": test_payload["prompt"],
        "max_new_tokens": test_payload["max_new_tokens"]
    }
    
    print(f"\n[INFO] Sending request with {len(encoded_images)} frames")
    
    # Send request
    res = requests.post(scoring_url, headers=headers, json=endpoint_payload, timeout=300)
    print(f"[INFO] Status code: {res.status_code}")
    
    if res.status_code == 200:
        result = res.json()
        if isinstance(result, str):
            result = json.loads(result)
        
        print("\n[RESULT]")
        print(f"  Predicted class: {result.get('predicted_class')}")
        print(f"  Raw output: {result.get('raw_output')}")
        print(f"  Match: {result.get('predicted_class') == test_example['conversations'][0]['assistant']}")
    else:
        print(f"[ERROR] {res.text}")
else:
    print("[WARN] Test file not found. Cannot send request.")

<h5> Delete Endpoint </h5>

In [None]:
ml_client.online_endpoints.begin_delete(name=endpoint_name).wait()

<h5> Problem Detection </h5>

In [None]:
print(res.status_code)
print(res.headers.get("content-type"))
print(res.text[:1000])  

In [None]:
ml_client.online_deployments.get_logs(
    name="blue", endpoint_name=endpoint_name, lines=200
)

<h2> End of Script </h2>

<h3>Appendix</h3>

<h5> Batch Evaluation </h5>

In [None]:
# Evaluate endpoint on multiple test samples
import json
import base64
import requests
from pathlib import Path
from io import BytesIO
from PIL import Image
import time

# Load test dataset
test_jsonl = Path("./converted_ucf101/ucf101_test.jsonl")
image_dir = Path("./converted_ucf101/images")

if not test_jsonl.exists():
    print("[ERROR] Test file not found")
else:
    # Load all test examples
    with open(test_jsonl, 'r', encoding='utf-8') as f:
        test_examples = [json.loads(line) for line in f]
    
    print(f"[INFO] Loaded {len(test_examples)} test samples")
    
    # Evaluate on first N samples (adjust as needed)
    num_samples = min(10, len(test_examples))  # Start with 10 samples
    
    results = []
    correct = 0
    total_inference_time = 0
    errors_count = 0
    
    print(f"\n[INFO] Evaluating {num_samples} samples...")
    print(f"Progress: ", end='', flush=True)
    
    for i, example in enumerate(test_examples[:num_samples]):
        print(f"{i+1}/{num_samples}...", end=' ', flush=True)
        
        # Build payload
        image_paths = [str(image_dir / img_path) for img_path in example['conversations'][0]['images']]
        ground_truth = example['conversations'][0]['assistant']
        
        # Convert to base64
        encoded_images = []
        try:
            for img_path in image_paths:
                img = Image.open(img_path).convert("RGB")
                buffer = BytesIO()
                img.save(buffer, format="JPEG")
                img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
                encoded_images.append(img_b64)
        except Exception as e:
            print(f"\n[ERROR] Sample {i}: Image loading failed - {str(e)}")
            errors_count += 1
            results.append({
                'id': example['id'],
                'ground_truth': ground_truth,
                'predicted': None,
                'correct': False,
                'error': f"Image loading: {str(e)}"
            })
            continue
        
        endpoint_payload = {
            "images": encoded_images,
            "prompt": example['conversations'][0]['user'],
            "max_new_tokens": 50
        }
        
        # Send request
        start_time = time.time()
        try:
            res = requests.post(scoring_url, headers=headers, json=endpoint_payload, timeout=300)
            inference_time = time.time() - start_time
            
            if res.status_code == 200:
                result = res.json()
                if isinstance(result, str):
                    result = json.loads(result)
                
                predicted_class = result.get('predicted_class')
                is_correct = (predicted_class == ground_truth)
                
                if is_correct:
                    correct += 1
                
                results.append({
                    'id': example['id'],
                    'ground_truth': ground_truth,
                    'predicted': predicted_class,
                    'correct': is_correct,
                    'inference_time': inference_time
                })
                
                total_inference_time += inference_time
            else:
                print(f"\n[ERROR] Sample {i}: HTTP {res.status_code} - {res.text[:100]}")
                errors_count += 1
                results.append({
                    'id': example['id'],
                    'ground_truth': ground_truth,
                    'predicted': None,
                    'correct': False,
                    'error': f"HTTP {res.status_code}: {res.text[:200]}"
                })
        
        except requests.exceptions.Timeout:
            inference_time = time.time() - start_time
            print(f"\n[ERROR] Sample {i}: Request timeout after {inference_time:.1f}s")
            errors_count += 1
            results.append({
                'id': example['id'],
                'ground_truth': ground_truth,
                'predicted': None,
                'correct': False,
                'error': f"Timeout after {inference_time:.1f}s"
            })
        
        except Exception as e:
            print(f"\n[ERROR] Sample {i}: {type(e).__name__} - {str(e)}")
            errors_count += 1
            results.append({
                'id': example['id'],
                'ground_truth': ground_truth,
                'predicted': None,
                'correct': False,
                'error': str(e)
            })
    
    print("\n")  # New line after progress
    
    # Calculate metrics
    successful_samples = num_samples - errors_count
    accuracy = correct / successful_samples if successful_samples > 0 else 0
    avg_inference_time = total_inference_time / successful_samples if successful_samples > 0 else 0
    
    print(f"\n{'='*60}")
    print(f"EVALUATION RESULTS")
    print(f"{'='*60}")
    print(f"Total samples: {num_samples}")
    print(f"Successful requests: {successful_samples}")
    print(f"Failed requests: {errors_count}")
    print(f"Correct predictions: {correct}")
    print(f"Accuracy: {accuracy:.2%} ({correct}/{successful_samples})")
    print(f"Average inference time: {avg_inference_time:.2f} seconds")
    print(f"{'='*60}")
    
    # Show sample results
    print(f"\nSample Results (first 5):")
    for r in results[:5]:
        status = "✓" if r['correct'] else "✗"
        pred_str = str(r.get('predicted', 'ERROR'))
        time_str = f"{r.get('inference_time', 0):.1f}s" if 'inference_time' in r else "N/A"
        print(f"  {status} GT: {r['ground_truth']:20s} | Pred: {pred_str:20s} | Time: {time_str}")
    
    # Confusion analysis
    if correct < successful_samples:
        print(f"\nIncorrect Predictions:")
        for r in results:
            if not r['correct'] and 'error' not in r:
                print(f"  GT: {r['ground_truth']:20s} → Pred: {str(r.get('predicted', 'None'))}")
    
    # Error summary
    if errors_count > 0:
        print(f"\nError Summary:")
        error_types = {}
        for r in results:
            if 'error' in r:
                error_key = r['error'].split(':')[0][:30]  # First 30 chars of error
                error_types[error_key] = error_types.get(error_key, 0) + 1
        for error_type, count in error_types.items():
            print(f"  {error_type}: {count} occurrences")
    
    print(f"\n[INFO] Evaluation completed!")


<h5>Trackio</h5>

In [None]:
# Detailed Trackio data review (alternative to UI)
import sqlite3
import pandas as pd
import json

conn = sqlite3.connect("./ft-project.db")

# Retrieve all metrics
metrics_df = pd.read_sql("SELECT * FROM metrics ORDER BY step;", conn)

print(f"[INFO] Total logged steps: {len(metrics_df)}")
print(f"[INFO] Run name: {metrics_df['run_name'].unique()}")
print(f"\n{'='*80}")
print("TRAINING METRICS")
print(f"{'='*80}")

# Parse metrics
for idx, row in metrics_df.iterrows():
    step = row['step']
    timestamp = row['timestamp']
    metrics_blob = row['metrics']
    
    # Convert binary data to JSON
    try:
        metrics_dict = json.loads(metrics_blob.decode('utf-8') if isinstance(metrics_blob, bytes) else metrics_blob)
        
        print(f"\nStep {step} ({timestamp}):")
        
        # Display key metrics
        important_keys = ['train/loss', 'train/learning_rate', 'train/epoch', 
                         'train/global_step', 'train/grad_norm']
        
        for key in important_keys:
            if key in metrics_dict:
                print(f"  {key:30s}: {metrics_dict[key]:.6f}")
        
        # GPU metrics (if pynvml is available)
        gpu_keys = [k for k in metrics_dict.keys() if 'gpu' in k.lower() or 'memory' in k.lower()]
        if gpu_keys:
            print(f"\n  GPU Metrics:")
            for key in gpu_keys[:5]:  # Display first 5 only
                print(f"    {key:28s}: {metrics_dict[key]}")
    
    except Exception as e:
        print(f"\n[ERROR] Step {step}: Cannot parse metrics - {str(e)[:100]}")

conn.close()
print(f"\n{'='*80}")
print("[INFO] Use trackio.show(project='ft-project') to view interactive UI")
print(f"[INFO] Or open browser: http://127.0.0.1:7860/?project=ft-project")
print(f"{'='*80}")