In [12]:
from azureml.core import Workspace

ws = Workspace.get(name="vij-workspace-1",
                   subscription_id="1a1d0c91-b4c7-49a1-a6ef-2cea4ddaadb3",
                   resource_group="vij-resource-group-1")

print("Connected to Azure ML Workspace:", ws.name)


Connected to Azure ML Workspace: vij-workspace-1


In [13]:
from sklearn.datasets import load_diabetes
import pandas as pd

diabetes = load_diabetes()
data = pd.DataFrame(data=diabetes.data, columns=diabetes.feature_names)
data['target'] = diabetes.target
data = data.head(100)  # Curtail to 100 rows

data.to_csv("diabetes.csv", index=False)
print("Dataset saved as diabetes.csv")


Dataset saved as diabetes.csv


In [14]:
from azureml.core import Datastore, Dataset
datastore = ws.get_default_datastore()
datastore.upload_files(["diabetes.csv"], target_path="diabetes-data", overwrite=True)
dataset = Dataset.Tabular.from_delimited_files(path=(datastore, "diabetes-data/diabetes.csv"))
print("Dataset uploaded to Azure ML")

Uploading an estimated of 1 files
Uploading diabetes.csv
Uploaded diabetes.csv, 1 files out of an estimated total of 1
Uploaded 1 files
Dataset uploaded to Azure ML


In [15]:
from azureml.core.compute import ComputeTarget, AmlCompute

compute_name = "Vij-Compute"
if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
else:
    compute_config = AmlCompute.provisioning_configuration(vm_size="STANDARD_E4DS_V4", max_nodes=4)
    compute_target = ComputeTarget.create(ws, compute_name, compute_config)
    compute_target.wait_for_completion(show_output=True)

print("Compute Target Ready:", compute_target.name)


Compute Target Ready: Vij-Compute


In [16]:
from azureml.core import Experiment, ScriptRunConfig, Environment
experiment = Experiment(workspace=ws, name="diabetes-experiment")
env = Environment(name="diabetes-env")
env.python.conda_dependencies.add_pip_package("scikit-learn")
env.python.conda_dependencies.add_pip_package("pandas")
env.python.conda_dependencies.add_pip_package("joblib")
src = ScriptRunConfig(source_directory=".",
                      script="training.py",
                      compute_target=compute_target,
                      environment=env)
run = experiment.submit(src)
run.wait_for_completion(show_output=True)

RunId: diabetes-experiment_1747400453_7cf23727
Web View: https://ml.azure.com/runs/diabetes-experiment_1747400453_7cf23727?wsid=/subscriptions/1a1d0c91-b4c7-49a1-a6ef-2cea4ddaadb3/resourcegroups/vij-resource-group-1/workspaces/vij-workspace-1&tid=d25ccddb-b863-4a84-8923-dbc1177436a0

Streaming user_logs/std_log.txt

