In [1]:
#Import Libraries 
import azureml.core
from azureml.core import Workspace
from azureml.core import Experiment, Datastore, Dataset
from azureml.core import Model
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn import metrics
from sklearn import preprocessing
import os

# Load the workspace from the saved config file
workspace = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, workspace.name))

Ready to use Azure ML 1.41.0 to work with mtcist-mlws


In [2]:
# Import Data to Datastore
blob_datastore_name='azblobsdk' # Name of the datastore to workspace
container_name=os.getenv("BLOB_CONTAINER", "mtcsynapse") # Name of Azure blob container
account_name=os.getenv("BLOB_ACCOUNTNAME", "mtcsynapse") # Storage account name
account_key=os.getenv("BLOB_ACCOUNT_KEY", "T9HOwGaOZtNJZTmOGcudpr3hND/2h6rlURq2BIqwMIIIu2oHPfURj2PbNYPdNvdDJLaWdD6uzImEUjtlUU0/uw==") # Storage account access key

blob_datastore = Datastore.register_azure_blob_container(workspace=workspace, 
                                                         datastore_name=blob_datastore_name, 
                                                         container_name=container_name, 
                                                         account_name=account_name,
                                                         account_key=account_key)

csv_path=[(blob_datastore, './FinanceDemo/dbo.taxidata.csv')]
tab_ds = Dataset.Tabular.from_delimited_files(path=csv_path)
df = tab_ds.to_pandas_dataframe()
#Copy the dataset
dfb = df.copy()
#Explore the dataset
dfb.head()

Unnamed: 0,congestion_surcharge,DOLocationID,extra,fare_amount,improvement_surcharge,mta_tax,passenger_count,payment_type,PULocationID,RatecodeID,store_and_fwd_flag,tip_amount,tolls_amount,total_amount,tpep_dropoff_datetime,tpep_pickup_datetime,trip_distance,VendorID
0,2.5,1C340B97-68C4-45D7-838B-CF2F69D78F19,0.0,5.0,0.3,0.5,1.0,2.0,DD889A14-C004-47B7-8217-1E5DB1A7377C,1.0,False,0.0,0.0,8.3,2020-06-17 06:20:34,2020-06-17 06:17:04,1.02,4D5C74F3-EB53-4677-8CC1-40DE172232AA
1,2.5,1C340B97-68C4-45D7-838B-CF2F69D78F19,0.0,5.0,0.3,0.5,1.0,2.0,DD889A14-C004-47B7-8217-1E5DB1A7377C,1.0,False,0.0,0.0,8.3,2020-06-17 06:20:34,2020-06-17 06:17:04,1.02,AE0E18EF-99DC-490C-9F30-93FF65776262
2,2.5,1C340B97-68C4-45D7-838B-CF2F69D78F19,0.0,5.0,0.3,0.5,1.0,2.0,DD889A14-C004-47B7-8217-1E5DB1A7377C,1.0,False,0.0,0.0,8.3,2020-06-17 06:20:34,2020-06-17 06:17:04,1.02,BCEA53DA-E187-4958-8EE3-1350B41EE9DC
3,2.5,1C340B97-68C4-45D7-838B-CF2F69D78F19,0.0,5.0,0.3,0.5,1.0,2.0,070B8A4E-8385-412B-B46A-05F381F6863D,1.0,False,0.0,0.0,8.3,2020-06-17 06:20:34,2020-06-17 06:17:04,1.02,4D5C74F3-EB53-4677-8CC1-40DE172232AA
4,2.5,1C340B97-68C4-45D7-838B-CF2F69D78F19,0.0,5.0,0.3,0.5,1.0,2.0,070B8A4E-8385-412B-B46A-05F381F6863D,1.0,False,0.0,0.0,8.3,2020-06-17 06:20:34,2020-06-17 06:17:04,1.02,AE0E18EF-99DC-490C-9F30-93FF65776262


In [3]:
#get ready for train

