# **This notebook is a POC of a Bitcoin pipeline of data ingest and training**

first we will get the workspace we are working at which is by import Workspace module from azureml.core and using from_config()

In [None]:
from azureml.core import Workspace
ws=Workspace.from_config()

then we will want to attach our compute for the pipeline by importing the module ComputeTraget and AmlCompute.
first we check if we have the desired compute in our workspace and if not we create a new one

In [None]:
from azureml.core.compute import ComputeTarget,AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name='ronku1'
try:
    pipeline_cluster=ComputeTarget(workspace=ws,name=cluster_name)
    print('Found')
except ComputeTargetException:
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_NC6', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)


then we need to create the environment of the pipeline(the vm behind) and here we are using docker to enable faster compute with our GPU.
Or we can use an existing environment that we have

In [None]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration,DockerConfiguration
location='/mnt/batch/tasks/shared/LS_root/mounts/clusters/ronku1/code/Users/ronku/bitcoin_pipeline/' #location of the pipeline folder in the azureml
#create a python environment for the experiment
experiment_env=Environment.from_conda_specification('bitcoin_env',location+'/bitcoin_env.yml')
# Specify a GPU base image
#mcr.microsoft.com/azureml/openmpi3.1.2-cuda10.1-cudnn7-ubuntu18.04
#mcr.microsoft.com/azureml/openmpi3.1.2-cuda10.2-cudnn8-ubuntu18.04
#mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.0.3-cudnn8-ubuntu18.04
experiment_env.docker.base_image = "mcr.microsoft.com/azureml/openmpi4.1.0-cuda11.0.3-cudnn8-ubuntu18.04"

experiment_env.register(workspace=ws,)
#untill here we register the environment and we can skip and comment all the lines above if we have existing and just use the line below
registered_env=Environment.get(ws,'bitcoin_env')
docker_config=DockerConfiguration(use_docker=True)#set docker for true to using the docker.base_image
#Create the pipeline config
pipeline_run_config=RunConfiguration()

#using the compute for the pipeline
pipeline_run_config.target=pipeline_cluster

#assigning the env
pipeline_run_config.environment=registered_env#define the environment
pipeline_run_config.docker=docker_config#enable docker usage in the pipeline config
print('pipeline config created')


In [None]:
%%writefile
name: bitcoin_env
channels:
- conda-forge
dependencies: 
- python==3.8.10
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- tensorflow=
- keras
- pip: 
  - azureml-defaults
  - pyarrow
  - tensorflow
  

then we are starting to create the steps in the pipeline here i used one for data ingest, some preprocessing and predicting while ingesting and one for the training step because i wanted each one to have different schedules

below data ingest step

In [None]:
##creating the pipeline
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

ingest_step=PythonScriptStep(name='Data Ingest',
                            source_directory=location,
                            script_name='data_ingest.py',
                            compute_target=pipeline_cluster,
                            runconfig=pipeline_run_config,
                            allow_reuse=True)


print('Pipeline steps ready')


after the %%writefile you enter the location the file to be written with $ at the start

In [None]:
%%writefile 
import pandas as pd
import numpy as np
import datetime
import requests
from azureml.core import Run
import os
import time
import tensorflow
from tensorflow import keras
from tensorflow.keras.models import load_model
from tensorflow.keras import models
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense,LSTM
from tensorflow.keras import layers
run=Run.get_context()

#getting the data from gateio API
def get_data(start,end):
    host = "https://api.gateio.ws"
    prefix = "/api/v4"
    headers = {'Accept': 'application/json', 'Content-Type': 'application/json'}

    url = '/spot/candlesticks'
    query_param = f'currency_pair=BTC_USDT&interval=15m&from={str(start)}&to={end}'
    r = requests.request('GET', host + prefix + url + "?" + query_param, headers=headers)
    #checking if the response is not empty
    if str(r.json())!='200':
        return '-1'
    else:
        return(r.json()[-1])
