## **Problem Statement**

### **Business Context**

An automobile dealership in Los Vegas specializes in selling luxury and non-luxury vehicles. They cater to diverse customer preferences with varying vehicle specifications, such as mileage, engine capacity, and seating capacity. However, the dealership faces significant challenges in maintaining consistency and efficiency across its pricing strategy due to reliance on manual processes and disconnected systems. Pricing evaluations are prone to errors, updates are delayed, and scaling operations are difficult as demand grows. These inefficiencies impact revenue and customer trust. Recognizing the need for a reliable and scalable solution, the dealership is seeking to implement a unified system that ensures seamless integration of data-driven pricing decisions, adaptability to changing market conditions, and operational efficiency.

### **Objective**

The dealership has hired you as an MLOps Engineer to design and implement an MLOps pipeline that automates the pricing workflow. This pipeline will encompass data cleaning, preprocessing, transformation, model building, training, evaluation, and registration with CI/CD capabilities to ensure continuous integration and delivery. Your role is to overcome challenges such as integrating disparate data sources, maintaining consistent model performance, and enabling scalable, automated updates to meet evolving business needs. The expected outcomes are a robust, automated system that improves pricing accuracy, operational efficiency, and scalability, driving increased profitability and customer satisfaction.

### **Data Description**

The dataset contains attributes of used cars sold in various locations. These attributes serve as key data points for CarOnSell's pricing model. The detailed attributes are:

- **Segment:** Describes the category of the vehicle, indicating whether it is a luxury or non-luxury segment.

- **Kilometers_Driven:** The total number of kilometers the vehicle has been driven.

- **Mileage:** The fuel efficiency of the vehicle, measured in kilometers per liter (km/l).

- **Engine:** The engine capacity of the vehicle, measured in cubic centimeters (cc). 

- **Power:** The power of the vehicle's engine, measured in brake horsepower (BHP). 

- **Seats:** The number of seats in the vehicle, can influence the vehicle's classification, usage, and pricing based on customer needs.

- **Price:** The price of the vehicle, listed in lakhs (units of 100,000), represents the cost to the consumer for purchasing the vehicle.

## **1. AzureML Environment Setup and Data Preparation**

### **1.1 Connect to Azure Machine Learning Workspace**

In [8]:
# Install the Azure Machine Learning SDK and FAISS-related utilities
!%pip install azure-ai-ml
# %pip install -U 'azureml-rag[faiss,hugging_face]>=0.2.36'

/bin/bash: line 1: fg: no job control


In [9]:
# Handle to the workspace
from azure.ai.ml import MLClient

# Authentication package
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()

from azureml.core import Workspace

In [10]:
%%writefile workspace.json
{
    "subscription_id": "aa382cca-fa09-4ae9-b74b-c63cd0b942e8",
    "resource_group":  "defualt_resource_group",
    "workspace_name": "azureai"  
}

Overwriting workspace.json


In [11]:
# Initialize credentials for Azure authentication
try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

In [12]:
# Initialize the MLClient to connect with AzureML
ml_client = MLClient.from_config(credential=credential, path="workspace.json")



# Create an AzureML Workspace object
ws = Workspace(
    subscription_id=ml_client.subscription_id,
    resource_group=ml_client.resource_group_name,
    workspace_name=ml_client.workspace_name,
)


# Verify the client and workspace details
print(ml_client)


Found the config file in: workspace.json
Overriding of current TracerProvider is not allowed
Overriding of current LoggerProvider is not allowed
Overriding of current MeterProvider is not allowed
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented


MLClient(credential=<azure.identity._credentials.default.DefaultAzureCredential object at 0x7c210860d210>,
         subscription_id=aa382cca-fa09-4ae9-b74b-c63cd0b942e8,
         resource_group_name=defualt_resource_group,
         workspace_name=azureai)


### **1.2 Set Up Compute Cluster**

In [13]:
from azure.ai.ml.entities import AmlCompute

# Name assigned to the compute cluster
cpu_compute_target = "cpu-cluster"

