In [1]:
from azure.ai.ml import MLClient, command, Input, Output
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import Environment
from azure.ai.ml.constants import AssetTypes
from azure.identity import ClientSecretCredential
import os
from dotenv import load_dotenv
from pathlib import Path
import time

# Load environment variables
load_dotenv()

print("="*60)
print("🚀 FINAL PIPELINE SUBMISSION")
print("="*60)

# Configuration
client_id = os.getenv("AZURE_CLIENT_ID")
client_secret = os.getenv("AZURE_CLIENT_SECRET")
tenant_id = os.getenv("AZURE_TENANT_ID")
subscription_id = os.getenv("SUBSCRIPTION_ID")
resource_group = os.getenv("RESOURCE_GROUP")
workspace_name = os.getenv("AZUREML_WORKSPACE_NAME")

print("✅ Checks completed:")
print("  • Storage keys synced")
print("  • Storage account verified: strategicaistuksdev02")
print("  • Service Principal has Storage Blob Data Contributor")
print("  • Default datastore confirmed: workspaceblobstore")
print()

🚀 FINAL PIPELINE SUBMISSION
✅ Checks completed:
  • Storage keys synced
  • Storage account verified: strategicaistuksdev02
  • Service Principal has Storage Blob Data Contributor
  • Default datastore confirmed: workspaceblobstore



In [2]:
# Create credential
credential = ClientSecretCredential(
    tenant_id=tenant_id,
    client_id=client_id,
    client_secret=client_secret
)

# Initialize ML Client
ml_client = MLClient(
    credential,
    subscription_id=subscription_id,
    resource_group_name=resource_group,
    workspace_name=workspace_name
)

print(f"Connected to workspace: {workspace_name}")

Connected to workspace: strategicai-mlw-uks-dev-01


In [3]:
# svc_pr_password = os.environ.get("AZURE_CLIENT_SECRET")

# svc_pr = ServicePrincipalAuthentication(
#        tenant_id=os.environ.get("AZURE_TENANT_ID"),
#        service_principal_id=os.environ.get("AZURE_CLIENT_ID"),
#        service_principal_password=svc_pr_password,
#        _enable_caching=True)

In [4]:
# Determine project root
current_dir = Path.cwd()
if current_dir.name == "notebooks":
    project_root = current_dir.parent
else:
    project_root = current_dir

print(f"Project root: {project_root}")

Project root: /mnt/batch/tasks/shared/LS_root/mounts/clusters/vm-juan/code/Notebooks/repos/azure_ml_pipelines_pa


In [5]:
import os

dependencies_dir = "./env"
os.makedirs(dependencies_dir, exist_ok=True)

In [6]:


# Create environment


custom_env_name = "pa-env"
try:
    # Get latest version of environment
    job_env = ml_client.environments.get(name=custom_env_name,version="3")