#setting the current time and convert it to timestamp
current=int(datetime.datetime.now().timestamp())
filename='Bitcoin_days.csv'
preped_name='preped_data.csv'
prediction_name='prediction.csv'
location='/mnt/batch/tasks/shared/LS_root/mounts/clusters/ronku1/code/Users/ronku/bitcoin_pipeline/'
#getting our dataset from the folder
temp_dataset=pd.read_csv(location+filename,index_col=0)
#start time is the last timestamp in the dataset
start=temp_dataset.iloc[-1,1]
#moving in jumps of 900 becuase we are using 15 mins intervals
end=start+900
#updating a predictions files with a timestamp
predictions=pd.read_csv(location+'prediction.csv',index_col=0)

#loading the model that we trained
model=tensorflow.keras.models.load_model(location+'outputs/bitcoin-pred-model')
#empty numpy array for the new predictions
temp_pred=np.zeros((1,2))
#checking that we don't go over the current time
if end<=current:
    #getting our dataset from the folder
    dataset=pd.read_csv(filename,index_col=0)
    #getting our the preped dataset from the folder
    preped=pd.read_csv(preped_name,index_col=0)
    while end<=current:
        #getting new data point and convert to numpy array
        new_data=get_data(int(start),int(end))
        new_data=np.array(new_data)
        
         if new_data=='-1':
            #if the response is empty copy the last row
            new_data=dataset[-1,:]
        else:
            new_data=np.array(new_data)
        #keeping on variable unchanged for later use
        new_data1=np.reshape(new_data,(1,-1))
        #appending new row to the dataset
        new_data=np.reshape(new_data,(1,-1))
        dataset=np.array(dataset)
        dataset=np.concatenate((dataset,new_data),axis=0)
        #dividing the last column to get the rate of change
        temp_data=float(dataset[-1,2])/float(dataset[-2,2])
        #preprocessing the new point to predict the value
        temp_data1=np.reshape(np.array(temp_data),(1,1))
        temp_data1=np.concatenate((new_data1[:,1:],temp_data1),axis=1)
        temp_data1=np.reshape(temp_data1,(1,1,6)).astype('float64')
        #predicting the new outcome
        pred=(model.predict(temp_data1))
        pred =1 if pred>=0.5 else 0
        pred=np.array([start,pred])
        pred=np.reshape(pred,(1,-1))
        #updating the temp_pred dataset
        temp_pred=np.concatenate((temp_pred,pred),axis=0)
        
        #setting the the target to be 1 if we get that the rate of change is greater than 1
        target=1 if temp_data>=1 else 0
        temp_data=np.reshape(np.array([temp_data,target]),(1,-1))

        second_last_row=np.reshape(np.array(dataset[-2,:]),(1,-1))
        second_last_row=np.concatenate((second_last_row,temp_data),axis=1)
        #updating the preped dataset
        preped=np.concatenate((np.array(preped),second_last_row),axis=0)
        start+=900
        end+=900
    

    #updating the final variables before saving as csv files
    predictions=np.array(predictions)
    predictions=np.concatenate((predictions,temp_pred[1:,:]),axis=0)
    predictions=pd.DataFrame(predictions)

    dataset=pd.DataFrame(dataset)
    preped=pd.DataFrame(preped)

    predictions.to_csv(location+prediction_name)
    dataset.to_csv(location+filename)
    preped.to_csv(location+preped_name)
    print('loading completed')
else:
    print(f'current time {current} is greater than {end}')
run.complete()


below training step

In [None]:
##creating the pipeline
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

train_step=PythonScriptStep(name='Train_step',
                            source_directory=location,
                            script_name='train.py',
                            compute_target=pipeline_cluster,
                            runconfig=pipeline_run_config,
                            allow_reuse=True)


print('Pipeline steps ready')


