# 在 Azure Machine Learning 上使用 LoRA 微调 Qwen-VL

本笔记演示如何在 Azure Machine Learning (Azure ML) 上对 Qwen-VL 系列多模态大模型执行 LoRA 指令微调，并在训练完成后注册模型及验证推理结果。流程涵盖环境配置、数据准备、训练脚本编写、作业提交与监控等关键步骤。

## 前提条件

- Azure 订阅以及已创建的 Azure ML 工作区。
- 工作区所在区域已经申请到足够的 GPU 配额（推荐 `Standard_NC24ads_A100_v4`）。
- 本地或计算实例已安装 `azure-ai-ml>=1.15.0` 与 `azure-identity>=1.16.0`。
- 若使用托管身份或服务主体，请确保具备对工作区的访问权限。
- 准备图文对训练数据，推荐整理为 JSON Lines 格式（示例见后文）。

## 操作文档概览

执行本指南时建议按以下顺序完成：
1. 在本地整理数据（图片 + 标注 JSONL），并根据示例检查字段。
2. 将数据上传/注册到 Azure ML 数据存储或数据资产。
3. 检查或创建 GPU 计算集群。
4. 运行 Notebook 前半部分生成并注册训练环境、上传脚本。
5. 根据实际资源修改命令作业参数并提交训练。
6. 监控日志、下载输出，最后注册模型并本地验证。
7. （可选）基于注册模型继续部署或批量推理。

## 连接到 Azure ML 工作区

填写订阅 ID、资源组与工作区名称。以下示例优先使用 `DefaultAzureCredential`，若本地无可用身份则回退到交互式浏览器登录。

In [23]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient
import os

# 读取工作区基本信息，优先从环境变量中获取
SUBSCRIPTION_ID = os.getenv("AZURE_SUBSCRIPTION_ID", "7a03e9b8-18d6-48e7-b186-0ec68da9e86f")
RESOURCE_GROUP = os.getenv("AZURE_RESOURCE_GROUP", "aml-rg")
WORKSPACE_NAME = os.getenv("AZUREML_WORKSPACE_NAME", "aml-hu-east-west-us2")

# 交互式登录
# credential = InteractiveBrowserCredential(tenant_id="16b3c013-d300-468d-ac64-7eda0820b6d3")
credential = DefaultAzureCredential()

# 初始化 MLClient 以便后续操作工作区资源
ml_client = MLClient(
    credential=credential,
    subscription_id=SUBSCRIPTION_ID,
    resource_group_name=RESOURCE_GROUP,
    workspace_name=WORKSPACE_NAME,
 )

print(f"Connected to workspace: {ml_client.workspace_name}")
workspace = ml_client.workspaces.get(ml_client.workspace_name)  # 读取工作区以获取区域信息
print(f"Location: {workspace.location}")

Overriding of current TracerProvider is not allowed
Overriding of current LoggerProvider is not allowed
Overriding of current MeterProvider is not allowed
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented


Connected to workspace: aml-hu-east-west-us2
Location: westus2


## 准备目录结构与环境文件

在工作目录下创建源码与环境配置文件夹，并写入将用于训练作业的 Conda 依赖。环境基于 Azure ML 官方 PyTorch CUDA 12.1 镜像。

In [24]:
import pathlib

# 在本地构建源码和环境目录，用于组织训练脚本与依赖文件
workspace_dir = pathlib.Path.cwd() / "qwen_vl_lora"
src_dir = workspace_dir / "src"
env_dir = workspace_dir / "env"
workspace_dir.mkdir(exist_ok=True)
src_dir.mkdir(parents=True, exist_ok=True)
env_dir.mkdir(parents=True, exist_ok=True)

print(f"Source directory: {src_dir}")
print(f"Environment directory: {env_dir}")

Source directory: /home/azureuser/qwen_vl_lora/qwen_vl_lora/src
Environment directory: /home/azureuser/qwen_vl_lora/qwen_vl_lora/env


In [25]:
%%writefile {env_dir}/conda.yaml
# Conda 环境描述，Azure ML 会据此在基础镜像上安装额外依赖
name: qwen-vl-lora-env
channels:
  - conda-forge
dependencies:
  - python=3.10
  - pip=24.0
  - pip:
    - accelerate>=0.28.0
    - bitsandbytes>=0.43.0
    - datasets>=2.18.0
    - peft>=0.11.0
    - pillow>=10.2.0
    - sentencepiece>=0.2.0
    - timm>=0.9.12
    - torch>=2.3.0
    - torchvision>=0.18.0
    - transformers>=4.39.0
    - trl>=0.8.6
    - wandb>=0.16.0