#cleanse data
df = df.drop_duplicates()
def impute_missing_for_cats(dataset):
    for i in dataset.columns:
        if (dataset[i].isnull().sum()>0):
            dataset.loc[dataset[i].isnull(),i] = dataset[i].value_counts().index[0]
    return dataset

df = impute_missing_for_cats(df)

#copy to generate datasets to be used for prediction
dft = df.copy()

#transform 
#change data types
dft['tpep_pickup_datetime']  = dft['tpep_pickup_datetime'].astype(str)
dft['tpep_dropoff_datetime'] = dft['tpep_dropoff_datetime'].astype(str)

#apply label encoder
le = preprocessing.LabelEncoder()
df['VendorID']=le.fit_transform(df['VendorID'])
df['PULocationID']=le.fit_transform(df['PULocationID'])
df['DOLocationID']=le.fit_transform(df['DOLocationID'])
df['store_and_fwd_flag']=le.fit_transform(df['store_and_fwd_flag'])

#tip amount should be greater than zero
df = df.loc[df['tip_amount']>=0,:]
df['duration'] = (df.tpep_dropoff_datetime-df.tpep_pickup_datetime).dt.total_seconds()/60.0

#extract date and time values 
df['pickup_year'] = df['tpep_pickup_datetime'].dt.year
df['pickup_month'] = df['tpep_pickup_datetime'].dt.month
df['pickup_weekday'] = df['tpep_pickup_datetime'].dt.weekday
df['pickup_hour'] = df['tpep_pickup_datetime'].dt.hour
df['dropoff_year'] = df['tpep_dropoff_datetime'].dt.year
df['dropoff_month'] = df['tpep_dropoff_datetime'].dt.month
df['dropoff_weekday'] = df['tpep_dropoff_datetime'].dt.weekday
df['dropoff_hour'] = df['tpep_dropoff_datetime'].dt.hour

#drop unneccessary coulumns
df = df.drop(columns='tpep_pickup_datetime')
df = df.drop(columns='tpep_dropoff_datetime')
df = df.loc[df.pickup_year==2020,:]
df = df.loc[df.dropoff_year==2020,:] 
df = df.drop(columns='pickup_year')
df = df.drop(columns='dropoff_year')
df = df.drop(columns='mta_tax')
df = df.drop(columns='dropoff_hour')
df = df.drop(columns='dropoff_weekday')
df = df.drop(columns='dropoff_month')
df = df.drop(columns='tolls_amount')
df = df.drop(columns='improvement_surcharge')
df = df.drop(columns='extra')

#split data set
def split(dataset):
    target_col = "tip_amount"
    X = dataset.drop(columns=target_col)
    y = dataset[target_col]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    return X_train, X_test, y_train, y_test


In [4]:
# Create an Azure ML experiment in your workspace
experiment = Experiment(workspace=workspace, name='batch-taxi-data')
run = experiment.start_logging(snapshot_directory=None)
print("Starting experiment:", experiment.name)


# Prepare X & Y
#Execute function 
X_train, X_test, y_train, y_test = split(df)

# Train a linear regression tree model
print('Training a linear regression model')
reg = LinearRegression()
reg.fit(X_train,y_train)



# calculate accuracy
y_pred = reg.predict(X_test)

run.log('R^2=',metrics.explained_variance_score(y_test,y_pred))
run.log('MAE:',metrics.mean_absolute_error(y_test,y_pred))
run.log('MSE:',metrics.mean_squared_error(y_test,y_pred))
run.log('RMSE:',np.sqrt(metrics.mean_squared_error(y_test,y_pred)))

print('R^2=',metrics.explained_variance_score(y_test,y_pred))
print('MAE:',metrics.mean_absolute_error(y_test,y_pred))

# Save the trained model

# Complete the run
model_file = 'reg.pkl'
joblib.dump(value=reg, filename=model_file)
run.upload_file(name = 'outputs/' + model_file, path_or_stream = './' + model_file)

run.complete()

