# Integrated Hybrid BLE-RSSI Pipeline Notebook

This Notebook allows us to train the model on UCS and deploy the model on any cloud of choice ( AWS or GCP or Azure) from one point, provided required parameters and secrets for the chosen cloud are set.

## Clone Cisco Kubeflow Starter Pack repository

In [None]:
BRANCH_NAME="hybrid" #Provide git branch as "master"/"dev"/"hybrid"
! git clone -b $BRANCH_NAME https://github.com/CiscoAI/cisco-kubeflow-starter-pack.git

## Install common packages

In [None]:
pip install kfp pandas numpy --user

## Install specific packages

### Install Packages below if using aws cloud

In [None]:
pip install boto3 sagemaker mxnet --user

### Install Packages below if using azure cloud

In [None]:
pip install azureml-core --user

## Restart notebook kernel

In [None]:
from IPython.display import display_html
display_html("<script>Jupyter.notebook.kernel.restart()</script>",raw=True)

## Set name of cloud to be used for model deployment ( aws or gcp or azure)

In [None]:
CLOUD_NAME=''

In [None]:
if not CLOUD_NAME or CLOUD_NAME not in ('aws','gcp','azure'):
     raise ValueError("Set name of the cloud on which you need to deploy your model: gcp/aws/azure")

## Import common libraries

In [None]:
import kfp
from kfp import components
import kfp.dsl as dsl
from kubernetes import client as k8s_client
import os
import logging
import time
import json
import numpy as np
from datetime import datetime
import sys

## Import Parameters file

In [None]:
params_file_path='cisco-kubeflow-starter-pack/apps/networking/ble-localization/hybrid/integrated/'
sys.path.insert(1, params_file_path)
import parameters

## Import specific libraries

In [None]:
if CLOUD_NAME=='aws':
        
        from kfp.aws import use_aws_secret
        import boto3
        import pandas as pd
        import sagemaker
        import mxnet as mx
        from mxnet import nd
        from sagemaker.mxnet import MXNetModel
        from sagemaker.predictor import json_serializer, json_deserializer, RealTimePredictor
        
elif CLOUD_NAME=='gcp':

        import kfp.gcp as gcp
        import kfp.dsl as dsl
        import googleapiclient.discovery
        
elif CLOUD_NAME=='azure':

        import base64
        from azureml.core.webservice import AciWebservice
        from azureml.core.webservice import Webservice
        from azureml.core.model import Model
        from azureml.core.authentication import ServicePrincipalAuthentication
        from azureml.core import Workspace
        from azureml.core.image import Image
        from azureml.core import Model

## Import parameters and set to local variables

In [None]:
if CLOUD_NAME=='aws':
    
        from parameters import Aws_params
        execution_mode, bucket_name, secret_name = Aws_params.execution_mode, Aws_params.bucket_name, Aws_params.secret_name 
        aws_cloud_region, model_name, instance_type, role_arn = Aws_params.aws_cloud_region, Aws_params.model_name, Aws_params.instance_type, Aws_params.role_arn 
        inference_image, endpoint_config_name, endpoint_name, model_path = Aws_params.inference_image, Aws_params.endpoint_config_name, Aws_params.endpoint_name, Aws_params.model_path
        
        %env AWS_DEFAULT_REGION={aws_cloud_region}
        
elif CLOUD_NAME=='gcp':
    
        from parameters import Gcp_params
        execution_mode, bucket_name, secret_name, gcp_cloud_region = Gcp_params.execution_mode, Gcp_params.bucket_name, Gcp_params.secret_name, Gcp_params.gcp_cloud_region 
        model_name, instance_type, google_application_credentials = Gcp_params.model_name, Gcp_params.instance_type, Gcp_params.google_application_credentials
        version_name, model_path, project_id = Gcp_params.version_name, Gcp_params.model_path, Gcp_params.project_id

        %env GOOGLE_APPLICATION_CREDENTIALS={google_application_credentials}
        
elif CLOUD_NAME=='azure':
    
       from parameters import Azure_params
       azure_model, azure_service = Azure_params.azure_model, Azure_params.azure_service       

## Validate host

In [None]:
if CLOUD_NAME=='aws' or CLOUD_NAME=='gcp':
    
        if execution_mode == "local" and host == '':
                 raise ValueError("Please set host to the appropriate URL")
        elif execution_mode != "local":
                 execution_mode = "in-cluster"


## Validate parameters