In [None]:
%%writefile 
from sklearn.preprocessing import StandardScaler,MinMaxScaler
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import tensorflow
import argparse
import joblib
from tensorflow import keras
from tensorflow.keras import models
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense,LSTM
from tensorflow.keras import layers
from azureml.core import Run,Model
from azureml.core.experiment import Experiment
import os
location='/mnt/batch/tasks/shared/LS_root/mounts/clusters/ronku1/code/Users/ronku/bitcoin_pipeline/'


run = Run.get_context()

print('Loading Data...')
data=pd.read_csv(location+'preped_data.csv',index_col=0)
length=data.shape[0]
data=np.array(data)[:,1:]
train_len=int(0.8*length)
#scaling the data to the range of 0 to 1
sc=MinMaxScaler(feature_range=(0,1))
sc.fit(data[:train_len,:])
#applying the transform to the whole data
data=sc.transform(data[:,:])
#data generator for the LSTM model
def generator(data, lookback, delay, min_index, max_index,
    shuffle=False, batch_size=128, step=6):
    if max_index is None:
        max_index = len(data) - delay - 1
    i = min_index + lookback

    if shuffle:
        rows = np.random.randint(
            min_index + lookback, max_index, size=batch_size)
    else:
        if i + batch_size >= max_index:
            i = min_index + lookback
        rows = np.arange(i, min(i + batch_size, max_index))
        i += len(rows)
    samples = np.zeros((len(rows),
            lookback // step,
            data.shape[-1]-1))
    targets = np.zeros((len(rows),))
    for j, row in enumerate(rows):
        indices = range(rows[j] - lookback, rows[j], step)
        samples[j] = data[indices,:-1]
        targets[j] = data[rows[j] + delay][-1]
    return (samples, targets)
lookback=96*14 #observations will go back 14 day
step=1 #observations will be sampled at one data point per hour
delay=4 #target will be hour in the future
batch_size=1024*3
#getting train and val sets
x_train,y_train = generator(data,
                    lookback=lookback,
                    delay=delay,
                    min_index=0,
                    max_index=int(0.8*length),
                    shuffle=True,
                    step=step,
                    batch_size=batch_size)
x_val,y_val = generator(data,
                    lookback=lookback,
                    delay=delay,
                    min_index=int(0.8*length)+1,
                    max_index=None,
                    step=step,
                    batch_size=batch_size)

val_steps=int(0.9*length-0.8*length-lookback)
#model creation
model=tensorflow.keras.models.Sequential()
model.add(tensorflow.keras.layers.LSTM(512,return_sequences=True,input_shape=(None,data.shape[-1]-1)))
model.add(tensorflow.keras.layers.LSTM(256,return_sequences=True))
model.add(tensorflow.keras.layers.LSTM(128,return_sequences=False))
model.add(tensorflow.keras.layers.Dense(32,activation='relu',kernel_regularizer='l2'))
model.add(tensorflow.keras.layers.Dense(1,activation='sigmoid'))

adam=tensorflow.keras.optimizers.Adam()
#compiling with adam and using AUC and accuracy for the metrics
model.compile(optimizer=adam,loss='binary_crossentropy',metrics=['accuracy','AUC'])
model.summary()
#training the model and saving to new variable
history=model.fit(x=x_train,y=y_train,epochs=1,validation_data=(x_val,y_val),verbose=1)
#plotting and logging graphs of thew accuracy auc and loss to azureml studio
loss=history.history['loss']
val_loss=history.history['val_loss']
acc=history.history['accuracy']
val_acc=history.history['val_accuracy']
val_auc=history.history['val_auc']
train_auc=history.history['auc']
epochs = range(1, len(acc) + 1)
current=int(datetime.datetime.now().timestamp())


