# 01 â€” Azure ML + MLflow Quickstart

This notebook connects to an Azure ML workspace, sets MLflow tracking to the workspace, trains a simple model, logs metrics + artifacts, registers the model, and runs batch scoring.

## Prereqs
- Azure subscription + access to the workshop resource group
- Azure ML workspace deployed (see `infra/main.bicep`)
- Auth: `DefaultAzureCredential` (recommended) or interactive browser fallback

## Known Issue: NumPy 2.x Compatibility
MLflow embeds conda/pip dependencies when logging a model. If logged with NumPy 2.x, Azure ML's curated environments (NumPy 1.x) will fail. This notebook explicitly pins `numpy<2.0` when logging the model.

In [None]:
# Install dependencies (uncomment if needed)
# %pip install -r ../requirements.txt

import os
import json
import time
import uuid
import sys
import site

# Avoid mixing user-site packages with the repo venv
try:
    user_site = site.getusersitepackages()
    sys.path = [p for p in sys.path if os.path.normcase(p) != os.path.normcase(user_site)]
except Exception:
    pass

import mlflow
import numpy as np
import pandas as pd

from sklearn.datasets import fetch_openml
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score
from sklearn.model_selection import train_test_split

from azure.ai.ml import MLClient
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.entities import Model, ManagedOnlineEndpoint, ManagedOnlineDeployment
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

print('Imports OK')
print('Python:', sys.executable)
print('mlflow:', mlflow.__version__, 'from', mlflow.__file__)

In [None]:
# Workshop configuration
import subprocess
import shutil
import platform

def _get_windows_persisted_env(var_name: str) -> str:
    """Read a persisted env var from Windows registry (HKCU/HKLM)."""
    try:
        import winreg
    except Exception:
        return ""

    for root, path in (
        (winreg.HKEY_CURRENT_USER, r"Environment"),
        (winreg.HKEY_LOCAL_MACHINE, r"SYSTEM\CurrentControlSet\Control\Session Manager\Environment"),
    ):
        try:
            with winreg.OpenKey(root, path) as key:
                value, _ = winreg.QueryValueEx(key, var_name)
                if isinstance(value, str) and value.strip():
                    return value.strip()
        except (FileNotFoundError, OSError):
            continue
    return ""

SUBSCRIPTION_ID = os.getenv('AZURE_SUBSCRIPTION_ID', '').strip()
RESOURCE_GROUP = os.getenv('AZURE_RESOURCE_GROUP', 'rg-dnd-mlops-demo').strip()
WORKSPACE_NAME = os.getenv('AZURE_ML_WORKSPACE', 'mlw-dndmlops2-dev').strip()

# Windows: try loading from registry if env var not set
if not SUBSCRIPTION_ID and platform.system() == 'Windows':
    persisted = _get_windows_persisted_env('AZURE_SUBSCRIPTION_ID')
    if persisted:
        SUBSCRIPTION_ID = persisted
        os.environ['AZURE_SUBSCRIPTION_ID'] = SUBSCRIPTION_ID
        print('Loaded AZURE_SUBSCRIPTION_ID from Windows registry.')

# Fallback: Azure CLI
if not SUBSCRIPTION_ID and shutil.which('az'):
    try:
        SUBSCRIPTION_ID = subprocess.check_output(
            ['az', 'account', 'show', '--query', 'id', '-o', 'tsv'],
            text=True, stderr=subprocess.STDOUT,
        ).strip()
        if SUBSCRIPTION_ID:
            os.environ['AZURE_SUBSCRIPTION_ID'] = SUBSCRIPTION_ID
            print('Using subscription from Azure CLI (az account show).')
    except Exception as e:
        print('Azure CLI fallback failed:', repr(e))

if not SUBSCRIPTION_ID:
    raise ValueError('Missing AZURE_SUBSCRIPTION_ID. Set it as an env var or update this cell.')

print('Subscription:', SUBSCRIPTION_ID)
print('Resource group:', RESOURCE_GROUP)
print('Workspace:', WORKSPACE_NAME)

In [None]:
# Authenticate and connect to Azure ML
try:
    credential = DefaultAzureCredential(exclude_interactive_browser_credential=True)
    credential.get_token('https://management.azure.com/.default')