# Register the model
run.register_model(model_path='outputs/reg.pkl', model_name='reg_model',
                   tags={'Training context':'Inline Training'},
                   properties={"R^2=": run.get_metrics()["R^2="], 'MAE:': run.get_metrics()['MAE:']})
print('Model trained and registered.')

Starting experiment: batch-taxi-data
Training a linear regression model
R^2= 0.8089338839866168
MAE: 0.5383633262366414
Model trained and registered.


In [5]:
# Generate and upload batch data - sample data generated simulating future data to be used for prediction
from azureml.core import Datastore, Dataset
import pandas as pd
import os

# Import Data
# Set default data store



workspace.set_default_datastore('azblobsdk')
default_ds = workspace.get_default_datastore()

# Enumerate all datastores, indicating which is the default
for ds_name in workspace.datastores:
    print(ds_name, "- Default =", ds_name == default_ds.name)                            


#df = pd.read_csv('transformed_taxidata.csv')
dft = dft.drop(columns=[ 'tip_amount'])
# Get a 100-item sample of the feature columns (not the diabetic label)
sample = dft.sample(n=100).values

# Create a folder
batch_folder = './batch-data'
os.makedirs(batch_folder, exist_ok=True)
print("Folder created!")

# Save each sample as a separate file
print("Saving files...")
for i in range(100):
    fname = str(i+1) + '.csv'
    sample[i].tofile(os.path.join(batch_folder, fname), sep=",")
print("files saved!")

# Upload the files to the default datastore
print("Uploading files to datastore...")
default_ds = workspace.get_default_datastore()
default_ds.upload(src_dir="batch-data", target_path="batch-data", overwrite=True, show_progress=True)

# Register a dataset for the input data
batch_data_set = Dataset.File.from_files(path=(default_ds, 'batch-data/'), validate=False)
try:
    batch_data_set = batch_data_set.register(workspace=workspace, 
                                             name='batch-data',
                                             description='batch data',
                                             create_new_version=True)
except Exception as ex:
    print(ex)

print("Done!")


azureml - Default = False
azblobsdk - Default = True
workspaceworkingdirectory - Default = False
workspacefilestore - Default = False
workspaceartifactstore - Default = False
workspaceblobstore - Default = False
Folder created!
Saving files...
files saved!
Uploading files to datastore...
Uploading an estimated of 103 files
Uploading batch-data/.amlignore
Uploaded batch-data/.amlignore, 1 files out of an estimated total of 103
Uploading batch-data/.amlignore.amltmp
Uploaded batch-data/.amlignore.amltmp, 2 files out of an estimated total of 103
Uploading batch-data/1.csv
Uploaded batch-data/1.csv, 3 files out of an estimated total of 103
Uploading batch-data/10.csv
Uploaded batch-data/10.csv, 4 files out of an estimated total of 103
Uploading batch-data/100.csv
Uploaded batch-data/100.csv, 5 files out of an estimated total of 103
Uploading batch-data/11.csv
Uploaded batch-data/11.csv, 6 files out of an estimated total of 103
Uploading batch-data/12.csv
Uploaded batch-data/12.csv, 7 files

"Datastore.upload" is deprecated after version 1.0.69. Please use "Dataset.File.upload_directory" to upload your files             from a local directory and create FileDataset in single method call. See Dataset API change notice at https://aka.ms/dataset-deprecation.


In [6]:
#Create compute target for inference if it is not created 
from azureml.core import Workspace
workspace = Workspace.from_config()
# Create Compute
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "your-compute-cluster"