plt.plot(epochs, acc,label='Training acc')
plt.plot(epochs, val_acc, label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()
run.log_image(name='accuracy'+str(current),plot=plt)
print('accuracy plot was logged')
plt.figure()

plt.plot(epochs, loss, label='Training loss')
plt.plot(epochs, val_loss, label='Validation loss')
plt.title('Training and validation loss')
plt.legend()
run.log_image(name='loss'+str(current),plot=plt)
print('loss plot was logged')

plt.figure()

plt.plot(epochs, train_auc, label='Training AUC')
plt.plot(epochs, val_auc, label='Validation AUC')
plt.title('Training and validation AUC')
plt.legend()
run.log_image('AUC'+str(current),plot=plt)
print('AUC plot was logged')
val_auc=float(val_auc[-1])
val_acc=float(val_acc[-1])
#logging auc and accuracy metrics for later use
run.log(name='AUC',value=val_auc)
run.log(name='Accuracy',value=val_acc)
#loading the current model
old_model=Model(workspace=run.experiment.workspace,name='bitcoin-pred')
run_id=old_model.properties['ID']
exp=Experiment(workspace=run.experiment.workspace,name='Bitcoin-Pipeline')

old_run_metrics=Run(experiment=exp,run_id=run_id).get_metrics()
old_auc=old_run_metrics['AUC']
old_acc=old_run_metrics['Accuracy']
#checking if new model is better than current model if so we will register the new one
if val_auc>float(old_auc) or val_acc>float(old_acc):
    print('Regestring model')
    print("Saving model...")
    model.save(location+'outputs/bitcoin-pred-model')
    model_file = location+'outputs/bitcoin-pred-model'
    #keeping the auc accuracy in the properties of the model
    Model.register(workspace=run.experiment.workspace,
                    model_path=model_file,
                    model_name='bitcoin-pred',
                    tags={'training-context':'pipline'},
                    properties={'ID':str(run.id),
                    'AUC':str(val_auc),
                    'Accuracy':str(val_acc)},
                    run_id=str(run.id)
else:
    print('New model did not registered')
run.complete()


first we run the pipeline to check that everything is running properlly 

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

#creating the pipeline
pipeline_steps=[ingest_step]#pipeline_steps=[train_step]
pipeline=Pipeline(workspace=ws,steps=pipeline_steps)

#create the experiment and run the pipeline
experiment=Experiment(workspace=ws,name='Bitcoin-Pipeline')
pipeline_run=experiment.submit(pipeline,regenrate_outputs=True)
print('Pipeline is submitted and ready for use')
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

after the pipeline ran sucessfully we can publish an endpoint(for example in our exmaple a training endpoint)

In [None]:
#Publishing the pipeline
#we give the publish pipeline name and some description
published_pipeline=pipeline_run.publish_pipeline(name='hourly-bitcoin-data-ingest-and-predict',description='Hourly data ingest',version='1.0')
print(published_pipeline)
#Endpoint
rest_endpoint=published_pipeline.endpoint
print(rest_endpoint)

we can schedule the pipeline to run different time intervals for our uses.Notice you must publish a pipeline first and then you can schedule it.


In [None]:
from azureml.pipeline.core import ScheduleRecurrence,Schedule,TimeZone

#Running the pipeline everyday at midnight
recurrence=ScheduleRecurrence(frequency='Hour',interval=1,start_time='2022-03-08T11:10:00',time_zone=TimeZone.IsraelStandardTime)
daily_schedule=Schedule.create(ws,'hourly-bitcoin-data-ingest-and-predict',pipeline_id=published_pipeline.id,experiment_name='Bitcoin-Pipeline',recurrence=recurrence)
print('Pipeline scheduled')

In [None]:
from azureml.pipeline.core import ScheduleRecurrence,Schedule,TimeZone

#Running the pipeline everyday at midnight
recurrence=ScheduleRecurrence(frequency='Day',interval=1,start_time='2022-03-08T11:10:00',time_zone=TimeZone.IsraelStandardTime)
daily_schedule=Schedule.create(ws,'Daily training',pipeline_id=published_pipeline.id,experiment_name='Bitcoin-Pipeline',recurrence=recurrence)
print('Pipeline scheduled')

and we can cancel the schedules if we don't need some of them anymore

In [None]:
schedules = Schedule.list(ws)
for sch in schedules:
    Schedule.disable(sch)
schedules