In [None]:
from azureml.core import Workspace, Dataset, Datastore, Experiment
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter
from azureml.pipeline.core import Pipeline, PipelineRun

workspace = Workspace.from_config()
print(f'WS name: {workspace.name}\nRegion: {workspace.location}\nSubscription id: {workspace.subscription_id}\nResource group: {workspace.resource_group}')

dataset = Dataset.get_by_name(workspace, name='offerallocation-parquet-default')
#dataset.download(target_path='.', overwrite=False)

In [62]:
mydatastore = Datastore.get(workspace, 'workspaceblobstore')

from azureml.data.datapath import DataPath, DataPathComputeBinding
from  azureml.pipeline.core.graph import PipelineParameter
data_path = DataPath(datastore=mydatastore, path_on_datastore='parquet/offerpath2')
datapath1_pipeline_param = PipelineParameter(name="input_datapath", default_value=data_path)
datapath_input = (datapath1_pipeline_param, DataPathComputeBinding(mode='mount'))

In [63]:
StepToWriteDateFile = PythonScriptStep(
    name='StepToWriteDateFile',
    script_name="StepToWriteDateFile.py",
    arguments=[ "--arg1", datapath_input],
    inputs=[datapath_input],
    #runconfig = compute_config ,
    compute_target='main-cluster', 
    source_directory='.')
print("StepToWriteDateFile created")

StepToWriteDateFile created


In [64]:
%%writefile StepToWriteDateFile.py

import argparse
import os
import pandas as pd
from azureml.core import Run
parser = argparse.ArgumentParser("train")
parser.add_argument("--arg1", type=str, help="sample datapath argument")
import shutil

args = parser.parse_args()

root_dir = args.arg1

file_set = set()

print(f'my root dir inside this script is: {root_dir}')

counter = 0
if os.path.exists(os.path.join(args.arg1,"FileToProcess")):
    shutil.rmtree(os.path.join(args.arg1,"FileToProcess"))
os.makedirs(os.path.join(args.arg1,"FileToProcess"))
    
for dir_, _, files in os.walk(root_dir):
    for file_name in files:
        # can do more fancy logic here and control what gets passed on for next step
        if(file_name[-7:] == "parquet"):
            rel_dir = os.path.relpath(dir_, root_dir)
            rel_file = os.path.join(rel_dir, file_name)
            mydf = pd.DataFrame({'dir':[rel_dir], 'fileName':[file_name],'fullFileName':[rel_file]})
            fileWriteLocation = os.path.join(args.arg1,"FileToProcess",f"file{counter}.csv")
            mydf.to_csv(fileWriteLocation)
            counter = counter + 1
            file_set.add(rel_file)
print(file_set)        

Overwriting StepToWriteDateFile.py


In [None]:
from azureml.core import Run,Environment
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
from azureml.data import OutputFileDatasetConfig
output_dir = OutputFileDatasetConfig(name="scores")

dataset = Dataset.File.from_files(path = [(mydatastore, "parquet/offerpath2/FileToProcess/*")])

env = Environment(name="parallelenv")

env.from_conda_specification('parallelenv','parallelenv.yml')

parallel_run_config = ParallelRunConfig(
   source_directory='.',
   entry_script='ParallelModelTrain.py',
   mini_batch_size="1",
   error_threshold=2,
   output_action="append_row",
   environment=env,
   compute_target='main-cluster',
   append_row_file_name="my_outputs.txt",
   run_invocation_timeout=1200,
   node_count=1)

parallelrun_step = ParallelRunStep(
   name="paralleltrainmodels",
   parallel_run_config=parallel_run_config,
   inputs=[dataset.as_named_input("inputds") ],
   output=output_dir
   #models=[ model ] #not needed as its only relevant in batch inferencing
   #arguments=[ ],
   #allow_reuse=True
)

In [None]:
%%writefile ParallelModelTrain.py
import pandas as pd
import os
from azureml.core import Workspace, Datastore, Experiment,Dataset, Experiment, Run
from azureml.core.authentication import ServicePrincipalAuthentication


def init():
    global workspace
    global dataset
    global mounted_path
    print('beginning the process')
    workspace_name = 'tsi_ml_path'
    run = Run.get_context()
    client_secret = run.get_secret(name="SECRETNAME") #you insert the secret name you have created
    subscription_id = '' #your subscription id
    resource_group = ''   #name of the resource group of your workspace
    svc_pr = ServicePrincipalAuthentication(tenant_id="",service_principal_id="",service_principal_password="client_secret")
    workspace = Workspace(subscription_id, resource_group, workspace_name, auth=svc_pr)
    dataset = Dataset.get_by_name(workspace, name='offerallocation-parquet-default')
    import tempfile
    mounted_path = tempfile.mkdtemp()
    # mount dataset onto the mounted_path of a Linux-based compute
    mount_context = dataset.mount(mounted_path)
    mount_context.start()
 

def run(mini_batch):

    for file_path in mini_batch:
        print (f"in mini batch process {mounted_path}")
        input_data = pd.read_csv(file_path)
        for index, row in input_data.iterrows():
            print('row is as follows')
            print(row)
            print(os.path.join(mounted_path,row['fullFileName']))
            modelinputdata = pd.read_parquet(os.path.join(mounted_path,row['fullFileName']))
            print(f"model input data shape is {modelinputdata.shape}")
    return input_data

In [None]:
from azureml.pipeline.core import Pipeline, PipelineRun,StepSequence
step_sequence = StepSequence(steps=[StepToWriteDateFile])# , parallelrun_step])
pipeline = Pipeline(workspace=workspace, steps=step_sequence)
print("pipeline with the train_steps created")
experiment_name = 'parallelstepwork'
source_directory  = '.'

experiment = Experiment(workspace, experiment_name)
pipeline_run = experiment.submit(pipeline,pipeline_parameters={"input_datapath":data_path})
#pipeline.publish(name='pipeline to parallel file processing', description='show parallel processing', version="1.0", continue_on_step_failure=None)


### Publish pipeline

In [None]:
#This will create an endpoint for your pipeline which can be called from your ADF

#published_pipeline = pipeline.publish(name="ms-datapath-prs", description="Pipeline to test Datapath and PRS", version="1.0", continue_on_step_failure=True)
#published_pipeline