except Exception:
    credential = InteractiveBrowserCredential()

ml_client = MLClient(
    credential=credential,
    subscription_id=SUBSCRIPTION_ID,
    resource_group_name=RESOURCE_GROUP,
    workspace_name=WORKSPACE_NAME,
)

print('Connected to workspace:', ml_client.workspace_name)

In [None]:
# Configure MLflow to use Azure ML workspace tracking
tracking_uri = ml_client.workspaces.get(WORKSPACE_NAME).mlflow_tracking_uri
mlflow.set_tracking_uri(tracking_uri)

experiment_name = os.getenv('MLFLOW_EXPERIMENT_NAME', 'mlops-hackathon-demo')
mlflow.set_experiment(experiment_name)

print('MLflow tracking URI:', tracking_uri)
print('Experiment:', experiment_name)

In [None]:
# Load dataset (OpenML Spambase - 4,601 emails, 57 features)
print('Loading Spambase dataset from OpenML...')
spambase = fetch_openml(data_id=44, as_frame=True, parser='auto')
data = spambase.frame.rename(columns={'class': 'is_spam'})
data['is_spam'] = data['is_spam'].astype(int)

X = data.drop('is_spam', axis=1)
y = data['is_spam']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print('Train:', X_train.shape, 'Test:', X_test.shape)
X_train.head()

## Train Model with MLflow Tracking

We train a RandomForest classifier and log:
- Parameters
- Metrics (accuracy, precision, recall, F1, AUC)
- Feature importance artifact
- Model with **explicit `numpy<2.0`** requirement for Azure ML compatibility

In [None]:
from mlflow.models import infer_signature

params = {
    'n_estimators': 100,
    'max_depth': 10,
    'min_samples_split': 5,
    'min_samples_leaf': 2,
    'random_state': 42,
}

# Explicit pip requirements for Azure ML compatibility
pip_requirements = [
    'numpy<2.0',
    'scikit-learn>=1.0,<2.0',
    'pandas',
    'mlflow',
]

with mlflow.start_run(run_name='spam-classifier-rf') as run:
    mlflow.log_params(params)
    mlflow.log_param('training_samples', len(X_train))
    mlflow.log_param('dataset', 'UCI Spambase')
    mlflow.log_param('numpy_constraint', '<2.0')

    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]

    metrics = {
        'accuracy': accuracy_score(y_test, y_pred),
        'precision': precision_score(y_test, y_pred),
        'recall': recall_score(y_test, y_pred),
        'f1_score': f1_score(y_test, y_pred),
        'roc_auc': roc_auc_score(y_test, y_pred_proba),
    }
    mlflow.log_metrics(metrics)

    feature_importance = (
        pd.DataFrame({'feature': X.columns, 'importance': model.feature_importances_})
        .sort_values('importance', ascending=False)
    )
    feature_importance.to_csv('feature_importance.csv', index=False)
    mlflow.log_artifact('feature_importance.csv')

    signature = infer_signature(X_train, model.predict(X_train))

    mlflow.sklearn.log_model(
        model,
        artifact_path='model',
        signature=signature,
        pip_requirements=pip_requirements,
        registered_model_name='spam-classifier',
    )

print('Run ID:', run.info.run_id)
print('Metrics:', {k: round(v, 4) for k, v in metrics.items()})
feature_importance.head(10)

In [None]:
# Register model in Azure ML with governance metadata
from azure.ai.ml.entities import Model
from azure.ai.ml.constants import AssetTypes

model_uri = f'runs:/{run.info.run_id}/model'

registered_model = ml_client.models.create_or_update(
    Model(
        path=model_uri,
        name='spam-classifier',
        type=AssetTypes.MLFLOW_MODEL,
        description='Email spam classifier (numpy<2.0 compatible)',
        tags={
            'author': os.getenv('MODEL_AUTHOR', 'workshop-attendee'),
            'use_case': 'spam_detection',
            'dataset': 'UCI Spambase',
            'framework': 'sklearn',
            'numpy_constraint': '<2.0',
        },
        properties={k: str(round(v, 4)) for k, v in metrics.items()},
    )
)