In [None]:
if CLOUD_NAME=='aws':
    
    awsParams = [execution_mode, bucket_name, secret_name, instance_type, aws_cloud_region, model_name, role_arn, inference_image, endpoint_config_name, endpoint_name, model_path]
    for param in awsParams:
        if not param:
            raise ValueError("One of the parameters in the aws_params list is missing. Please check whether all parameters are entered values")
            
elif CLOUD_NAME=='gcp':
    
    gcpParams = [execution_mode, bucket_name, secret_name, instance_type, google_application_credentials, gcp_cloud_region, model_name, version_name, model_path, project_id]
    for param in gcpParams:
        if not param:
            raise ValueError("One of the parameters in the gcp_params list is missing. Please check whether all parameters are entered values")
            
elif CLOUD_NAME=='azure':
    
    azureParams = [azure_model, azure_service]
    for param in azureParams:
        if not param:
            raise ValueError("One of the parameters in the azure_params list is missing. Please check whether all parameters are entered values")

## Load components & declare environment variables

In [None]:
if CLOUD_NAME=='aws':
    model = 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/sagemaker/model/component.yaml'
    deploy = 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/sagemaker/deploy/component.yaml'
    
    sagemaker_model_op = components.load_component_from_url(model)
    sagemaker_deploy_op = components.load_component_from_url(deploy)

    def blerssi_aws_train_upload_op(step_name='aws-train'):
        return dsl.ContainerOp(
            name='aws-train-upload-s3',
            image='ciscoai/mxnet-blerssi-train-upload:v0.2',
            command=['python', '/opt/mx-dnn.py', 'train'],
            arguments=['--bucket-name', bucket_name]
        ).apply(use_aws_secret(secret_name=secret_name, aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'))
    
elif CLOUD_NAME=='gcp':
    from parameters import Timestamp
    deploy="https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/ml_engine/deploy/component.yaml"
    
    mlengine_deploy_op = components.load_component_from_url(deploy)

    def blerssi_train_upload_op(step_name='blerssi-train'):
        return dsl.ContainerOp(
            name='blerssi-train-upload-gcp',
            image='docker.io/samba07/blerssi-gcp-mlengine:0.2',
            command=['python', '/opt/blerssi-model.py'],
            arguments=['--bucket-name', bucket_name,
                       '--model-version', Timestamp.gcp_timestamp]
        ).apply(gcp.use_gcp_secret(secret_name))

elif CLOUD_NAME=='azure':
    
    path='cisco-kubeflow-starter-pack/apps/networking/ble-localization/hybrid/azure/pipelines/'
    component_root_train = path + 'components/train_model/'
    component_root_register = path + 'components/register_model/'
    component_root_deploy = path + 'components/deploy_model/'
    
    azure_train_op = kfp.components.load_component_from_file(os.path.join(component_root_train, 'component.yaml'))
    azure_register_op = kfp.components.load_component_from_file(os.path.join(component_root_register, 'component.yaml'))
    azure_deploy_op = kfp.components.load_component_from_file(os.path.join(component_root_deploy, 'component.yaml'))
    
    nfs_pvc = k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='nfs')
    nfs_volume = k8s_client.V1Volume(name='nfs', persistent_volume_claim=nfs_pvc)
    nfs_volume_mount = k8s_client.V1VolumeMount(mount_path='/mnt/', name='nfs')
    
    workspace_name = os.getenv('WORKSPACE_NAME')
    subscription_id = os.getenv('SUBSCRIPTION_ID')
    resource_group = os.getenv('RESOURCE_GROUP')
    tenant_id = os.getenv('TENANT_ID')
    service_principal_id = os.getenv('SERVICE_PRINCIPAL_ID')
    service_principal_password = os.getenv('SERVICE_PRINCIPAL_PASSWORD')




## Define pipeline functions

In [None]:
if CLOUD_NAME=='aws':
    
    @dsl.pipeline(
    name='AWS Sagemaker Hybrid Pipeline',
    description='Pipeline to train BLERSSI model using mxnet and save in aws s3 bucket'
    )
    def aws_pipeline(
        region="",
        image="",
        model_name="",
        endpoint_config_name="",
        endpoint_name="",
        model_artifact_url="",
        instance_type_1="",
        role=""
    ):
        train_upload_model = blerssi_aws_train_upload_op()

        create_model = sagemaker_model_op(
            region=region,
            model_name=model_name,
            image=image,
            model_artifact_url=model_artifact_url,
            role=role
        ).apply(use_aws_secret(secret_name=secret_name, aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'))
        create_model.after(train_upload_model)

        sagemaker_deploy=sagemaker_deploy_op(
            region=region,
            endpoint_config_name=endpoint_config_name,
            endpoint_name=endpoint_name,
            model_name_1=create_model.output,
            instance_type_1=instance_type_1
        ).apply(use_aws_secret(secret_name=secret_name, aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'))
        sagemaker_deploy.after(create_model)
        
elif CLOUD_NAME=='gcp':
    
    @dsl.pipeline(
    name='CloudML deploy pipeline',
    description='CloudML deploy pipeline'
     )
    def gcp_pipeline(
          model_uri = '',
          project_id = '',
          model_id = '',
          version_id = '',
          runtime_version = '1.10',
          python_version = '',
          version = '',
          replace_existing_version = 'False',
          set_default = 'True',
          wait_interval = '30'):

        train_upload_model = blerssi_train_upload_op()

        task = mlengine_deploy_op(
            model_uri=model_uri, 
            project_id=project_id, 
            model_id=model_id, 
            version_id=version_id, 
            runtime_version=runtime_version, 
            python_version=python_version,
            version=version, 
            replace_existing_version=replace_existing_version, 
            set_default=set_default, 
            wait_interval=wait_interval).apply(gcp.use_gcp_secret(secret_name))
        task.after(train_upload_model)
        
elif CLOUD_NAME=='azure':
    
    def azure_pipeline():
    
            #Define task for training BLERSSI data
            azure_train_task = azure_train_op()
            azure_train_task.add_volume(nfs_volume)
            azure_train_task.add_volume_mount(nfs_volume_mount)

            #Define task for registering BLERSSI model on Azure
            azure_register_task = azure_register_op(workspace_name=workspace_name,
                                                   subscription_id=subscription_id,
                                                   resource_group=resource_group,
                                                   model_name=azure_model,
                                                   tenant_id=tenant_id,
                                                   service_principal_id=service_principal_id,
                                                   service_principal_password=service_principal_password)

            azure_register_task.add_volume(nfs_volume)
            azure_register_task.add_volume_mount(nfs_volume_mount)
            azure_register_task.after(azure_train_task)

            #Define Task for deploying BLERSSI model on Azure 
            azure_deploy_task = azure_deploy_op(workspace_name=workspace_name,
                                                subscription_id=subscription_id,
                                                resource_group=resource_group,
                                                model_name=azure_model,
                                                service_name=azure_service,
                                                tenant_id=tenant_id,
                                                service_principal_id=service_principal_id,
                                                service_principal_password=service_principal_password)

            azure_deploy_task.after(azure_register_task)

## Run pipeline

In [None]:
if CLOUD_NAME=='aws':
    
    from parameters import Timestamp
    try:
        import kfp.compiler as compiler
        compiler.Compiler().compile(aws_pipeline, 'blerssi_aws_pipeline.tar.gz')
    except RuntimeError as err:
        logging.debug(err)
        logging.info("Argo workflow failed validation check but it can still be used to run experiments.")
        
    client = None
    if execution_mode == "local":
        client = kfp.Client(host=host)
    else:
        client = kfp.Client()
    blerssi_hybrid_experiment = client.create_experiment(name='BLERSSI-Sagemaker')

    run = client.run_pipeline(blerssi_hybrid_experiment.id, 'blerssi-sagemaker-pipeline-'+Timestamp.timestamp, pipeline_package_path='blerssi_aws_pipeline.tar.gz', params={
'region': aws_cloud_region,
'image': inference_image,
'model_name': model_name,
'endpoint_config_name': endpoint_config_name,
'endpoint_name': endpoint_name,
'model_artifact_url': model_path,
'instance_type_1': instance_type,
'role': role_arn
})

elif CLOUD_NAME=='gcp':
    
    from parameters import Timestamp
    try:
        import kfp.compiler as compiler
        compiler.Compiler().compile(gcp_pipeline, 'blerssi_gcp_pipeline.tar.gz')
    except RuntimeError as err:
        logging.debug(err)
        logging.info("Argo workflow failed validation check but it can still be used to run experiments.")
        
    client = None
    if execution_mode == "local":
        client = kfp.Client(host=host)
    else:
        client = kfp.Client()
    blerssi_hybrid_experiment = client.create_experiment(name='BLERSSI-GCP')
        
    run = client.run_pipeline(blerssi_hybrid_experiment.id, 'blerssi-gcp-mlengine-pipeline-'+Timestamp.gcp_timestamp, pipeline_package_path='blerssi_gcp_pipeline.tar.gz',
                         params={'model_uri': model_path,
                                 'project_id': project_id,
                                 'model_id': model_name,
                                 'version_id': version_name})
    
elif CLOUD_NAME=='azure':
    
    #Create a pipeline run
    kfp.Client().create_run_from_pipeline_func(azure_pipeline, arguments={})
    
    


## Check service endpoint status

In [None]:
if CLOUD_NAME=='aws':
    
    sagemaker_session = sagemaker.Session()
    sg_client = boto3.client('sagemaker', region_name=aws_cloud_region)
    resp = sg_client.describe_endpoint(EndpointName=endpoint_name)
    endpoint_status = resp['EndpointStatus']
    print("Endpoint status:", endpoint_status)
    logging.info(f"Endpoint status: {endpoint_status}")
    
elif CLOUD_NAME=='gcp':
    
    service = googleapiclient.discovery.build('ml', 'v1')
    name = 'projects/{}/models/{}/versions/{}'.format(project_id, model_name, version_name)
    
elif CLOUD_NAME=='azure':
    
    svc_pr_password = os.environ.get("AZUREML_PASSWORD")
    svc_pr = ServicePrincipalAuthentication(
    tenant_id=tenant_id,
    service_principal_id=service_principal_id,
    service_principal_password=service_principal_password)

    ws = Workspace(
    subscription_id=subscription_id,
    resource_group=resource_group,
    workspace_name=workspace_name,
    auth=svc_pr
    )
    
    service = Webservice(workspace=ws, name=azure_service)
    print(service.scoring_uri)
    
    print(service.get_logs())   

## Predict using service endpoint

In [None]:
if CLOUD_NAME=='aws':
    
    predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sagemaker_session, content_type= 'application/x-npy', accept= 'application/json')

    def _npy_dumps(data):
        """
        Serialized a numpy array into a stream of npy-formatted bytes.
        """
        from six import BytesIO
        buffer = BytesIO()
        np.save(buffer, data)
        return buffer.getvalue()

    request_data = _npy_dumps(nd.array([[-200, -200, -200, -75, -200, -200, -200, -200, -200, -200, -200, -200, -200],[-200, -200, -200, -75, -200, -200, -200, -200, -200, -200, -200, -200, -200]]).asnumpy())
    result = predictor.predict(data=request_data)

    import pickle
    depickled_result = pickle.loads(result)

    print("Outputs, predictions")
    print(depickled_result[0], depickled_result[1])
    
elif CLOUD_NAME=='gcp':
    
    instances=[{"b3001":[1.0] , "b3002":[1.0] , "b3003":[0.4], "b3004":[1.0] , "b3005":[0.385] , "b3006":[0.280] , "b3007":[0.405] , "b3008":[1.0] , "b3009":[1.0] , "b3010":[1.0] , "b3011":[1.0] , "b3012":[1.0] , "b3013":[1.0]}]
    response = service.projects().predict(
        name=name,
        body={'instances': instances}
    ).execute()

    if 'error' in response:
        raise RuntimeError(response['error'])
    else:
      print(response['predictions'])

elif CLOUD_NAME=='azure':
    
    X=[[-200,-200,-200,-200,-200,-63,-200,-200,-200,-200,-200,-200,-200]]
    test_samples = json.dumps({"data": X})
    test_samples = bytes(test_samples, encoding='utf8')
    
    print(service.run(input_data=test_samples))
    
    

## Clean up after prediction

In case of GCP cloud, please enter service account name & project ID in the placeholders below.

In [None]:
if CLOUD_NAME=='aws':
    
    logging.info("Deleting endpoint...")
    predictor.delete_endpoint()
    
elif CLOUD_NAME=='gcp':
    
    !gcloud auth activate-service-account <<ACCOUNT>> --key-file=auth.json --project=<<PROJECT ID>>

    # Delete version resource
    ! gcloud ai-platform versions delete $version_name --quiet --model $model_name 

    # Delete model resource
    ! gcloud ai-platform models delete $model_name --quiet

    # Delete Cloud Storage objects that were created
    ! gsutil -m rm -r gs://$bucket_name/
        
elif CLOUD_NAME=='azure':
    
    #Delete/cleanup created webservice
    service.delete()

    #Delete created image 
    image_obj = Image(ws, name=azure_service)
    image_obj.delete()

    #Delete created model
    model_obj = Model(ws, name=azure_model)
    model_obj.delete()