Overwriting /home/azureuser/qwen_vl_lora/qwen_vl_lora/env/conda.yaml


In [26]:
from azure.ai.ml.entities import Environment

# 将自定义 Conda 依赖与官方基础镜像组合成 Azure ML 环境
# 使用 HuggingFace NLP 专用镜像（推荐用于 Transformers + LoRA 微调）
qwen_env = Environment(
    name="qwen-vl-lora-env-demo-03",
    description="LoRA finetuning environment for Qwen-VL demo 03",
    conda_file=str((env_dir / "conda.yaml").as_posix()),
    # 方案 1：HuggingFace NLP GPU 专用镜像（推荐）
    # 已预装 transformers, CUDA 12.x, 针对 A100 优化
    image="mcr.microsoft.com/azureml/curated/acft-hf-nlp-gpu:latest",
)
registered_env = ml_client.environments.create_or_update(qwen_env)
print(f"Environment registered: {registered_env.name}:{registered_env.version}")

Environment registered: qwen-vl-lora-env-demo-03:1


### 注册训练环境

创建并注册包含所需依赖的 Azure ML 环境。根据您的需求选择合适的基础镜像。

#### 🎯 镜像选择建议

**当前配置（推荐）：HuggingFace NLP GPU 专用镜像**
```python
image="mcr.microsoft.com/azureml/curated/acft-hf-nlp-gpu:latest"
```
✅ **优点：**
- 专门为 HuggingFace Transformers 优化
- 预装 CUDA 12.x，完美支持最新的 bitsandbytes
- 针对 A100 GPU 优化
- 包含 NLP 和多模态任务的常用库

**其他可用选项：**

| 镜像 | CUDA | 适用场景 | 推荐度 |
|------|------|---------|--------|
| `acft-hf-nlp-gpu:latest` | 12.x | HuggingFace + LoRA 微调 | ⭐⭐⭐⭐⭐ |
| `pytorch-2.0-cuda11.7-cudnn8-ubuntu22.04:latest` | 11.7 | 通用 PyTorch 训练 | ⭐⭐⭐ |
| `openmpi4.1.0-cuda11.8-cudnn8-ubuntu22.04:latest` | 11.8 | 分布式训练 | ⭐⭐⭐ |

#### ⚠️ 重要提示
- **bitsandbytes 0.43.0+** 需要 CUDA 12.x 以获得最佳性能
- **Qwen-VL 多模态模型** 在 HuggingFace 镜像上运行更稳定
- **A100 GPU** 在 CUDA 12.x 上性能更优

### 计算资源准备
1. 推荐使用带 A100 80GB 的 GPU（如 `Standard_NC24ads_A100_v4`）。
2. 若尚未创建计算集群，可在 Azure ML Studio → 计算 → 计算集群 创建，或使用 Azure CLI：
   ```bash
   az ml compute create --name gpu-a100-cluster --type amlcompute \
       --resource-group <RG> --workspace-name <WS> \
       --min-instances 0 --max-instances 2 --size Standard_NC24ads_A100_v4
   ```
3. 训练前确认集群处于“空闲”或“已分配”状态，确保配额足够。

In [27]:
# 检查现有计算资源并在需要时创建 GPU 集群
from azure.ai.ml.entities import AmlCompute

# 列出所有现有的计算资源
print("现有计算资源:")
for compute in ml_client.compute.list():
    print(f"  - {compute.name} (类型: {compute.type}, 大小: {getattr(compute, 'size', 'N/A')}, 状态: {compute.provisioning_state})")

# 定义计算集群名称
COMPUTE_NAME = "qwen-fine-tune-H100"

# 检查计算集群是否存在，如果不存在则创建
try:
    compute_target = ml_client.compute.get(COMPUTE_NAME)
    print(f"\n计算集群 '{COMPUTE_NAME}' 已存在，状态: {compute_target.provisioning_state}")