try:
    # let's see if the compute target already exists
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print("Creating a new cpu compute target...")

    # Let's create the Azure ML compute object with the intended parameters
    cpu_cluster = AmlCompute(
        name=cpu_compute_target,
        # Azure ML Compute is the on-demand VM service
        type="amlcompute",
        # VM Family
        size="Standard_D8_v3",
        # Minimum running nodes when there is no job running
        min_instances=0,
        # Nodes in cluster
        max_instances=1,
        # How many seconds will the node running after the job termination
        idle_time_before_scale_down=180,
        # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
        tier="Dedicated",
    )

    # Now, we pass the object to MLClient's create_or_update method
    cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster).result()

print(
    f"AMLCompute with name {cpu_cluster.name} is created, the compute size is {cpu_cluster.size}"
)

You already have a cluster named cpu-cluster, we'll reuse it as is.
AMLCompute with name cpu-cluster is created, the compute size is Standard_D8_v3


### **1.3 Register Dataset as Data Asset**

In [17]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes
from pathlib import Path

# Path to the local dataset

# ("used_cars.csv")
local_csv = Path("/home/azureuser/cloudfiles/code/Users/Keya_1751545169843/car-price-mlops/used_cars.csv")
assert local_csv.exists(), f"File not found:"
# Path: Users/Keya_1751545169843/car-price-mlops/used_cars.csv
# Relative Path: /home/azureuser/cloudfiles/code/Users/Keya_1751545169843/car-price-mlops/used_cars.csv

# Set the version number of the data asset (for example: '1')
VERSION = "10"

# Create and register the dataset as an AzureML data asset
data_asset = Data(
    path=local_csv,
    type=AssetTypes.URI_FILE, 
    description="A dataset of used cars for price prediction",
    name="used-cars-data",
    version=VERSION,
)

In [18]:
print(local_csv)
# print(path)

/home/azureuser/cloudfiles/code/Users/Keya_1751545169843/car-price-mlops/used_cars.csv


In [19]:
# Create the data asset in the workspace
ml_client.data.create_or_update(data_asset)

