In [1]:
import azureml.core
from azureml.core import Workspace
from azureml.core.authentication import InteractiveLoginAuthentication  
from azureml.core.compute import ComputeTarget, AmlCompute  
from azureml.core.compute_target import ComputeTargetException  
from azureml.core import Experiment, ScriptRunConfig, Environment
from azureml.core.runconfig import DockerConfiguration
from azureml.widgets import RunDetails

from azureml.core import Dataset
from azureml.data.datapath import DataPath

import pandas as pd
import matplotlib.pyplot as plt
import urllib  

import warnings
warnings.filterwarnings('ignore')

**Connect to your workspace**

In [2]:
ws = Workspace.from_config(path= './config.json')
print(f'Version of Azure ML {azureml.core.VERSION} to work with {ws.name}')

Version of Azure ML 1.53.0 to work with wks-pablo-ts


**Upload the datastore and create the dataset**

In [3]:
from azureml.core import Dataset
from azureml.data.datapath import DataPath



# Get the default datastore
default_ds = ws.get_default_datastore()

if 'sp500_dataset' not in  ws.datasets:

    Dataset.File.upload_directory(src_dir='data',
                                target=DataPath(default_ds, 'sp500-data/'),
                                overwrite= True
                                )
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'sp500-data/*.csv'))

    data_set =(Dataset
                .Tabular
                .from_delimited_files(path=(default_ds,'sp500-data/*.csv'))
                ) 

    try:
        data_set = data_set.register( workspace=ws,
                                    name =  'sp500 dataset',
                                    description =  'sp500 data',
                                    tags = {'format':'CSV'},
                                    create_new_version=True
                                    )
    except Exception as ex:
        print(ex)



Validating arguments.
Arguments validated.
'overwrite' is set to True. Any file already present in the target will be overwritten.
Uploading files from 'c:/Users/pablo.prieto/Desktop/TS_Azure/data' to 'sp500-data/'
Creating new dataset


**Create scripts for pipeline steps**

In [4]:
import os

preparing_folder = 'sp500_prep'
os.makedirs(preparing_folder,exist_ok=True)
print(preparing_folder, 'folder created')

sp500_prep folder created


**The pipeline contains two scripts:**

The first script named *prep_sp500.py* loads and prepares the data that will be exported to the next script.

The second script so-called *training_sp500.py*  imports the preprocesed data in order to train and to register a model based on SARIMAX structure. 

In [5]:
%%writefile $preparing_folder/prep_sp500.py

import os
import argparse
import pandas as pd
from azureml.core import Run

# Get_parameter

parser = argparse.ArgumentParser()
parser.add_argument('--input-data', type = str, dest = 'raw_dataset_id' , help = 'raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Get run context

run = Run.get_context()

print("Loading data.....")

df = run.input_datasets['raw_data'].to_pandas_dataframe()

df["DATE"] =  pd.to_datetime(df["DATE"])
df['SP500']= pd.to_numeric(df['SP500'], errors='coerce')
df = pd.DataFrame(df)
df = df.dropna()               
               
row_sp500 = len(df)

run.log('Number_rows', row_sp500)

# Save the prepped data
print("Saving Data...")
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder,'SP500.csv')
df.to_csv(save_path, index = False, header=True)

# End the run
run.complete()


Overwriting sp500_prep/prep_sp500.py


In [6]:
training_folder = 'sp500_training'
os.makedirs(training_folder,exist_ok=True)
print(training_folder, 'folder created')

sp500_training folder created


In [7]:
%%writefile $training_folder/training_sp500.py


from sklearn.metrics import mean_squared_error, mean_absolute_error
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller


from sklearn.metrics import mean_squared_error
from azureml.core import Run, Model
import argparse
import joblib
import os

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from typing import Union

import warnings
warnings.filterwarnings("ignore")




def optimize_ARIMA(endog: Union[pd.Series, list], order_list: list, d: int) -> pd.DataFrame:
    
    results = []
    
    for order in order_list:
        try: 
            model = SARIMAX(endog, order=(order[0], d, order[1]), simple_differencing=False).fit(disp=False)
        except:
            continue
            
        aic = model.aic
        results.append([order, aic])
        
    result_df = pd.DataFrame(results)
    result_df.columns = ['(p,q)', 'AIC']
    #Sort in ascending order, lower AIC is better
    result_df = result_df.sort_values(by='AIC', ascending=True).reset_index(drop=True)
    
    return result_df


def rolling_forecast(df: pd.DataFrame, train_len: int, horizon: int, window: int, d: int, order: tuple) -> list:
    
    total_len = train_len + horizon
    pred_ARMA = []
    
    for i in range(train_len, total_len, window):
        model = SARIMAX(endog=df, order=(order[0],d,order[1]))
        res = model.fit(disp=False)
        predictions = res.get_prediction(0, i + window - 1)
        oos_pred = predictions.predicted_mean.iloc[-window:]
        pred_ARMA.extend(oos_pred)
        
    return pred_ARMA