except Exception as e:
    print(f"\n计算集群 '{COMPUTE_NAME}' 不存在，正在创建...")
    
    # 创建计算集群配置
    # 注意：Standard_NC24ads_A100_v4 在某些区域可能不可用
    # 可替换为其他 GPU SKU，如 Standard_NC6s_v3, Standard_NC12s_v3 等
    compute_config = AmlCompute(
        name=COMPUTE_NAME,
        type="amlcompute",
        size="Standard_NC80adis_H100_v5",  # A100 80GB GPU
        min_instances=0,
        max_instances=1,
        idle_time_before_scale_down=300,  # 5分钟后自动缩减
        tier="dedicated",
    )
    
    try:
        compute_target = ml_client.compute.begin_create_or_update(compute_config).result()
        print(f"计算集群 '{COMPUTE_NAME}' 创建成功！")
    except Exception as create_error:
        print(f"创建失败: {create_error}")
        print("\n可能的原因:")
        print("1. 所选 GPU SKU 在当前区域不可用")
        print("2. GPU 配额不足")
        print("\n建议:")
        print("- 在 Azure Portal 中检查可用的 VM SKU")
        print("- 请求增加 GPU 配额")
        print("- 或使用现有的计算资源（如果上面列出了可用的计算）")
        raise

现有计算资源:
  - A100-swedCentral (类型: virtualmachine, 大小: N/A, 状态: Succeeded)
  - A100-centre-us (类型: virtualmachine, 大小: N/A, 状态: Succeeded)
  - multi-model-batch-vm (类型: computeinstance, 大小: Standard_D96ads_v5, 状态: Succeeded)
  - o1-performance-test-vm (类型: computeinstance, 大小: Standard_E4ds_v4, 状态: Succeeded)
  - slm-fine-tune-lab (类型: computeinstance, 大小: Standard_E4ds_v4, 状态: Succeeded)
  - gpu-cluster-a100 (类型: amlcompute, 大小: Standard_NC24ads_A100_v4, 状态: Succeeded)
  - agent-demo-ws (类型: computeinstance, 大小: Standard_D32d_v4, 状态: Succeeded)
  - notebook-llm-solu-vm (类型: computeinstance, 大小: Standard_D48a_v4, 状态: Succeeded)
  - gpu-phi4-predict-out (类型: computeinstance, 大小: Standard_NC8as_T4_v3, 状态: Succeeded)
  - Standard-NV12s-v3-m60 (类型: computeinstance, 大小: Standard_NV12s_v3, 状态: Succeeded)
  - multi-gpus-trainer (类型: computeinstance, 大小: Standard_NC64as_T4_v3, 状态: Succeeded)
  - a100-west-1 (类型: amlcompute, 大小: Standard_NC24ads_A100_v4, 状态: Succeeded)
  - qwen-fine-tune-A100 (类

### 检查并创建计算集群

运行下方代码单元格将：
1. 列出工作区中所有现有的计算资源
2. 检查 `gpu-a100-cluster` 是否存在
3. 如果不存在，自动创建该集群

**注意事项：**
- 如果创建失败（配额不足或 SKU 不可用），可以：
  - 在 Azure Portal 申请 GPU 配额
  - 修改代码使用其他可用的 GPU SKU（如 `Standard_NC6s_v3`）
  - 或使用上面列出的现有计算资源

## 准备数据集

> 目标：整理图文多轮对话数据，并转换为 LoRA 训练所需的 JSON Lines 文件。

### 1. 本地目录结构建议
- `dataset/`根目录
  - `images/`存放所有训练图片，可按需要再分子目录。
  - `train.jsonl`训练集标注文件。
  - `validation.jsonl`验证集（可选）。

### 2. JSONL 单条样本示例
```json
{
  "image": "images/sample_0001.jpg",
  "question": "描述图片中的主要场景。",
  "answer": "图片展示了一位骑行者在海边公路上骑行，阳光明媚。",
  "system": "你是一名图文理解助手。"
}
```
- `image`可以是相对路径（相对于数据集根目录）或绝对路径；
- `question`为用户指令/提问；
- `answer`为模型期望回应；
- `system`可选，用于提供系统提示或角色设定；
- 可扩展额外字段（如标签、难度）但需在训练脚本中自行处理。

### 3. 将其他标注格式转为 JSONL
若已有 CSV/Excel/JSON，可在本地运行以下示例脚本生成 `train.jsonl`：
```python
import csv, json, pathlib

input_csv = pathlib.Path("./raw.csv")
output_jsonl = pathlib.Path("./dataset/train.jsonl")

with input_csv.open("r", encoding="utf-8") as fin, output_jsonl.open("w", encoding="utf-8") as fout:
    reader = csv.DictReader(fin)
    for row in reader:
        record = {
            "image": f"images/{row['image_filename']}",
            "question": row["instruction"],
            "answer": row["response"],
            "system": row.get("system_prompt") or "你是一名视觉助手。"
        }
        fout.write(json.dumps(record, ensure_ascii=False) + "\n")
```
脚本执行后确认：
1. JSONL 每行均为合法 JSON；
2. 引用的图片文件均存在且可打开；
3. 验证集（若存在）与训练集结构一致即可。

转换完成后重复前述上传步骤，将新生成的 JSONL 文件和图片目录同步到 Azure ML 数据存储即可。

### 5. 从 Parquet/Arrow 数据源生成 JSONL
若你的训练数据保存在 `test-00000-of-00001.parquet` 等 Parquet 文件中，可按以下步骤转换：
1. 使用 `datasets` 或 `pyarrow` 读取 Parquet；
2. 逐行构建所需字段并写入 JSONL；
3. 同时批量导出引用到的图片（若存储为二进制或远程 URL）。

下面示例 Parquet 中包含字段 'id', 'category', 'images', 'question', 'question_text', 'answer', 'difficulty', 'metric_info', 'initial_state'，并且 `images` 已指向本地图片：

如 Parquet 中包含嵌入二进制图像，可先使用 `dataset[i]["image"].save(...)` 将其导出到 `images/` 目录，再在生成 JSONL 时写入相对路径。若 `image_path` 为远程 URL，需提前下载到本地后再写入。

In [None]:
from datasets import load_dataset
import json, pathlib
from PIL import Image

parquet_path = "./dataset/test-00000-of-00001.parquet"  # 修改为你的 parquet 路径
output_jsonl = pathlib.Path("./dataset/train.jsonl")
images_dir = pathlib.Path("./dataset/images")
images_dir.mkdir(parents=True, exist_ok=True)

dataset = load_dataset("parquet", data_files=parquet_path, split="train")

def save_image_as_png(image: Image.Image, image_path: pathlib.Path) -> str:
    """统一将图片保存为 PNG，保持 Alpha 通道并避免压缩损失。"""
    if image.mode in ("RGBA", "LA", "P"):
        image = image.convert("RGBA")
    else:
        image = image.convert("RGB")
    image.save(image_path, format="PNG")
    return f"images/{image_path.name}"

def resolve_image_field(example, sample_idx: int) -> str:
    """将 parquet 中的 images 字段转换为 JSONL 所需的图片路径。"""
    image_field = example.get("images")
    if isinstance(image_field, list):
        image_field = image_field[0] if image_field else None
    if isinstance(image_field, Image.Image):
        image_path = images_dir / f"{sample_idx:06d}.png"
        return save_image_as_png(image_field, image_path)
    if hasattr(image_field, "save"):
        pil_image = image_field
        image_path = images_dir / f"{sample_idx:06d}.png"
        return save_image_as_png(pil_image, image_path)
    if isinstance(image_field, dict):
        candidate = image_field.get("path") or image_field.get("image_path") or image_field.get("url")
        if candidate:
            return candidate
    if isinstance(image_field, str):
        return image_field
    raise ValueError("无法解析 parquet 中的 images 字段，请检查数据格式")

with output_jsonl.open("w", encoding="utf-8") as fout:
    for idx, example in enumerate(dataset):
        record = {
            "image": resolve_image_field(example, idx),
            "question": example.get("question_text") or example.get("question") or "",
            "answer": example.get("answer", ""),
            "system": example.get("initial_state") or "你是一名视觉助手。",
            "category": example.get("category"),
            "metadata": {"difficulty": example.get("difficulty"), "metric_info": example.get("metric_info")},
        }
        fout.write(json.dumps(record, ensure_ascii=False) + "\n")
print(f"Wrote {len(dataset)} samples to {output_jsonl}")

### 4. 将本地数据上传到 Azure ML
1. **确认默认数据存储**：运行下方代码查看 `workspaceblobstore` 或自定义数据存储的名称。
2. **上传/注册数据资产**：可使用 Notebook 中的 Python 代码，也可通过 Azure ML Studio 的“数据”页面手动上传。
3. **保持目录结构**：上传时确保 `images/` 等子目录与 JSONL 文件保持原有层级。
4. **数据更新**：若重新上传，可选择新版本号，避免覆盖历史数据。

In [28]:
# 查看当前工作区内可用的数据存储（默认通常为 workspaceblobstore）
for ds in ml_client.datastores.list():
    print(ds.name, "->", ds.type)

azureml_globaldatasets -> DatastoreType.AZURE_BLOB
workspaceblobstore -> DatastoreType.AZURE_BLOB
workspaceartifactstore -> DatastoreType.AZURE_BLOB
workspaceworkingdirectory -> DatastoreType.AZURE_FILE
workspacefilestore -> DatastoreType.AZURE_FILE


> **提示**：如果数据量较大，可使用 `azcopy` 或 Azure Storage Explorer 先上传到 Blob，再注册为数据资产。下方示例适合直接从本地文件夹上传。

In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes
from datetime import datetime
import pathlib

# 将本地文件夹上传并注册为数据资产
local_dataset_path = pathlib.Path("./dataset")  # 修改为你的本地数据集路径
if not local_dataset_path.exists():
    raise FileNotFoundError(f"本地数据目录不存在: {local_dataset_path}")

data_asset = Data(
    name="qwen-vl-instruction-data",
    version=datetime.utcnow().strftime("%Y%m%d%H%M"),
    path=local_dataset_path.as_posix(),
    type=AssetTypes.URI_FOLDER,
    description="Qwen-VL LoRA instruction dataset",
)

registered_data = ml_client.data.create_or_update(data_asset)
print(f"Data asset created: {registered_data.name}:{registered_data.version}")
print(f"Asset path: {registered_data.path}")

AttributeError: type object 'datetime.datetime' has no attribute 'timezone'

运行成功后，记下 `Data asset created` 输出的 ID/路径，并将其填写到后续 `DATASET_PATH` 变量中。

In [None]:
%%writefile {src_dir}/train_lora.py
import argparse
import json
import logging
import os
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

import torch
from peft import LoraConfig, TaskType, get_peft_model, prepare_model_for_kbit_training
from PIL import Image
from torch.utils.data import Dataset
from transformers import (
    AutoModelForVision2Seq,
    AutoProcessor,
    BitsAndBytesConfig,
    Trainer,
    TrainingArguments,
    )

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def parse_args() -> argparse.Namespace:
    """解析命令行参数，便于在 Azure ML 作业中灵活传参。"""
    parser = argparse.ArgumentParser(description="LoRA finetuning for Qwen-VL demo 03")
    parser.add_argument("--model-name", type=str, default="Qwen/Qwen2-VL-7B-Instruct")
    parser.add_argument("--dataset-dir", type=str, required=True)
    parser.add_argument("--train-file", type=str, default="train.jsonl")
    parser.add_argument("--validation-file", type=str, default=None)
    parser.add_argument("--output-dir", type=str, default="./outputs")
    parser.add_argument("--per-device-train-batch-size", type=int, default=1)
    parser.add_argument("--per-device-eval-batch-size", type=int, default=1)
    parser.add_argument("--gradient-accumulation-steps", type=int, default=8)
    parser.add_argument("--num-train-epochs", type=float, default=1.0)
    parser.add_argument("--learning-rate", type=float, default=2e-4)
    parser.add_argument("--weight-decay", type=float, default=0.0)
    parser.add_argument("--warmup-ratio", type=float, default=0.03)
    parser.add_argument("--logging-steps", type=int, default=10)
    parser.add_argument("--save-strategy", type=str, default="epoch")
    parser.add_argument("--eval-strategy", type=str, default="epoch")
    parser.add_argument("--lora-rank", type=int, default=64)
    parser.add_argument("--lora-alpha", type=int, default=128)
    parser.add_argument("--lora-dropout", type=float, default=0.05)
    parser.add_argument("--target-modules", type=str, default="q_proj,k_proj,v_proj,o_proj,gate_proj,up_proj,down_proj")
    parser.add_argument("--bf16", action="store_true", default=True)
    parser.add_argument("--trust-remote-code", action="store_true", default=True)
    parser.add_argument("--report-to", type=str, default="none")
    return parser.parse_args()

def load_records(dataset_dir: str, file_name: str) -> List[Dict[str, Any]]:
    """读取 JSONL 文件，并返回样本列表。"""
    file_path = os.path.join(dataset_dir, file_name)
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Dataset file not found: {file_path}")
    records: List[Dict[str, Any]] = []
    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            records.append(json.loads(line))
    logger.info("Loaded %d samples from %s", len(records), file_path)
    return records

def resolve_image_path(dataset_dir: str, image_path: str) -> str:
    """统一处理相对路径，便于在 Azure ML 计算节点上访问图片。"""
    return image_path if os.path.isabs(image_path) else os.path.join(dataset_dir, image_path)

@dataclass
class QwenRecord:
    """用 dataclass 存储单条样本，提升可读性。"""
    image: str
    question: str
    answer: str
    system: Optional[str] = None

class QwenVLDataset(Dataset):
    """将原始 JSON 样本转换为模型可直接使用的提示格式。"""
    def __init__(self, records: List[Dict[str, Any]], dataset_dir: str, processor: AutoProcessor):
        self.dataset_dir = dataset_dir
        self.processor = processor
        self.records: List[QwenRecord] = [
            QwenRecord(
                image=resolve_image_path(dataset_dir, item["image"]),
                question=item.get("question", ""),
                answer=item.get("answer", ""),
                system=item.get("system")
            )
            for item in records
        ]

    def __len__(self) -> int:
        return len(self.records)

    def __getitem__(self, idx: int) -> Dict[str, Any]:
        record = self.records[idx]
        if not os.path.exists(record.image):
            raise FileNotFoundError(f"Image not found: {record.image}")
        image = Image.open(record.image).convert("RGB")
        messages: List[Dict[str, Any]] = []
        if record.system:
            messages.append({"role": "system", "content": [{"type": "text", "text": record.system}]})
        user_content: List[Dict[str, Any]] = [{"type": "image", "image": image}]
        if record.question:
            user_content.append({"type": "text", "text": record.question})
        messages.append({"role": "user", "content": user_content})
        messages.append({"role": "assistant", "content": [{"type": "text", "text": record.answer}]})
        prompt = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=False)
        return {"prompt": prompt, "image": image}