try:
    # Check for existing compute target
    inference_cluster = ComputeTarget(workspace=workspace, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        inference_cluster = ComputeTarget.create(workspace, cluster_name, compute_config)
        inference_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster, use it.


In [7]:

import os
# Create a folder for the experiment files
experiment_folder = 'batch_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

batch_pipeline


In [8]:

%%writefile $experiment_folder/batch_taxi.py
import os
import numpy as np
import pandas as pd
from azureml.core import Model
import joblib
from sklearn import preprocessing

def init():
    # Runs when the pipeline step is initialized
    global model

    # load the model
    model_path = Model.get_model_path('reg_model')
    model = joblib.load(model_path)


def run(mini_batch):
    # This runs for each batch
    resultList = []

    # process each file in the batch
    for f in mini_batch:
        # Read the comma-delimited data into an array
        le = preprocessing.LabelEncoder()
        data=pd.read_csv(f, delimiter=',', header = None)
        data.columns = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count',
               'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'payment_type',
               'fare_amount', 'extra', 'mta_tax', 'tolls_amount',
               'improvement_surcharge', 'total_amount', 'congestion_surcharge',
               'VendorID', 'PULocationID', 'DOLocationID']
        data['tpep_pickup_datetime']  = pd.to_datetime(data['tpep_pickup_datetime'])
        data['tpep_dropoff_datetime'] = pd.to_datetime(data['tpep_dropoff_datetime']) 
        data['VendorID']=le.fit_transform(data['VendorID'])
        data['PULocationID']=le.fit_transform(data['PULocationID'])
        data['DOLocationID']=le.fit_transform(data['DOLocationID'])
        data['store_and_fwd_flag']=le.fit_transform(data['store_and_fwd_flag'])
        data['duration'] = (data.tpep_dropoff_datetime-data.tpep_pickup_datetime).dt.total_seconds()/60.0
        data['pickup_year'] = data['tpep_pickup_datetime'].dt.year
        data['pickup_month'] = data['tpep_pickup_datetime'].dt.month
        data['pickup_weekday'] = data['tpep_pickup_datetime'].dt.weekday
        data['pickup_hour'] = data['tpep_pickup_datetime'].dt.hour
        data['dropoff_year'] = data['tpep_dropoff_datetime'].dt.year
        data['dropoff_month'] = data['tpep_dropoff_datetime'].dt.month
        data['dropoff_weekday'] = data['tpep_dropoff_datetime'].dt.weekday
        data['dropoff_hour'] = data['tpep_dropoff_datetime'].dt.hour
        data = data.drop(columns='tpep_pickup_datetime')
        data = data.drop(columns='tpep_dropoff_datetime')
        data = data.loc[data.pickup_year==2020,:]
        data = data.loc[data.dropoff_year==2020,:] 
        data = data.drop(columns='pickup_year')
        data = data.drop(columns='dropoff_year')
        data = data.drop(columns='mta_tax')
        data = data.drop(columns='dropoff_hour')
        data = data.drop(columns='dropoff_weekday')
        data = data.drop(columns='dropoff_month')
        data = data.drop(columns='tolls_amount')
        data = data.drop(columns='improvement_surcharge')
        data = data.drop(columns='extra')
        data = data.values.flatten()
        #dataReshape into a 2-dimensional array for prediction (model expects multiple items)
        prediction = model.predict(data.reshape(1, -1))
        # Append prediction to results
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList

Overwriting batch_pipeline/batch_taxi.py


In [9]:
%%writefile $experiment_folder/batch_environment.yml
name: batch_environment
dependencies:
- python=3.6.2
- scikit-learn
- pip
- pip:
  - azureml-defaults
  - pandas

Overwriting batch_pipeline/batch_environment.yml


In [10]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# Create an Environment for the experiment
batch_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/batch_environment.yml")
batch_env.docker.base_image = DEFAULT_CPU_IMAGE
print('Configuration ready.')

Configuration ready.


In [11]:
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig

output_dir = OutputFileDatasetConfig(name='inferences')

parallel_run_config = ParallelRunConfig(
    source_directory=experiment_folder,
    entry_script="batch_taxi.py",
    mini_batch_size="1",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=inference_cluster,
    node_count=2)

parallelrun_step = ParallelRunStep(
    name='batch-score-taxi',
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('taxi_batch')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

print('Steps defined')

Steps defined


In [12]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=workspace, steps=[parallelrun_step])
pipeline_run = Experiment(workspace, 'mslearn-taxi-batch').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

