In [None]:
from azureml.pipeline.core import Pipeline
from azureml.core import Datastore
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep

In [None]:
from azureml.core import Workspace
from azureml.core.authentication import InteractiveLoginAuthentication

ws = Workspace.from_config(auth=InteractiveLoginAuthentication(tenant_id=os.environ["AML_TENANT_ID"]))
ws

In [None]:
from azureml.core import Dataset, ComputeTarget
from azureml.core.compute import ComputeTarget, AmlCompute

compute = AmlCompute(ws, "cpu-compute2")
print(compute)

In [None]:
datastore = ws.get_default_datastore()

In [None]:
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
aml_run_config = RunConfiguration()
aml_run_config.target = compute

# Use conda_dependencies.yml to create a conda environment in the Docker image for execution
aml_run_config.environment.python.user_managed_dependencies = False

# Specify CondaDependencies obj, add necessary packages
aml_run_config.environment.python.conda_dependencies = CondaDependencies.create(
    conda_packages=['pandas','scikit-learn'], 
    pip_packages=['azureml-sdk', 'azureml-dataprep[fuse,pandas]'], 
    pin_sdk_version=False)

## Step 0: Grab an open dataset and register it

This is baseline data. If the `Dataset` does not exist, create and register it. Not a part of the Pipeline.

In [None]:
if not 'titanic_ds' in ws.datasets.keys() :
    # create a TabularDataset from Titanic training data
    web_paths = ['https://dprepdata.blob.core.windows.net/demo/Titanic.csv',
                 'https://dprepdata.blob.core.windows.net/demo/Titanic2.csv']
    titanic_ds = Dataset.Tabular.from_delimited_files(path=web_paths)

    titanic_ds.register(workspace = ws,
                                     name = 'titanic_ds',
                                     description = 'new titanic training data',
                                     create_new_version = True)

titanic_ds = Dataset.get_by_name(ws, 'titanic_ds')

In [None]:
type(titanic_ds)

In [None]:
if not 'titanic_files_ds' in ws.datasets.keys() :
    # create a TabularDataset from Titanic training data
    web_paths = ['https://dprepdata.blob.core.windows.net/demo/Titanic.csv',
                 'https://dprepdata.blob.core.windows.net/demo/Titanic2.csv']
    titanic_ds = Dataset.File.from_files(path=web_paths)

    titanic_ds.register(workspace = ws,
                                     name = 'titanic_files_ds',
                                     description = 'File Dataset of titanic training data',
                                     create_new_version = True)

## Step 1: Dataprep

In [None]:
%%writefile dataprep.py

import pandas as pd 
from azureml.core import Run
import numpy as np 
from sklearn.model_selection import train_test_split
import argparse

RANDOM_SEED=42

def prepare_age(df):
    # Fill in missing Age values from distribution of present Age values 
    mean = df["Age"].mean()
    std = df["Age"].std()
    is_null = df["Age"].isnull().sum()
    # compute enough (== is_null().sum()) random numbers between the mean, std
    rand_age = np.random.randint(mean - std, mean + std, size = is_null)
    # fill NaN values in Age column with random values generated
    age_slice = df["Age"].copy()
    age_slice[np.isnan(age_slice)] = rand_age
    df["Age"] = age_slice
    df["Age"] = df["Age"].astype(int)
    
    # Quantize age into 5 classes
    df['Age_Group'] = pd.qcut(df['Age'],5, labels=False)
    df.drop(['Age'], axis=1, inplace=True)
    return df

def prepare_fare(df):
    df['Fare'].fillna(0, inplace=True)
    df['Fare_Group'] = pd.qcut(df['Fare'],5,labels=False)
    df.drop(['Fare'], axis=1, inplace=True)
    return df 

def prepare_genders(df):
    genders = {"male": 0, "female": 1, "unknown": 2}
    df['Sex'] = df['Sex'].map(genders)
    df['Sex'].fillna(2, inplace=True)
    df['Sex'] = df['Sex'].astype(int)
    return df

def prepare_embarked(df):
    df['Embarked'].replace('', 'U', inplace=True)
    df['Embarked'].fillna('U', inplace=True)
    ports = {"S": 0, "C": 1, "Q": 2, "U": 3}
    df['Embarked'] = df['Embarked'].map(ports)
    return df
    
parser = argparse.ArgumentParser()
parser.add_argument('--train_path', dest='train_path', required=True)
parser.add_argument('--test_path', dest='test_path', required=True)
args = parser.parse_args()
    
titanic_ds = Run.get_context().input_datasets['titanic_ds']
df = titanic_ds.to_pandas_dataframe().drop(['PassengerId', 'Name', 'Ticket', 'Cabin'], axis=1)
df = prepare_embarked(prepare_genders(prepare_fare(prepare_age(df))))

train, test = train_test_split(df, test_size = 0.2)

os.makedirs(os.path.dirname(args.train_path), exist_ok=True)
train.to_csv(args.train_path)

os.makedirs(os.path.dirname(args.test_path), exist_ok=True)
test.to_csv(args.test_path)

print(f"Wrote test to {args.train_path} and train to {args.test_path}")

In [None]:
train_pd = PipelineData("titanic_train", datastore)
test_pd = PipelineData("titanic_test", datastore)

In [None]:
dataprep_step = PythonScriptStep(
    name="dataprep", 
    script_name="dataprep.py", 
    compute_target=compute, 
    runconfig=aml_run_config,
    arguments=["--train_path", train_pd, "--test_path", test_pd],
    inputs=[titanic_ds.as_named_input("titanic_ds")],
    outputs=[train_pd, test_pd]
)

### Step 2: Train with SKLearn

In [None]:
%%writefile train.py

import pandas as pd 
from sklearn.ensemble import RandomForestClassifier
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--train_path', dest='train_path', required=True)
parser.add_argument('--test_path', dest='test_path', required=True)
args = parser.parse_args()
    
train = pd.read_csv(args.train_path)
test = pd.read_csv(args.test_path)

train_X = train.drop("Survived", axis=1)
train_y = train["Survived"]

random_forest = RandomForestClassifier(n_estimators=100)
random_forest.fit(train_X, train_y)


test_X = test.drop("Survived", axis=1)
test_y = test["Survived"]

score = round(random_forest.score(test_X, test_y) * 100, 2)
print(f"Score of RandomForestClassifier was {score}%")

In [None]:
train_step = PythonScriptStep(
    name="train", 
    script_name="train.py", 
    compute_target=compute, 
    runconfig=aml_run_config,
    arguments=["--train_path", train_pd, "--test_path", test_pd],
    inputs=[train_pd, test_pd],
    outputs=[]
)

## Submit it

In [None]:
from azureml.core import Experiment 
if not 'titanic_2step' in ws.experiments.keys() :
    Experiment(ws, 'titanic_2step')
experiment = ws.experiments['titanic_2step']

In [None]:
pipeline = Pipeline(ws, [dataprep_step, train_step])

In [None]:
run = experiment.submit(pipeline, show_output=True)