class QwenDataCollator:
    """自定义 collator，将批次中的文本和图片一起编码为张量。"""
    def __init__(self, processor: AutoProcessor):
        self.processor = processor

    def __call__(self, features: List[Dict[str, Any]]) -> Dict[str, torch.Tensor]:
        prompts = [feature["prompt"] for feature in features]
        images = [feature["image"] for feature in features]
        batch = self.processor(
            text=prompts,
            images=images,
            return_tensors="pt",
            padding=True
        )
        batch["labels"] = batch["input_ids"].clone()
        return batch

def create_model(args: argparse.Namespace) -> AutoModelForVision2Seq:
    """加载基础模型并应用 LoRA 配置，同时启用 4bit 量化以节省显存。"""
    quant_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16
    )
    model = AutoModelForVision2Seq.from_pretrained(
        args.model_name,
        quantization_config=quant_config,
        torch_dtype=torch.bfloat16 if args.bf16 else torch.float16,
        trust_remote_code=args.trust_remote_code,
        device_map="auto"
    )
    model = prepare_model_for_kbit_training(model, use_gradient_checkpointing=True)
    target_modules = [module.strip() for module in args.target_modules.split(",") if module]
    lora_config = LoraConfig(
        r=args.lora_rank,
        lora_alpha=args.lora_alpha,
        target_modules=target_modules,
        lora_dropout=args.lora_dropout,
        bias="none",
        task_type=TaskType.CAUSAL_LM
    )
    model = get_peft_model(model, lora_config)
    model.print_trainable_parameters()
    return model