except:
    job_env = Environment(
        name=custom_env_name,
        description="Environment for Personal Agendas pipeline",
        conda_file=os.path.join(dependencies_dir, "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi5.0-ubuntu24.04:20250601.v1",
    )
    job_env = ml_client.environments.create_or_update(job_env)
    env_version = job_env.version
    print(f"Created environment: {custom_env_name}:{env_version}")

print(
    f"Environment with name {job_env.name} is registered to workspace, the environment version is {job_env.version}"
)

ActivityCompleted: Activity=Environment.Get, HowEnded=Failure, Duration=558.54 [ms], Exception=ResourceNotFoundError, ErrorCategory=UserError, ErrorMessage=(UserError) No environment exists for name: pa-env, version: 3, label: 
Code: UserError
Message: No environment exists for name: pa-env, version: 3, label: 
ActivityCompleted: Activity=Environment.CreateOrUpdate, HowEnded=Failure, Duration=247.32 [ms], Exception=ResourceExistsError, ErrorCategory=UserError, ErrorMessage=(UserError) Environment pa-env with version 1 is already registered and cannot be changed.
Code: UserError
Message: Environment pa-env with version 1 is already registered and cannot be changed.


Created environment: pa-env:2
Environment with name pa-env is registered to workspace, the environment version is 2


In [7]:
from pathlib import Path
# Verify local directory structure
# If running from notebooks/ folder, go up one level to project root
current_dir = Path.cwd()
if current_dir.name == "notebooks":
    project_root = current_dir.parent
    print(f"Running from notebooks folder, using parent as project root")
else:
    project_root = current_dir
    print(f"Running from project root directly")

print(f"Current working directory: {current_dir}")
print(f"Project root: {project_root}")

# Check if PA directory exists
pa_dir = project_root / "PA"
if not pa_dir.exists():
    print(f"ERROR: PA directory not found at {pa_dir}")
    print("Please ensure you're running from the correct directory")
else:
    print(f"✓ PA directory found at {pa_dir}")

# Check if azureml_pipeline directory exists
pipeline_dir = project_root / "azureml_pipeline"
if not pipeline_dir.exists():
    print(f"Creating azureml_pipeline directory at {pipeline_dir}")
    pipeline_dir.mkdir(exist_ok=True)
else:
    print(f"✓ azureml_pipeline directory found at {pipeline_dir}")

# Check for config files
config_vet = pa_dir / "config" / "config_vet.yaml"
config_ecomm = pa_dir / "config" / "config_ecomm.yaml"

if not config_vet.exists():
    print(f"WARNING: config_vet.yaml not found at {config_vet}")
if not config_ecomm.exists():
    print(f"WARNING: config_ecomm.yaml not found at {config_ecomm}")

Running from notebooks folder, using parent as project root
Current working directory: /mnt/batch/tasks/shared/LS_root/mounts/clusters/vm-juan/code/Notebooks/repos/azure_ml_pipelines_pa/notebooks
Project root: /mnt/batch/tasks/shared/LS_root/mounts/clusters/vm-juan/code/Notebooks/repos/azure_ml_pipelines_pa
✓ PA directory found at /mnt/batch/tasks/shared/LS_root/mounts/clusters/vm-juan/code/Notebooks/repos/azure_ml_pipelines_pa/PA
✓ azureml_pipeline directory found at /mnt/batch/tasks/shared/LS_root/mounts/clusters/vm-juan/code/Notebooks/repos/azure_ml_pipelines_pa/azureml_pipeline


In [8]:
# Define the data preparation component
data_preparation_component = command(
    name="data_preparation",
    display_name="Data Preparation - PA Pipeline",
    description="Process registration, scan, and session data",
    inputs={
        "input_uri": Input(type="uri_folder"),
        "config_type": Input(type="string", default="ecomm"),
        "incremental": Input(type="boolean", default=False)
    },
    outputs={
        "registration_output": Output(type="uri_folder"),
        "scan_output": Output(type="uri_folder"),
        "session_output": Output(type="uri_folder"),
        "metadata_output": Output(type="uri_folder")
    },
    code=str(project_root),  # Upload entire project
    command="""python azureml_pipeline/azureml_step1_data_prep.py \
        --config PA/config/config_${{inputs.config_type}}.yaml \
        --input_uri ${{inputs.input_uri}} \
        --incremental ${{inputs.incremental}} \
        --output_registration ${{outputs.registration_output}} \
        --output_scan ${{outputs.scan_output}} \
        --output_session ${{outputs.session_output}} \
        --output_metadata ${{outputs.metadata_output}}
    """,
    environment=f"{custom_env_name}:{env_version}",
    compute="cpu-cluster",
    is_deterministic=False
)


In [9]:
# Define the pipeline
@pipeline(
    compute="cpu-cluster",
    description="Personal Agendas data processing pipeline",
)
def personal_agendas_pipeline(
    pipeline_input_data: Input,
    pipeline_config_type: str = "ecomm",
    pipeline_incremental: bool = False
):
    """
    Personal Agendas Pipeline
    Step 1: Data Preparation (Registration, Scan, Session)
    """
    
    step1 = data_preparation_component(
        input_uri=pipeline_input_data,
        config_type=pipeline_config_type,
        incremental=pipeline_incremental
    )
    
    return {
        "registration_data": step1.outputs.registration_output,
        "scan_data": step1.outputs.scan_output,
        "session_data": step1.outputs.session_output,
        "metadata": step1.outputs.metadata_output
    }

In [10]:
# Using the landing_pa datastore you have
# "azureml://subscriptions/b8d6d487-0bd2-4773-b318-12ab763ed178/resourcegroups/strategicai-rg-uks-dev-01/workspaces/strategicai-mlw-uks-dev-01/datastores/landing_pa/paths/landing/azureml/"
input_data_uri = f"azureml://subscriptions/{subscription_id}/resourcegroups/{resource_group}/workspaces/{workspace_name}/datastores/landing_pa/paths/landing/azureml/"

print(f"\nInput data URI: {input_data_uri}")



Input data URI: azureml://subscriptions/b8d6d487-0bd2-4773-b318-12ab763ed178/resourcegroups/strategicai-rg-uks-dev-01/workspaces/strategicai-mlw-uks-dev-01/datastores/landing_pa/paths/landing/azureml/


In [11]:
print(f"\nInput data URI: {input_data_uri}")

# Create pipeline instance
pipeline_job = personal_agendas_pipeline(
    pipeline_input_data=Input(
        type=AssetTypes.URI_FOLDER,
        path=input_data_uri
    ),
    pipeline_config_type="ecomm",
    pipeline_incremental=False
)

# Configure pipeline metadata
pipeline_job.display_name = "Personal Agendas Pipeline - ECOMM"
pipeline_job.tags = {
    "project": "personal_agendas",
    "event_type": "ecomm",
    "storage_fixed": "true",
    "environment": "dev"
}
pipeline_job.experiment_name = "personal_agendas_experiment"


Input data URI: azureml://subscriptions/b8d6d487-0bd2-4773-b318-12ab763ed178/resourcegroups/strategicai-rg-uks-dev-01/workspaces/strategicai-mlw-uks-dev-01/datastores/landing_pa/paths/landing/azureml/


In [12]:
# Submit the pipeline
print("\n" + "="*60)
print("SUBMITTING PIPELINE")
print("="*60)

try:
    submitted_job = ml_client.jobs.create_or_update(pipeline_job)
    
    print(f"\n✅ SUCCESS! Pipeline submitted!")
    print(f"\n📊 Job Details:")
    print(f"  • Name: {submitted_job.name}")
    print(f"  • Status: {submitted_job.status}")
    print(f"  • Type: {submitted_job.type}")
    print(f"\n🔗 Monitor your pipeline at:")
    print(f"  {submitted_job.studio_url}")
    
    print("\n💡 Tips:")
    print("  • Click the link above to watch the pipeline progress")
    print("  • Check 'Outputs + logs' for detailed execution logs")
    print("  • The first run may take longer due to environment setup")
    
except Exception as e:
    print(f"\n❌ Submission failed: {str(e)}")
    print("\n🔧 Troubleshooting:")
    
    if "AuthorizationFailure" in str(e):
        print("  Storage authorization issue still present. Try:")
        print("  1. Wait 2 more minutes and retry")
        print("  2. Run: az ml workspace sync-keys --resource-group {} --workspace-name {}".format(resource_group, workspace_name))
        print("  3. Use UserIdentityConfiguration instead (see previous examples)")
    elif "compute" in str(e).lower():
        print("  Compute cluster issue. Check:")
        print("  1. Cluster 'cpu-cluster' exists")
        print("  2. Cluster is running (not stopped)")
        print("  3. You have permissions to use it")
    else:
        print("  Check the error message above for details")
        print("  Verify all prerequisites are met")

print("\n" + "="*60)


SUBMITTING PIPELINE

✅ SUCCESS! Pipeline submitted!

📊 Job Details:
  • Name: placid_dog_jg7n9cgq46
  • Status: NotStarted
  • Type: pipeline

🔗 Monitor your pipeline at:
  https://ml.azure.com/runs/placid_dog_jg7n9cgq46?wsid=/subscriptions/b8d6d487-0bd2-4773-b318-12ab763ed178/resourcegroups/strategicai-rg-uks-dev-01/workspaces/strategicai-mlw-uks-dev-01&tid=3540e7dc-31b3-4057-9e31-43e9fe938179

💡 Tips:
  • Click the link above to watch the pipeline progress
  • Check 'Outputs + logs' for detailed execution logs
  • The first run may take longer due to environment setup



[32mUploading azure_ml_pipelines_pa (0.89 MBs): 100%|██████████| 890668/890668 [00:00<00:00, 2406521.49it/s]
[39m

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