Created step batch-score-taxi [6cd1ce3d][6bc4b7ce-9654-4d78-aca2-f89f9eb293d3], (This step is eligible to reuse a previous run's output)
Submitted PipelineRun 927696a7-e7fe-4566-a8b2-277ffb65b83d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/927696a7-e7fe-4566-a8b2-277ffb65b83d?wsid=/subscriptions/eeff915b-797e-4af4-b406-92c56702f072/resourcegroups/mtcsynapse/workspaces/mtcist-mlws&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
PipelineRunId: 927696a7-e7fe-4566-a8b2-277ffb65b83d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/927696a7-e7fe-4566-a8b2-277ffb65b83d?wsid=/subscriptions/eeff915b-797e-4af4-b406-92c56702f072/resourcegroups/mtcsynapse/workspaces/mtcist-mlws&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: e9064c43-5fa0-4827-8058-44669fceaff6
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/e9064c43-5fa0-4827-8058-44669fceaff6?wsid=/subscriptions/eeff915b-797e-4af4-b406

ActivityFailedException: ActivityFailedException:
	Message: Activity Failed:
{
    "error": {
        "code": "UserError",
        "message": "User program failed with Exception: Run failed, please check logs for details. You can check logs/readme.txt for the layout of logs.",
        "messageParameters": {},
        "detailsUri": "https://aka.ms/azureml-run-troubleshooting",
        "details": []
    },
    "time": "0001-01-01T00:00:00.000Z"
}
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Activity Failed:\n{\n    \"error\": {\n        \"code\": \"UserError\",\n        \"message\": \"User program failed with Exception: Run failed, please check logs for details. You can check logs/readme.txt for the layout of logs.\",\n        \"messageParameters\": {},\n        \"detailsUri\": \"https://aka.ms/azureml-run-troubleshooting\",\n        \"details\": []\n    },\n    \"time\": \"0001-01-01T00:00:00.000Z\"\n}"
    }
}

In [None]:
import pandas as pd
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('taxi-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='taxi-results')

# Traverse the folder hierarchy and find the results file
for root, dirs, files in os.walk('taxi-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
dfs = pd.read_csv(result_file, delimiter=":", header=None)
dfs.columns = ["File", "Prediction"]

# Display the first 20 results
dfs.head(20)

In [None]:
import glob
path =r'./batch-data'
filenames = glob.glob(path + "/*.csv")
dfall = []
for csv in filenames:
    frame = pd.read_csv(csv,delimiter=',', header = None)
    frame['filename'] = os.path.basename(csv)
    dfall.append(frame)
# Concatenate all data into one DataFrame
cols = list(dfb.columns)
cols.append("file_name")
cols.remove("tip_amount")
big_frame = pd.concat(dfall, ignore_index=True)
big_frame.columns = cols
dfj = pd.concat([big_frame,dfs],axis=1, join='inner')
df_final = dfj[['file_name','VendorID','PULocationID','DOLocationID','Prediction']]
df_final.file_name = df_final['file_name'].apply(lambda x: x.rpartition('.')[0]).astype(int)
df_final['Customer'] = df_final.file_name
df_final = df_final.drop(columns = 'file_name')
df_final = df_final[['Customer','VendorID','PULocationID','DOLocationID','Prediction']]
df_final = df_final.sort_values(by = 'Customer')

In [None]:
dft.head()

In [None]:
pd.concat(dfall).head()

In [None]:
df_final.reset_index(drop=True).head()

In [None]:
# Create a folder
batch_folder = './final-data'
os.makedirs(batch_folder, exist_ok=True)
print("Folder created!")

df_final.to_csv('./final-data/final2.csv', index=False)

# Upload the files to the default datastore
print("Uploading files to datastore...")
default_ds = workspace.get_default_datastore()
default_ds.upload(src_dir="final-data", target_path="final-data", overwrite=True, show_progress=True)

In [None]:
dfb.head()

In [None]:
import pandas as pd
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('taxi-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='taxi-results')

# Traverse the folder hierarchy and find the results file
for root, dirs, files in os.walk('taxi-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)