def main() -> None:
    args = parse_args()
    os.makedirs(args.output_dir, exist_ok=True)
    processor = AutoProcessor.from_pretrained(args.model_name, trust_remote_code=args.trust_remote_code)
    train_records = load_records(args.dataset_dir, args.train_file)
    train_dataset = QwenVLDataset(train_records, args.dataset_dir, processor)
    eval_dataset = None
    if args.validation_file:
        eval_records = load_records(args.dataset_dir, args.validation_file)
        eval_dataset = QwenVLDataset(eval_records, args.dataset_dir, processor)
    model = create_model(args)
    collator = QwenDataCollator(processor)
    training_args = TrainingArguments(
        output_dir=args.output_dir,
        per_device_train_batch_size=args.per_device_train_batch_size,
        per_device_eval_batch_size=args.per_device_eval_batch_size,
        gradient_accumulation_steps=args.gradient_accumulation_steps,
        learning_rate=args.learning_rate,
        weight_decay=args.weight_decay,
        num_train_epochs=args.num_train_epochs,
        warmup_ratio=args.warmup_ratio,
        logging_steps=args.logging_steps,
        save_strategy=args.save_strategy,
        eval_strategy="no" if eval_dataset is None else args.eval_strategy,
        bf16=args.bf16 and torch.cuda.is_available(),
        dataloader_num_workers=4,
        report_to=[args.report_to] if args.report_to and args.report_to != "none" else [],
        run_name=os.getenv("AML_RUN_ID", "qwen-vl-lora"),
        remove_unused_columns=False
    )
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        data_collator=collator
    )
    trainer.train()
    if eval_dataset is not None:
        metrics = trainer.evaluate()
        logger.info("Evaluation metrics: %s", metrics)
    trainer.save_model(args.output_dir)
    processor.save_pretrained(os.path.join(args.output_dir, "processor"))
    logger.info("Training completed. Artifacts saved to %s", args.output_dir)