parser = argparse.ArgumentParser()
parser.add_argument("--training-data",
                    type = str, 
                    dest = 'training_data', 
                    help = 'training data' 
                    )

args = parser.parse_args()
training_data = args.training_data


# Get the experiment run context

run = Run.get_context()

# load the prepared data file in the training folder
print("Loading Data...")
file_path = os.path.join(training_data,'SP500.csv')
df = (pd.read_csv(file_path, parse_dates = ["DATE"])
        .set_index("DATE")
     )

shape = df.shape
run.log("DF shape", shape)

d = 0
p_value = adfuller(df)[1]
for i in range(5):
    
    if p_value > 0.05:
        eps_diff = np.diff(df['SP500'], n = i + 1 )
        p_value = adfuller(eps_diff)[1]
        d = d  + 1
        
    else:
        break
print(f"The value of integration d is: {d}")


# Combination of multiple values for p and q from zero to five
n = 5
order_list = [(p,q) for p in range(0,n+1) for q in range(0,n+1)]


limit = np.int(0.2*len(df))
train = df.iloc[:-limit]
test = df.iloc[-limit:]

result_df = optimize_ARIMA(train, order_list, d)
order = result_df.iloc[0,0]

model = SARIMAX(train, order=(order[0],d,order[1]), simple_differencing=False)
model_fit = model.fit(disp=False)



TRAIN_LEN = len(train)
HORIZON = len(test)
WINDOW = 1
pred_ARMA = rolling_forecast(df, TRAIN_LEN, HORIZON, WINDOW,d,order)

df_test  = pd.DataFrame(test)
df_test["pred_fort"] = pred_ARMA

rmse = mean_squared_error(df_test['SP500'], df_test["pred_fort"])


fig, ax = plt.subplots()
ax.plot(df_test['SP500'].iloc[:30], label='Actual')
ax.plot(df_test["pred_fort"].iloc[:30], label='Test')
fig.autofmt_xdate()
plt.tight_layout()
run.log_image('Test', plot = fig)
print('RMSE:', rmse)
run.log('RMSE', np.float(rmse))

# Save the trained model in the outputs folder
print("Saving model...")
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'sp500_model.pkl')
joblib.dump(value=model, filename=model_file)


# Register the model
print('Registering model...')
Model.register(workspace=run.experiment.workspace,
            model_path = model_file,
            model_name = 'sp500_model',
            tags={'Training context':'Pipeline'},
            properties={'RMSE': np.float(rmse)})

run.complete()



Overwriting sp500_training/training_sp500.py


In [8]:

from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_cmp = "prediction-sp500"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_cmp)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(workspace=ws, 
                                                name=cluster_cmp, 
                                                provisioning_configuration=compute_config
                                                )
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster, use it.


In [9]:
%%writefile $training_folder/experiment_env.yml

name: experiment_env

dependencies:
- python=3.8.10
- scikit-learn
- ipykernel
- matplotlib
- statsmodels
- pandas
- typing
- pip
- pip:
   - azureml-defaults
   - pyarrow



Overwriting sp500_training/experiment_env.yml


In [10]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment (from a .yml file)
experiment_env = Environment.from_conda_specification("experiment_env", training_folder + "/experiment_env.yml")

# Register the environment 
experiment_env.register(workspace=ws)
registered_env = Environment.get(ws, 'experiment_env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


**Create and Run Pipeline**

In [11]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

# Get the training dataset
sp500_ds = ws.datasets.get("sp500 dataset")

# Create an OutputFileDatasetConfig (temporary Data Reference) for data passed from step 1 to step 2
prepped_data = OutputFileDatasetConfig("prepped_data")

prep_step = PythonScriptStep(name= "Prepare Data",
                             source_directory = preparing_folder,
                             script_name = "prep_sp500.py",
                             arguments = ['--input-data', sp500_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data],
                            compute_target = pipeline_cluster,
                            runconfig = pipeline_run_config,
                            allow_reuse = True
                             )

# Step 2, run the training script
train_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = training_folder,
                                script_name = "training_sp500.py",
                                arguments = ['--training-data', prepped_data.as_input()],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

Pipeline steps defined


In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'mslearn-sp500-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

**Depict the metrics recorded.**

In [13]:

for run in pipeline_run.get_children():
    print(run.name, ':')
    metrics = run.get_metrics()
    for metric_name in metrics:
        print('\t',metric_name, ":", metrics[metric_name])

Train and Register Model :
	 DF shape : (1258, 1)
	 Test : aml://artifactId/ExperimentRun/dcid.b81ed40e-dd54-4d45-85e2-31ec81cd6a75/Test_1698363614.png
	 RMSE : 1737.3741505628946
Prepare Data :
	 Number_rows : 1258


In [14]:
# Publish the pipeline from the run
published_pipeline = pipeline_run.publish_pipeline(
    name="sp500-training-pipeline", description="Trains sp500 model", version="1.0")

published_pipeline

Name,Id,Status,Endpoint
sp500-training-pipeline,626a1fc8-ea19-492c-a260-40949e7481b4,Active,REST Endpoint