Python executable: /azureml-envs/azureml_e5f5c1ae3d2957fcce6534e509fda810/bin/python
Python version: 3.8.13 (default, Oct 21 2022, 23:50:54) 
[GCC 11.2.0]
Python packages path: ['/mnt/azureml/cr/j/f960e4b41b0d4ca2b141b81053fba552/exe/wd', '', '/azureml-envs/azureml_e5f5c1ae3d2957fcce6534e509fda810/lib/python38.zip', '/azureml-envs/azureml_e5f5c1ae3d2957fcce6534e509fda810/lib/python3.8', '/azureml-envs/azureml_e5f5c1ae3d2957fcce6534e509fda810/lib/python3.8/lib-dynload', '/azureml-envs/azureml_e5f5c1ae3d2957fcce6534e509fda810/lib/python3.8/site-packages', '/mnt/azureml/cr/j/f960e4b41b0d4ca2b141b81053fba552/exe/wd', '/azureml-envs/azureml_e5f5c1ae3d2957fcce6534e509fda810/lib/p

{'runId': 'diabetes-experiment_1747400453_7cf23727',
 'target': 'Vij-Compute',
 'status': 'Completed',
 'startTimeUtc': '2025-05-16T13:01:01.593877Z',
 'endTimeUtc': '2025-05-16T13:01:16.391773Z',
 'services': {},
 'properties': {'_azureml.ComputeTargetType': 'amlcdsi',
  '_azureml.ClusterName': 'Vij-Compute',
  'ContentSnapshotId': '6b39ef9a-a519-424d-a2b3-d8eb3961faac',
  'ProcessInfoFile': 'azureml-logs/process_info.json',
  'ProcessStatusFile': 'azureml-logs/process_status.json'},
 'inputDatasets': [],
 'outputDatasets': [],
 'runDefinition': {'script': 'training.py',
  'command': '',
  'useAbsolutePath': False,
  'arguments': [],
  'sourceDirectoryDataStore': None,
  'framework': 'Python',
  'communicator': 'None',
  'target': 'Vij-Compute',
  'dataReferences': {},
  'data': {},
  'outputData': {},
  'datacaches': [],
  'jobName': None,
  'maxRunDurationSeconds': 2592000,
  'nodeCount': 1,
  'instanceTypes': [],
  'priority': None,
  'credentialPassthrough': False,
  'identity': N

In [17]:
model = run.register_model(model_name="diabetes-model", model_path="outputs/model.pkl")
print("Model Registered Successfully:", model.name)

Model Registered Successfully: diabetes-model


<mark>**VERSION Control**</mark>

<mark>**Better run the following in BASH without ! sign**</mark>

In [22]:
# Check code quality locally
!pip install flake8
!flake8 train.py
!flake8 score.py

# Add code to git repo
!git init
!git add train.py score.py
!git commit -m "Initial ML scripts"
!git remote add origin <your-repo-url>
!git push -u origin main


train.py:0:1: E902 FileNotFoundError: [Errno 2] No such file or directory: 'train.py'
score.py:0:1: E902 FileNotFoundError: [Errno 2] No such file or directory: 'score.py'
[33mhint: Using 'master' as the name for the initial branch. This default branch name[m
[33mhint: is subject to change. To configure the initial branch name to use in all[m
[33mhint:[m
[33mhint: 	git config --global init.defaultBranch <name>[m
[33mhint:[m
[33mhint: Names commonly chosen instead of 'master' are 'main', 'trunk' and[m
[33mhint: 'development'. The just-created branch can be renamed via this command:[m
[33mhint:[m
[33mhint: 	git branch -m <name>[m
Initialized empty Git repository in /mnt/batch/tasks/shared/LS_root/mounts/clusters/vij-compute/code/Users/radha_thaj/.git/
fatal: detected dubious ownership in repository at '/mnt/batch/tasks/shared/LS_root/mounts/clusters/vij-compute/code/Users/radha_thaj'
To add an exception for this directory, call:

	git config --global --add safe.director

In [24]:
# OPEN Github make arepository and copy the URL here
!git remote add origin https://github.com/vijy24/Daibetes_deployment.git
!git push -u origin main


fatal: detected dubious ownership in repository at '/mnt/batch/tasks/shared/LS_root/mounts/clusters/vij-compute/code/Users/radha_thaj'
To add an exception for this directory, call:

	git config --global --add safe.directory /mnt/batch/tasks/shared/LS_root/mounts/clusters/vij-compute/code/Users/radha_thaj
fatal: detected dubious ownership in repository at '/mnt/batch/tasks/shared/LS_root/mounts/clusters/vij-compute/code/Users/radha_thaj'
To add an exception for this directory, call:

	git config --global --add safe.directory /mnt/batch/tasks/shared/LS_root/mounts/clusters/vij-compute/code/Users/radha_thaj


In [25]:
!git status


fatal: detected dubious ownership in repository at '/mnt/batch/tasks/shared/LS_root/mounts/clusters/vij-compute/code/Users/radha_thaj'
To add an exception for this directory, call:

	git config --global --add safe.directory /mnt/batch/tasks/shared/LS_root/mounts/clusters/vij-compute/code/Users/radha_thaj


In [27]:
!git config --global --add safe.directory /mnt/batch/tasks/shared/LS_root/mounts/clusters/vij-compute/code/Users/radha_thaj


In [28]:
!git status


On branch master

No commits yet

Untracked files:
  (use "git add <file>..." to include in what will be committed)
	[31m.amlignore[m
	[31m.amlignore.amltmp[m
	[31m.ipynb_aml_checkpoints/[m
	[31mVij-Notebook.ipynb[m
	[31mbatch_predict_async.py[m
	[31mbatch_predict_async.py.amltmp[m
	[31mdiabetes.csv[m
	[31mdiabetes_batch-100.csv[m
	[31mdiabetes_batch-1000.csv[m
	[31mdiabetes_predictions.csv[m
	[31mdiabetes_predictions_async.csv[m
	[31mdiabetes_predictions_async_concurrent.csv[m
	[31mscoring.py[m
	[31mscoring.py.amltmp[m
	[31mtraining.py[m
	[31mtraining.txt.amltmp[m
	[31mvij-notebook.ipynb.amltmp[m

nothing added to commit but untracked files present (use "git add" to track)


In [29]:
!git log


fatal: your current branch 'master' does not have any commits yet


In [30]:
!git add Vij-Notebook.ipynb diabetes.csv


In [31]:
!git commit -m "Add notebook and diabetes dataset"


Author identity unknown

*** Please tell me who you are.

Run

  git config --global user.email "you@example.com"
  git config --global user.name "Your Name"

to set your account's default identity.
Omit --global to set the identity only in this repository.

fatal: unable to auto-detect email address (got 'azureuser@vij-compute.(none)')


In [33]:
!git config --global user.email "vijy_24@yahoo.in"
!git config --global user.name "Vijyant"


In [34]:
!git commit -m "Add notebook and diabetes dataset"


[master (root-commit) 17e20a4] Add notebook and diabetes dataset
 2 files changed, 1296 insertions(+)
 create mode 100644 Vij-Notebook.ipynb
 create mode 100644 diabetes.csv


In [35]:
!git push -u origin master


fatal: 'origin' does not appear to be a git repository
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.


In [36]:
!git remote add origin https://github.com/vijy24/Daibetes_deployment.git


In [37]:
!git push -u origin master


Username for 'https://github.com': 

In [39]:
!git ls-files


Vij-Notebook.ipynb
diabetes.csv


<mark>## **DEPLOYMENT**</mark>

In [8]:
from azureml.core.model import Model
from azureml.core.webservice import AciWebservice, Webservice
from azureml.core.model import InferenceConfig

inference_config = InferenceConfig(entry_script="scoring.py", environment=env)

deployment_config = AciWebservice.deploy_configuration(cpu_cores=1, memory_gb=1)

service = Model.deploy(ws, "diabetes-service", [model], inference_config, deployment_config)
service.wait_for_deployment(show_output=True)

print("Service Deployed at:", service.scoring_uri)


To leverage new model deployment capabilities, AzureML recommends using CLI/SDK v2 to deploy models as online endpoint, 
please refer to respective documentations 
https://docs.microsoft.com/azure/machine-learning/how-to-deploy-managed-online-endpoints /
https://docs.microsoft.com/azure/machine-learning/how-to-attach-kubernetes-anywhere 
For more information on migration, see https://aka.ms/acimoemigration 
  service = Model.deploy(ws, "diabetes-service", [model], inference_config, deployment_config)


Tips: You can try get_logs(): https://aka.ms/debugimage#dockerlog or local deployment: https://aka.ms/debugimage#debug-locally to debug if deployment takes longer than 10 minutes.
Running
2025-05-16 12:18:52+00:00 Creating Container Registry if not exists.
2025-05-16 12:18:52+00:00 Registering the environment.
2025-05-16 12:18:52+00:00 Use the existing image.
2025-05-16 12:18:52+00:00 Generating deployment configuration.
2025-05-16 12:18:54+00:00 Submitting deployment to compute.
2025-05-16 12:19:01+00:00 Checking the status of deployment diabetes-service..
2025-05-16 12:20:15+00:00 Checking the status of inference endpoint diabetes-service.
Succeeded
ACI service creation operation finished, operation "Succeeded"
Service Deployed at: http://d52f27c9-d0b2-4c55-94bf-e3a8109e67c6.uksouth.azurecontainer.io/score


In [9]:
import json
import requests

# Example input — replace with real feature values from your dataset
sample_input = {
    "data": [[0.038, 0.05, 0.061, 0.021, 0.046, 0.033, 0.026, 0.008, 0.012, 0.019]]
}

input_json = json.dumps(sample_input)


In [10]:
# Replace with your actual scoring URI
scoring_uri = service.scoring_uri

# Only needed if authentication is enabled
headers = {"Content-Type": "application/json"}

# Make the request
response = requests.post(scoring_uri, data=input_json, headers=headers)

print("Prediction:", response.json())


Prediction: [145.91014672025526]


In [38]:
print(service.scoring_uri)


http://d52f27c9-d0b2-4c55-94bf-e3a8109e67c6.uksouth.azurecontainer.io/score


In [12]:
import pandas as pd
import json
import requests
from tqdm import tqdm  # For progress bar

# Step 1: Load your CSV file
df = pd.read_csv("diabetes_batch.csv")  # 👈 replace with your actual file
print(f"Loaded {len(df)} rows")

# Step 2: Define scoring URI and headers
scoring_uri = "http://d52f27c9-d0b2-4c55-94bf-e3a8109e67c6.uksouth.azurecontainer.io/score"  # 👈 replace with your actual URI
headers = {"Content-Type": "application/json"}

# Step 3: Send each row for prediction and collect results
predictions = []

for _, row in tqdm(df.iterrows(), total=len(df)):
    data = {"data": [row.tolist()]}
    input_json = json.dumps(data)
    
    try:
        response = requests.post(scoring_uri, data=input_json, headers=headers)
        prediction = response.json()
    except Exception as e:
        prediction = [str(e)]
    
    predictions.append(prediction[0])  # Assuming a single prediction

# Step 4: Save predictions to new CSV
df["prediction"] = predictions
df.to_csv("diabetes_predictions.csv", index=False)
print("✅ Predictions saved to diabetes_predictions.csv")





In [13]:
!pip install aiohttp




In [18]:
!pip install aiohttp
!pip show aiohttp



Name: aiohttp
Version: 3.11.16
Summary: Async http client/server framework (asyncio)
Home-page: https://github.com/aio-libs/aiohttp
Author: 
Author-email: 
License: Apache-2.0
Location: /anaconda/envs/azureml_py38/lib/python3.10/site-packages
Requires: aiohappyeyeballs, aiosignal, async-timeout, attrs, frozenlist, multidict, propcache, yarl
Required-by: adlfs, aiohttp-cors, datasets


In [21]:
import sys
print(sys.executable)  # Confirm path is /anaconda/envs/azureml_py310_sdkv2/bin/python
!{sys.executable} -m pip install aiohttp


/anaconda/envs/azureml_py310_sdkv2/bin/python
Collecting aiohttp
  Downloading aiohttp-3.11.18-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.7 kB)
Collecting aiohappyeyeballs>=2.3.0 (from aiohttp)
  Downloading aiohappyeyeballs-2.6.1-py3-none-any.whl.metadata (5.9 kB)
Collecting async-timeout<6.0,>=4.0 (from aiohttp)
  Downloading async_timeout-5.0.1-py3-none-any.whl.metadata (5.1 kB)
Collecting multidict<7.0,>=4.5 (from aiohttp)
  Downloading multidict-6.4.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.3 kB)
Collecting propcache>=0.2.0 (from aiohttp)
  Downloading propcache-0.3.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting yarl<2.0,>=1.17.0 (from aiohttp)
  Downloading yarl-1.20.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (72 kB)
Downloading aiohttp-3.11.18-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [1]:
import aiohttp
print(aiohttp.__version__)


3.11.18


In [2]:
!pip show aiohttp

Name: aiohttp
Version: 3.11.16
Summary: Async http client/server framework (asyncio)
Home-page: https://github.com/aio-libs/aiohttp
Author: 
Author-email: 
License: Apache-2.0
Location: /anaconda/envs/azureml_py38/lib/python3.10/site-packages
Requires: aiohappyeyeballs, aiosignal, async-timeout, attrs, frozenlist, multidict, propcache, yarl
Required-by: adlfs, aiohttp-cors, datasets


In [3]:
import sys
print(sys.executable)

/anaconda/envs/azureml_py310_sdkv2/bin/python


In [5]:
!pip install nest_asyncio




In [6]:
import nest_asyncio
import asyncio
nest_asyncio.apply()


In [8]:
print(f"Rows in DataFrame: {len(df)}")
print(f"Length of predictions: {len(predictions)}")


Rows in DataFrame: 1024
Length of predictions: 1627


## <mark>** Sequential Async Batch Prediction Script**</mark>

In [10]:
# Step 1: Import libraries
import pandas as pd
import aiohttp
import nest_asyncio
import asyncio

# Step 2: Apply nest_asyncio so async works in Jupyter/interactive notebooks
nest_asyncio.apply()

# Step 3: Load and clean your data
df = pd.read_csv("diabetes_batch-1000.csv", header=None)   # Update with your filename if needed
df = df.dropna()  # Remove any rows with all NaN values (often blank rows at end)
print(f"[INFO] Cleaned DataFrame rows: {len(df)}")

# Step 4: Set up endpoint and batch config
scoring_uri = "http://4d19bf24-c355-4b9e-87ed-dcf77f3deda1.uksouth.azurecontainer.io/score"   # <-- Put your endpoint here!
headers = {"Content-Type": "application/json"}
batch_size = 50  # Number of rows per batch; adjust as needed

# Step 5: Define the async batch sending function
async def send_batch(session, batch_data, batch_num):
    """
    Sends a batch of data for prediction.
    """
    payload = {"data": batch_data}
    try:
        async with session.post(scoring_uri, json=payload) as resp:
            result = await resp.json()
            # Log batch info
            print(f"[BATCH {batch_num}] Sent {len(batch_data)} rows, received {len(result)} predictions")
            return result  # Should be a list of predictions for the batch
    except Exception as e:
        print(f"[BATCH {batch_num}] ERROR: {e}")
        return [str(e)] * len(batch_data)  # Fill this batch with error message(s)

# Step 6: Define the main async batching runner
async def run_async_batch_predictions(df, batch_size):
    """
    Runs prediction for the DataFrame in batches asynchronously.
    """
    predictions = []
    async with aiohttp.ClientSession(headers=headers) as session:
        num_batches = (len(df) + batch_size - 1) // batch_size  # Calculate number of batches
        for batch_num, start in enumerate(range(0, len(df), batch_size), 1):
            end = min(start + batch_size, len(df))
            batch_data = df.iloc[start:end].values.tolist()
            # Send this batch for prediction
            batch_predictions = await send_batch(session, batch_data, batch_num)
            # Defensive: make sure batch_predictions is a list
            if not isinstance(batch_predictions, list):
                batch_predictions = [batch_predictions]
            predictions.extend(batch_predictions)
            print(f"[INFO] Accumulated {len(predictions)} predictions so far")
    return predictions

# Step 7: Run the async batch prediction and save results
predictions = await run_async_batch_predictions(df, batch_size)
print(f"[FINAL] Rows in DataFrame: {len(df)}; Total predictions: {len(predictions)}")

# Step 8: Attach predictions to the DataFrame and save to CSV
df["prediction"] = predictions
df.to_csv("diabetes_predictions_async_batch.csv", index=False)
print("✅ [DONE] Saved predictions to diabetes_predictions_async_batch.csv")


[INFO] Cleaned DataFrame rows: 1000
[BATCH 1] Sent 50 rows, received 50 predictions
[INFO] Accumulated 50 predictions so far
[BATCH 2] Sent 50 rows, received 50 predictions
[INFO] Accumulated 100 predictions so far
[BATCH 3] Sent 50 rows, received 50 predictions
[INFO] Accumulated 150 predictions so far
[BATCH 4] Sent 50 rows, received 50 predictions
[INFO] Accumulated 200 predictions so far
[BATCH 5] Sent 50 rows, received 50 predictions
[INFO] Accumulated 250 predictions so far
[BATCH 6] Sent 50 rows, received 50 predictions
[INFO] Accumulated 300 predictions so far
[BATCH 7] Sent 50 rows, received 50 predictions
[INFO] Accumulated 350 predictions so far
[BATCH 8] Sent 50 rows, received 50 predictions
[INFO] Accumulated 400 predictions so far
[BATCH 9] Sent 50 rows, received 50 predictions
[INFO] Accumulated 450 predictions so far
[BATCH 10] Sent 50 rows, received 50 predictions
[INFO] Accumulated 500 predictions so far
[BATCH 11] Sent 50 rows, received 50 predictions
[INFO] Accumula

## <mark>** Concurrent Async Batch Prediction Script**</mark>

In [1]:
# Step 1: Import required libraries
import pandas as pd
import aiohttp
import nest_asyncio
import asyncio

# Step 2: Enable nested asyncio for Jupyter
nest_asyncio.apply()

# Step 3: Load and clean your data
df = pd.read_csv("diabetes_batch-100.csv", header=None)
df = df.dropna()
print(f"[INFO] Cleaned DataFrame rows: {len(df)}")  # Should print 100

# Step 4: Set up endpoint and config
scoring_uri = "http://4d19bf24-c355-4b9e-87ed-dcf77f3deda1.uksouth.azurecontainer.io/score"  # <- Replace with your endpoint!
headers = {"Content-Type": "application/json"}
batch_size = 20

# Step 5: Define async function to send a batch
async def send_batch(session, batch_data, batch_num):
    payload = {"data": batch_data}
    try:
        async with session.post(scoring_uri, json=payload) as resp:
            result = await resp.json()
            print(f"[BATCH {batch_num}] Sent {len(batch_data)} rows, received {len(result)} predictions")
            return result
    except Exception as e:
        print(f"[BATCH {batch_num}] ERROR: {e}")
        return [str(e)] * len(batch_data)

# Step 6: Define main concurrent async runner (all batches at once)
async def run_concurrent_batches(df, batch_size):
    async with aiohttp.ClientSession(headers=headers) as session:
        tasks = []
        for batch_num, start in enumerate(range(0, len(df), batch_size), 1):
            end = min(start + batch_size, len(df))
            batch_data = df.iloc[start:end].values.tolist()
            tasks.append(send_batch(session, batch_data, batch_num))
        # Run all batch requests concurrently!
        all_results = await asyncio.gather(*tasks)
        # Flatten the results: all_results is a list of lists (one per batch)
        predictions = [pred for batch in all_results for pred in batch]
        return predictions

# Step 7: Run concurrent async batch prediction and save results
predictions = await run_concurrent_batches(df, batch_size)
print(f"[FINAL] Rows in DataFrame: {len(df)}; Total predictions: {len(predictions)}")
df["prediction"] = predictions
df.to_csv("diabetes_predictions_async_concurrent.csv", index=False)
print("✅ [DONE] Saved predictions to diabetes_predictions_async_concurrent.csv")


[INFO] Cleaned DataFrame rows: 100
[BATCH 5] Sent 20 rows, received 20 predictions
[BATCH 4] Sent 20 rows, received 20 predictions
[BATCH 3] Sent 20 rows, received 20 predictions
[BATCH 2] Sent 20 rows, received 20 predictions
[BATCH 1] Sent 20 rows, received 20 predictions
[FINAL] Rows in DataFrame: 100; Total predictions: 100
✅ [DONE] Saved predictions to diabetes_predictions_async_concurrent.csv


## <mark>**Concurrent process Thread based **</mark>

In [2]:
import pandas as pd
import requests
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
# Load your data
df = pd.read_csv("diabetes_batch-100.csv", header=None)
scoring_uri = "http://4d19bf24-c355-4b9e-87ed-dcf77f3deda1.uksouth.azurecontainer.io/score"
headers = {"Content-Type": "application/json"}
# Function to send one row
def predict_single_row(row):
    data = {"data": [row.tolist()]}
    try:
        response = requests.post(scoring_uri, data=json.dumps(data), headers=headers)
        return response.json()[0]
    except Exception as e:
        return str(e)
# Run predictions asynchronously
predictions = []
with ThreadPoolExecutor(max_workers=10) as executor:  # 🔁 change 10 to control parallelism
    futures = [executor.submit(predict_single_row, row) for _, row in df.iterrows()]
    for future in as_completed(futures):
        predictions.append(future.result())

# Attach results and save
df["prediction"] = predictions
df.to_csv("diabetes_predictions_async.csv", index=False)
print("✅ Saved predictions to diabetes_predictions_async.csv")


✅ Saved predictions to diabetes_predictions_async.csv


## <mark>## **AIOHTTP + ASYNC -Concurrent Batch Prediction Pipeline -Schedule a Job to run every Monday and Thursday**</mark>

**Step 2: Register the Script in a Pipeline Step (Jupyter Notebook)**

In [10]:
import azureml.core
print(azureml.core.VERSION)


1.60.0


In [5]:
import sys
print(sys.executable)


/anaconda/envs/azureml_py310_sdkv2/bin/python


In [6]:
!{sys.executable} -m pip show azureml-core


Name: azureml-core
Version: 1.60.0
Summary: Azure Machine Learning core packages, modules, and classes
Home-page: https://docs.microsoft.com/python/api/overview/azure/ml/?view=azure-ml-py
Author: Microsoft Corp
Author-email: 
License: https://aka.ms/azureml-sdk-license
Location: /anaconda/envs/azureml_py310_sdkv2/lib/python3.10/site-packages
Requires: adal, argcomplete, azure-common, azure-core, azure-graphrbac, azure-mgmt-authorization, azure-mgmt-containerregistry, azure-mgmt-keyvault, azure-mgmt-network, azure-mgmt-resource, azure-mgmt-storage, backports.tempfile, contextlib2, docker, humanfriendly, jmespath, jsonpickle, knack, msal, msal-extensions, msrest, msrestazure, ndg-httpsclient, packaging, paramiko, pathspec, pkginfo, PyJWT, pyopenssl, python-dateutil, pytz, requests, SecretStorage, urllib3
Required-by: azureml-pipeline-core, azureml-sdk, azureml-telemetry, azureml-train-automl-client, azureml-train-core


In [24]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration
from azureml.pipeline.steps import PythonScriptStep

env = Environment("test-env")
env.python.conda_dependencies.add_pip_package("pandas")

run_config = RunConfiguration()
run_config.environment = env

step = PythonScriptStep(
    name="TestStep",
    script_name="test.py",
    source_directory=".",
    compute_target="cpu-cluster",  # or any compute name you have
    runconfig=run_config,
    allow_reuse=False
)
print("✅ PythonScriptStep created successfully!")


✅ PythonScriptStep created successfully!


In [25]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration

env = Environment("async-batch-env")
env.python.conda_dependencies.add_pip_package("pandas")
env.python.conda_dependencies.add_pip_package("aiohttp")
env.python.conda_dependencies.add_pip_package("nest_asyncio")

run_config = RunConfiguration()
run_config.environment = env


In [26]:
from azureml.pipeline.steps import PythonScriptStep

step = PythonScriptStep(
    name="AsyncBatchPredictionStep",
    script_name="batch_predict_async.py",
    source_directory=".",  # Your script's folder
    compute_target=compute_target,  # Already defined in your notebook
    runconfig=run_config,
    allow_reuse=False
)


** Build and Submit the Pipeline**


In [38]:
from azureml.core import Dataset

dataset = Dataset.File.from_files(path=(datastore, 'mydata/diabetes_batch-100.csv'))
dataset = dataset.register(
    workspace=ws,
    name="diabetes-batch-input",
    create_new_version=True
)


In [27]:
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment

pipeline = Pipeline(workspace=ws, steps=[step])
pipeline.validate()
pipeline_run = Experiment(ws, "async-batch-prediction-pipeline").submit(pipeline)
print("Pipeline submitted. Run ID:", pipeline_run.id)


Step AsyncBatchPredictionStep is ready to be created [91a42fc9]
Created step AsyncBatchPredictionStep [91a42fc9][b682c9b8-7039-42db-a82c-8b4c772075db], (This step will run and generate new outputs)
Submitted PipelineRun 046d04fd-0289-47be-9823-41dd275dfbbd
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/046d04fd-0289-47be-9823-41dd275dfbbd?wsid=/subscriptions/1a1d0c91-b4c7-49a1-a6ef-2cea4ddaadb3/resourcegroups/vij-resource-group-1/workspaces/vij-workspace-1&tid=d25ccddb-b863-4a84-8923-dbc1177436a0
Pipeline submitted. Run ID: 046d04fd-0289-47be-9823-41dd275dfbbd


**Schedule the Pipeline for Monday and Thursday**

In [30]:
published_pipeline = pipeline.publish(
    name="async-batch-prediction-pipeline",
    description="Batch prediction pipeline (async batch aiohttp)",
    version="1.0"
)
print("Published pipeline ID:", published_pipeline.id)


Published pipeline ID: 1f92b03f-c6e2-44e2-8c42-8476038512aa


In [None]:
schedule = Schedule.create(
    workspace=ws,
    name="mon-thu-batch-prediction",
    pipeline_id=published_pipeline.id,  # Use the published pipeline's id!
    experiment_name="async-batch-prediction-pipeline",
    recurrence=recurrence,
    description="Run batch prediction every Monday and Thursday",
    wait_for_provisioning=True
)


In [32]:
step = PythonScriptStep(
    name="AsyncBatchPredictionStep",
    script_name="batch_predict_async.py",
    source_directory=".",
    compute_target=compute_target,
    runconfig=run_config,
    arguments=["--input_data", "dummy"],  # <-- Add this line
    allow_reuse=False
)