if __name__ == "__main__":
    main()

## 配置训练作业

定义训练数据位置、输出目录、计算集群等参数。若数据存放在默认数据存储中，可直接使用 `azureml://datastores/<datastore>/paths/...` URI。

In [None]:
from azure.ai.ml import Input, Output
from azure.ai.ml.constants import AssetTypes

# 将数据路径替换为前一节注册成功的数据资产 ID 或数据存储 URI
# 例如：DATASET_PATH = registered_data.id
BASE_MODEL = "Qwen/Qwen2-VL-7B-Instruct"

# 使用注册的数据资产路径（如果前面已经执行了数据注册步骤）
# 或者使用默认数据存储路径
try:
    # 优先使用已注册的数据资产
    DATASET_PATH = registered_data.path
except NameError:
    # 如果没有注册数据资产，使用默认数据存储路径
    DATASET_PATH = "azureml://datastores/workspaceblobstore/paths/qwen-vl-dataset/"

TRAIN_FILE = "train.jsonl"
VALIDATION_FILE = None  # 若无验证数据可设置为 None

# 输出路径不需要预先创建，Azure ML 会自动处理
# 使用默认数据存储 workspaceblobstore
OUTPUT_PATH = f"azureml://datastores/workspaceblobstore/paths/qwen-vl-outputs/{ml_client.workspace_name}"