print(f'Model registered: {registered_model.name}:{registered_model.version}')

---
## Batch Scoring Job

Run **asynchronous batch scoring** as an Azure ML command job:
- No online endpoint required
- Produces a CSV output you can download
- Uses the registered MLflow model

In [None]:
# Batch scoring (1/3): resolve the model reference
from typing import Optional
from azure.ai.ml import Input, Output, command

model_name = registered_model.name
model_version = str(registered_model.version)
model_ref = f'azureml:{model_name}:{model_version}'

batch_n_rows = int(os.getenv('BATCH_N_ROWS', '100'))

print('Model reference:', model_ref)
print('Batch rows:', batch_n_rows)

In [None]:
# Batch scoring (2/3): submit job
from azure.ai.ml.entities import UserIdentityConfiguration

# Auto-select compute
batch_compute = os.getenv('AML_BATCH_COMPUTE', '').strip()
if not batch_compute:
    computes = list(ml_client.compute.list())
    aml_compute_names = [
        getattr(c, 'name', None) for c in computes
        if getattr(c, 'name', None)
        and ('amlcompute' in str(getattr(c, 'type', '')).lower())
    ]
    if not aml_compute_names:
        raise RuntimeError('No AmlCompute cluster found. Create one in Azure ML Studio.')
    batch_compute = aml_compute_names[0]
    print('Auto-selected compute:', batch_compute)
else:
    print('Using compute:', batch_compute)

batch_env = 'azureml://registries/azureml/environments/sklearn-1.5/labels/latest'

inline_command = f"""python - <<'PYSCRIPT'
import os
import pandas as pd
import mlflow
from sklearn.datasets import fetch_openml

model_dir = r"${{{{inputs.model}}}}"
out_dir = r"${{{{outputs.predictions}}}}"
n_rows = {batch_n_rows}

spambase = fetch_openml(data_id=44, as_frame=True, parser='auto')
data = spambase.frame.rename(columns={{'class': 'is_spam'}})
X = data.drop('is_spam', axis=1).head(n_rows)

model = mlflow.pyfunc.load_model(model_dir)
preds = model.predict(X)

out = pd.DataFrame({{'prediction': preds}})
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, 'predictions.csv')
out.to_csv(out_path, index=False)
print('Wrote:', out_path)
PYSCRIPT
"""

batch_job = command(
    command=inline_command,
    inputs={'model': Input(type='mlflow_model', path=model_ref, mode='download')},
    outputs={'predictions': Output(type='uri_folder')},
    environment=batch_env,
    compute=batch_compute,
    experiment_name=experiment_name,
    display_name='batch-score-spam-classifier',
    identity=UserIdentityConfiguration(),
)

submitted = ml_client.jobs.create_or_update(batch_job)
print('Submitted batch scoring job:', submitted.name)

In [None]:
# Batch scoring (3/3): wait + download output
from pathlib import Path

wait_seconds = int(os.getenv('BATCH_WAIT_SECONDS', '900'))
poll_seconds = 15

start = time.time()
status = None
while True:
    status = ml_client.jobs.get(submitted.name).status
    print('Job status:', status)
    if status in {'Completed', 'Failed', 'Canceled'}:
        break
    if time.time() - start > wait_seconds:
        print('Job still running; re-run this cell later.')
        break
    time.sleep(poll_seconds)

if status == 'Completed':
    download_dir = Path('batch_outputs') / submitted.name
    download_dir.mkdir(parents=True, exist_ok=True)
    ml_client.jobs.download(name=submitted.name, download_path=str(download_dir), output_name='predictions')
    print('Downloaded to:', download_dir.resolve())

    pred_path = download_dir / 'named-outputs' / 'predictions' / 'predictions.csv'
    if pred_path.exists():
        preds = pd.read_csv(pred_path)
        display(preds.head(10))
    else:
        print('Predictions file not found. Check Studio for job output.')
elif status == 'Failed':
    print('Job failed. Check logs in Azure ML Studio.')
    try:
        ml_client.jobs.stream(submitted.name)
    except Exception as e:
        print('Could not stream logs:', repr(e))