Data({'path': 'azureml://subscriptions/aa382cca-fa09-4ae9-b74b-c63cd0b942e8/resourcegroups/defualt_resource_group/workspaces/azureai/datastores/workspaceblobstore/paths/LocalUpload/0b8e06a9f14bf45a52b1c21394f1cdf03017517cd48663b3e20a05882ff35cdd/used_cars.csv', 'skip_validation': False, 'mltable_schema_url': None, 'referenced_uris': None, 'type': 'uri_file', 'is_anonymous': False, 'auto_increment_version': False, 'auto_delete_setting': None, 'name': 'used-cars-data', 'description': 'A dataset of used cars for price prediction', 'tags': {}, 'properties': {}, 'print_as_yaml': False, 'id': '/subscriptions/aa382cca-fa09-4ae9-b74b-c63cd0b942e8/resourceGroups/defualt_resource_group/providers/Microsoft.MachineLearningServices/workspaces/azureai/data/used-cars-data/versions/10', 'Resource__source_path': '', 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/autoproject/code', 'creation_context': <azure.ai.ml.entities._system_data.SystemData object at 0x7c21087d9600>, 'serialize': <m

### **1.4 Create and Configure Job Environment**

In [47]:
# Create a directory for the preprocessing script
import os

src_dir_env = "./env"
os.makedirs(src_dir_env, exist_ok=True)

In [48]:
%%writefile {src_dir_env}/conda.yml
name: sklearn-env
channels:
  - conda-forge
dependencies:
  - python=3.8
  - pip=21.2.4
  - scikit-learn=0.23.2
  - scipy=1.7.1
  - pip:  
    - mlflow==2.8.1
    - azureml-mlflow==1.51.0
    - azureml-inference-server-http
    - azureml-core==1.49.0
    - cloudpickle==1.6.0

Writing ./env/conda.yml


In [22]:
from azure.ai.ml.entities import Environment, BuildContext

env_docker_conda = Environment(
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
    conda_file="env/conda.yml",
    name="machine_learning_E2E",
    description="Environment created from a Docker image plus Conda environment.",
)
ml_client.environments.create_or_update(env_docker_conda)

Environment({'arm_type': 'environment_version', 'latest_version': None, 'image': 'mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04', 'intellectual_property': None, 'is_anonymous': False, 'auto_increment_version': False, 'auto_delete_setting': None, 'name': 'machine_learning_E2E', 'description': 'Environment created from a Docker image plus Conda environment.', 'tags': {}, 'properties': {'azureml.labels': 'latest'}, 'print_as_yaml': False, 'id': '/subscriptions/aa382cca-fa09-4ae9-b74b-c63cd0b942e8/resourceGroups/defualt_resource_group/providers/Microsoft.MachineLearningServices/workspaces/azureai/environments/machine_learning_E2E/versions/1', 'Resource__source_path': '', 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/autoproject/code', 'creation_context': <azure.ai.ml.entities._system_data.SystemData object at 0x7c21087db520>, 'serialize': <msrest.serialization.Serializer object at 0x7c21087dbca0>, 'version': '1', 'conda_file': {'channels': ['conda-forge'], 'dependencie

## **2. Model Development Workflow**

### **2.1 Data Preparation**

This **Data Preparation job** is designed to process an input dataset by splitting it into two parts: one for training the model and the other for testing it. The script accepts three inputs: the location of the input data (`used_cars.csv`), the ratio for splitting the data into training and testing sets (`test_train_ratio`), and the paths to save the resulting training (`train_data`) and testing (`test_data`) data. The script first reads the input CSV data from a data asset URI, then splits it using Scikit-learn's train_test_split function, and saves the two parts to the specified directories. It also logs the number of records in both the training and testing datasets using MLflow.

In [50]:
import os 
# Users/Keya_1751545169843/car-price-mlops/src
src_dir_job_scripts = "Users/Keya_1751545169843/car-price-mlops/src/data_prep"
os.makedirs(src_dir_job_scripts, exist_ok=True)

In [51]:
%%writefile {src_dir_job_scripts}/data_prep.py

import os
import argparse
import logging
import mlflow
import pandas as pd

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

def main(): 
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="Path to input data")
    parser.add_argument("--test_train_ratio", type=float, default=0.2)
    parser.add_argument("--train_data", type=str, help="Path to save train data")
    parser.add_argument("--test_data", type=str, help="Path to save test data")
    args = parser.parse_args()
    args = parser.parse_args()
     # Start MLflow Run
    mlflow.start_run()

    # Log arguments
    logging.info(f"Input data path: {args.data}")
    logging.info(f"Test-train ratio: {args.test_train_ratio}")

    # Reading Data
    df = pd.read_csv(args.data)

    # Encode categorical feature
    le = LabelEncoder()
    df['Segment'] = le.fit_transform(df['Segment'])

    # Split Data into train and test datasets
    train_df, test_df = train_test_split(df, test_size=args.test_train_ratio, random_state=42)

    # Save train and test data
    os.makedirs(args.train_data, exist_ok=True)
    os.makedirs(args.test_data, exist_ok=True)
    train_df.to_csv(os.path.join(args.train_data, "train.csv"), index=False)
    test_df.to_csv(os.path.join(args.test_data, "test.csv"), index=False)

    # log the metrics
    mlflow.log_metric('train size', train_df.shape[0])
    mlflow.log_metric('test size', test_df.shape[0])
    
    mlflow.end_run()

if __name__ == "__main__":
    main()



Overwriting Users/Keya_1751545169843/car-price-mlops/src/data_prep/data_prep.py


#### **Define Data Preparation job**

For this AzureML job, we define the `command` object that takes input files and output directories, then executes the script with the provided inputs and outputs. The job runs in a pre-configured AzureML environment with the necessary libraries. The result will be two separate datasets for training and testing, ready for use in subsequent steps of the machine learning pipeline.

### **2.2 Training the Model**

This Model Training job is designed to train a **Random Forest Regressor** on the dataset that was split into training and testing sets in the previous data preparation job. This job script accepts five inputs: the path to the training data (`train_data`), the path to the testing data (`test_data`), the number of trees in the forest (`n_estimators`, with a default value of 100), the maximum depth of the trees (`max_depth`, which is set to None by default), and the path to save the trained model (`model_output`).

The script begins by reading the training and testing data files, then processes the data to separate features (X) and target labels (y). A Random Forest Regressor model is initialized using the given n_estimators and max_depth, and it is trained using the training data. The model's performance is evaluated using the `Mean Squared Error (MSE)`. The MSE score is logged in MLflow. Finally, the trained model is saved and stored in the specified output location as an MLflow model. The job completes by logging the final MSE score and ending the MLflow run.


In [52]:

import os

src_dir_job_scripts = "Users/Keya_1751545169843/car-price-mlops/src/model_train"
os.makedirs(src_dir_job_scripts, exist_ok=True)

In [53]:
%%writefile {src_dir_job_scripts}/model_train.py

# Required imports for training
import mlflow
import argparse
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

mlflow.start_run()  # Start the MLflow experiment run

os.makedirs("./outputs", exist_ok=True)  # Create the "outputs" directory if it doesn't exist

def select_first_file(path):
    """Selects the first file in a folder, assuming there's only one file.
    Args:
        path (str): Path to the directory or file to choose.
    Returns:
        str: Full path of the selected file.
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])

def main():
    parser = argparse.ArgumentParser("train")
    parser.add_argument("--train_data", type=str, help="Path to train dataset")
    parser.add_argument("--test_data", type=str, help="Path to test dataset")
    parser.add_argument("--model_output", type=str, help="Path of output model")
    parser.add_argument('--n_estimators', type=int, default=100,
                        help='The number of trees in the forest')
    parser.add_argument('--max_depth', type=int, default=None,
                        help='The maximum depth of the tree. If None, then nodes are expanded until all leaves are pure or until all leaves contain less than min_samples_split samples.')

    args = parser.parse_args()

    # Load datasets
    train_df = pd.read_csv(select_first_file(args.train_data))
    test_df = pd.read_csv(select_first_file(args.test_data))

    # Split the data into features(X) and target(y) 
    y_train = train_df['price']
    X_train = train_df.drop(columns=['price'])
    y_test = test_df['price']
    X_test = test_df.drop(columns=['price'])

    # Initialize and train a RandomForest Regressor
    model = RandomForestRegressor(n_estimators=args.n_estimators, max_depth=args.max_depth, random_state=42)
    model.fit(X_train, y_train)

    # Log model hyperparameters
    mlflow.log_param("model", "RandomForestRegressor")
    mlflow.log_param("n_estimators", args.n_estimators)
    mlflow.log_param("max_depth", args.max_depth)

    # Predict using the RandomForest Regressor on test data
    yhat_test = model.predict(X_test)

    # Compute and log mean squared error for test data
    mse = mean_squared_error(y_test, yhat_test)
    print('Mean Squared Error of RandomForest Regressor on test set: {:.2f}'.format(mse))
    mlflow.log_metric("MSE", float(mse))

    # Save the model
    mlflow.sklearn.save_model(sk_model=model, path=args.model_output)

    mlflow.end_run()  # Ending the MLflow experiment run

if __name__ == "__main__":
    main()

Overwriting Users/Keya_1751545169843/car-price-mlops/src/model_train/model_train.py


#### **Define Model Training Job**

For this AzureML job, we define the `command` object that takes the paths to the training and testing data, the number of trees in the forest (`n_estimators`), and the maximum depth of the trees (`max_depth`) as inputs, and outputs the trained model. The command runs in a pre-configured AzureML environment with all the necessary libraries. The job produces a trained **Random Forest Regressor model**, which can be used for predicting the price of used cars based on the given attributes.

### **2.3 Registering the Best Trained Model**

The **Model Registration job** is designed to take the best-trained model from the hyperparameter tuning sweep job and register it in MLflow as a versioned artifact for future use in the used car price prediction pipeline. This job script accepts one input: the path to the trained model (model). The script begins by loading the model using the `mlflow.sklearn.load_model()` function. Afterward, it registers the model in the MLflow model registry, assigning it a descriptive name (`used_cars_price_prediction_model`) and specifying an artifact path (`random_forest_price_regressor`) where the model artifacts will be stored. Using MLflow's `log_model()` function, the model is logged along with its metadata, ensuring that the model is easily trackable and retrievable for future evaluation, deployment, or retraining.

In [28]:
from azure.ai.ml import command, Input, Output

step_process = command(
    name="data_preparation",
    display_name="Data Preparation for Automated Vehicle Pricing",
    description="Prepare and split data into train and test sets",
    inputs={ 
        "data": Input(type="uri_file"),
        "test_train_ratio": Input(type="number"),
    },
    outputs={  
        "train_data": Output(type="uri_folder", mode="rw_mount"),
        "test_data": Output(type="uri_folder", mode="rw_mount"),
    },
    code="./data_prep",
    command="""python data_prep.py \
            --data ${{inputs.data}} \
            --test_train_ratio ${{inputs.test_train_ratio}} \
            --train_data ${{outputs.train_data}} \
            --test_data ${{outputs.test_data}}""",
    environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest",
)

#### **Define Model Register Job**

For this AzureML job, a `command` object is defined to execute the `model_register.py` script. It accepts the best-trained model as input, runs the script in the `AzureML-sklearn-1.0-ubuntu20.04-py38-cpu` environment, and uses the same compute cluster as the previous jobs (`cpu-cluster`). This job plays a crucial role in the pipeline by ensuring that the best-performing model identified during hyperparameter tuning is systematically stored and made available in the MLflow registry for further evaluation, deployment, or retraining. Integrating this job into the end-to-end pipeline automates the process of registering high-quality models, completing the model development lifecycle and enabling the prediction of used car prices.

In [29]:
from azure.ai.ml import command, Input, Output

train_step = command(
    name="train_price_prediction_model",
    display_name="Train Price Prediction Model",
    description="Train a Random Forest Regressor for used car price prediction",
    inputs={
        "train_data": Input(type="uri_folder"),
        "test_data": Input(type="uri_folder"),
        "n_estimators": Input(type="number", default=100),
        "max_depth": Input(type="number", default=10),
    },
    outputs={
        "model_output": Output(type="mlflow_model"),
    },
    code="./model_train",
    command="""python model_train.py \
            --train_data ${{inputs.train_data}} \
            --test_data ${{inputs.test_data}} \
            --n_estimators ${{inputs.n_estimators}} \
            --max_depth ${{inputs.max_depth}} \
            --model_output ${{outputs.model_output}}""",
    environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest",
    compute="cpu-cluster",
)

In [54]:
# Create directory for the preprocessing script: 
import os

src_dir_job_scripts = "Users/Keya_1751545169843/car-price-mlops/src/model_register"
os.makedirs(src_dir_job_scripts, exist_ok=True)

In [55]:
%%writefile {src_dir_job_scripts}/model_register.py
import os
import argparse
import logging
import mlflow
import pandas as pd
from pathlib import Path

mlflow.start_run()  # Starting the MLflow experiment run

def main():
    # Argument parser setup for command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--model", type=str, help="Path to the trained model")  # Path to the trained model artifact
    args = parser.parse_args()

    # Load the trained model from the provided path
    model = mlflow.sklearn.load_model(args.model)

    print("Registering the best trained used cars price prediction model")
    
    # Register the model in the MLflow Model Registry under the name "price_prediction_model"
    mlflow.sklearn.log_model(
        sk_model=model,
        registered_model_name="used_cars_price_prediction_model",
        artifact_path="random_forest_price_regressor"
    )

    # End the MLflow run
    mlflow.end_run()

if __name__ == "__main__":
    main()

Overwriting Users/Keya_1751545169843/car-price-mlops/src/model_register/model_register.py


In [56]:
from azure.ai.ml import command, Input

model_register_component = command(
    name="register_model", 
    display_name="Register Best Model",
    description="Register the best trained model in MLflow Model Registry",
    inputs={
        "model": Input(type="mlflow_model"), 
    },
    code="./model_register",
    command="""python  model_register.py \
            --model ${{inputs.model}}""",
    environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest",
    # environment = "machine_learning_E2E"
    compute="cpu-cluster",
)

In [None]:
from azure.ai.ml.sweep import Choice
from azure.ai.ml.entities import Model
from azure.ai.ml.constants import ModelType
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import Input

# Assemble the pipeline by chaining the jobs
@pipeline(
    compute="cpu-cluster",
    description="End-to-end MLOps pipeline for used car price prediction"
)
def complete_pipeline(input_data_uri, test_train_ratio, n_estimators, max_depth):
    
    # Step 1: Preprocess the data
    preprocess_step = step_process(
        data=input_data_uri,
        test_train_ratio=test_train_ratio,
    )
    
# Step 2: Train the model using preprocessed data
# This is the *template* job whose input pipeline will sweep over

# Define the training step with hyperparameters for tuning
    job_for_sweep = train_step(
        train_data=preprocess_step.outputs.train_data,
        test_data=preprocess_step.outputs.test_data,
        #  Hyperparameters as Choice search space
        n_estimators=Choice(values=[10, 20, 30, 50]),
        max_depth=Choice(values=[5, 10, 15, 20]),
    )

    # Define the sweep job
    sweep_job = job_for_sweep.sweep(
        compute="cpu-cluster",
        sampling_algorithm="random",
        primary_metric="MSE",
        goal="Minimize",
    )

        # Set sweep limits
    sweep_job.set_limits(
        max_total_trials=20,
        max_concurrent_trials=10,
        timeout=7200,  # seconds
    )

    # Set the limits for the sweep job:
    # - max_total_trials: The maximum number of hyperparameter combinations to be evaluated (20 in this case).
    # - max_concurrent_trials: The maximum number of trials to run simultaneously (10 in this case) to optimize resource utilization.
    # - timeout: The maximum allowed duration for the sweep job in seconds (7200 seconds, or 2 hours).
    sweep_job.set_limits(max_total_trials=20, max_concurrent_trials=10, timeout=7200)
    
    # Step 3: Register the best model
    # After the sweep job, get the best model
    model_register_step = model_register_component(
        model=sweep_job.outputs.model_output,
    )

    # Returning outputs from all steps in the pipeline
    return {
        "pipeline_job_train_data": preprocess_step.outputs.train_data,
        "pipeline_job_test_data": preprocess_step.outputs.test_data,
        "pipeline_job_best_model": job_for_sweep.outputs.model_output,
    }


### **2.4. Assembling the End-to-End Workflow**

The end-to-end pipeline integrates all the previously defined jobs into a seamless workflow, automating the process of data preparation, model training, hyperparameter tuning, and model registration. The pipeline is designed using Azure Machine Learning's `@pipeline` decorator, specifying the compute target and providing a detailed description of the workflow.

In [34]:
# The code retrieves a specific version of a registered data asset using the ml_client object.
data_path = ml_client.data.get("used-cars-data", version=VERSION).path

# Create pipeline instance
pipeline_instance = complete_pipeline(
    input_data_uri=Input(type="uri_file", path=data_path),
    test_train_ratio=0.2,
    n_estimators=50,
    max_depth=5
)

# Submit the pipeline to Azure ML
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_instance, 
    experiment_name="price_prediction_pipeline"
)

# Stream the output of the job for real-time logs
ml_client.jobs.stream(pipeline_job.name)

# Access pipeline outputs (optional, after job completion)
print(f"Train data location: {pipeline_job.outputs['pipeline_job_train_data']}")
print(f"Test data location: {pipeline_job.outputs['pipeline_job_test_data']}")
print(f"Best model location: {pipeline_job.outputs['pipeline_job_best_model']}")

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
[32mUploading data_prep (0.0 MBs): 10

RunId: sad_car_w6hsrw6hv5
Web View: https://ml.azure.com/runs/sad_car_w6hsrw6hv5?wsid=/subscriptions/aa382cca-fa09-4ae9-b74b-c63cd0b942e8/resourcegroups/defualt_resource_group/workspaces/azureai

Streaming logs/azureml/executionlogs.txt

[2025-11-25 12:44:09Z] Submitting 1 runs, first five are: 10714843:26aab7e2-d45e-424e-9b9e-f9d529dc8b3e
[2025-11-25 12:51:45Z] Completing processing run id 26aab7e2-d45e-424e-9b9e-f9d529dc8b3e.
[2025-11-25 12:51:46Z] Submitting 1 runs, first five are: 682e4838:1d1e01c7-72fd-46bf-a98e-708b66b0cd26
[2025-11-25 13:02:36Z] Completing processing run id 1d1e01c7-72fd-46bf-a98e-708b66b0cd26.
[2025-11-25 13:02:37Z] Submitting 1 runs, first five are: 20f825c8:62e7288b-ccf7-4b7a-8105-c4f2378834bf
[2025-11-25 13:03:28Z] Completing processing run id 62e7288b-ccf7-4b7a-8105-c4f2378834bf.

Execution Summary
RunId: sad_car_w6hsrw6hv5
Web View: https://ml.azure.com/runs/sad_car_w6hsrw6hv5?wsid=/subscriptions/aa382cca-fa09-4ae9-b74b-c63cd0b942e8/resourcegroups/defualt_

In [35]:


final_job = ml_client.jobs.get(pipeline_job.name)
print(f"Train data location: {final_job.outputs['pipeline_job_train_data']}")
print(f"Test data location: {final_job.outputs['pipeline_job_test_data']}")
print(f"Best model location: {final_job.outputs['pipeline_job_best_model']}")


Train data location: ${{parent.outputs.pipeline_job_train_data}}
Test data location: ${{parent.outputs.pipeline_job_test_data}}
Best model location: ${{parent.outputs.pipeline_job_best_model}}


In [36]:
try:
    # Submit the pipeline
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_instance, 
        experiment_name="price_prediction_pipeline"
    )
    
    print(f"Pipeline submitted: {pipeline_job.name}")
    print(f"Web View: {pipeline_job.services['Studio'].endpoint}")
    
    # Wait for completion with error handling
    ml_client.jobs.stream(pipeline_job.name)
    
    # If successful, access outputs
    final_job = ml_client.jobs.get(pipeline_job.name)
    if final_job.status == "Completed":
        print(f"\n✓ Pipeline completed successfully!")
        print(f"Train data: {final_job.outputs['pipeline_job_train_data']}")
        print(f"Test data: {final_job.outputs['pipeline_job_test_data']}")
        print(f"Best model: {final_job.outputs['pipeline_job_best_model']}")
    else:
        print(f"\n✗ Pipeline failed with status: {final_job.status}")
        
except Exception as e:
    print(f"Error during pipeline execution: {e}")
    # Get job details for debugging
    if 'pipeline_job' in locals():
        job_details = ml_client.jobs.get(pipeline_job.name)
        print(f"Job status: {job_details.status}")

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.MLFlowModelJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.MLFlowModelJobOutput'> and will be ignored


Pipeline submitted: magenta_deer_cbxml5vf3c
Web View: https://ml.azure.com/runs/magenta_deer_cbxml5vf3c?wsid=/subscriptions/aa382cca-fa09-4ae9-b74b-c63cd0b942e8/resourcegroups/defualt_resource_group/workspaces/azureai&tid=a2799098-ec71-4199-a883-6274017f5282
RunId: magenta_deer_cbxml5vf3c
Web View: https://ml.azure.com/runs/magenta_deer_cbxml5vf3c?wsid=/subscriptions/aa382cca-fa09-4ae9-b74b-c63cd0b942e8/resourcegroups/defualt_resource_group/workspaces/azureai

Streaming logs/azureml/executionlogs.txt

[2025-11-25 13:04:25Z] Completing processing run id ad7a4bda-01e8-43a2-8b69-84582b5cb97e.
[2025-11-25 13:04:26Z] Completing processing run id 33712cae-54de-4a08-bc05-3246cba85af3.
[2025-11-25 13:04:27Z] Completing processing run id 375b7663-7902-400f-9e29-60da687e00e7.
[2025-11-25 13:04:28Z] Finishing experiment: no runs left and nothing to schedule.

Execution Summary
RunId: magenta_deer_cbxml5vf3c
Web View: https://ml.azure.com/runs/magenta_deer_cbxml5vf3c?wsid=/subscriptions/aa382cca-f