# COMPUTE_NAME 已在前面的计算资源准备单元格中定义
EXPERIMENT_NAME = "qwen-vl-lora-demo-03"

train_input = Input(type=AssetTypes.URI_FOLDER, path=DATASET_PATH)

print(f"配置摘要:")
print(f"  - 基础模型: {BASE_MODEL}")
print(f"  - 数据集路径: {DATASET_PATH}")
print(f"  - 训练文件: {TRAIN_FILE}")
print(f"  - 验证文件: {VALIDATION_FILE}")
print(f"  - 输出路径: {OUTPUT_PATH}")
print(f"  - 计算集群: {COMPUTE_NAME}")
print(f"  - 实验名称: {EXPERIMENT_NAME}")

## 创建并提交命令作业

在运行下方代码前请确认：
1. `DATASET_PATH`、`OUTPUT_PATH`、`COMPUTE_NAME` 等变量已替换为实际值；
2. 训练脚本 `train_lora.py` 已根据需要调整超参数；
3. 若要启用 WandB/MLflow 记录，请将 `--report-to` 设置为对应后端并配置凭据。

随后执行代码即可提交 Azure ML 命令作业。

In [None]:
from azure.ai.ml import command

# 根据是否存在验证集动态拼接训练脚本命令
validation_arg = f"--validation-file {VALIDATION_FILE}" if VALIDATION_FILE else ""
command_parts = [
    "python train_lora.py",
    f"--model-name {BASE_MODEL}",
    "--dataset-dir ${inputs.data}",
    f"--train-file {TRAIN_FILE}",
    "--output-dir ./outputs",
    "--per-device-train-batch-size 1",
    "--gradient-accumulation-steps 8",
    "--num-train-epochs 3",
    "--learning-rate 2e-4",
    "--logging-steps 5"
]
if validation_arg:
    command_parts.append(validation_arg)
lora_command = " ".join(command_parts)

# 创建 Azure ML 命令作业,挂载数据并保存 LoRA 结果到指定目录
command_job = command(
    code=str(src_dir),
    command=lora_command,
    inputs={"data": train_input},
    environment=registered_env,
    compute=COMPUTE_NAME,
    experiment_name=EXPERIMENT_NAME,
    display_name="qwen-vl-lora-train-demo-01",
    outputs={
        "trained_lora": Output(type=AssetTypes.URI_FOLDER, path=OUTPUT_PATH)
    },
    description="LoRA finetuning job for Qwen-VL"
)

command_job.inputs["data"].mode = "mount"
submitted_job = ml_client.jobs.create_or_update(command_job)
print(f"Job submitted: {submitted_job.name}")

## 监控训练日志

运行以下代码实时查看作业日志，或直接在 Azure ML Studio 门户中监控。

In [None]:
# 在命令行实时查看训练过程输出
ml_client.jobs.stream(submitted_job.name)

## 注册 LoRA 适配器模型

作业完成后，可将输出目录注册为模型资产，方便后续部署或批量推理。

In [None]:
from azure.ai.ml.entities import Model
from azure.ai.ml.constants import ModelType

# 首先获取已完成作业的详细信息
completed_job = ml_client.jobs.get("serene_gas_4q7fphz2pz")