---
## Batch Endpoint (Production Pattern)

A **Batch Endpoint** provides a durable REST endpoint for batch inference:
- No always-on compute (cost-efficient)
- Versioned deployments with traffic routing
- Built-in job management

In [None]:
# Batch Endpoint (1/3): Create the endpoint
from azure.ai.ml.entities import BatchEndpoint

batch_endpoint_name = os.getenv('AML_BATCH_ENDPOINT_NAME', f'spam-batch-{uuid.uuid4().hex[:8]}')

batch_endpoint = BatchEndpoint(
    name=batch_endpoint_name,
    description='Spam classifier batch endpoint',
    tags={'environment': 'workshop'},
)

print(f'Creating batch endpoint: {batch_endpoint_name}')
ml_client.batch_endpoints.begin_create_or_update(batch_endpoint).result()
print('Batch endpoint created!')

In [None]:
# Batch Endpoint (2/3): Create deployment
from azure.ai.ml.entities import ModelBatchDeployment, ModelBatchDeploymentSettings
from azure.ai.ml.constants import BatchDeploymentOutputAction
from azure.ai.ml.entities import BatchRetrySettings

model_for_batch = f'azureml:{registered_model.name}:{registered_model.version}'
batch_endpoint_env = 'azureml://registries/azureml/environments/sklearn-1.5/labels/latest'
deployment_name = 'numpy1x'

batch_deployment = ModelBatchDeployment(
    name=deployment_name,
    endpoint_name=batch_endpoint_name,
    model=model_for_batch,
    environment=batch_endpoint_env,
    compute=batch_compute,
    settings=ModelBatchDeploymentSettings(
        instance_count=1,
        max_concurrency_per_instance=2,
        mini_batch_size=10,
        output_action=BatchDeploymentOutputAction.APPEND_ROW,
        output_file_name='predictions.csv',
        retry_settings=BatchRetrySettings(max_retries=3, timeout=300),
        logging_level='info',
    ),
)

print(f'Creating deployment: {deployment_name}')
ml_client.batch_deployments.begin_create_or_update(batch_deployment).result()

# Set as default
batch_endpoint = ml_client.batch_endpoints.get(batch_endpoint_name)
batch_endpoint.defaults.deployment_name = deployment_name
ml_client.batch_endpoints.begin_create_or_update(batch_endpoint).result()

print(f'Deployment "{deployment_name}" created and set as default')

In [None]:
# Batch Endpoint (3/3): Invoke with test data
from azure.ai.ml import Input
from pathlib import Path

batch_input_path = Path('batch_endpoint_input.csv')
X_test.head(50).to_csv(batch_input_path, index=False)
print(f'Created input file: {batch_input_path} (50 rows)')

job = ml_client.batch_endpoints.invoke(
    endpoint_name=batch_endpoint_name,
    inputs={'input': Input(path=str(batch_input_path.resolve()), type=AssetTypes.URI_FILE)},
)

print(f'Batch job submitted: {job.name}')
print(f'Monitor: https://ml.azure.com/runs/{job.name}?wsid=/subscriptions/{SUBSCRIPTION_ID}/resourcegroups/{RESOURCE_GROUP}/workspaces/{WORKSPACE_NAME}')

In [None]:
# Wait for batch endpoint job and download results
job_name = job.name
wait_seconds = 600

start = time.time()
while True:
    job_status = ml_client.jobs.get(job_name)
    status = job_status.status
    print(f'Batch job status: {status}')
    if status in {'Completed', 'Failed', 'Canceled'}:
        break
    if time.time() - start > wait_seconds:
        print('Job still running. Re-run later.')
        break
    time.sleep(15)

if status == 'Completed':
    output_dir = Path('batch_endpoint_outputs') / job_name
    output_dir.mkdir(parents=True, exist_ok=True)
    ml_client.jobs.download(name=job_name, download_path=str(output_dir), output_name='score')
    print(f'Downloaded to: {output_dir}')

    for pred_file in output_dir.rglob('predictions.csv'):
        preds_df = pd.read_csv(pred_file)
        print(f'\nPredictions from {pred_file}:')
        display(preds_df.head(10))
        break
else:
    print(f'Job status: {status}')

