# Import and publish projects
This Jupyter Notebook uses the SAS Event Stream Processing Studio REST API and the SAS Event Stream Manager REST API to import, publish, and run projects in a Kubernetes cluster.


Pre-requisite: This example will assume that at same level as this Jupyter Notebook there is an *xml_projects* folder with XML files representing proejcts the user wants to import and publish to ESP.

This Jupyter Notebook uses the project XML files in the `xml_projects` folder and performs the following tasks:
1. Check whether the projects have already been imported to SAS Event Stream Processing Studio.
2. If a project has been previously imported, then import it using the next version number. Otherwise, import the project as version 1.
3. Make the projects public so that they are visible to all users.
4. Publish the projects from SAS Event Stream Processing Studio to SAS Event Stream Manager.
5. Synchronize the projects.
6. Create a SAS Event Stream Manager deployment whose type is "Cluster".
7. Run the projects in the Kubernetes cluster. This action creates and starts an ESP server for each project.

   
Note: Please make sure you run [Imports and Global Variables](#imports) before executing anything else in this notebook. Also ensure you are authenticated by running [Get Access Token](#authentication).


## Imports and Global Variables <a id='imports'></a>
Run this cell before any of the others as it imports packages and sets variables that will be used throughout the notebook.

In [1]:
import requests
import xml.etree.ElementTree as ET
import os
import json
import time
import sys
from urllib3.exceptions import InsecureRequestWarning


def bootstrap_server_and_credentials():
    global server, username, password, chosen_deployment_name
    server = "acme.kc4.ingress-nginx.espsc-kc5-m1.espstudio.sashq-d.openstack.sas.com"
    username = "fsduser"
    password = "Mercury7"
    chosen_deployment_name = "test"
    if len(sys.argv) > 3:
        server = sys.argv[1]
        username = sys.argv[2]
        password = sys.argv[3]

    print('Server: ' + server)
    print('Username: ' + username)
    print('Password: ' + password, flush=True)
    print('ESM Deployment Cluster Name: ' + chosen_deployment_name, flush=True)

# Suppress ssl warnings caused by verify=False
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
bootstrap_server_and_credentials()

Server: acme.kc4.ingress-nginx.espsc-kc5-m1.espstudio.sashq-d.openstack.sas.com
Username: fsduser
Password: Mercury7
ESM Deployment Cluster Name: test


# Get Access token <a id='authentication'></a>

In [2]:
def get_access_token():
    body = {'grant_type': 'password', 'username': username, 'password': password}
    headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Authorization': 'Basic c2FzLmVjOg=='}
    access_token_response = requests.post('http://' + server + '/SASLogon/oauth/token', data=body, headers=headers,
                                          verify=False)
    return access_token_response.json()["access_token"]

access_token = get_access_token()
print(access_token)
headers = {"Content-Type": "application/json", "Authorization": "Bearer " + access_token}

eyJqa3UiOiJodHRwczovL2xvY2FsaG9zdC9TQVNMb2dvbi90b2tlbl9rZXlzIiwia2lkIjoibGVnYWN5LXRva2VuLWtleSIsInR5cCI6IkpXVCIsImFsZyI6IlJTMjU2In0.eyJzdWIiOiI1NDk3NWNmMi0wZDhiLTQ1ZmMtOWZhYy0wOTlhYjZjNmQ1MDciLCJ1c2VyX25hbWUiOiJmc2R1c2VyIiwib3JpZ2luIjoibGRhcCIsImlzcyI6Imh0dHA6Ly9sb2NhbGhvc3QvU0FTTG9nb24vb2F1dGgvdG9rZW4iLCJhdXRob3JpdGllcyI6WyJWaXlhU0FTQWRtaW5zIiwic3Zpdm11c2VycyIsInN2aWRldm9wcyIsImludGNhY29kZXNpZ25pbmciLCJ1bml4X3ImZCIsIlVTU0FTIiwicmNpLWNpcnJ1cy1kZXYiLCJpbnRjYXdlYmNlcnRyZXEiLCJvcGVuc3RhY2t1c2VycyIsInJjaS1jbHVzdGVyZGV2bW9kZSIsImludGNhdXNlcnMiXSwiY2xpZW50X2lkIjoic2FzLmVjIiwiYXVkIjpbInVhYSIsIm9wZW5pZCIsInNhcy5lYyJdLCJleHRfaWQiOiJjbj1STVMgRlNEIFByb2R1Y3RzIFVzZXIsb3U9R2VuZXJpYyBhbmQgU2hhcmVkIEFjY291bnRzLG91PUFkbWluLGRjPW5hLGRjPVNBUyxkYz1jb20iLCJ6aWQiOiJ1YWEiLCJncmFudF90eXBlIjoicGFzc3dvcmQiLCJ1c2VyX2lkIjoiNTQ5NzVjZjItMGQ4Yi00NWZjLTlmYWMtMDk5YWI2YzZkNTA3IiwiYXpwIjoic2FzLmVjIiwic2NvcGUiOlsib3BlbmlkIiwidWFhLnVzZXIiXSwiYXV0aF90aW1lIjoxNzE1MzQ5MTY4LCJleHAiOjE3MTUzNTI3NjgsImlhdCI6MTcxNTM0OTE2OCwianRp

# Function to get ESP Projects

In [5]:
def get_projects():
    projects = []
    get_projects_response = requests.get(
        'http://' + server + '/SASEventStreamProcessingStudio/esp-project', headers=headers, verify=False)
    if get_projects_response.status_code == 200:
        projects = get_projects_response.json()["items"]
    return projects

# Function to import project to ESP Studio

In [6]:
def import_project_to_studio(project_body):
    project_id = ""
    import_project_response = requests.post('http://' + server + '/SASEventStreamProcessingStudio/esp-project',
                                            data=json.dumps(project_body), headers=headers, verify=False)
    if import_project_response.status_code == 200:
        project_id = import_project_response.json()["flowId"]
        print('success, project_id=' + project_id)
    return project_id

# Function to get next version number of a project

In [7]:
def get_next_project_version(project_id):
    version = 2
    get_next_version_response = requests.get(
        'http://' + server + '/SASEventStreamProcessingStudio/project-versions/projects/' + project_id + '/nextVersion',
        headers=headers, verify=False)
    if get_next_version_response.status_code == 200:
        version = get_next_version_response.json()["major"]
    else:
        print('Failed to get next version')
        print(get_next_version_response)
    return version

# Function to make project public - by default it is private and hidden from other users

In [8]:
def make_project_public(project_id):
    requests.patch('http://' + server + '/SASEventStreamProcessingStudio/esp-project/'
                   + project_id + '/authorization?private=false',
                   headers={'Authorization': 'Bearer ' + access_token}, verify=False)

# Function to create expected project model to be published

In [9]:
def create_publish_project_body(project_body, version):
    project_body["name"] = project_body["friendlyName"]
    project_body["description"] = ""
    project_body["friendlyName"] = None
    project_body["majorVersion"] = str(version)
    project_body["minorVersion"] = "0"
    project_body["version"] = str(version) + '.0'
    project_body["versionNotes"] = "notes"
    project_body["uploadedBy"] = username
    project_body["modifiedBy"] = username
    epoch_time = int(time.time())
    project_body["uploaded"] = epoch_time
    project_body["modified"] = epoch_time + 10
    project_body["isDeployable"] = False

# Function to publish project

In [10]:
def publish_project(project_id, project_body, version):
    folder_id = ""
    create_publish_project_body(project_body, version)
    publish_project_response = requests.post(
        'http://' + server + '/SASEventStreamProcessingStudio/project-versions/projects/' + project_id,
        data=json.dumps(project_body), headers=headers, verify=False)

    if publish_project_response.status_code == 200:
        folder_id = publish_project_response.json()["folderId"]
    else:
        print('PUBLISH FAILED')
        print(publish_project_response)
        print('Version:' + str(version))
    return folder_id

# Function to synchronize projects from Studio to ESM

In [11]:
def synchronize_project_for_ESM(folder_id):
    success = False
    synchronize_project_response = requests.post(
        'http://' + server + '/SASEventStreamProcessingStudio/project-versions/projects/synchronize/' + folder_id,
        data=folder_id, headers=headers, verify=False)

    if synchronize_project_response.status_code == 200:
        success = True
    return success

# Function to get deployment details needed to start K8s cluster on ESM

In [12]:
def get_deployment_details():
    success = True
    deployment_id = ''
    deployment_name = ''
    projects_running_on_deployment = []
    deployments_response = requests.get("http://" + server + "/SASEventStreamManager/deployment?noDetails=false",
                                        headers=headers, verify=False)

    if deployments_response.status_code != 200:
        print("Could not find any deployments", deployments_response.text)
        success = False

    # Here we try to start clusters against the hard-coded deployment name
    # if it does not exist, it will spin off cluster against 1st cluster
    deployment_items = deployments_response.json()["items"]
    if len(deployment_items) > 0:
        deployment_id = deployment_items[0]["uuid"]
        deployment_name = deployment_items[0]["label"]
        projects_running_on_deployment = get_project_names_from_deployment(deployment_items[0])
        
        for deployment in deployment_items:
            if deployment["type"] == "cluster" and deployment["label"] == chosen_deployment_name:
                deployment_id = deployment["uuid"]
                deployment_name = deployment["label"]
                projects_running_on_deployment = get_project_names_from_deployment(deployment)
    else:
        print("Could not find any deployments", deployments_response.text)
        success = False

    return success, deployment_id, deployment_name, projects_running_on_deployment

def get_project_names_from_deployment(deployment):
    project_names = []
    for server in deployment["servers"]:
        project_names = project_names + list(map(lambda project: project["name"], server["projects"]))
    return project_names

# Function to start K8s cluster on ESM

In [13]:
def start_k8s_cluster(k8s_project_body):
    success, deployment_id, deployment_name, projects_running_on_deployment = get_deployment_details()

    if k8s_project_body["name"] in projects_running_on_deployment:
        print("Project " + k8s_project_body["name"] + " is already running on deployment " + deployment_name +".")
    elif success:
        # Here are the deployment settings hard-coded for all projects
        k8s_deployment_settings = {
            "persistentVolumeClaim": "sas-event-stream-processing-studio-app",
            "requestsMemory": "1Gi",
            "requestsCpu": 1,
            "requestsGpu": "0",
            "limitsMemory": "1Gi",
            "limitsCpu": 1,
            "limitsGpu": "0",
            "minReplicas": 1,
            "maxReplicas": 1,
            "useLoadBalancer": False,
            "loadBalancingPolicy": "none",
            "averageUtilization": "50",
            "loadBalancerTargetsList": []
        }
        k8s_cluster_body = {
            "distinguisher": deployment_name,
            "esmDeploymentId": deployment_id,
            "gpuReliant": False,
            "loadOnly": False,
            "project": k8s_project_body,
            "settings": k8s_deployment_settings
        }

        response = requests.post("http://" + server + "/SASEventStreamManager/server/cluster",
                                 data=json.dumps(k8s_cluster_body), headers=headers, verify=False)
        if response.status_code != 200:
            print("error creating cluster", response.text)
        else:
            print("Project " + k8s_project_body["name"], " successfully started K8s pod in deployment: " + deployment_name)

# Create ESM Deployment for projects to run against

In [3]:
def create_ESM_cluster_deployment():
    deployment_alread_exists_error = 'A deployment named "' + chosen_deployment_name + '" already exists'
    deployment_body = {
        "name": chosen_deployment_name,
        "type": "cluster"
    }
    response = requests.post('http://' + server + '/SASEventStreamManager/deployment',
                                            data=json.dumps(deployment_body), headers=headers, verify=False)
    if deployment_alread_exists_error in response.text:
        print ("Deployment " + chosen_deployment_name + " already exists, we can proceed to start the projects")
    elif response.status_code != 201:
        print("error creating ESM cluster deployment", response.text)

create_ESM_cluster_deployment()

# Loop through all XML projects in xml_projects folder and do the following
1. Check if projects have already been imported
2. If yes, import it using next version number. Otherwise import it normally with version 1 
3. Make project public so all users can see it
4. Publish project
5. Synchronize projects from Studio to ESM
6. Create ESM Deployment Cluster on which to run the projects
7. Start projets on K8s cluster on ESM

In [14]:
def import_and_publish_xml_files():
    current_dir = os.getcwd() + "/xml_projects"
    projects = get_projects()
    for file_name in os.listdir(current_dir):
        print()
        if not file_name.endswith('.xml'): continue
        version = 1
        project_id = None
        xml_file_path = os.path.join(current_dir, file_name)
        project_name = get_project_name_from_xml(xml_file_path)
        data = open(xml_file_path, "r").read()
        project_body = {'friendlyName': project_name, 'xml': data}

        if projects:
            project_that_matches_xml_name = None
            for project in projects:
                if project['friendlyName'] == project_name:
                    project_that_matches_xml_name = project
                    break

            if project_that_matches_xml_name:
                print(project_name + ' is already imported')
                project_id = project_that_matches_xml_name['flowId']
                version = get_next_project_version(project_id)
                print('Creating new project version: ' + str(version))

        if not bool(project_id):
            print('Importing ' + project_name)
            project_id = import_project_to_studio(project_body)
            make_project_public(project_id)

        folder_id = publish_project(project_id, project_body, version)

        synchronize_project_for_ESM(folder_id)

        if project_id:
            print(file_name + ' successfully published', flush=True)
        else:
            print(file_name + ' failed to publish', flush=True)

        k8s_project_body = {"id": project_id, "name": project_name, "version": version}
        start_k8s_cluster(k8s_project_body)

def get_project_name_from_xml(xml_file_path):
    tree = ET.parse(xml_file_path)
    root = tree.getroot()
    project_name = root.attrib['name']
    return project_name



import_and_publish_xml_files()



Importing lua_connector
success, project_id=3166f279-6073-4cf1-af93-e2d230335ff5
lua_connector.xml successfully published
Project lua_connector  successfully started K8s pod in deployment: test

Importing python_connector
success, project_id=b95dc84b-8c9e-4ade-b69a-42bcb5a9d40e
python_connector.xml successfully published
Project python_connector  successfully started K8s pod in deployment: test