# 从已完成的作业中获取输出路径
output_path = None
if completed_job.outputs and "trained_lora" in completed_job.outputs:
    # 尝试不同的方式获取输出路径
    output_obj = completed_job.outputs["trained_lora"]
    if hasattr(output_obj, 'path'):
        output_path = output_obj.path
    elif hasattr(output_obj, 'uri'):
        output_path = output_obj.uri
    else:
        # 如果上述方法都不行，构造输出路径
        output_path = f"{OUTPUT_PATH}/{submitted_job.name}/trained_lora"

if not output_path:
    # 如果仍然无法获取路径，手动构造
    output_path = f"{OUTPUT_PATH}/{submitted_job.name}/trained_lora"

print(f"Using output path: {output_path}")

# 将 LoRA 输出路径注册为模型资产，便于后续部署或版本管理
# 注意：使用 ModelType.CUSTOM_MODEL 而不是 AssetTypes.URI_FOLDER
model_asset = Model(
    name="qwen-vl-lora-adapter",
    path=output_path,
    type="custom_model",  # 修改为支持的模型类型
    description="LoRA adapter fine-tuned from Qwen-VL",
    tags={"base_model": BASE_MODEL, "task": "instruction-following"}
)
registered_model = ml_client.models.create_or_update(model_asset)
print(f"Model registered: {registered_model.name}:{registered_model.version}")

## 本地验证 LoRA 效果

以下示例展示如何下载输出、合并 LoRA 权重并运行简单推理。请确保有可用 GPU。

In [None]:
from peft import PeftModel
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
from PIL import Image
import pathlib
import tempfile
import torch

# 下载 Azure ML 作业输出，获取存放 LoRA 权重的本地临时目录
download_dir = pathlib.Path(tempfile.mkdtemp(prefix="qwen-lora-"))
ml_client.jobs.download("serene_gas_4q7fphz2pz", output_name="trained_lora", download_path=download_dir.as_posix())
print(f"Artifacts downloaded to {download_dir}")

# 加载基础模型并应用 LoRA 适配器
adapter_path = "/tmp/qwen-all-o2r7gmxz/artifacts/outputs/checkpoint-162"

# 使用具体的 Qwen2VLForConditionalGeneration 类而不是 AutoModelForVision2Seq
try:
    print("Loading base model...")
    base_model = Qwen2VLForConditionalGeneration.from_pretrained(
        BASE_MODEL, 
        trust_remote_code=True,
        torch_dtype=torch.bfloat16,
        device_map="auto"
    )
    print("Base model loaded successfully!")
    
    print("Loading LoRA adapter...")
    model = PeftModel.from_pretrained(base_model, adapter_path, trust_remote_code=True)
    print("LoRA adapter loaded successfully!")
    
    print("Loading processor...")
    processor = AutoProcessor.from_pretrained(BASE_MODEL, trust_remote_code=True)
    print("Processor loaded successfully!")
    
except Exception as e:
    print(f"Error loading model: {e}")
    print("Trying alternative approach...")
    
    # 备用方案：直接使用 AutoModel
    from transformers import AutoModel
    base_model = AutoModel.from_pretrained(
        BASE_MODEL, 
        trust_remote_code=True,
        torch_dtype=torch.bfloat16,
        device_map="auto"
    )
    model = PeftModel.from_pretrained(base_model, adapter_path, trust_remote_code=True)
    processor = AutoProcessor.from_pretrained(BASE_MODEL, trust_remote_code=True)

# 准备用于测试的单张图片，构造对话式输入模板
test_image_path = "./dataset/images/sample_000000.png"  # 替换为实际的本地图像路径
if not pathlib.Path(test_image_path).exists():
    print(f"Warning: Test image not found at {test_image_path}")
    print("Please update test_image_path to point to a valid image file")
else:
    test_image = Image.open(test_image_path).convert("RGB")
    messages = [
        {"role": "user", "content": [{"type": "image", "image": test_image}, {"type": "text", "text": "请描述这张图片。"}]}
    ]
    prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = processor(text=prompt, images=test_image, return_tensors="pt")
    
    # 如果有 GPU 可用，将输入移动到 GPU
    if torch.cuda.is_available():
        inputs = {k: v.to("cuda") for k, v in inputs.items()}
    
    # 生成回复并打印，便于快速验证效果
    print("Generating response...")
    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=256, do_sample=True, temperature=0.7)
    
    # 解码并打印响应
    response = processor.tokenizer.decode(outputs[0], skip_special_tokens=True)
    print("\n" + "="*50)
    print("Model Response:")
    print("="*50)
    print(response)
    print("="*50)