In [None]:
# (Optional) Get batch endpoint details
endpoint_info = ml_client.batch_endpoints.get(batch_endpoint_name)

print('=== Batch Endpoint Details ===')
print(f'Name: {endpoint_info.name}')
print(f'Scoring URI: {endpoint_info.scoring_uri}')
print(f'Default deployment: {endpoint_info.defaults.deployment_name}')
print(f'Studio: https://ml.azure.com/batchEndpoints/{batch_endpoint_name}?wsid=/subscriptions/{SUBSCRIPTION_ID}/resourcegroups/{RESOURCE_GROUP}/workspaces/{WORKSPACE_NAME}')

---
## Optional: Real-time Serving (Managed Online Endpoint)

Deploy a **managed online endpoint** for real-time inference.
- Takes 5-10 minutes to provision
- Requires VM quota for the chosen instance type

Skip this section if batch scoring meets your needs.

In [None]:
# Create and deploy managed online endpoint
online_endpoint_name = os.getenv('AML_ENDPOINT_NAME', f'spam-clf-{uuid.uuid4().hex[:8]}')

online_endpoint = ManagedOnlineEndpoint(
    name=online_endpoint_name,
    description='Spam classification endpoint',
    auth_mode='key',
    tags={'environment': 'workshop'},
)

print(f'Creating endpoint: {online_endpoint_name}')
ml_client.online_endpoints.begin_create_or_update(online_endpoint).result()
print('Endpoint created!')

online_deployment = ManagedOnlineDeployment(
    name='blue',
    endpoint_name=online_endpoint_name,
    model=f'azureml:{registered_model.name}:{registered_model.version}',
    instance_type='Standard_DS3_v2',
    instance_count=1,
)

print('Creating deployment (5-10 min)...')
ml_client.online_deployments.begin_create_or_update(online_deployment).result()

online_endpoint = ml_client.online_endpoints.get(online_endpoint_name)
online_endpoint.traffic = {'blue': 100}
ml_client.online_endpoints.begin_create_or_update(online_endpoint).result()

print('Deployment complete!')
print(f'Scoring URI: {online_endpoint.scoring_uri}')

In [None]:
# Test the online endpoint
test_samples = X_test.head(5).to_dict(orient='split')
request_json = json.dumps({
    'input_data': {
        'columns': test_samples['columns'],
        'data': test_samples['data'],
    }
})

response = ml_client.online_endpoints.invoke(
    endpoint_name=online_endpoint_name,
    deployment_name='blue',
    request_file=None,
    request_json=request_json,
)

print('Predictions:')
print(json.loads(response))

In [None]:
# Studio link
studio_url = (
    f'https://ml.azure.com/experiments/{experiment_name}'
    f'?wsid=/subscriptions/{SUBSCRIPTION_ID}/resourceGroups/{RESOURCE_GROUP}'
    f'/providers/Microsoft.MachineLearningServices/workspaces/{WORKSPACE_NAME}'
)
print('Open in Azure ML Studio:')
print(studio_url)

---
## Cleanup

Delete endpoints to avoid ongoing compute cost.

In [None]:
# Uncomment to delete endpoints

# Delete batch endpoint
# ml_client.batch_endpoints.begin_delete(name=batch_endpoint_name).result()
# print('Deleted batch endpoint:', batch_endpoint_name)

# Delete online endpoint (if created)
# ml_client.online_endpoints.begin_delete(name=online_endpoint_name).result()
# print('Deleted online endpoint:', online_endpoint_name)

---
## Key Takeaways

1. **MLflow Integration**: Azure ML provides native MLflow tracking URI
2. **NumPy Compatibility**: Pin `numpy<2.0` when logging models for Azure ML endpoints
3. **Batch Scoring**: Use command jobs for ad-hoc batch inference
4. **Batch Endpoints**: Use for production batch workloads with versioned deployments
5. **Online Endpoints**: Use for real-time inference (higher cost)

## Next Steps
- Run `01-automated-retraining/submit_pipeline.py` for automated retraining
- Run `02-observability/submit_drift_job.py` for drift detection
- Run `03-governance/run_audit_report.py` for compliance auditing