# Orchestrator development notebook
Juan-Carlos Maureira<br>


## Deployment environment

In [1]:
import sys
import os
from pathlib import Path

from IPython.core.magic import register_cell_magic

deploy_path="../lib/python"
sys.path.insert(0, os.path.abspath(deploy_path))

%load_ext autoreload
%autoreload 2

@register_cell_magic
def deploy(target, cell):
    'deploy classes coded in the cell'
    
    full_target = "%s/%s" % (deploy_path,target)
    if not os.path.exists(os.path.dirname(full_target)):
        os.makedirs(os.path.dirname(full_target))
        init_file = "'%s/__init__.py" % os.path.dirname(full_target)
        if not os.path.exists(init_file):
            open(init_file,"w").close()
    
    with open(full_target, 'wt') as fd:
        fd.write(cell)

class StopExecution(Exception):
    def _render_traceback_(self):
        pass

In [2]:
#from orch.base import *
import requests

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

  from IPython.core.display import display, HTML


## Base Classes

In [3]:
%%deploy /orch/base/slurm.py
import sys
import subprocess
import time
import select
import os
import re
import dill
import pickle
import json
import io

from .AVROCodec import AVROCodec
from ._async import *
from .SerializedObject import *

from taskit.frontend import FrontEnd, BackendNotAvailableError
from taskit.log import FileLogger, DEBUG, INFO, ERROR

from threading import Thread, Event

from promise import Promise
import traceback
import tempfile

from .AbstractJob import AbstractJob

import requests
import json

import traceback
import pandas as pd

class SlurmController(object):
    BATCH_BIN  = "/usr/bin/sbatch"
    SRUN_BIN    = "/usr/bin/srun"
    SCANCEL_BIN = "/usr/bin/scancel"
    SQUEUE_BIN  = "/usr/bin/squeue"

    def __init__(self, job):
        self.job = job

    def squeue(self, steps=False):

        squeue_args = ['-o','%all']
        if steps:
            squeue_args.append('-s')

        result = subprocess.run([self.SQUEUE_BIN] + squeue_args, stdout=subprocess.PIPE, shell=False, universal_newlines = True)
        output = result.stdout

        lines = output.split('\n')
        header_line = lines.pop(0)
        header_cols = header_line.split('|')
        entries = []
        error_lines = [] # do something with this later
        for line in lines:
            parts = line.split('|')
            d = {}
            if len(parts) != len(header_cols):
                error_lines.append((len(parts), line, parts))
            else:
                for i, key in enumerate(header_cols):
                    if key not in d:
                        d[key] = parts[i]
            if d:
                entries.append(d)
        df = pd.DataFrame(entries, columns=header_cols)
        df = df.loc[:,~df.columns.duplicated()]
        return df

    def cancel(self, job_id, steps=False):
        df = self.squeue(steps=steps)
        if df.loc[df.JOBID==job_id].shape[0]>0:
            result = subprocess.run([self.SCANCEL_BIN,job_id], stdout=subprocess.PIPE, shell=False, universal_newlines = True)
            if result.returncode==0:
                return True
            return False
        else:
            # job_id not in squeue
            return False

    def getJobInfo(self, job_id, steps=False):
        df = self.squeue(steps=steps)

        df_job = df.loc[df.JOBID==job_id]
        if df_job.shape[0]>0:
            return json.loads(df_job.iloc[0].to_json())

        return None

    def getJobByName(self,job_name, last=False, steps=False):
        df = self.squeue(steps=steps)
        df_job = df.loc[df.NAME==job_name]
        if df_job.shape[0]>0:
            return json.loads(df_job.to_json(orient="records"))

        return None

class Job(AbstractJob):

    SBATCH_BIN  = "/usr/bin/sbatch"
    SRUN_BIN    = "/usr/bin/srun"
    SCANCEL_BIN = "/usr/bin/scancel"
    SQUEUE_BIN  = "/usr/bin/squeue"

    tmp_stdout  = "./.orch_jobs"

    def __init__(self, params={}):
        super().__init__(params)

        self.job_id        = None
        self.step_id       = None
        self.job           = None
        self.worker_port   = None
        self.host          = None

        self._exec_hdl     = None
        self.result        = None

        self.log_handler   = None

        self.max_retry     = 180
        self.exception     = None

    def __deployBackend(self,function,lock):

        print("starting deploy of backend")

        cmd = []

        steps = False
        if not self.as_job:
            cmd = [ self.SRUN_BIN, ]
            steps = True
        else:
            cmd = [ self.SBATCH_BIN, ]

        # add this when inside a job allocation

        if not self.as_job:
            if "SLURM_JOB_ID" in os.environ:
                if self.exclusive:
                    cmd.append("--exclusive")

                cmd.append("--export=ALL")
                cmd.append("--unbuffered")
            else:
                cmd.append("--export=ALL")
                cmd.append("--unbuffered")
        else:
            cmd.append("--export=ALL")

        for key,value in self.params.items():

            if key=="cores":
                key = "cpus-per-task"
            if key=="name":
                key = "job-name"
            if key=="verbose":
                continue
            if key=="output":
                continue

            cmd.append("--%s=%s" % (key,value))

        cmd.append("%s")

        orch_py="'from bupacl.orch.base import Worker; Worker().run()'"

        cmd_py = []

        cmd_py.append('python3.9')
        cmd_py.append('-u')
        cmd_py.append('-c')
        cmd_py.append(orch_py)

        std_out = ""

        if self.as_job:
            if not os.path.exists(self.tmp_stdout):
                os.makedirs(self.tmp_stdout, exist_ok=True)

            std_out = tempfile.NamedTemporaryFile(dir=self.tmp_stdout,delete=False).name
            worker_cmd = "--wait -o %s --wrap=\"%s\"" % (std_out," ".join(cmd_py))
            cmd_str = " ".join(cmd) % worker_cmd
        else:
            cmd_str = " ".join(cmd) % " ".join(cmd_py)

        #print("cmd:",cmd_str)
        #print("as_job:",self.as_job)

        proc = subprocess.Popen(cmd_str, shell=True,stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

        poll = select.poll()
        r    = proc.poll()

        if r is not None and r != 0:
            self.exception=RuntimeError("srun command failed with exit code %d" % r)
            lock.set()
            return

        event_mask = select.POLLIN | select.POLLERR | select.POLLHUP | select.EPOLLIN | select.EPOLLPRI

        # check for node allocation status
        slurm_ctrl = SlurmController(self)
        #print(self.params)

        if self.as_job:
           print("running as job") 

        for retry in range(0,10):
            #print("waiting job info %s" % self.params["job-name"])
            job_info_lst = slurm_ctrl.getJobByName(self.params["job-name"],steps=steps)
            if job_info_lst is None:
                time.sleep(10)
            else:
                break

        if job_info_lst is None:
            self.exception=RuntimeError("Could not get jobinfo from slurm")
            lock.set()
            return 

        job_info = job_info_lst[-1]  # get the last job in the list
        job_id = None
        if self.as_job:
            job_id = job_info["JOBID"]
        else:
            job_id = job_info["STEPID"]

        wait_time = 10 # segs
        is_running = False
        if self.as_job:
            for retry in range(0,60):
                print("waiting for job")
                job_info = slurm_ctrl.getJobByName(self.params["job-name"],steps=steps)[-1]  # get the last job in the list
                #print("job_info:",job_info)
                if job_info["STATE"]=="RUNNING":
                    is_running = True
                    break
                time.sleep(wait_time)

            if not is_running:
                print("Worker job not running after 600 segs")
                slurm_ctrl.cancel(job_id)
                self.exception = RuntimeError("Worker job not running after 300 segs")
                lock.set()
                return
        else:
            #job step
            pass

        stdout_handler = None
        if self.as_job:
            retry_count = 0
            while not os.path.exists(std_out):
                time.sleep(1)
                retry_count+=1
                if retry_count > self.max_retry:
                    self.exception=RuntimeError("Submitted job was not executed in %d seconds" % self.max_retry)
                    lock.set()
                    return

            stdout_handler = open(std_out,"r")
        else:
            stdout_handler = proc.stdout.fileno()

        poll.register(stdout_handler, event_mask)

        regex="JOBID: ([0-9]*) STEP: ([0-9]*) PORT: ([0-9]*) HOST: (.*)"
        prog = re.compile(regex,re.MULTILINE|re.DOTALL)
        self.host="worker"

        while proc.poll() is None:
            events = poll.poll(1)
            if events is None:
                time.sleep(1)
                continue;

            for fd, event in events:
                if event & select.POLLERR:
                    poll_active = False
                    break
                os.fsync(fd)
                if (event & select.POLLIN) or (event & select.POLLHUP):
                    data = os.read(fd, 10240)
                    if data.decode(errors='replace') == '':
                        continue
                    lines = data.strip().splitlines()
                    for line in lines:
                        if self.verbose and self.host!="worker":
                            print("** %s %d ** %s" % (self.host, self.job_id, line.decode()),flush=True)

                        if self.job_log is not None and self.host!="worker":
                            if self.log_handler is None:
                                log_file = self.job_log.replace("%J", str(self.job_id))
                                if self.step_id is not None:
                                    log_file = log_file.replace("%S", str(self.step_id))
                                # create the job logfile
                                self.log_handler = open(log_file,'w')

                            self.log_handler.write("%s\n" % line.decode())

                        #TODO: capture just the first occurence and then discard further headers of prog regex
                        m = prog.match(line.decode())

                        if m is not None:
                            job_id       = int(m.group(1))
                            step_id      = int(m.group(2))
                            worker_port  = int(m.group(3))
                            self.host    = m.group(4)

                            if worker_port>0:
                                self.worker_port = worker_port
                            else:
                                self.exception=RuntimeError("Returned Worker Port unavailable")
                                lock.set()
                                return

                            if job_id>0:
                                self.job_id = job_id
                                self.job = {}

                                if self.as_job:
                                     self.job["batch_host"] = self.host
                                else:
                                    self.step_id = step_id
                                    self.job["node_list"] = self.host

                            else:
                                self.exception=RuntimeError("Returned JobID is not a number")
                                lock.set()
                                return

                            lock.set()
                            print("backend ready ",job_id, self.host)
                            break

                        if len(line) == 0:
                            break

        #print("waiting for backend to finish")
        exit_code = proc.poll()

        if self.log_handler is not None:
            self.log_handler.close()

        if exit_code>0:
            if self.verbose:
                #print("backend exit_code %d" % exit_code);
                #if self.as_job:
                    #print(stdout_handler.read())
                #else:
                    #print(proc.stdout.read())
                pass
            lock.set()
        else:
            if self.as_job:
                if self.verbose:
                    #print(stdout_handler.read())
                    pass
                stdout_handler.close()
                os.remove(std_out)

    def execute(self,function,*args):
        @Async
        def _exec(function,*args):
            result = None
            lock = Event()

            # deploy the backend
            t = Thread(target=self.__deployBackend, args=(function,lock))
            t.start()
            print("waiting for backend")
            lock.wait()
            if self.job is not None:
                print("starting frontend")
                try:
                    if self.as_job:
                        backend_addr = self.job["batch_host"]
                    else:
                        backend_addr = self.job["node_list"]

                    print("Worker running at %s : %d " % (backend_addr,self.worker_port))
                    #log = FileLogger(sys.stdout,[ERROR])

                    frontend = FrontEnd([backend_addr],default_port=self.worker_port, codec=bupacl.orch.base.AVROCodec)

                    retry = 0
                    retry_time = 1
                    while retry<5:
                        # getting a handle for the function to work with
                        fn = None
                        try:
                            fn = function
                        except Exception as e:
                            print("Error getting the function to call: %s" % function)
                            print(e)
                            break
                        # unwrap the function if it is wrapped in an AsyncCall
                        if isinstance(fn,ThreadAsyncMethod) or isinstance(fn,ProcessAsyncMethod):
                            fn = fn.Callable
                        # serialize the function
                        code_string = dill.dumps(fn)
                        # build the function_calller argument to pass the function and the args
                        func_handler = {"func": code_string, "name": fn.__name__, "args": args }
                        try:
                            h = frontend.work('function_caller',func_handler)
                            self.result = h
                            break;
                        except BackendNotAvailableError:
                            time.sleep(retry_time)
                            retry=retry+1
                        except Exception as e:
                            print("Frontend received exception from backend: ",e)
                            self.result = e
                            break;

                    print("sending stop signal to backend %s" % backend_addr)
                    frontend.send_stop(backend_addr)
                    t.join()
                    return self.result
                except Exception as e:
                    print("exception when sending function to backend: ", e)
                    # capture the stacktrace
                    st = io.StringIO()
                    traceback.print_exc(file=st)
                    st.seek(0)
                    st_str = st.read()
                    self.result = "%s\n%s" % (e, st_str )
                    print("error read from backend:",st_str)
                    # cancel the job
                    job_id = self.job_id
                    subprocess.check_output([self.SCANCEL_BIN,"%d" % job_id])
                    return self.result
            else:
                self.result = self.exception
                return self.result

        self._exec_hdl = _exec(function,*args)

        return self._exec_hdl

In [4]:
%%deploy /orch/loggers/__init__.py
import logging
from logging import warning, info
from logging.config import fileConfig
import sys

class NullLogger:
    def __init__(self):
        pass
    def info(*args,**kwargs):
        pass
    def warning(*args,**kwargs):
        pass
    def error(*args,**kwargs):
        pass
    def critical(*args,**kwargs):
        pass
    def debug(*args,**kwargs):
        pass
    
class BasicLogger:
    def __init__(self, name):
        self.name = name
        logging.basicConfig(format='%(asctime)s | %(levelname)s : %(message)s',level=logging.INFO, stream=sys.stdout)
        self.logger = logging.getLogger(self.name)
        
    def info(self,*args,**kwargs):
        self.logger.info(*args,**kwargs)
    def critical(self,*args,**kwargs):
        self.logger.critical(*args,**kwargs)
    def warning(self,*args,**kwargs):
        self.logger.warning(*args,**kwargs)
    def error(self,*args,**kwargs):
        self.logger.error(*args,**kwargs)
    def debug(self,*args,**kwargs):
        self.logger.debug(*args,**kwargs)

class FileLogger:
    def __init__(self, name, config_file="etc/logging.ini", logfile="logs/orchestrator.log"):
        self.name = name
        fileConfig(config_file)
        self.logger = logging.getLogger(self.name)

    def info(self,*args,**kwargs):
        self.logger.info(*args,**kwargs)
    def critical(self,*args,**kwargs):
        self.logger.critical(*args,**kwargs)
    def warning(self,*args,**kwargs):
        self.logger.warning(*args,**kwargs)
    def error(self,*args,**kwargs):
        self.logger.error(*args,**kwargs)
    def debug(self,*args,**kwargs):
        self.logger.debug(*args,**kwargs)

In [5]:
%%deploy /orch/base/PersistentDict.py
import shelve
import os
from multiprocessing import Lock

class PersistentDict():

    def _openDict(self):
        mode = "w"
        if not os.path.exists(self.storage_file):
            mode = "c"

        h = shelve.open(self.storage_file, flag=mode)

        return h

    def __init__(self, storage_file="./.persistentdict/storage.dbm"):
        self.storage_file = storage_file

        if not os.path.exists(os.path.dirname(storage_file)):
            os.makedirs(os.path.dirname(storage_file))

        self.lock = Lock()

    def get(self, key):

        with self.lock:
            db = self._openDict()
            value = db[key]
            db.close()

        return value

    def mput(self,key,value, debug=False):

        with self.lock:
            db = self_openDict()
            if key in db:
                tval = db[key]
                if isinstance(tval,list):
                    if debug:
                        print("key %s has already multiple values %s. adding new value" % (key, tval))
                    if value not in tval:
                        tval.append(value)
                        db[key] = tval
                else:
                    if debug:
                        print("key %s is transformed in multiple values %s" % (key, tval))

                    if tval != value:
                        nval = [ tval, value ]
                        db[key] = nval

            else:
                db[key] = value

            db.close()

        return

    def put(self,key,value):
        with self.lock:
            db = self._openDict()
            db[key] = value
            db.close()

    def exists(self, key):
        found = False
        with self.lock:
            db = self._openDict()
            found = key in db
            db.close()

        return found


    def keys(self):
        keys = []
        with self.lock:
            db = self._openDict()
            keys = db.keys()
            db.close()

        return keys

    def updateFromTable(self,table,key=None,value=None, allow_multiple_values = False, debug=False):
        for idx,row in table.iterrows():
            t_key   = row[key]
            if t_key.strip()=="":
                continue
            t_value = row[value]
            if allow_multiple_values:
                self.mput(t_key,t_value, debug=debug)
            else:
                self.put(t_key,t_value)

In [6]:
%%deploy /orch/base/AbstractApiService.py

from flask import Flask, render_template, request, jsonify
from threading import Thread, Condition
from werkzeug.serving import make_server

from ..loggers import NullLogger

class AbstractApiService(Thread):
    def __init__(self,api_name, bind_addr="127.0.0.1", bind_port=8010, templates=None):
        Thread.__init__(self)
        self.api_name  = api_name
        self.bind_addr = bind_addr
        self.bind_port = bind_port

        self.api       = Flask(api_name, template_folder=templates)
        self.srv       = None
        self.logger    = NullLogger()
        self.running   = False

        self.cv        = None
        
    def __del__(self):
        self.stopService()
    
    def isRunning(self):
        return self.running
        
    def setLogger(self, logger):        
        self.logger = logger

    def addRule(self, url, name, callback, **kwargs):
        self.api.add_url_rule(url, name, callback,**kwargs)

    def wait(self):
        self.cv = Condition()
        with self.cv:
            self.cv.wait()
            return True
    
    def release(self):
        if self.cv is not None:
            with self.cv:
                self.cv.notifyAll()
    
    def stopService(self):
        if self.srv is not None:
            self.srv.shutdown()

        # wait for thread to finish
        self.join()
        
        self.logger.info("%s : stopped" % self.api_name)
        self.running = False
        
    def run(self):
        self.logger.info("%s : starting" % self.api_name)
        self.srv  = make_server(self.bind_addr, self.bind_port, self.api)
        self.ctx  = self.api.app_context()
        self.running = True
        self.srv.serve_forever()
        self.logger.info("%s : finishing" % self.api_name)

In [7]:
%%deploy /orch/base/db/DataBaseBackend.py
# DataBaseBackend 

import time
import sqlalchemy as sal
from sqlalchemy import create_engine, and_
from sqlalchemy.sql import func
from sqlalchemy.sql import text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import defer
from sqlalchemy.orm import undefer


class DataBaseBackend(object):
        
    def __init__(self, conn_str):
        try:
            self.engine = sal.create_engine(conn_str)
        except Exception as e:
            raise e
            
        session_factory = sessionmaker(self.engine,expire_on_commit=False)
        self.session = scoped_session(session_factory)
            
    def getEngine(self):
        return self.engine
    
    def existTable(self,table_name):
        engine = self.getEngine()
        ins = sal.inspect(engine)
        if not ins.has_table(table_name):
            return False
        return True
    
    def getTable(self,table_name):
        if self.existTable(table_name):
            engine = self.getEngine()            
            metadata = sal.MetaData(engine)
            table = sal.Table(table_name, metadata, autoload=True, autoload_with=engine)
        
            return table
        raise TableDoesNotExist(table_name)
        
    def initialize(self, p_object):
        if not self.existTable(p_object.__tablename__):
            engine = self.getEngine()
            try:
                # Implement the creation
                p_object.metadata.create_all(engine)
            except Exception as e:
                print(e)
                return False
            return True
        
        return True
    
    def query(self, query):
        try:
            statement = text(query)
            rs = self.session.execute(statement)
            return rs
            
        except Exception as e:
            print("error in query: %s" % query)
            raise e
    
    def getObjects(self, p_obj, *args, defer_cols=[], **kwargs):
        
        sess = self.session()
        rs = None
        try:
            if len(kwargs)>0 or len(args)>0:
                if len(defer_cols)>0:
                    defer_lst = list([defer(x) for x in defer_cols])                    
                    if len(kwargs)>0:
                        rs = sess.query(p_obj).filter_by(**kwargs).options(*defer_lst).all()
                    else:
                        rs = sess.query(p_obj).filter(*args).options(*defer_lst).all()
                else:
                    if len(kwargs)>0:
                        rs = sess.query(p_obj).filter_by(**kwargs).all()
                    else:
                        rs = sess.query(p_obj).filter(*args).all()
            else:
                if len(defer_cols)>0:
                    defer_lst = list([defer(x) for x in defer_cols])
                    rs = sess.query(p_obj).options(*defer_lst).all()
                else:
                    rs = sess.query(p_obj).all()
        except Exception as e:
            raise e
        finally:
            if len(defer_cols)==0:
                self.session.remove()
        
        return rs
        
    def refreshObject(self, p_obj):
        sess = self.session()
        try:
            sess.expire(p_obj)
            sess.refresh(p_obj)            
        except Exception as e:
            raise e        
        finally:
            self.session.remove()
            
        return True
    
    def commit(self):
        sess = self.session()
        try:
            sess.commit()
        except Exception as e:
            raise e
        finally:
            self.session.remove()
        return True
    
    def updateObjects(self, p_obj, *args, **kw_args):
        sess = self.session()
        try:
            rs = sess.query(p_obj).filter(*args).update(kw_args)
            sess.commit()
        except Exception as e:
            raise e
        finally:
            self.session.remove()
            
        return rs
    
    def saveObject(self, p_obj):
        sess = self.session()
        try:
            sess.add(p_obj)
            e = None
            done=False
            for r in range(0,10):
                try:
                    sess.commit()
                    done=True
                    break
                except Exception as e:
                    print("retry commit")
                    time.sleep(5)
            if not done:
                raise RuntimeError("could not save object to database:",e)
                
        except Exception as e:
            raise e
        finally:    
            self.session.remove()
    
        return True

    def destroyObject(self, p_obj):
        sess = self.session()
        try:
            sess.delete(p_obj)
            sess.commit()
        except Exception as e:
            raise e
        finally:    
            self.session.remove()

        return True

In [8]:
%%deploy /orch/base/ActionListener.py

class ActionListener(object):    
    def actionPerformed(self, evt):
        pass


In [9]:
%%deploy /orch/base/ActionEvent.py

class ActionEvent(object):
    def __init__(self, *args, **kw_args):
        self.source = None
    
    def setSource(self, src):
        self.source = src
        
    def getSource(self):
        return self.source
        
    def __repr__(self):
        return "%s[source=%s]" % (type(self).__name__, type(self.source).__name__)


In [10]:
%%deploy /orch/base/Observable.py

from . import ActionListener

class Observable(object):
    def __init__(self,*args, **kw_args):
        self.listeners = []
        
    def addActionListener(self,al):
        if issubclass(type(al), ActionListener):
            if not hasattr(self,"listeners"):
                self.listeners = []
            self.listeners.append(al)
            return self
        raise RuntimeError("%s must inhnerit from ActionListener" % type(al).__name__)

    def actionPerformed(self, evt):
        evt.setSource(self)
        if not hasattr(self,"listeners"):
            self.listeners = []
            
        for al in self.listeners:
            al.actionPerformed(evt)
        

In [11]:
%%deploy /orch/base/SerializedObject.py
class SerializedObject(object):
    _data =  {}
    def __init__(self,data={}):
        self._data = data

    def __del__(self):
        del self._data

    def get(self,key):
        return self._data[key]


In [12]:
%%deploy /orch/base/AVROCodec.py
import select 
import os 
import re 
import jsonpickle as jp
from .SerializedObject import SerializedObject

# CLEAN THIS CLASS FROM SCHEMA RELATED TO ASTRONOMY

class AVROCodec(object):
 
    """
    AVRO codec using jsonpickle for encoding. 
    """

    hdu_schema = '{"namespace": "example.avro",\
        "type": "record",\
        "name": "FITS-HDU",\
        "fields": [\
            {"name": "image"       ,"type": "bytes"},\
            {"name": "header"      ,"type": "bytes"}\
        ]\
    }'
    
    @staticmethod
    def encode(obj):
        #print("encoding ",obj, type(obj))

        if isinstance(obj,bool):
            #print("encoding bool")
            s = SerializedObject({"type":type(obj), "value": str(obj) })
            return jp.dumps(s)

        if isinstance(obj,str):
            #print("encoding string")
            s = SerializedObject({"type":type(obj), "value": obj })
            return jp.dumps(s)

        if isinstance(obj,int):
            #print("encoding int")
            s = SerializedObject({"type":type(obj), "value": str(obj) })
            return jp.dumps(s)

        if isinstance(obj, (list, tuple) ):
            #print("encoding list")
            l = [AVROCodec.encode(o) for o in obj]
            return jp.dumps(l)

        #print("encoding obj ",obj, type(obj))
        #s = time.time()
        l = jp.dumps(obj)
        #e = time.time()
        #print("done encoding %s" % (e-s))
        return l

    @staticmethod
    def decode(enc):
        obj = None
        try:
            obj = jp.loads(enc)
        except Exception as e:
            print(e)

        if isinstance(obj, (list, tuple)):
            #print("decoding list")
            return [ AVROCodec.decode(o) for o in obj ]

        if isinstance(obj,SerializedObject):
            #print("decoding serializedObject")
            o = obj.get("type")()

            if isinstance(o,bool):
                return obj.get("value")

            if isinstance(o,int):
                return int(obj.get("value"))

            if isinstance(o,str):
                return str(obj.get("value"))

        return obj



In [13]:
%%deploy /orch/base/LocalJob.py
from .AbstractJob import AbstractJob

from threading import Thread, Event
from promise import Promise
import traceback
import tempfile
import dill

from ._async import *

class LocalJob(AbstractJob):
    def __init__(self, params={}):
        super(LocalJob,self).__init__(params)

    def execute(self, function, *args):

        #import psutil
        #max_cores = psutil.cpu_count()

        @ProcessAsync
        def _exec(f, *args):
            self.result = f(*args)
            return self.result

        fn = None
        try:
            fn = function
        except Exception as e:
            print("Error getting the function to call: %s" % function)
            print(e)
            raise(e)

        # unwrap the function if it is wrapped in an AsyncCall
        if isinstance(fn,ThreadAsyncMethod) or isinstance(fn,ProcessAsyncMethod):
            fn = fn.Callable

        self._exec_hdl = _exec(fn, *args)
        return self._exec_hdl
    


In [14]:
%%deploy /orch/base/_async.py
# Async decorator
# with threads and processes
#
import multiprocessing as mp
import jsonpickle
import ctypes
import random 
import string 
from .argument import Argument
import psutil
import signal
import threading
import os

import sys, traceback

from promise import Promise

class TimeoutError(RuntimeError):
    pass

class ProcessAsyncCall(object):

    pool_cnt = {}

    def __init__(self, fnc, pool_size, callback = None):
        
        self.p_id      = ''.join([random.choice(string.ascii_letters + string.digits) for n in range(32)]) 

        self.single    = False

        if pool_size is None:
            self.single    = True
            self.pool_size = 1
        else:
            self.pool_size = pool_size

        self.Callable  = fnc
        self.Callback  = callback
        self.Result    = None
        self.Manager = mp.Manager()
        self.pool      = None

        self.parent_pid = os.getpid()

    def cancel(self, sig=signal.SIGTERM):
        try:
            parent = psutil.Process(self.parent_pid)
        except psutil.NoSuchProcess:
              return
        children = parent.children(recursive=True)
        for process in children:
            process.send_signal(sig)
        
    def getPool(self):
        if self.Callable.__name__ not in self.pool_cnt:
            if self.Manager is None:
                self.Manager = mp.Manager()
            self.pool_cnt[self.Callable.__name__] = {"queue": self.Manager.list()  ,"running" : self.Manager.list() }
        return self.pool_cnt[self.Callable.__name__]

    def __call__(self, *args, **kwargs):
        self.Result  = self.Manager.Value(ctypes.c_wchar,'')
 
        if self.single :
            self.proc = mp.Process(target = self.run, args = args, kwargs = kwargs)
            #self.proc.daemon = True
            self.proc.start()
        else:
            if self.Callable.__name__ not in self.pool_cnt:
            #   print("creating pool record",self.pool_size)
            	self.pool_cnt[self.Callable.__name__] = {"queue": self.Manager.list()  ,"running" : self.Manager.list() }

            pool = self.getPool()
        
            self.proc = mp.Process(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)

            self.lock = self.Manager.Lock()

            if len(pool["running"]) < self.pool_size:
                #print("running",self.p_id)
                pool["running"].append(self.p_id)
                #print(len(pool["queue"]),len(pool["running"]))

                self.proc.start()

            else:
                #print("queueing ",self.p_id)
                self.lock.acquire()
                pool["queue"].append(self.lock) 
                #print(len(pool["queue"]),len(pool["running"]))

                def start_proc(proc):
                    #print("start proc adquire", self.p_id)
                    try:
                        proc.lock.acquire()
                        proc.lock.release()
                    except Exception as e:
                        print("error trying to acquire lock.",e)

                    #print("start proc released", self.p_id)
                    pool = proc.getPool()
                    pool["running"].append(self.p_id)
                    proc.proc.start()
                    return

                t = threading.Thread(target=lambda p: start_proc(p) , args=(self,) )
                t.start()
                t.join()

        return self
    
    def then(self, fn):
        response = self.get()
        return fn(*response)

    def wait(self, timeout = None):        
        if self.proc.is_alive():
            self.proc.join(timeout)
            if self.proc.is_alive():
                raise TimeoutError()
        return True

    def get(self, default = None):
        if self.Result.value == '':
            self.wait()
       
        if self.proc.is_alive():
            self.proc.join()
            
        try:
            self.proc.terminate()
        except:
            pass
        
        self.proc.close()

        ret = default
        try:
            ret = jsonpickle.decode(self.Result.value)
        except Exception as e:
            print("error decoding result value",e)
            pass

        self.Result = None
        if self.single:
            self.Manager.shutdown()
            self.Manager = None
        else:
            pool = self.getPool()
            if len(pool["running"])==0 and self.Manager is not None and len(pool["queue"])==0:
                #print("shutting down manager")
                # kill the manager when all running processes are done
                self.Manager.shutdown()
                self.Manager = None
                del self.pool_cnt[self.Callable.__name__] 

        return ret

    def run(self, *args, **kwargs):
        try:
            result = self.Callable(*args, **kwargs)
            try: 
                packed_result = jsonpickle.encode(result)
                self.Result.value = "%s" % packed_result
            except Exception as e:
                print("error calling function:",e)
                try:
                    self.lock.release()
                except:
                    pass
                raise e
                
            if self.Callback:
                self.Callback(self.Result)
        except Exception as e:
            print(e, args, kwargs)
            try:
                self.lock.release()
            except:
                pass
            raise e

        try:
            self.lock.release()
            self.lock = None
        except:
            pass

        if not self.single:
            pool = self.getPool()
            #print("removing ",self.p_id)
            pool["running"].remove(self.p_id) 
            #print(len(pool["queue"]),len(pool["running"]))

            pool_len = len(pool["running"])
            if pool_len < self.pool_size:
                if len(pool["queue"])>0:
                    proc_avai = self.pool_size - pool_len
                    #print("releasing %d processes" % proc_avai)
                    for i in range(0,proc_avai):
                        try:
                            l = pool["queue"].pop(0)
                            l.release()
                        except Exception as e:
                            print("error releasing the lock in dequeue",e)

        return
    
class ThreadAsyncCall(object):
    def __init__(self, fnc, callback = None):
        self.Callable = fnc
        self.Callback = callback
        self.Result   = None
        self.Thread   = None
        
    def __call__(self, *args, **kwargs):
        self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
        self.Thread.start()
        return self

    def wait(self, timeout = None):
        self.Thread.join(timeout)
        
        if self.Thread.is_alive():
            raise TimeoutError()
        else:
            return self.Result

    def get(self):
        if self.Result is None:
            self.wait()

        self.Thread.join()
        del self.Thread

        while isinstance(self.Result, (ThreadAsyncCall, ProcessAsyncCall, Argument)):
            self.Result = self.Result.get()
           
        if isinstance(self.Result, RuntimeError):
            raise(self.Result)
 
        return self.Result

    def run(self, *args, **kwargs):
        try:
            self.Result = self.Callable(*args, **kwargs)
            if self.Callback:
                self.Callback(self.Result)
        except Exception as e:
            print(e, args, kwargs)
            traceback.print_exc(file=sys.stdout)

class ThreadAsyncMethod(object):
    def __init__(self, fnc, callback=None):
        self.Callable = fnc 
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        return ThreadAsyncCall(self.Callable, self.Callback)(*args, **kwargs)

class ProcessAsyncMethod(object):
    def __init__(self, fnc, callback=None, pool_size = 1):
        self.pool_size = pool_size
        self.Callable  = fnc 
        self.Callback  = callback

    def __call__(self, *args, **kwargs):
        return ProcessAsyncCall(self.Callable, self.pool_size, self.Callback)(*args, **kwargs)

def ProcessAsync(arg = None, callback = None):
    if isinstance(arg, int):
        def ProcessAsyncWrapper(fnc = None,  callback = None):
            if fnc == None:
                def AddAsyncCallback(fnc):
                    return ProcessAsyncMethod(fnc, callback, arg)
                return AddAsyncCallback
            else:
                return ProcessAsyncMethod(fnc, callback, arg)
        return ProcessAsyncWrapper
    else:
        return ProcessAsyncMethod(arg, callback, None)

def Async(fnc = None, callback = None):
    if fnc == None:
        def AddAsyncCallback(fnc):
            return ThreadAsyncMethod(fnc, callback)
        return AddAsyncCallback
    else:
        return ThreadAsyncMethod(fnc, callback)

class AsyncDummyMethod(object):
    def __init__(self, function, result):
        self.function = function
        self.result = result

    def __call__(self, *args, **kwargs):
        self.result = self.function(*args, **kwargs)
        return self

    def wait(self):
        return self.result

    def get(self):
        return self.result

def AsyncDummy(fnc = None, callback = None):
    if fnc == None:
        def AddAsyncDummyCallback(fnc):
            return AsyncDummyMethod(fnc, callback)
        return AddAsyncDummyCallback
    else:
        return AsyncDummyMethod(fnc, callback)

In [15]:
%%deploy /orch/base/argument.py
import pickle
import os
import tempfile
import jsonpickle as jp
import dill

class Argument:
    def __init__(self, value, persistent = False, pkl_path = "./tmp"):
        self.value = value
        self.persistent = persistent
        self.got   = 0
        self.pkl_path = pkl_path

        if not os.path.exists(self.pkl_path):
            os.makedirs(self.pkl_path)

        self.pkl_file = tempfile.NamedTemporaryFile(dir=self.pkl_path,delete=False).name
        #pickle.dump(jp.dumps(self.value), )
        with open(self.pkl_file, 'wb') as fd:
            dill.dump(self.value,fd)
            
        self.value = None

    def __del__(self):
        if not self.persistent and os.path.exists(self.pkl_file) and self.got:
            os.remove(self.pkl_file)

    def type(self):
        if self.value is not None:
            return type(self.value)
        return None

    def destroy(self):
         if os.path.exists(self.pkl_file):
            os.remove(self.pkl_file)

    def saveAs(self, outfile):
        from shutil import copyfile
        copyfile(self.pkl_file, outfile)
        
    def get(self):
        with open(self.pkl_file,'rb') as fd:
            self.value = dill.load(fd) 
    
        self.got = 1
        return self.value

In [16]:
%%deploy /orch/base/worker.py
from . import AVROCodec
import subprocess
import time
import os
import sys
import socket
import importlib
import codecs

import select
import os
import re
import jsonpickle as jp

from taskit.backend import BackEnd, ADMIN_TASKS
from taskit.log import FileLogger, DEBUG, INFO, ERROR

from promise import Promise
from contextlib import closing

def function_caller(arg):
    import dill, types, traceback, sys

    try:
        func_name = arg["name"]
        f_args = arg["args"]
        func = dill.loads(arg["func"])
        r = func(*f_args)
        return r
    except Exception as e:
        print("exception raised when calling function by the worker",e)
        traceback.print_exc(file=sys.stdout)
        return e
    return None

def error_maker():
    assert False, 'Why ever did you call this!?'

class Worker():
    call = None

    def __init__(self):
        #print("Worker constructor")
        pass

    def __del__(self):
        #print("Worker destructor") 
        pass

    def get_free_port(self):
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
            s.bind(('', 0))
            return s.getsockname()[1]

    def run(self):

        my_host = os.uname()[1]
        tasks = {'function_caller': function_caller, 'get-error': error_maker }
        tasks.update(ADMIN_TASKS)

        port = self.get_free_port()
        job_id = os.getenv("SLURM_JOB_ID")
        step = os.getenv("SLURM_STEP_ID")
        if step == None:
            step = 0

        #home_pwd=os.environ["HOME"]
        #f = open("%s/tmp/backend-%s.%s" % (home_pwd, job_id,step), "w+")
        #log = FileLogger(f,[DEBUG, INFO, ERROR])

        #backend = BackEnd(tasks,host=my_host, port=port, codec=bupacl.orch.base.AVROCodec, tracebacks=True, logger=log)
        backend = BackEnd(tasks,host=my_host, port=port, codec=bupacl.orch.base.AVROCodec, tracebacks=True)

        print("JOBID: %s STEP: %s PORT: %d HOST: %s" % (os.getenv("SLURM_JOB_ID"),step,port,os.getenv("SLURMD_NODENAME")),flush=True)

        backend.main()

        #print("Backend finished")

In [17]:
%%deploy /orch/base/__init__.py
name="orch"
version="1.2"
author="Juan Carlos Maureira"

from .ActionEvent import ActionEvent
from .ActionListener import ActionListener
from .Observable import Observable
from .SerializedObject import *
from ._async import *
from .worker import Worker
from .AbstractJob import AbstractJob
from .argument import Argument
from .AbstractApiService import AbstractApiService
from .AbstractApiClient import AbstractApiClient
from .PersistentDict import PersistentDict

from .slurm import SlurmController
from .slurm import *
from .LocalJob import *

# Decorators

def asJob(*f, **arg):  
    def submitJob(job_params, function, *args, **kwargs):
        if "name" not in job_params and "job-name" not in job_params:
            job_params["job-name"] = function.__name__

        job = Job(params = job_params)
        if "verbose" in job_params:
            job.setVerbose(job_params['verbose'])
        else:
            job.setVerbose(False)

        if "output" in job_params:
            job.setJobLog(job_params['output'])

        job.asJob(True)
        ret = job.run(function, *args, **kwargs)
        return ret
    
    params = {"ntasks":"1","cpus-per-task":1, "mem": "4000M","nodes":1}
    
    if f != () and callable(*f):
        # call without arguments
        def wrapper(*args, **kwargs):
            return submitJob(params, f[0], *args, **kwargs)
        return wrapper
    else:
        def jobFunction(func):
            params = {"ntasks":"1"}
            if len(arg)>0 :
                params.update(arg)

            def wrapper(*args, **kwargs):
                return submitJob(params, func, *args, **kwargs)
            return wrapper
        return jobFunction 

def asLocalJob(*f, **arg):  
    def submitJob(job_params, function, *args, **kwargs):
        if "name" not in job_params and "job-name" not in job_params:
            job_params["job-name"] = function.__name__

        job = LocalJob(params = job_params)
        if "verbose" in job_params:
            job.setVerbose(job_params['verbose'])
        else:
            job.setVerbose(False)

        if "output" in job_params:
            job.setJobLog(job_params['output'])

        job.asJob(True)
        ret = job.run(function, *args, **kwargs)
        return ret
    
    params = {"ntasks":"1","cpus-per-task":1, "mem": "4000M","nodes":1}
    
    if f != () and callable(*f):
        # call without arguments
        def wrapper(*args, **kwargs):
            return submitJob(params, f[0], *args, **kwargs)
        return wrapper
    else:
        def jobFunction(func):
            params = {"ntasks":"1"}
            if len(arg)>0 :
                params.update(arg)

            def wrapper(*args, **kwargs):
                return submitJob(params, func, *args, **kwargs)
            return wrapper
        return jobFunction 

def asStep(*f, **arg):
    
    def submitJob(job_params, function, *args, **kwargs):
        job_params["job-name"] = function.__name__
        job = Job(params = job_params)
        job.setVerbose(True)
        job.setExclusive(False)
        ret = job.run(function, *args, **kwargs)
        return ret
    
    params = {"ntasks":"1","cpus-per-task":1, "mem": "4000M","nodes":1}
    
    if f != () and callable(*f):
        # call without arguments
        def wrapper(*args, **kwargs):
            return submitJob(params, f[0], *args, **kwargs)
        return wrapper
    else:
        def jobFunction(func):
            params = {"ntasks":"1","cpus-per-task":1, "mem": "4000M","nodes":1}
            if len(arg)>0 :
                params.update(arg)
            def wrapper(*args, **kwargs):
                return submitJob(params, func, *args, **kwargs)
            return wrapper
        return jobFunction
      
def asExclusiveStep(*f, **arg):
    def submitJob(job_params, function, *args, **kwargs):
        job_params["job-name"] = function.__name__
        job = Job(params = job_params)
        if "verbose" in job_params:
            job.setVerbose(job_params['verbose'])
        else:
            job.setVerbose(False)

        job.setExclusive(True)
        ret = job.run(function, *args, **kwargs)
        return ret
    
    params = {"ntasks":"1","cpus-per-task":1, "mem": "4000M","nodes":1}
    
    if f != () and callable(*f):
        # call without arguments
        def wrapper(*args, **kwargs):
            return submitJob(params, f[0], *args, **kwargs)
        return wrapper
    else:
        def jobFunction(func):
            params = {"ntasks":"1","cpus-per-task":1, "mem": "4000M","nodes":1}
            if len(arg)>0 :
                params.update(arg)
            def wrapper(*args, **kwargs):
                return submitJob(params, func, *args, **kwargs)
            return wrapper
        return jobFunction


In [18]:
%%deploy /orch/base/db/__init__.py

from .DataBaseBackend import *

In [19]:
%%deploy /orch/exceptions/__init__.py

# exceptions

class InitializeError(Exception):
    def __init__(self,table_name):
        super(InitializeError, self).__init__(table_name)

class TableDoesNotExist(Exception):
    def __init__(self,table_name):
        super(TableDoesNotExist, self).__init__(table_name)

class PipelineAlreadyRegistered(Exception):
    def __init__(self,label):
        super(PipelineAlreadyRegistered, self).__init__(label)

class PipelineNotRegistered(Exception):
    def __init__(self,label):
        super(PipelineNotRegistered, self).__init__(label)

class PipelineNotFound(Exception):
    def __init__(self,label):
        super(PipelineNotFound, self).__init__(label)
        
class MultiplePipelineFound(Exception):
    def __init__(self,label):
        super(MultiplePipelineFound, self).__init__(label)
        
class MultipleActivePipelineRegistered(Exception):
    def __init__(self,label):
        super(MultipleActivePipelineRegistered, self).__init__(label)
        
class NoActivePipelineRegistered(Exception):
    def __init__(self,label):
        super(NoActivePipelineRegistered, self).__init__(label)
        
class ImplementationIsNotAFunction(Exception):
    def __init__(self,name):
        super(ImplementationIsNotAFunction, self).__init__(name)

class MultipleExecutionIDFound(Exception):
    def __init__(self,uuid):
        super(MultipleExecutionIDFound, self).__init__(uuid)
        
class ExecutionIdNotFound(Exception):
    def __init__(self,uuid):
        super(ExecutionIdNotFound, self).__init__(uuid)

class APIResponseError(Exception):
    def __init__(self,code):
        super(APIResponseError, self).__init__(code)

class PipelineExecutionError(Exception):
    def __init__(self,e):
        super(PipelineExecutionError, self).__init__(e)

class PipelineSchedulingError(Exception):
    def __init__(self,e):
        super(PipelineSchedulingError, self).__init__(e)

class PipelineNotSavedInCatalog(Exception):
    def __init__(self,name):
        super(PipelineNotSavedInCatalog, self).__init__(name)

class EventInThePast(Exception):
    def __init__(self,name, time):
        super(EventInThePast, self).__init__("%s @ %s" % (name,time))        
        
class EventWithNoRecurrence(Exception):
    def __init__(self,name):
        super(EventWithNoRecurrence, self).__init__(name)

class MultipleScheduledEventFound(Exception):
    def __init__(self,uuid):
        super(MultipleScheduledEventFound, self).__init__(uuid) 
        
class NotificationNotRegistered(Exception):
    def __init__(self,label):
        super(NotificationNotRegistered, self).__init__(label)

class SubscriberNotRegistered(Exception):
    def __init__(self,label):
        super(SubscriberNotRegistered, self).__init__(label)

class NoSuchKeyInDictionary(Exception):
    def __init__(self,label):
        super(NoSuchKeyInDictionary, self).__init__(label)

class NoSuchDictionary(Exception):
    def __init__(self,label):
        super(NoSuchDictionary, self).__init__(label)


## PipelineManager Classes (server side)

In [20]:
%%deploy /orch/pipelinemanager/AbstractPipeline.py
import base64
import dill

# AbstractPipeline

class AbstractPipeline: 
    def __repr__(self):
        return "<Pipeline(id=%s, name='%s', owner='%s', version=%d, active='%s'>" % (
            str(self.id), self.name, self.owner_id, self.version, self.active
        )
                
    def getFunction(self):
        # include decryption
        return dill.loads(base64.b64decode(self.impl_fn))
        
    def setActive(self, state):
        self.active = state
        return True
    
    def isActive(self):
        return self.active
        
    def asJson(self):
        impl_fn_b64 = ""
        if self.impl_fn is not None:
            impl_fn_b64 = base64.b64encode(self.impl_fn).decode("utf8")
            
        return {
            "id"          : self.id,
            "name"        : self.name,
            "owner_id"    : self.owner_id,
            "creation"    : self.creation,
            "version"     : self.version,
            "tags"        : self.tags,
            "changed"     : self.changed,
            "active"      : self.active,
            "impl_fn"     : impl_fn_b64
        }
        
    
    # arguments are volatile. only used to set arguments for execution 
    def setArguments(self, args):
        self.args = args
        
    # keywords arguments are volatile. only used to set arguments for execution 
    def setKeywordArguments(self, kw_args):
        self.kw_args = kw_args
    
    def getArguments(self):
        if hasattr(self,"args"):
            return self.args
        return []
    
    def getKeywordArguments(self):
        if hasattr(self,"kw_args"):
            return self.kw_args
        return {}
    
    def addVariable(self, key, value):
        if not hasattr(self,"vars"):
            self.vars = {}
        self.vars[key]=value
    
    def getVariable(self, key):
        if hasattr(self,"vars"):
            if key in self.vars:
                return self.vars[key]

        return None
    
    def getVariables(self):
        if hasattr(self,"vars"):
            return self.vars
        else:
            return {}
    
    @classmethod
    def fromJson(cls, json):
        pipeline = cls()
        pipeline.id          = json["id"]
        pipeline.name        = json["name"]
        pipeline.owner_id    = json["owner_id"]
        pipeline.creation    = json["creation"]
        pipeline.version     = json["version"]
        pipeline.tags        = json["tags"]
        pipeline.changed     = json["changed"]
        pipeline.active      = json["active"]
        pipeline.impl_fn     = base64.b64decode(json["impl_fn"].encode("utf8"))
        
        return pipeline


In [21]:
%%deploy /orch/pipelinemanager/AbstractExecutor.py
import base64
import dill
import inspect

# AbstractExecutor

class AbstractExecutor(object):
    @classmethod
    
    def fromJson(cls, json):
        obj = cls()
        
        for k,v in json.items():
            setattr(obj, k, v)
        
        obj.handler     = None
        obj.job_handler = None    
        
        return obj

    def asJson(self):
        if self.end_ts is None:
            end_ts_str = "None"
        else:
            end_ts_str = self.end_ts.strftime("%m/%d/%Y %H:%M:%S")

        if self.exec_time is None:
            exec_time_sec = None
        else:
            exec_time_sec = self.exec_time.total_seconds()

        json_obj = {
            "id"            : self.id,
            "name"          : self.name,
            "version"       : self.version,
            "owner_id"      : self.owner_id,
            "uuid"          : self.uuid,
            "creation"      : self.creation.strftime("%m/%d/%Y %H:%M:%S"),
            "start_ts"      : self.start_ts.strftime("%m/%d/%Y %H:%M:%S"),
            "end_ts"        : end_ts_str,
            "exec_time"     : exec_time_sec,
            "state"         : self.state
        }
        return json_obj

    def getOutput(self):
        if self.output is not None:
            return base64.b64decode(self.output).decode("utf8")
        return None

    def getReturnValue(self):
        if self.pipeline_ret is not None:
            return dill.loads(base64.b64decode(self.pipeline_ret))
        return None
    
    def getErrors(self):
        if self.error is not None:
            return base64.b64decode(self.error).decode("utf8")
        return None
    
    def getArguments(self):
        return dill.loads(base64.b64decode(self.pipeline_args.encode("utf8")))
    
    def getFunction(self):
        # return the function as a function handler
        return dill.loads(base64.b64decode(self.pipeline_fn.encode("utf8")))

    def isPreparing(self):
        return self.state == 1 or self.state==2
    
    def isRunning(self):
        return self.state ==3
        
    def isDone(self):
        return self.state == 4 or self.state == 5

    def isSuccessful(self):
        return self.state == 4
    
    def isFailed(self):
        return self.state == 5
    
    def getExecutionId(self):
        # TODO: assert for empty uuid         
        return self.uuid
    
    def __repr__(self):
        return "<Execution[id=%d, when=%s, exec_time=%s, uuid=%s, state=%d, pipeline_name=%s, version=%d]>" % (self.id, self.creation, self.exec_time, self.uuid, self.state, self.name, self.version)

In [22]:
%%deploy /orch/pipelinemanager/Pipeline.py
import sqlalchemy as sal
from sqlalchemy import create_engine, and_
from sqlalchemy.sql import func
import base64
import dill

# Pipeline
from . import AbstractPipeline
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

class Pipeline(AbstractPipeline,Base):
    __tablename__ = 'pipelines'
    
    id         = sal.Column('id', sal.Integer, primary_key=True, nullable=False)
    name       = sal.Column('name', sal.String)
    owner_id   = sal.Column('owner_id', sal.String)
    creation   = sal.Column('creation', sal.DateTime(timezone=True),server_default=func.now())
    version    = sal.Column('version', sal.Integer)
    tags       = sal.Column('tags', sal.String)
    changed    = sal.Column('changed', sal.DateTime(timezone=True),onupdate=func.now())
    active     = sal.Column('active', sal.Boolean)
    impl_fn    = sal.Column('impl_fn', sal.TEXT)
    
    catalog    = None
    manager    = None

    def setActive(self, state):
        self.active = state
        if self.catalog is not None:
            self.catalog.deactivateAll(self.name)
            if not self.catalog.saveObject(self):
                return False
        
        return True

    def getExecutions(self,**kw_args):
        if self.manager is not None:
            return self.manager.get_execution_list(self.name, **kw_args)
        return []


In [23]:
%%deploy /orch/pipelinemanager/PipelineCatalog.py

# PipelineCatalog    
import base64
import dill
import pandas as pd

from sqlalchemy.sql import func

from ..base.db import DataBaseBackend
from ..exceptions import InitializeError, PipelineAlreadyRegistered, PipelineNotFound, PipelineNotSavedInCatalog
from . import Pipeline

class PipelineCatalog(DataBaseBackend):

    def __init__(self, manager, db_conn_str = "sqlite:///orchestrator.sqlite"):
        
        super().__init__(db_conn_str)
        
        self.manager = manager
        
        if not self.initialize(Pipeline):
            raise InitializeError(Pipeline.__tablename__)
            
    def isRegistered(self,name,**kw_args):
        result = self.getObjects(Pipeline, name=name,**kw_args)
        if len(result)>0:
            return True
        return False

    def register(self, name, owner_id, pipeline_fn, tags=[], new_version = False ):
        if not new_version:
            if not self.isRegistered(name):
            
                #TODO: encrypt serialization with owner_key
                pipeline_serialized = base64.b64encode(dill.dumps(pipeline_fn))

                tags_str = ",".join(tags)

                new_pipeline = Pipeline(
                    name          = name,
                    owner_id      = owner_id,
                    version       = 1,
                    tags          = tags_str,
                    active        = False,
                    impl_fn       = pipeline_serialized
                )                
                if self.saveObject(new_pipeline):
                    saved_pipeline = self.getObjects(Pipeline,
                        name      = name, 
                        owner_id  = owner_id,
                        version   = 1,
                        tags      = tags_str,
                        active    = False
                    )
                    return saved_pipeline
                else:
                    raise PipelineNotSavedInCatalog(name)
            raise PipelineAlreadyRegistered(name)   
            
        else:
            if self.isRegistered(name):
                df = pd.read_sql("SELECT max(version) as version FROM pipelines where name=='%s'"% name, con=self.getEngine())
                
                cur_version = int(df.version.values[0])
                new_version = cur_version+1
            
                #TODO: encrypt serialization with owner_key
                pipeline_serialized = base64.b64encode(dill.dumps(pipeline_fn))

                tags_str = ",".join(tags)

                new_pipeline = Pipeline(
                    name          = name,
                    owner_id      = owner_id,
                    version       = new_version,
                    tags          = tags_str,
                    active        = False,
                    impl_fn       = pipeline_serialized
                )

                if self.saveObject(new_pipeline):
                    saved_pipeline = self.getObjects(Pipeline,
                        name      = name, 
                        owner_id  = owner_id,
                        version   = new_version,
                        tags      = tags_str,
                        active    = False
                    )

                    return saved_pipeline[0]
                else:
                    raise PipelineNotSavedInCatalog(name)
    
            raise PipelineAlreadyRegistered(name)   
        
    def get(self, *args, **kwargs):
        results = self.getObjects(Pipeline,*args,**kwargs)

        # assign for each object the manager where they come from
        for i in range(0,len(results)):
            results[i].catalog = self
            results[i].manager = self.manager
            
        if len(results)==0:        
            raise PipelineNotFound("%s" % kwargs)    
        return results
    
    def deactivateAll(self, name):
        return self.updateObjects(Pipeline, Pipeline.name == name, active = False )>0


In [24]:
%%deploy /orch/pipelinemanager/Executor.py
# Executor
import sys
import io
from io import StringIO
from uuid import uuid4
import base64
import dill
import jsonpickle
import inspect
import time
from datetime import date, datetime, timezone
from tzlocal import get_localzone 
import traceback
import re

import sqlalchemy as sal
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func

from ..exceptions import ImplementationIsNotAFunction, PipelineExecutionError
from ..base import Argument, Async, asJob
from ..base import Observable
from ..orchestrator import OrchestratorAccess

from . import AbstractExecutor
from .Events import *
from .executePipelineAsJob import *

Base = declarative_base()

class Executor(AbstractExecutor, Base, Observable):
    
    __tablename__ = 'executions'
    
    id             = sal.Column('id', sal.Integer, primary_key=True, nullable=False)
    name           = sal.Column('name', sal.String)
    version        = sal.Column('version', sal.Integer)
    owner_id       = sal.Column('owner_id', sal.String)
    uuid           = sal.Column('uuid', sal.String)
    sch_evt_uuid   = sal.Column('sch_evt_uuid', sal.String)
    creation       = sal.Column('creation', sal.DateTime(timezone=True),server_default=func.now())
    start_ts       = sal.Column('start_ts', sal.DateTime(timezone=True))
    end_ts         = sal.Column('end_ts', sal.DateTime(timezone=True))
    exec_time      = sal.Column('exec_time', sal.Interval(second_precision=3))
    state          = sal.Column('state', sal.Integer)    
    pipeline_fn    = sal.Column('pipeline_fn', sal.TEXT)
    pipeline_args  = sal.Column('pipeline_args', sal.TEXT)
    pipeline_ret   = sal.Column('pipeline_ret', sal.TEXT)
    output         = sal.Column('output', sal.TEXT)
    error          = sal.Column('error', sal.TEXT)
    
    def __init__(self, em, pipeline, local=True, cores=1, partition=None, memory=None):
        
        Observable.__init__(self)
        # non persistent attributes
        self.em          = em
        self.pipeline    = pipeline
        self.handler     = None
        self.job_handler = None
        self.cores       = cores
        self.local_job   = local
        self.partition   = partition
        self.memory      = memory
        
        # persistent attributes
        self.name        = pipeline.name
        self.version     = pipeline.version
        self.owner_id    = pipeline.owner_id
        self.creation    = datetime.now(timezone.utc).astimezone(get_localzone())
        self.pipeline_fn = pipeline.impl_fn
        self.uuid        = str(uuid4())
        self.state       = 1  # created
        
        if hasattr(pipeline,"scheduled_event_uuid"):
            self.sch_evt_uuid = pipeline.scheduled_event_uuid
        
        if not self.em.saveObject(self):
            print("error saving executor")

    def __not_persistent_init__(self,em):
        Observable.__init__(self)

        self.em          = em
        self.handler     = None
        self.job_handler = None
            
    def setLocalJob(self, b):
        self.local_job = b

    def setCores(self, cores):
        self.cores = cores

    def refresh(self):
        self.em.refreshObject(self)        
    
    def run(self, *args, **kwargs):
        name = self.pipeline.name
       
        self.pipeline.setArguments(args)
        self.pipeline.setKeywordArguments(kwargs)
        self.pipeline.report = None

        orch_access = OrchestratorAccess(self.em.owner.owner, self.pipeline)
        
        pipeline_fn = self.pipeline.getFunction()

        pipeline_args = {
            'args': args, 
            'kwargs': kwargs
        }
       
        self.pipeline_args = base64.b64encode(dill.dumps(pipeline_args))
        self.state         = 2  # initialized        
        
        if not self.em.saveObject(self):
            print("error saving executor")
        
        # add this executor to the executor manager active list
        self.em.active[self.uuid] = self 

        if inspect.isfunction(pipeline_fn):
            
            result = None
            
            output = StringIO()
            error  = StringIO()
            
            def oprint(*args):
                print(*args, file=output)
                
            def eprint(*args):
                print(*args, file=error)

            # trigger execution start event
            self.actionPerformed(ExecutionStarted(self.pipeline))
            
            start_ts = datetime.now(timezone.utc).astimezone(get_localzone())
            oprint("Pipeline Execution")
            oprint("pipeline          : %s" % name)
            oprint("pipeline version  : %d" % self.pipeline.version)
            oprint("cores             : %d" % self.cores)
            oprint("start time        : ",start_ts)
            oprint("arguments         : %s %s" % (str(args), str(kwargs)))

            # execute the pipeline as a process in order to wait for the result 
            # or eventually cancel the execution
            
            exec_info = orch_access.exec_info
            
            @Async
            def execute_pipeline():

                # execution process which trigger the pipeline as job 
                # and wait for it to finish
 
                exec_info = orch_access.exec_info
               
                try:
                    returned_arg = None
                    
                    orch_access_manager  = orch_access.manager
                    orch_access_pipeline = orch_access.pipeline
                    
                    if self.local_job:
                        oprint("executing pipeline as process")
                        #orch_access.pipeline = None
                        self.job_handler = execute_pipeline_as_local(self.cores, pipeline_fn, orch_access, args, kwargs )
                    else:
                        oprint("executing pipeline as job")
                        # TODO: until providing a better orch_access to be used from compute nodes
                        orch_access.pipeline = None
                        orch_access.manager = None

                        self.job_handler = execute_pipeline_as_job(self.cores, pipeline_fn, orch_access, args, kwargs )

                    oprint("waiting for pipeline to finish")
                    
                    self.start_ts = start_ts
                    self.state    = 3  # running

                    if not self.em.saveObject(self):
                        eprint("error saving executor")
                    
                    try:
                        # wait for the result (the second get is to get the returned argument value)
                        if self.local_job:
                            h = self.job_handler.get()
                            if h is not None:
                                returned_arg = h.get()
                            else:
                                returned_arg = None
                        else:
                            returned_arg = self.job_handler.get()

                        #print("returned arg:",returned_arg)
                    except Exception as e:
                        print("Exception raised when waiting for job:",e)
                        
                    success = False
                    result = None
                    b64_output = None
                    b64_error = None

                    if returned_arg is not None:
                        success, result, b64_output, b64_error, exec_info = returned_arg
                        
                    if success:
                        oprint("pipeline execution successfully finished")
                        self.state    = 4  # finished
                    else:
                        oprint("pipeline execution failed")
                        self.state    = 5  # error
                    
                    if exec_info is not None:
                        # update variables and report from orch_access exec_info
                        self.pipeline.vars     = exec_info.vars
                        self.pipeline.report   = exec_info.report

                    self.pipeline.state    = self.state
                    
                    # get the output
                    self.pipeline_ret = base64.b64encode(dill.dumps(result))
                    self.pipeline.result = self.pipeline_ret

                    self.actionPerformed(ExecutionFinished(self.pipeline))
                    
                    end_ts = datetime.now(timezone.utc).astimezone(get_localzone())
                    self.end_ts = end_ts
                    self.exec_time = end_ts - start_ts
    
                    exec_output = "No captured output"
                    if b64_output is not None:
                        exec_output = base64.b64decode(b64_output).decode('utf8')
                
                    exec_error = "No captured output"
                    if b64_error is not None:
                        exec_error  = base64.b64decode(b64_error).decode('utf8')
            
                    exec_output = re.sub("(.*):\/\/(.*):(.*)@(.*)","\\1://*****:******@\\4",exec_output)
                    exec_error = re.sub("(.*):\/\/(.*):(.*)@(.*)","\\1://*****:******@\\4",exec_error)
   
                    if self.pipeline.report is not None:
                        oprint("=================================================")
                        oprint("execution report start")
                        oprint(self.pipeline.report)
                        oprint("execution report end")
                        oprint("=================================================")
    
                    oprint("=================================================")
                    oprint("execution output start")
                    oprint(exec_output)
                    eprint(exec_error)
                    oprint("execution output end")
                    oprint("=================================================")

                    oprint("pipeline return value")
                    oprint(result)

                    oprint("pipeline execution complete")

                    output.seek(0)
                    error.seek(0)

                    self.output   = base64.b64encode(output.read().encode('utf8'))
                    self.error    = base64.b64encode(error.read().encode('utf8'))

                    if not self.em.saveObject(self):
                        eprint("error saving executor")

                    # check for execution notification
                    if exec_info.notify_execution:
                        self.em.sendExecutionNotification(self.pipeline, self.uuid, exec_info.notification_target, show=exec_info.notification_show)
    
                    # remove the executor from active list in the executor manager
                    if self.uuid in self.em.active:
                        del self.em.active[self.uuid]
                    
                    return result
                
                except Exception as e:
                    self.state    = 5  # error
                    self.error    = "%s" % e

                    print("exception when running the pipeline %s" % name)
                    print("Exception reported:",e)
                    print(traceback.format_exc())
                                        
                    st = io.StringIO()
                    traceback.print_exc(file=st)
                    st.seek(0)
                    st_str = st.read()

                    eprint(e)
                    eprint(st_str)
                    
                    output.seek(0)
                    error.seek(0)

                    if self.uuid in self.em.active:
                        del self.em.active[self.uuid]
                    
                    if exec_info is None:
                        exec_info = orch_access.exec_info
                                            
                    self.output   = base64.b64encode(output.read().encode('utf8'))
                    self.error    = base64.b64encode(error.read().encode('utf8'))

                    if not self.em.saveObject(self):
                        print("error saving executor")

                    # check for execution notification
                    if exec_info.notify_execution:
                        self.em.sendExecutionNotification(self.pipeline, self.uuid, exec_info.notification_target, show=exec_info.notification_show)
                    
                    return e
                
            try:
                # execute pipeline as job
                self.handler = execute_pipeline()

            except Exception as e:
                self.output   = None
                self.error    = "%s" % e
                print(e)
                
            return self
        else:
            raise ImplementationIsNotAFunction(name)

    def cancel(self):
        if self.handler is not None:
            if self.job_handler is not None:
                self.job_handler.cancel()
                self.state = 6 # cancelled
                
                if self.uuid in self.em.active:
                    del self.em.active[self.uuid]
                    
                if not self.em.saveObject(self):
                    print("error saving executor when cancelling executing")
                else:
                    return True
        else:
            self.state = 6 # cancelled
            if self.uuid in self.em.active:
                del self.em.active[self.uuid]
                
            if not self.em.saveObject(self):
                print("error saving executor when cancelling executing")
            else:
                return True
        
        return False
            
    def serialize(self):
        s_output = None
        if self.output is not None:
            s_output = self.output.decode("utf8")
            
        s_error = None
        if self.error is not None:
            s_error = self.error.decode("utf8")
            
        s_args = None
        if self.pipeline_args is not None:
            s_args = self.pipeline_args.decode("utf8")
            
        obj = {
            "id"            : self.id,
            "name"          : self.name,
            "version"       : self.version,
            "owner_id"      : self.owner_id,
            "uuid"          : self.uuid,
            "creation"      : self.creation,
            "start_ts"      : self.start_ts,
            "end_ts"        : self.end_ts,
            "exec_time"     : self.exec_time,
            "state"         : self.state,
            "pipeline_fn"   : self.pipeline_fn.decode("utf8"),            
            "pipeline_args" : s_args,
            "output"        : s_output,
            "error"         : s_error
        }
        # dict is serialized as b64 dill
        sobj = { "uuid": self.uuid, "sobj": base64.b64encode(dill.dumps(obj)).decode("utf8") }

        return sobj

In [25]:
%%deploy /orch/pipelinemanager/ExecutorManager.py
# ExecutorManager

from ..base import ActionListener, Observable
from ..base.db import DataBaseBackend
from . import Executor
from ..exceptions import InitializeError,MultipleExecutionIDFound,ExecutionIdNotFound

from email.message import EmailMessage
from email.utils import make_msgid
import pandas as pd
import smtplib
import traceback
import re
import time
import ssl

class ExecutorManager(DataBaseBackend,ActionListener, Observable):
    
    active = {}

    def sendMail(self,mail_dst,subject=None, mail_cc=[],bounce_dest=None, mail_content=None, mail_content_html=None, attachments=None):
        # compose the email

        if self.smtp_crd is None:
            print("No SMTP configured. unable to send mails")
            return False
        
        smtp_crd = self.smtp_crd
        
        sender    = smtp_crd["sender"]
        recipient = mail_dst
        rcpt = recipient + mail_cc

        if bounce_dest is None:
            bounce_dest = sender

        asparagus_cid = make_msgid()

        # bupa relay
        server = smtp_crd["smtp_host"]
        context = ssl.create_default_context()
        with smtplib.SMTP_SSL(server,context=context) as s:
            s.set_debuglevel(0)
            s.login(smtp_crd["username"],smtp_crd["password"])
            msg = EmailMessage()
            msg.set_content(mail_content)

            msg.add_alternative(mail_content_html, subtype='html')

            if attachments is not None:
                for file in attachments:

                    mime = magic.Magic(mime=True)
                    mime_type = mime.from_file(file)

                    main_type = mime_type.split("/")[0]
                    sub_type = mime_type.split("/")[1]

                    with open(file, 'rb') as content_file:
                        content = content_file.read()
                        msg.add_attachment(content, maintype=main_type, subtype=sub_type, filename=os.path.basename(file))

            print("sending mail to %s" % rcpt)

            msg['Subject'] = subject
            msg['From'] = sender
            msg['To'] = recipient
            msg['Cc'] = mail_cc

            s.sendmail(bounce_dest, rcpt, msg.as_string())

            s.quit()

    def __init__(self,owner,db_conn_str="sqlite:///orchestrator.sqlite"):
        super().__init__(db_conn_str)
        self.owner = owner
        self.smtp_crd = owner.smtp_crd
        if not self.initialize(Executor):
            raise InitializeError(Executor.__tablename__)

    def create(self,pipeline, *args, **kw_args):
        executor = Executor(self, pipeline, *args, **kw_args)
        executor.addActionListener(self)
        return executor

    def getExecutorByID(self, executor_id):
        # check executor in active list
        if executor_id in self.active:
            executor = self.active[executor_id]
            return executor
        else:
            # executor not active. trying to get it from persistency backend
            try:
                executors = self.getObjects(Executor, uuid=executor_id)                
                if len(executors) == 1:
                    executors[0].__not_persistent_init__(self)
                    executors[0].addActionListener(self)
                    
                    return executors[0]
                elif len(executors) > 1:
                    raise MultipleExecutionIDFound(executor_id)
                else:
                    raise ExecutionIdNotFound(executor_id)
            except Exception as e:
                raise e
            
        return None
    
    def getExecutionList(self, pipeline_name, **kw_args):
        exec_list = []
        # get the active executions first
        for uuid, ex in self.active.items():
            if ex.name == pipeline_name:
                exec_list.append(ex)
                        
        # get the executions stored in persistency backend
        try:
            executors = self.getObjects(Executor, defer_cols=["output","error"],name=pipeline_name, **kw_args)
            exec_list = exec_list + executors
        except Exception as e:
            raise e

        return executors
    
    def getExecutionsBy(self, where, **kw_args):
        exec_list = []
        
        # get the executions stored in persistency backend
        try:
            rs = self.query("select id from executions where %s" % where)
            id_lst = []
            for r_id in rs:
                id_lst.append(r_id[0])
                  
            exec_list = self.getObjects(Executor, Executor.id.in_(id_lst), defer_cols=["output","error"])
            
        except Exception as e:
            raise e

        return exec_list
    
    def getRunningExecutions(self):
        exec_list = list(self.active.values())
        return exec_list
    
    def sendExecutionNotification(self, pipeline, exec_id, target, show="all"):
        
        show_arr = show.split(",")
        for show_item in show_arr:
            if not show_item in ["all", "output","error","report"]:
                raise(RuntimeError("sendExecutionNotification: show must be a string list (separated by ,) of the following options: all, output,error or report"))
        
        print("notifying exceution to %s" % target)
        print("pipeline:",pipeline)
        
        last_exec = self.getExecutorByID(exec_id)
        print("last execution:",last_exec)
        
        show_output    = "output" in show_arr
        show_error     = "error" in show_arr
        show_report    = "report" in show_arr
        
        if show=="all":
            show_output    = True
            show_error     = True
            show_report    = True
        
        try:
            exec_output = None
            exec_error  = None
            report      = None
            
            if show_output or show_error:
                for retry in range(0,5):
                    last_exec = self.getExecutorByID(exec_id)
                    exec_output = last_exec.getOutput()

                    if exec_output is None:
                        print("output not yet ready")
                        time.sleep(10)
                    else:
                        break

                for retry in range(0,5):
                    last_exec = self.getExecutorByID(exec_id)
                    exec_error  = last_exec.getErrors()

                    if exec_error is None:
                        print("error output not yet ready")
                        time.sleep(10)
                    else:
                        break

                # remove credentials from url like strings
                exec_output = re.sub("(.*):\/\/(.*):(.*)@(.*)","\\1://*****:******@\\4",exec_output)
                exec_error = re.sub("(.*):\/\/(.*):(.*)@(.*)","\\1://*****:******@\\4",exec_error)

            if show_report:
                if hasattr(pipeline,"report"):
                    report = pipeline.report
            
            subject="Pipeline Execution Report: %s" % pipeline.name

            mail_cnt_output  = ""
            mail_cnt_error   = ""
            mail_cnt_report  = ""
            mail_cnt_report_html = ""
            mail_cnt_output_html = ""
            mail_cnt_error_html  = ""
            
            if show_output:
                mail_cnt_output = f"""Registro de Ejecución:\n{exec_output}\n\n"""
                mail_cnt_output_html = f"""<h2>Registro de Ejecucion</h2><pre>{exec_output}</pre>"""
            if show_error: 
                mail_cnt_error  = f"""Registro de Errores:\n{exec_error}\n\n"""
                mail_cnt_error_html  = f"""<h2>Registro de Errores</h2><pre>{exec_error}</pre>"""
            if show_report:
                mail_cnt_report = f"""Reporte de Ejecución:\n{report}\n\n"""
                if isinstance(pipeline.report, pd.DataFrame):
                    mail_cnt_report_html = f"""<h2>Reporte de Ejecución</h2><pre>{report.to_html()}</pre>"""
                else:
                    mail_cnt_report_html = f"""<h2>Reporte de Ejecución</h2><pre>{report}</pre>"""
            
            mail_content=f"""{mail_cnt_report}{mail_cnt_output}{mail_cnt_error}\n------------------------------"""
            mail_content_html=f"""{mail_cnt_report_html}{mail_cnt_output_html}{mail_cnt_error_html}\n------------------------------"""
            
            self.sendMail(target,subject=subject,mail_content=str(mail_content), mail_content_html=str(mail_content_html))
            
        except Exception as e:
            print("Error notifying execution to %s" % target)
            print(e)

            trc_bk = traceback.format_exc()

            print(trc_bk)

            subject="Pipeline Execution Report: %s" % pipeline.name

            mail_content=f"""Registro de Ejecución:\n{e}\nRegistro de Errores:\n{trc_bk}\n------------------------------"""
            mail_content_html=f"""<h2>Registro de Ejecucion:</h2><pre>{e}</pre><h2>Registro de Errores:</h2><pre>{trc_bk}</pre>------------------------------"""
            self.sendMail(target,subject=subject,mail_content=str(mail_content), mail_content_html=str(mail_content_html))
            
        print("notification sent to %s" % target)

    def actionPerformed(self, evt):
        # onyl forward the event to all listeners
        Observable.actionPerformed(self,evt)

In [26]:
%%deploy /orch/pipelinemanager/PipelineManager.py
from flask import request, jsonify
from multiprocessing import Process, Queue, Manager

from datetime import date, datetime, timezone
from tzlocal import get_localzone 
import jsonpickle
import dill
import inspect
import time

from ..base import Observable, ActionListener
from ..orchestrator import OrchestratorManager
from . import PipelineCatalog
from . import ExecutorManager
from ..exceptions import MultipleActivePipelineRegistered, NoActivePipelineRegistered, PipelineExecutionError, PipelineNotRegistered
from .Events import *

class PipelineManager(ActionListener, Observable):
    def __init__(self, owner,db_conn_str = "sqlite:///orchestrator.sqlite"):
        
        super().__init__()
        
        self.owner            = owner
        self.smtp_crd         = owner.smtp_crd
        self.catalog          = PipelineCatalog(self,db_conn_str=db_conn_str)
        self.executor_manager = ExecutorManager(self, db_conn_str=db_conn_str)
        self.db_conn_str = db_conn_str
        
        self.executor_manager.addActionListener(self)
        
    def register(self, name, pipeline_fn, new_version = False):
        if not new_version:
            if not self.catalog.isRegistered(name):
                try:
                    #TODO: handle the owner in the api
                    new_pipeline = self.catalog.register(name,"jcm",pipeline_fn)
                    return new_pipeline
                except Exception as e:
                    raise e
            # already registered
            return self.catalog.get(name=name)
        else:
            if self.catalog.isRegistered(name):
                try:
                    #TODO: handle the owner in the api
                    new_pipeline = self.catalog.register(name,"jcm",pipeline_fn, new_version = True) 
                    return new_pipeline
                except Exception as e:
                    raise e
            else:
                if new_version:
                    raise(PipelineNotRegistered(name))
                
            # already registered
            
            return self.catalog.get(name=name)

    def isRegistered(self,name, **kw_args):
        return self.catalog.isRegistered(name,**kw_args)
        
    def get(self, name, **kw_args): 
        if self.catalog.isRegistered(name):
            try:
                pipelines = self.catalog.get(name=name, **kw_args)
                return pipelines
            except Exception as e:
                raise e
        
        # pipele not registered
        return False
    
    def activate(self, pipeline):
        #TODO: check if pipeline is instance of pipeline
        if pipeline.isActive():
            return True

        # deactivate all the pipelines with given name
        self.catalog.deactivateAll(pipeline.name)

        # activate the pipeline
        if pipeline.setActive(True):
            return True
        else:
            return False

    def deactivate(self, pipeline):
        if not pipeline.isActive():
            return True
        # deactivate the pipeline
        if pipeline.setActive(False):
            return True

        return False

    def deactivateAll(self, pipeline):
        return self.catalog.deactivateAll(pipeline.name)

    def execution_status(self, exec_id):
        executor = self.executor_manager.getExecutorByID(exec_id)        
        if executor is not None:            
            return executor.state
        else:
            return None
    
    def get_execution(self, exec_id):
        executor = self.executor_manager.getExecutorByID(exec_id)
        
        if executor is not None:
            return executor
        else:
            return None

    def cancel_execution(self, exec_id):
        executor = self.executor_manager.getExecutorByID(exec_id)
        
        if executor is not None:
            return executor.cancel()
        else:
            return False
        
    def execute(self, pipeline, *pipeline_args, **pipeline_kwargs):
        try:        
            executor = self.executor_manager.create(pipeline)
            executor.run(*pipeline_args,**pipeline_kwargs)
            return executor
                
        except Exception as e:
            raise e

    def createExecutor(self, pipeline, *args, **kw_args):
        try:        
            executor = self.executor_manager.create(pipeline,*args, **kw_args)
            return executor
                
        except Exception as e:
            raise e
           
    def get_execution_list(self,pipeline_name, **kw_args):
        if self.catalog.isRegistered(pipeline_name):            
            executions = self.executor_manager.getExecutionList(pipeline_name, **kw_args)
            
            if len(executions)>0:                
                return executions
            else:
                return []

        raise PipelineNotRegistered(pipeline_name)
        
    def get_executions_by(self, where):
        executions = self.executor_manager.getExecutionsBy(where)

        if len(executions)>0:                
            return executions
        else:
            return []
    
    def get_running_executions(self):
        executions = self.executor_manager.getRunningExecutions()

        if len(executions)>0:
            return executions
        else:
            return []
    
    def actionPerformed(self, evt):
        # only forward the event to all listeners
        Observable.actionPerformed(self, evt)

In [27]:
%%deploy /orch/pipelinemanager/Events.py
from ..base import ActionEvent

class ExecutionStarted(ActionEvent):
    def __init__(self,pipeline, *args,**kw_args):
        super().__init__()
        self.pipeline_name = pipeline.name
        self.pipeline      = pipeline
        
class ExecutionFinished(ActionEvent):
    def __init__(self,pipeline,*args,**kw_args):
        super().__init__()
        self.pipeline_name = pipeline.name
        self.pipeline      = pipeline


In [28]:
%%deploy /orch/pipelinemanager/executePipelineAsJob.py
def execute_pipeline_as_local(cores, pipeline_fn, orch_access, args, kwargs):
    from ..base import Argument, asLocalJob
    @asLocalJob(cores=cores, verbose=True)
    #def run_pipeline_as_local(args):
    def run_pipeline_as_local(pipeline_fn, args, kwargs):

        import base64
        from io import StringIO
        import sys
        import traceback
        import platform
        from orch.base import Argument
        from contextlib import redirect_stdout, redirect_stderr

        import os, psutil

        #pipeline_fn, args, kwargs = args.get()
       
        p_output = StringIO()
        p_error  = StringIO()
        
        result     = None 
        success    = False
        b64_output = ""
        b64_error  = ""
       
        with redirect_stderr(p_error) as e:
            with redirect_stdout(p_output) as o:
                try:

                    print("Execution at        : ", platform.node())
                    print("executing function  :",pipeline_fn)
                    print("Orchestrator Access :",orch_access)
                    
                    # make available the orch_access instance to the pipeline
                    pipeline_fn.__globals__["orch_access"] = orch_access
                    
                    result = pipeline_fn(*args,**kwargs)
                    
                    # memory consumption of current process
                    process = psutil.Process(os.getpid())
                    orch_access.exec_info.memory = process.memory_full_info().rss
                    print("Memory Consumption :",process.memory_info().rss, "bytes") 

                    success = True
                except Exception as ex:
                    print("Exeption when running process as local job:",ex)
                    exc_info = sys.exc_info()
                    traceback.print_exception(*exc_info)

                    result = ex

                    # memory consumption of current process
                    process = psutil.Process(os.getpid())
                    print("Memory Consumption :",process.memory_info().rss, "bytes") 
                    orch_access.exec_info.memory = process.memory_info().rss
                    del exc_info

        try:

            o.flush()
            e.flush()

            output = o.getvalue()
            error  = e.getvalue()

            b64_output = base64.b64encode(output.encode('utf8'))
            b64_error  = base64.b64encode(error.encode('utf8'))
        except Exception as ex:
            print("Error encoding output:",ex)
            err_str = "%s" % ex

            o.flush()
            e.flush()

            output = o.getvalue()
            error  = e.getvalue()

            b64_error  = base64.b64encode(err_str.encode('utf8'))
            b64_output = base64.b64encode(error.encode('utf8'))
        
        return Argument((success, result, b64_output ,b64_error, orch_access.exec_info ))

    return run_pipeline_as_local(pipeline_fn, args, kwargs )


# execute the pipeline as job
def execute_pipeline_as_job(cores, pipeline_fn, orch_access, args, kwargs):
    from ..base import Argument, asJob

    #TODO: handle partition and memory in decorator with defaults 
    @asJob(cores=cores, verbose=True)
    def run_pipeline_as_job(args):
        import base64
        from io import StringIO
        import sys
        import traceback
        import platform
        from orch.base import Argument
        from contextlib import redirect_stdout, redirect_stderr
 
        pipeline_fn, args, kwargs = args.get()
       
        p_output = StringIO()
        p_error  = StringIO()
        
        result     = None 
        success    = False
        b64_output = ""
        b64_error  = ""
       
        with redirect_stderr(p_error) as e:
            with redirect_stdout(p_output) as o:
                try:

                    print("Execution at: ", platform.node())
                    print("executing function:",pipeline_fn)

                    pipeline_fn.__globals__["orch_access"] = orch_access
                    
                    result = pipeline_fn(*args,**kwargs)
                    success = True
                except Exception as ex:
                    print("Exeption when running function:",ex)
                    exc_info = sys.exc_info()
                    traceback.print_exception(*exc_info)

                    result = ex

                    del exc_info

        try:
            output = o.getvalue()
            error  = e.getvalue()

            b64_output = base64.b64encode(output.encode('utf8'))
            b64_error  = base64.b64encode(error.encode('utf8'))
        except Exception as ex:
            print("Error encoding output:",ex)
            err_str = "%s" % ex
            b64_error  = base64.b64encode(err_str.encode('utf8'))
            
        return Argument( (success, result, b64_output ,b64_error, orch_access) )

    return run_pipeline_as_job(Argument( (pipeline_fn, args, kwargs) ))

In [29]:
%%deploy /orch/pipelinemanager/__init__.py

from ..base.db import *
from .Events import *
from .AbstractExecutor import *
from .AbstractPipeline import *
from .Pipeline import *
from .PipelineCatalog import *
from .Executor import *
from .ExecutorManager import *
from .PipelineManager import *

## Scheduler Classes (Server side)

In [30]:
%%deploy /orch/scheduler/AbstractScheduledEvent.py
import base64
import dill
import inspect

# AbstractScheduledEvent

class AbstractScheduledEvent(object):
    @classmethod
    def fromJson(cls, json):
        obj = cls()
        for k,v in json.items():
            setattr(obj, k, v)
        return obj
    
    def setArguments(self,pipeline_args):
        self.args = base64.b64encode(dill.dumps(pipeline_args))
        
    def setKeywordArguments(self,pipeline_kw_args):
        self.kw_args = base64.b64encode(dill.dumps(pipeline_kw_args))
        
    def getArguments(self):
        if self.args is not None:
            return dill.loads(base64.b64decode(self.args))
        else:
            return ()

    def getKeywordArguments(self):
        if self.kw_args is not None:
            return dill.loads(base64.b64decode(self.kw_args))
        else:
            return {}
    
    def __repr__(self):
        return "<ScheduledEvent[name=%s, owner=%s, uuid=%s, trigger_time=%s, recurrency=%s, active=%s, pipeline=%s]>" % (
            self.name,
            self.owner_id,
            self.uuid,
            self.trigger_time,
            self.recurrency,
            self.active,
            self.pipeline
        )

In [31]:
%%deploy /orch/scheduler/TimeEvent.py
import time
from datetime import timedelta
from datetime import datetime
from dateutil import parser
from pytimeparse.timeparse import timeparse
from dateutil.relativedelta import relativedelta

from ctparse import ctparse
from ctparse.types import Time as ctTime
from ctparse.types import Duration as ctDuration
from ctparse.types import DurationUnit

from ..base import ActionEvent 
from ..exceptions import EventInThePast

class TimeEvent(ActionEvent):

    def getTime2Trigger(self,when, recurrency=None, reference=datetime.now()):
        evt_time = when
        if recurrency is not None:
            evt_time = "%s %s" % (when, recurrency)
        
        ctr = ctparse(evt_time,reference).resolution
        next_evt_date = None
        if isinstance(ctr,ctDuration):
            #print("duration",ctr, ctr.unit)
            td = None
            t = 0
            if ctr.unit == DurationUnit.MONTHS:
                td = relativedelta(months=ctr.value)
                t = 1 
            elif ctr.unit == DurationUnit.DAYS:
                td = relativedelta(days=ctr.value)
                t = 1
            elif ctr.unit == DurationUnit.HOURS:
                td = relativedelta(hours=ctr.value)
            elif ctr.unit == DurationUnit.MINUTES:
                td = relativedelta(minutes=ctr.value)
            else:
                print("DurationUnit not handled",ctr.unit)

            ref = parser.parse(when)
            if t == 0:
                next_evt_date = reference + td
            else:
                if ref > reference:
                    next_evt_date = ref
                else:
                    next_evt_date = ref + td

        elif isinstance(ctr,ctTime):
            #print("time",ctr)
            if ctr.hasTime:
                next_evt_date = datetime(year=ctr.year, month=ctr.month,day=ctr.day, hour=ctr.hour, minute=ctr.minute)
            else:
                when_tm = time.strptime(when,"%H:%M")
                next_evt_date = datetime(year=ctr.year, month=ctr.month,day=ctr.day, hour=when_tm.tm_hour, minute=when_tm.tm_min)

        return next_evt_date

    def __init__(self, label, when=datetime.now(), recurrency_str=None, resolution=1):
        super().__init__()
        
        self.label             = label
        self.when              = when
        
        if recurrency_str is not None and recurrency_str!="":
            self.recurrency    = recurrency_str
        else:
            self.recurrency    = None
    
        self.evt_time          = None
        self.resolution        = resolution

        try:
            self.evt_time          = self.getTime2Trigger(self.when, self.recurrency)
            print("Event scheduled at",self.evt_time)
        except Exception as e:
            print("error computing next event time")

    def updateEventTime(self):
      
        print("updating event time")
        tm = datetime.now()
        try:
            next_evt_ts = self.getTime2Trigger(self.when, self.recurrency ,tm)

            print("evt_time      ",self.evt_time)
            print("next_evt_time ",next_evt_ts)

            if self.evt_time is None:
                self.evt_time = parser.parse(self.when)

            if next_evt_ts > tm:
                self.evt_time = next_evt_ts
                print("Time Event updated:",self.evt_time)
            else:
                print("next event time in the past",next_evt_ts, tm)

            return next_evt_ts
        except Exception as e:
            print("error computing next event time")

        return None

    def isRecurrent(self):
        if self.recurrency is not None:
            return True
        return False
        
    def getRecurrency(self):
        if self.isRecurrent():
            try:
                evt_time = "%s %s" % (self.when, self.recurrency)
                ctr = ctparse(evt_time,datetime.now()).resolution
                return ctr
            except Exception as e:
                return self.recurrency
        return None
        
    def trigger(self, tm):
        try:
            if self.evt_time < tm:
                return True
        except Exception as e:
            print("error determining triggering condition for",self)

        return False
        
    def __repr__(self):
        return "TimeEvent[label=%s,next_evt_time=%s,recurrency=%s]" % (self.label, self.evt_time, self.getRecurrency())

In [32]:
%%deploy /orch/scheduler/TimeManager.py
from threading import Thread
import time
from datetime import datetime, timedelta
from pytimeparse.timeparse import timeparse
import re

from ..base import Observable
from . import TimeEvent
from ..exceptions import EventWithNoRecurrence

class TimeManager(Thread, Observable):
    def __init__(self, resolution="1s"):
        Thread.__init__(self)
        Observable.__init__(self)
        try:        
            self.resolution = timeparse(resolution)
        except Exception as e:
            raise e
        self.running = False
        
        self.event_list = {}
        
        self.start()
        self.localtime = datetime.now()

    def addTimeEvent(self, label, trigger_time_str, recurrency_str=None):
        # translate short recurrency format to long one
        re_dur  = "([0-9]*)(m|h|d|M)"
        t = None
        
        if recurrency_str is not None:
            if re.match(re_dur, recurrency_str):
                match = re.search(re_dur, recurrency_str)
                if match.group(2)=="m":
                    t = "%d minutes" % int(match.group(1))
                if match.group(2)=="h":
                    t = "%d hours" % int(match.group(1))
                if match.group(2)=="d":
                    t = "%d days" % int(match.group(1))
                if match.group(2)=="M":
                    t = "%d months" % int(match.group(1))
                recurrency_str = t

        self.event_list[label] = TimeEvent(label, trigger_time_str, recurrency_str, self.resolution)
        
    def removeTimeEvent(self, evt_label):
        if evt_label in self.event_list:
            del self.event_list[evt_label] 
            return True
        
        return False
        
    def stop(self):
        self.running = False
        
    def run(self):
        
        print("TimeManager starting time:",datetime.now())
        self.running = True
        while self.running:
            time.sleep(self.resolution)
            self.localtime = datetime.now()
            for evt_label, evt in list(self.event_list.items()):
                try:
                    if evt.trigger(self.localtime):
                        print("triggerred",evt)
                        if evt.isRecurrent():
                            # update next event time
                            self.event_list[evt_label].updateEventTime()
                            
                        else:
                            print("one time event list. removing it")
                            # one time event. removing it from event list
                            del self.event_list[evt_label]
                            
                        # trigger the event to the listeners
                        self.actionPerformed(evt)

                except Exception as e:
                    print("TimeManager Exception:",e)
                    print("Event:",evt)

        print("TimeManager ending time:",datetime.now())

In [33]:
%%deploy /orch/scheduler/ScheduledEvent.py
import sqlalchemy as sal
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, and_
from sqlalchemy.sql import func
from uuid import uuid4
import base64
import dill

from ..base import ActionEvent
from . import AbstractScheduledEvent

# ScheduledEvent
# 
# register the scheduled execution of a pipeline identified by name
# the execution time is denoted by trigger time and recurrency
# all scheduled events have a name to identify them easily

Base = declarative_base()

class ScheduledEvent(AbstractScheduledEvent, Base, ActionEvent):
    __tablename__ = 'scheduled_events'
    
    id           = sal.Column('id', sal.Integer, primary_key=True, nullable=False)
    name         = sal.Column('name', sal.String)
    owner_id     = sal.Column('owner_id', sal.String)
    uuid         = sal.Column('uuid', sal.String)
    creation     = sal.Column('creation', sal.DateTime(timezone=True),server_default=func.now())
    tags         = sal.Column('tags', sal.String)
    changed      = sal.Column('changed', sal.DateTime(timezone=True),onupdate=func.now())
    active       = sal.Column('active', sal.Boolean)
    trigger_time = sal.Column('trigger_time', sal.DateTime(timezone=True))
    recurrency   = sal.Column('recurrency', sal.Integer)
    pipeline     = sal.Column('pipeline', sal.String)
    args         = sal.Column('pipeline_args', sal.TEXT)
    kw_args      = sal.Column('pipeline_kw_args', sal.TEXT)
    
    def __init__(self, schm, label):
        self.schm = schm
        self.name = label
        self.uuid = str(uuid4())
        
    def serialize(self):
        s_args = None
        if self.args is not None:
            s_args = self.args.decode("utf8")
            
        s_kw_args = None
        if self.kw_args is not None:
            s_kw_args = self.kw_args.decode("utf8")
            
        obj = {
            "id"            : self.id,
            "name"          : self.name,
            "owner_id"      : self.owner_id,
            "uuid"          : self.uuid,
            "creation"      : self.creation,
            "tags"          : self.tags,
            "changed"       : self.changed,
            "active"        : self.active,
            "trigger_time"  : self.trigger_time,
            "recurrency"    : self.recurrency,            
            "pipeline"      : self.pipeline,
            "args"          : s_args,
            "kw_args"       : s_kw_args
        }
        # dict is serialized as b64 dill
        sobj = { "uuid": self.uuid, "sobj": base64.b64encode(dill.dumps(obj)).decode("utf8") }

        return sobj

In [34]:
%%deploy /orch/scheduler/Events.py
from ..base import ActionEvent

class ExecutePipeline(ActionEvent):
    def __init__(self,sch_evt_uuid, pipeline, args, kw_args):
        super().__init__()
        self.sch_evt_uuid = sch_evt_uuid
        self.pipeline     = pipeline
        self.args         = args
        self.kw_args      = kw_args
    def __repr__(self):
        return "<ExecutePipeline[sch_evt_uuid=%s, pipeline=%s, args=%s, kw_args=%s]>" % (
            self.sch_evt_uuid,
            self.pipeline,
            self.args,
            self.kw_args
        )


In [35]:
%%deploy /orch/scheduler/SchedulerManager.py
from datetime import datetime
from dateutil import parser

from ..base import ActionListener, Observable
from ..base.db import DataBaseBackend
from ..exceptions import MultipleScheduledEventFound
from .TimeManager import TimeManager
from .TimeEvent import TimeEvent
from .ScheduledEvent import ScheduledEvent
from .Events import *

class SchedulerManager(DataBaseBackend, ActionListener, Observable):
    def __init__(self,owner, time_resolution="1s", db_conn_str="sqlite:///orchestrator.sqlite"):
        Observable.__init__(self)
        self.owner = owner
        self.tm    = TimeManager(time_resolution)

        self.tm.addActionListener(self)
        super().__init__(db_conn_str)
        
        if not self.initialize(ScheduledEvent):
            raise InitializeError(ScheduledEvent.__tablename__)
            
        # activate current scheduled pipelines
        self.activateScheduledPipelines()
            
    def activateScheduledPipelines(self):
        print("activating current scheduled events")
        active_scheduled_events = self.getObjects(ScheduledEvent, active=True)
        for sch_evt in active_scheduled_events:
            trigger_time_str = sch_evt.trigger_time.strftime("%H:%M:%S")
            print("scheduling %s" % sch_evt)
            try:
                self.tm.addTimeEvent(sch_evt.uuid,trigger_time_str,sch_evt.recurrency)
            except Exception as e:
                print(e)

    def scheduleAt(self, pipeline, label = None, trigger_time_str=datetime.now().strftime("%H:%M:%S"), recurrency=None, tags=[]):
        
        if label is None:
            label = pipeline.name
        
        sch_evt      = ScheduledEvent(self,label)
        trigger_time = parser.parse(trigger_time_str)
        
        sch_evt.owner_id      = self.owner.getOwner()
        sch_evt.tags          = ",".join(tags)
        sch_evt.trigger_time  = trigger_time
        sch_evt.recurrency    = recurrency
        sch_evt.pipeline      = pipeline.name
        
        if hasattr(pipeline,"args"):
            print("execution args:",pipeline.args)
            sch_evt.setArguments(pipeline.args)
        else:
            sch_evt.setArguments(tuple())
            
        if hasattr(pipeline,"kw_args"):
            print("execution kw_args:",pipeline.kw_args)
            sch_evt.setKeywordArguments(pipeline.kw_args)
        else:
            sch_evt.setKeywordArguments({})
            
        sch_evt.active        = True
        try:
            if self.saveObject(sch_evt):
                self.tm.addTimeEvent(sch_evt.uuid,trigger_time_str,recurrency)
                return True
        except Exception as e:
            print("could not schedule the pipeline")
            raise e

    def cancelEvent(self, uuid):
        sch_evt_list = self.getObjects(ScheduledEvent, uuid=uuid)
            
        if len(sch_evt_list)==1:
            sch_evt = sch_evt_list[0]
            sch_evt.active = False
            
            if self.saveObject(sch_evt):
                self.tm.removeTimeEvent(uuid)
                return True
            
            return False
        elif len(sch_evt_list)>1:
            print("FATAL: multiple scheduled pipelines under the same uuid",uuid)
        else:
            print("no ScheduledEvent found for uuid",uuid)
        return False
            
    def actionPerformed(self, evt):
        print("SchedulerManager: actionEvent arrived", evt)
        sch_evt_list = self.getObjects(ScheduledEvent, uuid=evt.label)
        if len(sch_evt_list)==1:
            sch_evt = sch_evt_list[0]
            if sch_evt.active:
            
                args = sch_evt.getArguments()
                kw_args = sch_evt.getKeywordArguments()

                if not evt.isRecurrent():
                    print("scheduled event not recurrent.")
                    sch_evt.active = False
                    self.saveObject(sch_evt)

                Observable.actionPerformed(self,ExecutePipeline(sch_evt.uuid, sch_evt.pipeline, args, kw_args))
            else:
                print("SchedulerManager: scheduled event not active. ignoring time event")
                
        elif len(sch_evt_list)>1:
            print("multiple scheduled pipelines under the same uuid.")
        else:
            print("no ScheduledEvent found for ",evt)
            
    def getScheduledEvents(self, pipeline, **kw_args):
        sch_evt_list = self.getObjects(ScheduledEvent, pipeline=pipeline.name, **kw_args)
        return sch_evt_list
    
    def getScheduledEventById(self, scheduled_event_id):
        sch_evt_list = self.getObjects(ScheduledEvent, uuid=scheduled_event_id)
        if len(sch_evt_list)==1:
            return sch_evt_list[0]
        elif len(sch_evt_list)==0:
            return None
        else:
            raise MultipleScheduledEventFound(scheduled_event_id)
    
    def stop(self):
        self.tm.stop()

In [36]:
%%deploy /orch/scheduler/__init__.py

from .TimeEvent import *
from .TimeManager import *
from .Events import *
from .AbstractScheduledEvent import *
from .ScheduledEvent import *
from .SchedulerManager import *

## Orchestrator Classes (Manager, service and API)

In [37]:
%%deploy /orch/orchestrator/AbstractRemoteProcedureNotification.py
import base64
import dill
import json

# AbstractRemoteProcedureNotification

class AbstractRemoteProcedureNotification: 
    
    def addTrigger(self, pipeline_name):

        if not hasattr(self, 'data'):
            self.data = {}
        else:
            if isinstance(self.data, str):
                self.data = json.loads(self.data)

        if "triggers" not in self.data:
            self.data["triggers"] = []
            
        self.data["triggers"].append(pipeline_name)
  
        self.data = json.dumps(self.data)

    def getData(self):
        if not hasattr(self, 'data'):
            self.data = {}
            
        return json.loads(self.data)
    
    def __repr__(self):
        return "<RemoteProcedureNotification(id=%s, label='%s', owner='%s', uuid='%s', creation=%s, data='%s'>" % (
            str(self.id), self.label, self.owner_id, self.uuid, self.creation, self.data
        )

    def __gt__(self, other):
        return self.creation > other.creation
    
    def asJson(self):
        return {
            "id"            : self.id,
            "label"         : self.label,
            "owner_id"      : self.owner_id,
            "uuid"          : self.uuid,
            "creation"      : self.creation,
            "data"          : json.dumps(self.data)
        }
        
    @classmethod
    def fromJson(cls, json_obj):
        obj = cls()
        obj.id          = json_obj["id"]
        obj.label       = json_obj["label"]
        obj.owner_id    = json_obj["owner_id"]
        obj.creation    = json_obj["creation"]
        obj.uuid        = json_obj["uuid"]
        obj.data        = json.loads(json_obj["data"])
        
        return obj

In [38]:
%%deploy /orch/orchestrator/RemoteProcedureNotification.py
import sqlalchemy as sal
from sqlalchemy import create_engine, and_
from sqlalchemy.sql import func
from uuid import uuid4
import base64
import dill

# Remote Procedure Notification
from sqlalchemy.ext.declarative import declarative_base
from .AbstractRemoteProcedureNotification import *
Base = declarative_base()

class RemoteProcedureNotification(AbstractRemoteProcedureNotification,Base):
    __tablename__ = 'remoteprocedurenotification'
    
    id         = sal.Column('id', sal.Integer, primary_key=True, nullable=False)
    label      = sal.Column('label', sal.String)
    owner_id   = sal.Column('owner_id', sal.String)
    uuid       = sal.Column('uuid', sal.String)
    creation   = sal.Column('creation', sal.DateTime(timezone=True),server_default=func.now())
    data       = sal.Column('data', sal.TEXT)
    
    def serialize(self):            
        obj = self.asJson()
        # dict is serialized as b64 dill
        sobj = { "uuid": self.uuid, "sobj": base64.b64encode(dill.dumps(obj)).decode("utf8") }

        return sobj

In [39]:
%%deploy /orch/orchestrator/RemoteProcedureNotificationManager.py
# RemoteProcedureNotificationManager    
import base64
import pandas as pd
import getpass
from uuid import uuid4
import json

from sqlalchemy.sql import func

from ..base import Observable
from ..base.db import DataBaseBackend
from ..exceptions import InitializeError, NoActivePipelineRegistered, SubscriberNotRegistered, NotificationNotRegistered
from .RemoteProcedureNotification import RemoteProcedureNotification

from .RemoteProcedureNotificationSubscriber import *

class RemoteProcedureNotificationManager(DataBaseBackend,Observable):

    def __init__(self, manager, db_conn_str = "sqlite:///orchestrator.sqlite"):
        
        super().__init__(db_conn_str)
        
        self.manager = manager
        
        if not self.initialize(RemoteProcedureNotification):
            raise InitializeError(RemoteProcedureNotification.__tablename__)

        if not self.initialize(RemoteProcedureNotificationSubscriber):
            raise InitializeError(RemoteProcedureNotificationSubscriber.__tablename__)

    def subscribePipeline(self, label, pipeline_name, owner_id=None):
        uuid = str(uuid4())
        
        if owner_id is None:
            owner_id = getpass.getuser()

        # verify whether the pipeline exists 
        pipeline = None
        try:
            pipeline = self.manager.getActivePipeline(pipeline_name)
        except Exception as e:
            raise NoActivePipelineRegistered(pipeline_name)

        new_subscriber = RemoteProcedureNotificationSubscriber(
            label          = label,
            owner_id       = owner_id,
            uuid           = uuid,
            pipeline_name  = pipeline_name
        )

        if self.saveObject(new_subscriber):
            saved_subscriber = self.getObjects(RemoteProcedureNotificationSubscriber,
                label     = label, 
                owner_id  = owner_id,
                uuid      = uuid
            )
            if len(saved_subscriber)==1:
                return saved_subscriber[0]
            else:
                raise SubscriberNotRegistered(label)
        else:
            raise SubscriberNotRegistered(label)         

    def getSubscribedPipelines(self, label, **kw_args):
        result = self.getObjects(RemoteProcedureNotificationSubscriber, label=label,**kw_args)
        if len(result)>0:
            return result
        return []

    def getSubscriptionsByPipeline(self, pipeline_name, **kw_args):
        result = self.getObjects(RemoteProcedureNotificationSubscriber, pipeline_name=pipeline_name,**kw_args)
        if len(result)>0:
            return result
        return []

    def unsubscribrePipeline(self, uuid):
        result = self.getObjects(RemoteProcedureNotificationSubscriber, uuid=uuid)
        if len(result)>0:
            for subscriber in result:
                self.destroyObject(subscriber)
            return True

        return False

    def getNotifications(self, label, **kw_args):
        result = self.getObjects(RemoteProcedureNotification, label=label,**kw_args)
        if len(result)>0:
            return result
        return []

    def getLastNotification(self,label,**kw_args):
        lst = self.getNotifications(label,**kw_args)
        if len(lst)>0:
            return max(lst)
        return None
    
    def createNotification(self, label, owner_id=None, data={}):
        uuid = str(uuid4())
        
        if owner_id is None:
            owner_id = getpass.getuser()
            
        new_notification = RemoteProcedureNotification(
            label    = label,
            owner_id = owner_id,
            uuid     = uuid,
            data     = json.dumps(data)
        )

        if self.saveObject(new_notification):
            saved_notification = self.getObjects(RemoteProcedureNotification,
                label     = label, 
                owner_id  = owner_id,
                uuid      = uuid
            )
            if len(saved_notification)==1:

                notification = saved_notification[0]

                subscribers = self.getSubscribedPipelines(label)
                for subscriber in subscribers:
                    pipeline = self.manager.getActivePipeline(subscriber.pipeline_name)
                    if pipeline is not None:
                        executor = None
                        kw_args = {}
                        kw_args["rpn_data"]=data

                        if "event" in data:
                            if data["event"]=="finished":
                                print("chained trigger for finished execution")
                                result = dill.loads(base64.b64decode(data["result"]))
                                if isinstance(result,list) or isinstance(result,tuple):
                                    executor = self.manager.execute(pipeline, result, **kw_args)
                                elif isinstance(result,dict):
                                    kw_args.update(result)
                                    executor = self.manager.execute(pipeline, **kw_args)
                                else:
                                    executor = self.manager.execute(pipeline, result, **kw_args)

                            elif data["event"]=="failed":
                                print("chained trigger for failed execution")
                                ex = dill.loads(base64.b64decode(data["result"]))
                                executor = self.manager.execute(pipeline, ex, **kw_args)

                            else:
                                # different event
                                executor = self.manager.execute(pipeline, result, **kw_args)
                        else:
                            # normal rpn. rpn_data comes into the kw_args

                            executor = self.manager.execute(pipeline, **kw_args)

                        print("rpn triggering %s" % pipeline)
                        print("execution_id: %s" % executor.getExecutionId())
                        notification.addTrigger(subscriber.getPipeline())
                        self.saveObject(notification)
                    else:
                        print("no pipeline associated to notification")

                return notification
            else:
                raise NotificationNotRegistered(label)
        else:
            raise NotificationNotRegistered(label)

In [40]:
%%deploy /orch/orchestrator/AbstractRemoteProcedureNotificationSubscriber.py
import base64
import dill
import json

# AbstractRemoteProcedureNotificationSubscriber

class AbstractRemoteProcedureNotificationSubscriber: 
    
    def getPipeline(self):
        #TODO: get the pipeline from manager
        return self.pipeline_name
    
    def __repr__(self):
        return "<RemoteProcedureNotificationSubscriber(id=%s, label='%s', owner='%s', uuid='%s', creation=%s, pipeline='%s'>" % (
            str(self.id), self.label, self.owner_id, self.uuid, self.creation, self.pipeline_name
        )

    def asJson(self):
        return {
            "id"            : self.id,
            "label"         : self.label,
            "owner_id"      : self.owner_id,
            "uuid"          : self.uuid,
            "creation"      : self.creation,
            "pipeline_name" : self.pipeline_name
        }
        
    @classmethod
    def fromJson(cls, json_obj):
        obj = cls()
        obj.id            = json_obj["id"]
        obj.label         = json_obj["label"]
        obj.owner_id      = json_obj["owner_id"]
        obj.creation      = json_obj["creation"]
        obj.uuid          = json_obj["uuid"]
        obj.pipeline_name = json_obj["pipeline_name"]
        
        return obj


In [41]:
%%deploy /orch/orchestrator/RemoteProcedureNotificationSubscriber.py
import sqlalchemy as sal
from sqlalchemy import create_engine, and_
from sqlalchemy.sql import func
from uuid import uuid4
import base64
import dill

# Remote Procedure Notification Subscriber
from sqlalchemy.ext.declarative import declarative_base
from .AbstractRemoteProcedureNotificationSubscriber import *
Base = declarative_base()

class RemoteProcedureNotificationSubscriber(AbstractRemoteProcedureNotificationSubscriber, Base):
    __tablename__ = 'rpn_subscriber'
    
    id            = sal.Column('id', sal.Integer, primary_key=True, nullable=False)
    label         = sal.Column('label', sal.String)
    owner_id      = sal.Column('owner_id', sal.String)
    uuid          = sal.Column('uuid', sal.String)
    creation      = sal.Column('creation', sal.DateTime(timezone=True),server_default=func.now())
    pipeline_name = sal.Column('pipeline_name', sal.String)
    
    def serialize(self):            
        obj = self.asJson()
        # dict is serialized as b64 dill
        sobj = { "uuid": self.uuid, "sobj": base64.b64encode(dill.dumps(obj)).decode("utf8") }

        return sobj


In [42]:
%%deploy /orch/orchestrator/OrchestratorAccess.py
class OrchestratorAccess(object):
    
    class ExecutionInfo(object):
        def __init__(self,notify_execution=False,notification_target=None,notification_show="all",vars={},report=None):
            self.notify_execution    = notify_execution
            self.notification_target = notification_target
            self.notification_show   = notification_show
            self.vars                = vars
            self.report              = report
                
        def __reduce_ex__(self,protocol):
            return OrchestratorAccess.ExecutionInfo,(self.notify_execution,self.notification_target,self.notification_show, self.vars, self.report)
    
    def __init__(self, manager, pipeline):
        self.manager          = manager
        self.pipeline         = pipeline
        self.exec_info        = OrchestratorAccess.ExecutionInfo()

    def getException(self):
        if self.pipeline is not None:
            args = self.pipeline.getArguments()
            print("args:",args)
            if len(args)>0:
                if issubclass(type(args[0]),Exception):
                    return args[0]
        else:
            print("pipeline none in orch access")

        return None

    def getCredential(self, label):
        token = self.manager.getToken(label=label, whom=self.pipeline.name)
        return self.manager.getRegisterCredential(label, token, whom=self.pipeline.name)

    def actionPerformed(self, evt):
        return self.manager.actionPerformed(evt)
            
    def getOwner(self):
        return self.manager.getOwner()
    
    def getPipelines(self, name, **kw_args):
        return self.manager.getPipelines(name, **kw_args)
    
    def isPipelineRegistered(self, name, **kw_args):
        return self.manager.isPipelineRegistered(name, **kw_args)
    
    def getActivePipeline(self, name):
        return self.manager.getActivePipeline(name)
    
    def activatePipeline(self, pipeline):
        return self.manager.activatePipeline(pipeline)

    def deactivatePipeline(self, pipeline):
        return self.manager.deactivatePipeline(pipeline)
    
    def deactivateAll(self, pipeline):
        return self.manager.deactivateAll(pipeline)

    def execute(self, pipeline, *args, **kw_args):
        return self.manager.execute(pipeline, *args, **kw_args)

    def createExecutor(self, pipeline, local=True, cores=1):
        return self.manager.createExecutor(pipeline, local, cores)
 
    def getExecutionList(self, name, **kw_args):
        return self.manager.getExecutionList(name, **kw_args)
    
    def getExecution(self, exec_id):
        return self.manager.getExecution(exec_id)
    
    def getLastExecution(self, name):
        return self.manager.getLastExecution(name)

    def getScheduledExecutions(self, pipeline, **kw_args):
        return self.manager.getScheduledExecutions(pipeline, **kw_args)

    def getScheduledExecutionById(self, scheduled_event_id):
        return self.manager.getScheduledExecutionById(scheduled_event_id)

    def getLastNotification(self, label):
        return self.manager.getLastNotification(label)
        
    def getNotificationList(self, label):
        return self.manager.getNotificationList(label)

    def getSubscribedPipelines(self, label):
        return self.manager.getSubscribedPipelines(label)

    def getSubscriptionsByPipeline(self, pipeline_name):
        return self.manager.getSubscriptionsByPipeline(pipeline_name)
    
    def credentialExpiration(self, label):   
        return self.manager.credentialExpiration(label)
                    
    def getCredentialExpirationDate(self, label):
        return self.manager.getCredentialExpirationDate(label)
                        
    def getTokenExpirationDate(self, label):
        return self.manager.getTokenExpirationDate(label, self.pipeline.name)
    
    def tokenExpiration(self, label):
        return self.manager.tokenExpiration(label, self.pipeline.name)
    
    def getPublicKey(self):
        return self.manager.getPublicKey()

    def getAssignedToken(self, label, whom):
        return self.manager.getAssignedToken(label, whom)

    def checkProcessExpiration(self, process_name):
        return self.manager.checkProcessExpiration(process_name)

    def putInPersistentDict(self, dict_name, key, value):
        return self.manager.putInPersistentDict(dict_name, key, value)
    
    def getFromPersistentDict(self, dict_name, key):
        return self.manager.getFromPersistentDict(dict_name, key)
    
    def notifyExecution(self, target, show="all"):
        self.exec_info.notify_execution = True
        self.exec_info.notification_target = target
        self.exec_info.notification_show = show
    
    def createNotifycation(self, label, data={}):
        return self.manager.createNotification(label,data=data)
        
    def addVariable(self,key,value):
        self.exec_info.vars[key] = value
        
    def getVariable(self,key):
        if key in self.exec_info.vars:
            return self.exec_info.vars[key]
        
    def setReport(self, report):
        self.exec_info.report = report

In [43]:
%%deploy /orch/orchestrator/OrchCredentialManager.py


from credentialmanager.CredentialManager import CredentialManager
from credentialmanager.KeyChain import KeyChain
from credentialmanager.EncryptionKey import *
from credentialmanager.Credential import Credential
from ..loggers import BasicLogger
import jsonpickle


class OrchCredentialManager:
    def __init__(self, db_conn_str="sqlite:///orchestrator.sqlite"):
        self.orchkey = None
        self.db_conn_str = db_conn_str
        self.key_manager = None
        self.credential_manager = None
        self.logger = BasicLogger("OrchCredentialManager")
        # starting the keychain
        if self.key_manager is None:
            try:
                print("checking keychain...")
                self.key_manager = KeyChain(self.db_conn_str)
            except Exception as e:
                print("%s, instance could not be generated..."%(e))     
        # orchestrator validation
        try:
            self.orchkey = EncryptionKey.load("orchestrator")
            print("recovered master key for the orchestrator")
            status_key = "ok"

        except LocalKeyNotFound as e:
            try:
                print("Master key don't found, generating keys...")
                self.orchkey = EncryptionKey("orchestrator")
                self.orchkey.save()
                status_key = "ok"
                print("successfully created master key")
            except Exception as e:
                status_key = "error"
                print(e)
        except Exception as e:
            status_key = "error"
            raise RuntimeError("instance could not be generated...")
        # start the credential manager
        if status_key == "ok":
            if self.credential_manager is None:
                try:
                    print("checking credential manager...")
                    self.credential_manager = CredentialManager(self.orchkey, self.db_conn_str)
                except Exception as e:
                    print("%s, instance could not be generated..."%(e))
            
    def putKey(self, key, passphrase=None):
        """Store public key on keychain"""
        try:
            self.key_manager.store(key, passphrase)
        except Exception as e:
            raise e
    
        
    def getKey(self, label, passphrase=None):
        """ get key form keychain """
        key = None
        try:
            key = self.key_manager.retrieve(label, passphrase)
            key.__class__ = EncryptionKey
        except Exception as e:
            raise e
        return key
        
        
    def getKeyList(self, active=True):
        """list of keys stored on database"""
        try:
            return self.key_manager.getKeyList(active)
        except Exception as e:
            raise e
        
    
    def keyExpiration(self, key):  
        """Expiration date of a key is verified"""
        try:
            return self.key_manager.checkKeyExpiration(key)
        except Exception as e:
            return False
    
    
    def setKeyExpiration(self, key, date):
        """Set expiration date of a key"""
        try:
            self.credential_manager.setKeyExpiration(key, date)
        except Exception as e:
            raise e
            
            
    def getKeyExpirationDate(self, key):
        """Get expiration date of a key"""
        date = None
        try:
            date = self.key_manager.getKeyExpirationDate(key)
        except Exception as e:
            raise e
        return date
            
            
    def getCredentialList(self, active=True):
        """list of credential stored on database"""
        try:
            return self.credential_manager.getCredentialList(active)
        except Exception as e:
            raise e
            
            
    def credentialExpiration(self, label):   
        """Expiration date of credential is verified"""
        try:
            return self.credential_manager.checkCredentialExpiration(label)
        except Exception as e:
            return False
            
            
    def setCredentialExpiration(self, label, date):
        """Set expiration date of a credential"""
        try:
            self.credential_manager.setCredentialExpiration(label, date)
        except Exception as e:
            raise e
    
    
    def getCredentialExpirationDate(self, label):
        """Get expiration date of a credential"""
        date = None
        try:
            date = self.credential_manager.getCredentialExpirationDate(label)
        except Exception as e:
            raise e
        return date
        
        
    def signCredential(self, credential, key=None):
        """Sing Credential of current instance and key"""
        if key is None:
            key = self.orchkey
        self.credential_manager.signCredential(credential, key)
        print("Credential has been signed")
        
    
    def putCredential(self, credential, n_unlock=2, shared_users=4):
        """Register pipelines credentials"""
        try:
            self.credential_manager.store(credential, n_unlock, shared_users)
        except CredentialAlreadyExists:
            print("%s already exists"%(credential.getLabel()))
        except Exception as e:
            raise e
        
    
    def getCredential(self, label, decrypt=True, token=None, whom=None):
        """credential is retrieved from credential manager"""
        credential= None
        try:
            credential= self.credential_manager.retrieve(label, decrypt, token, whom)
            credential.__class__ = Credential
            print("Credential has been retrieve")
        except Exception as e:
            raise e
        return credential
    
    
    def verifyCredential(self, credential):
        """Verify Credential of current instance and key"""
        try:
            return self.credential_manager.verifyCredential(credential, self.orchkey)
        except Exception as e:
            raise e
        
    def encryptCredential(self, credential, recipient_key):
        """Encrypt credential with recipient key"""
        credential_encrypted = self.credential_manager.encryptCredential(credential, recipient_key)
        credential_encrypted.__class__ = Credential
        return credential_encrypted
    
    def createToken(self, credential, n_unlock=2, shared_users=4):
        """Decrypt credential encoded with Shamir"""
        try:
            return self.credential_manager.createToken(credential, n_unlock, shared_users)
        except Exception as e:
            raise e
        
        
    def putToken(self, credential, token_list):
        """Stores generated token for credential on database"""
        try:
            return self.credential_manager.storeToken(credential, token_list)    
        except Exception as e:
            raise e
        
    def registerToken(self, token_list):
        """register of retrieved tokens (strings) in the orchestrator"""
        try:
            for token in token_list:
                b64_token = base64.b64decode(token)
                token_decode = b64_token.decode('ascii')                             
                token_object = jsonpickle.loads(token_decode)
                token_object= token_object[0]
                credential = self.getCredential(token_object.getLabel(), decrypt=False, token=None, whom=None)
                self.credential_manager.storeReceivedToken(credential, token)
        except Exception as e:
            raise e
    
    def getToken(self, label=None, whom=None, active=True):
        """Retrieve token stored on database for credential"""
        try:
            return self.credential_manager.retrieveTokens(label, whom, active)
        except Exception as e:
            raise e
    
    
    def assignToken(self, whom, label, exp_date, comment=""):
        """ Assign inactive tokens to a process """
        try:
            self.credential_manager.assignToken(whom, label, exp_date, comment)
        except Exception as e:
            raise e
    
    
    def getAssignedToken(self, label=None, whom=None):
        """ Retrieve assigned tokens to share via string """
        try:
            return self.credential_manager.getAssignedToken(label, whom)
        except Exception as e:
            raise e
    
    
    def signToken(self, label, whom):
        """Sign credential with key from credential manager"""
        try:
            return self.credential_manager.signToken(label, whom)
        except Exception as e:
            raise e
    
    
    def verifyToken(self, token):
        """
        Check if signed token has been modified
        """
        try:
            return self.credential_manager.verifyToken(token)
        except Exception as e:
            raise e 
    
    
    def getTokenList(self, active=False):
        """Get list of tokens form vault"""
        try:
            return self.credential_manager.getTokenList(active)
        except Exception as e:
            raise e
    
    
    def getTokenExpirationDate(self, label, whom):
        """Get Date expiration on the related token"""
        try:
            return self.credential_manager.getTokenExpirationDate(label, whom)
        except Exception as e:
            raise e
    
    
    def tokenExpiration(self, label, whom):
        """Check expiration of the token related to the user"""
        try:
            return self.credential_manager.checkTokenExpiration(label, whom)
        except Exception as e:
            return False
        
        
    def getPublicKey(self):
        """Public key of the orchestrator is obtained"""
        try:
            return EncryptionKey.importPublicKey(self.orchkey.exportPublicKey())
        except Exception as e:
            raise e
            
    def getRegisterCredential(self, label, token=None, whom=None):
        """obtains the credential registered with the orchestrator"""
        token = self.getToken(label=label, whom=whom, active=True)
        credential = self.getCredential(label=label,decrypt=True,token=token, whom=whom)
        return credential
    
    def checkProcessExpiration(self, process_name):
        """Verifies the expiration of credentials and tokens of a pipeline"""
        flag_credential_expired = False
        flag_token_expired = False
        expired_credential=[]
        expired_token=[]
        whom = process_name
        try:
            token_list=self.getToken(label=None, whom=whom, active=True)
            for token in token_list:
                label = token.getLabel()
                status_credential= self.credentialExpiration(label)
                status_token= self.tokenExpiration(label, whom)
                if status_credential is False:
                    flag_credential_expired = True
                    expired_credential.append(label)
                if status_token is False:
                    flag_token_expired = True
                    expired_token.append(label)
            if flag_credential_expired is True and flag_token_expired is False:
                self.logger.info("credentials with label:%s has been expired" % (expired_credential))
                return False
            elif flag_credential_expired is False and flag_token_expired is True:
                self.logger.info("Tokens with label:%s for process: %s has been expired" % (expired_token, whom))
                return False
            elif flag_credential_expired is True and flag_token_expired is True:
                self.logger.info("credentials with label:%s and Tokens with label:%s for process:%s has been expired"\
                    % (expired_credential, expired_token, whom))
                return False
            elif flag_credential_expired is False and flag_token_expired is False:
                 return True
        except TokenNotFound as e:
            self.logger.info("there is no registered token for this pipeline")
            return "TokenNotFound"
        except Exception as e:
            raise e


In [44]:
%%deploy /orch/orchestrator/OrchestratorManager.py
from datetime import datetime
import getpass
import json
import os
import dill
import base64

from ..base import ActionListener
from ..scheduler import SchedulerManager, ScheduledEvent
from ..scheduler.Events import *
from ..pipelinemanager.Events import *
from ..pipelinemanager import PipelineManager
from ..exceptions import MultipleActivePipelineRegistered, NoActivePipelineRegistered
from ..exceptions import NoSuchKeyInDictionary, NoSuchDictionary
from .OrchCredentialManager import OrchCredentialManager
from .RemoteProcedureNotificationManager import RemoteProcedureNotificationManager
from ..base import PersistentDict

class OrchestratorManager(ActionListener):
    def __init__(self,db_conn_str="sqlite:///orchestrator.sqlite", smtp_crd=None):
        self.owner_id = getpass.getuser()
        self.smtp_crd = smtp_crd
        self.schm = SchedulerManager(self,db_conn_str=db_conn_str)
        self.pm   = PipelineManager(self,db_conn_str=db_conn_str)

        #self.ocm  = OrchCredentialManager(db_conn_str=db_conn_str)
        # use the default vaults for credential manager
        self.ocm  = OrchCredentialManager(db_conn_str = "sqlite:///./.credentials/credentialVault.sqlite")

        self.rpnm = RemoteProcedureNotificationManager(self,db_conn_str=db_conn_str)
        
        self.schm.addActionListener(self)
        self.pm.addActionListener(self)
        self.rpnm.addActionListener(self)
        self.db_conn_str = db_conn_str

        self.persistent_dict_list = {}
        
    def actionPerformed(self, evt):
        
        if isinstance(evt, ExecutePipeline):
            # trigger pipeline from SchedulerManager
            print("OrchestratorManager: trigger", evt)
            try:
                pipeline = self.getActivePipeline(evt.pipeline)
                
                print("executing %s with args: %s kw_args: %s" % (pipeline, evt.args, evt.kw_args))
                # include the scheduled_event_uuid in the pipeline instance 
                # in order ot allow executor to assocaite the execution to the 
                # scheduled event who triggered the execution
                pipeline.scheduled_event_uuid = evt.sch_evt_uuid
                self.execute(pipeline, *evt.args, **evt.kw_args)
                
            except Exception as e:
                print("Error:",e)
        elif isinstance(evt, ExecutionStarted):
            # pipeline manager informing the execution has begun
            print("execution of pipeline %s started" % evt.pipeline_name)
        elif isinstance(evt, ExecutionFinished):
            # pipeline manager informing the execution has finished
            print("execution of pipeline %s finished" % evt.pipeline_name)
            
            pipeline_vars   = evt.pipeline.getVariables()
            execution_state = evt.pipeline.state
            pipeline_result = evt.pipeline.result.decode("utf-8") 
           
            result_value    = dill.loads(base64.b64decode(pipeline_result))
            event_type = "finished"
            if issubclass(type(result_value),Exception) or execution_state==5:
                event_type = "failed"
            elif execution_state==6:
                event_type = "cancelled"

            pipe_notif_label = "EP_%s" % evt.pipeline_name
            pipe_exec_data   = {"event":event_type, "pipeline":evt.pipeline_name, "variables":pipeline_vars,"state": execution_state, "result":pipeline_result}
            # trigger RPN for allowing chaining
            self.createNotification(pipe_notif_label,data=pipe_exec_data)
            
        else:
            print("actionEvent",evt)
            
    def getOwner(self):
        return self.owner_id
    
    def register(self, label, pipeline_fn, token_list=None, new_version=False):
        if new_version is False or new_version is None:
            if token_list is not None:
                self.ocm.registerToken(token_list)
        return self.pm.register(label,pipeline_fn,new_version)
    
    def getPipelines(self,name, **kw_args):
        return self.pm.get(name,**kw_args)
    
    def isPipelineRegistered(self,name,**kw_args):
        return self.pm.isRegistered(name,**kw_args)
    
    def getActivePipeline(self, name):
        pipelines = self.pm.get(name, active = True)
        if len(pipelines)==1:
            return pipelines[0]
        elif len(pipelines)>1:
            raise MultipleActivePipelineRegistered(name)
        else:
            raise NoActivePipelineRegistered(name)
    
    def activatePipeline(self, pipeline):
        return self.pm.activate(pipeline)

    def deactivatePipeline(self, pipeline):
        return self.pm.deactivate(pipeline)
    
    def deactivateAll(self, pipeline):
        return self.pm.deactivateAll(pipeline)

    def execute(self, pipeline, *args, **kw_args):
        # verify expiration of credentials and tokens associated with pipeline
        status = self.ocm.checkProcessExpiration(pipeline.name)
        if status is False:
            raise Exception('Credentials or tokens for pipeline has been expired')
        return self.pm.execute(pipeline, *args, **kw_args)

    def createExecutor(self, pipeline, *args, **kw_args):
        return self.pm.createExecutor(pipeline, *args, **kw_args)   
 
    def getExecutionList(self, name, **kw_args):
        return self.pm.get_execution_list(name, **kw_args)

    def getExecutionsBy(self, where):
        return self.pm.get_executions_by(where)
        
    def getRunningExecutions(self):
        return self.pm.get_running_executions()
        
    def getExecution(self, exec_id):
        return self.pm.get_execution(exec_id)
    
    def cancelExecution(self, exec_id):
        return self.pm.cancel_execution(exec_id)
    
    def getLastExecution(self, name):
        exec_list = self.getExecutionList(name)
        if len(exec_list)>0:
            last_exec_id = exec_list[-1].getExecutionId()            
            return self.getExecution(last_exec_id)
        return None
    
    def scheduleAt(self, pipeline, label = None, trigger_time=datetime.now().strftime("%H:%M:%S"), recurrency=None, tags=[]):
        return self.schm.scheduleAt(pipeline, label, trigger_time, recurrency, tags)
    
    def cancelScheduledExecution(self, scheduled_event):
        if isinstance(scheduled_event, ScheduledEvent):
            return self.schm.cancelEvent(scheduled_event.uuid)
        return None
    
    def getScheduledExecutions(self, pipeline, **kw_args):
        return self.schm.getScheduledEvents(pipeline,**kw_args)

    def getScheduledExecutionById(self, scheduled_event_id):
        return self.schm.getScheduledEventById(scheduled_event_id)

    def createNotification(self,label, data={}):
        return self.rpnm.createNotification(label,data=data)
        
    def getLastNotification(self,label):
        return self.rpnm.getLastNotification(label)
        
    def getNotificationList(self,label):
        return self.rpnm.getNotifications(label)

    def subscribePipelineNotification(self, label, pipeline_name):

        try:
            pipeline = self.getActivePipeline(pipeline_name)
            if pipeline is not None:
                return self.rpnm.subscribePipeline(label, pipeline.name)

            raise NoActivePipelineRegistered(pipeline_name)
        except Exception as e:
            raise e

    def unsubscribePipelineNotification(self, label,pipeline_name):

        subscriptions = self.rpnm.getSubscribedPipelines(label)
        for s in subscriptions:
            if s.pipeline_name == pipeline_name:
                return self.rpnm.unsubscribrePipeline(s.uuid)

        return False

    def getSubscribedPipelines(self, label):
        return self.rpnm.getSubscribedPipelines(label)

    def getSubscriptionsByPipeline(self, pipeline_name):
        return self.rpnm.getSubscriptionsByPipeline(pipeline_name)

    def stop(self):
        print("stopping OrchestratorManager")
        self.schm.stop()
        
    def putKey(self, key, passphrase=None):
        self.ocm.putKey(key, passphrase)
        
    def getKey(self, label, passphrase=None):
        return self.ocm.getKey(label, passphrase)
        
    def getKeyList(self, active=True):
        return self.ocm.getKeyList(active)
    
    def keyExpiration(self, key):
        return self.ocm.keyExpiration(key)
        
    def setKeyExpiration(self, key, date):   
        self.ocm.setKeyExpiration(key, date)
        
    def getKeyExpirationDate(self, key):
        return self.ocm.getKeyExpirationDate(key)
    
    def getCredentialList(self, active=True):
        return self.ocm.getCredentialList(active)
    
    def credentialExpiration(self, label):   
        return self.ocm.credentialExpiration(label)
            
    def setCredentialExpiration(self, label, date):
        self.ocm.setCredentialExpiration(label, date)
        
    def getCredentialExpirationDate(self, label):
        return self.ocm.getCredentialExpirationDate(label)
        
    def signCredential(self, credential, key=None):
        self.ocm.signCredential(credential, key)  
    
    def putCredential(self, credential, n_unlock=2, shared_users=4):
        self.ocm.putCredential(credential, n_unlock, shared_users)
        
    def getCredential(self, label, decrypt=True, token=None, whom=None):
        return self.ocm.getCredential(label, decrypt, token, whom)
    
    def verifyCredential(self, credential):
        return self.ocm.verifyCredential(credential)
        
    def encryptCredential(self, credential, recipient_key):
        return self.ocm.encryptCredential(credential, recipient_key)
    
    def createToken(self, credential, min_unlock=2, shared_users=4):
        return self.ocm.createToken(credential, min_unlock, shared_users)

    def putToken(self, credential, token_list):
        self.ocm.putToken(credential, token_list)
        
    def registerToken(self, token_list):
        self.ocm.registerToken(token_list)

    def getToken(self, label=None, whom=None, active=True):
        return self.ocm.getToken(label, whom, active)
    
    def assignToken(self, whom, label, exp_date, comment=""):
        return self.ocm.assignToken(whom, label, exp_date, comment)
    
    def getAssignedToken(self, label, whom):
        return self.ocm.getAssignedToken(label, whom)
    
    def signToken(self, label, whom):
        return self.ocm.signToken(label, whom)
        
    def verifyToken(self, token):
        return self.ocm.verifyToken(token)
    
    def getTokenList(self, active=False):
        return self.ocm.getTokenList(active)
    
    def getTokenExpirationDate(self, label, whom):
        return self.ocm.getTokenExpirationDate(label, whom)
    
    def tokenExpiration(self, label, whom=None):
        return self.ocm.tokenExpiration(label, whom)
    
    def getPublicKey(self):
        return self.ocm.getPublicKey()
    
    def getRegisterCredential(self, label, token=None, whom=None):
        return self.ocm.getRegisterCredential(label, token, whom)
    
    def checkProcessExpiration(self, process_name):
        return self.ocm.checkProcessExpiration(process_name)

    def putInPersistentDict(self, dict_name, key, value):
        if dict_name not in self.persistent_dict_list:
            self.persistent_dict_list[dict_name] = PersistentDict(storage_file="./.persistentdict/%s.dbm" % dict_name)

        try:
            pers_dict = self.persistent_dict_list[dict_name]
            pers_dict.put(key,value)
            return True

        except Exception as e:
            raise e

        return False
    
    def getFromPersistentDict(self, dict_name, key):
        pdict = None
        if not os.path.exists("./.persistentdict/%s.dbm" % dict_name):
            raise NoSuchDictionary(dict_name)

        if dict_name not in self.persistent_dict_list:
            self.persistent_dict_list[dict_name] = PersistentDict(storage_file="./.persistentdict/%s.dbm" % dict_name)

        pdict = self.persistent_dict_list[dict_name]

        try:
            if not pdict.exists(key):
                raise NoSuchKeyInDictionary(key)
            value = pdict.get(key)

            return value
        except Exception as e:
            raise e
    
    def notifyExecution(self, target):
        print("notifyExecution does not work when executing directly the pipeline function")


In [45]:
%%deploy /orch/orchestrator/OrchestratorService.py
from flask import request, jsonify
from multiprocessing import Process, Queue, Manager
from datetime import date, datetime, timezone
from tzlocal import get_localzone
import jsonpickle
import inspect
import time
import dill
import base64
import ast
import json
from datetime import datetime

from ..base import AbstractApiService
from ..loggers import BasicLogger
from ..scheduler import ScheduledEvent
from ..exceptions import MultipleActivePipelineRegistered, NoActivePipelineRegistered,PipelineExecutionError
from ..exceptions import NoSuchKeyInDictionary, NoSuchDictionary
from .OrchestratorManager import OrchestratorManager
from .OrchestratorAccess import OrchestratorAccess

from credentialmanager.exceptions import *

class OrchestratorService(AbstractApiService):
    def __init__(self,address="127.0.0.1", port=8020, db_conn_str="sqlite:///orchestrator.sqlite", smtp_crd=None):
        super().__init__("OrchestratorService", bind_addr=address, bind_port=port)
        self.smtp_crd = smtp_crd
        self.addRule("/","status",self.status)
        self.addRule("/stop","stop",self.stop)
        self.addRule("/register","register",self.register, methods=["POST"])
        self.addRule("/get","get",self.get, methods=["POST"])
        self.addRule("/activate","activate",self.activate, methods=["POST"])
        self.addRule("/deactivate","deactivate",self.deactivate, methods=["POST"])

        # execution
        self.addRule("/execute","execute",self.execute, methods=["POST"])
        self.addRule("/execution/running","running_executions",self.get_running_executions, methods=["GET"])
        self.addRule("/execution/getby","get_executions_by",self.get_executions_by, methods=["POST"])
        self.addRule("/execution/<exec_id>/status","execution_status",self.execution_status, methods=["GET"])
        self.addRule("/execution/<exec_id>/output","execution_output",self.get_execution_output, methods=["GET"])
        self.addRule("/execution/<exec_id>/get","get_execution",self.get_execution, methods=["GET"])
        self.addRule("/execution/<exec_id>/cancel","cancel_execution",self.cancel_execution, methods=["GET"])
        self.addRule("/execution/<pipeline_name>/list","get_execution_list",self.get_execution_list, methods=["GET"])
        self.addRule("/execution/<pipeline_name>/last","get_last_execution",self.get_last_execution, methods=["GET"])
        self.addRule("/execution/<pipeline_name>/scheduled","get_scheduled_executions",self.get_scheduled_executions, methods=["GET"])

        # scheduling
        self.addRule("/schedule/<scheduled_execution_id>/cancel","cancel_scheduled_execution",self.cancel_scheduled_execution, methods=["GET"])
        self.addRule("/schedule/<scheduled_execution_id>/get","get_scheduled_execution_by_id",self.get_scheduled_execution_by_id, methods=["GET"])
        self.addRule("/scheduleAt","scheduleAt",self.scheduleAt, methods=["POST"])
        
        # remote procedure notiications
        
        self.addRule("/rpn/<label>/notify","create_notification",self.create_notification, methods=["GET","POST"])
        self.addRule("/rpn/<label>/last","get_last_notification",self.get_last_notification, methods=["GET"])
        self.addRule("/rpn/<label>/list","get_notification_list",self.get_notification_list, methods=["GET"])
        self.addRule("/rpn/<label>/subscribe","subscribe_notification_pipeline",self.subscribe_notification_pipeline, methods=["POST"])
        self.addRule("/rpn/<label>/unsubscribe","unsubscribe_notification_pipeline",self.unsubscribe_notification_pipeline,methods=["POST"])
        self.addRule("/rpn/<label>/subscribed","get_subscribed_pipelines",self.get_subscribed_pipelines, methods=["GET"])
        
        # credential manager
        self.addRule("/putKey","put_key",self.put_key, methods=["POST"])
        self.addRule("/putCredential","put_credential",self.put_credential, methods=["POST"])
        self.addRule("/putToken","put_token",self.put_token, methods=["POST"])
        self.addRule("/signCredential","sign_credential",self.sign_credential, methods=["POST"])
        self.addRule("/verifyCredential","verify_credential",self.verify_credential, methods=["POST"])
        self.addRule("/keyExpiration","key_expiration",self.key_expiration, methods=["POST"])
        self.addRule("/keyExpirationDate","get_key_expiration_date",self.get_key_expiration_date, methods=["POST"])
        self.addRule("/encryptCredential","encrypt_credential",self.encrypt_credential, methods=["POST"])
        self.addRule("/createToken","create_token",self.create_token, methods=["POST"])
        self.addRule("/key/<label>/<passphrase>/getKey","get_key",self.get_key, methods=["GET"])
        self.addRule("/key/getPublicKey","get_public_key",self.get_public_key, methods=["GET"])
        self.addRule("/KeyList/<active>","get_key_list",self.get_key_list, methods=["GET"])
        self.addRule("/credentialList/<active>","get_credential_list",self.get_credential_list, methods=["GET"])
        self.addRule("/getCredential","get_credential",self.get_credential, methods=["POST"])
        self.addRule("/credential/<label>/<whom>/<active>/getToken","get_token",self.get_token, methods=["GET"])
        self.addRule("/credential/<label>/credentialExpiration","credential_expiration",self.credential_expiration, methods=["GET"])
        self.addRule("/credential/<label>/credentialExpirationDate","get_credential_expiration_date",self.get_credential_expiration_date, methods=["GET"])
        self.addRule("/credential/<label>/<token>/<whom>/getRegisterCredential","get_register_credential",self.get_register_credential, methods=["GET"])
        self.addRule("/assignToken","assign_token",self.assign_token, methods=["POST"])
        self.addRule("/token/<label>/<whom>/getAssignedToken","get_assigned_token",self.get_assigned_token, methods=["GET"])
        self.addRule("/signToken","sign_token",self.sign_token, methods=["POST"])
        self.addRule("/verifyToken","verify_token",self.verify_token, methods=["POST"])
        self.addRule("/tokenList/<active>","get_token_list",self.get_token_list, methods=["GET"])
        self.addRule("/token/<label>/<whom>/tokenExpirationDate","get_token_expiration_date",self.get_token_expiration_date, methods=["GET"])
        self.addRule("/token/<label>/<whom>/tokenExpiration","token_expiration",self.token_expiration, methods=["GET"])
        
        # Persistent Dict
        self.addRule("/pd/<dict_name>/put","pd_put",self.put_persistent_dict, methods=["POST"])
        self.addRule("/pd/<dict_name>/get/<key>","pd_get",self.get_persistent_dict, methods=["GET"])
        self.addRule("/pd/<dict_name>/get/<key>/asJson","pd_get_as_json",self.get_persistent_dict_as_json, methods=["GET"])

        self.manager = OrchestratorManager(db_conn_str=db_conn_str, smtp_crd=self.smtp_crd)
        
        self.setLogger(BasicLogger("OrchestratorService"))
        
    def status(self):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        return {
            "code"   : 200,
            "status" : "running",
            "ts"     : now.isoformat()
        }
    
    def register(self):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        json_input = request.json
        
        name         = json_input["name"]
        pipeline_fn  = dill.loads(base64.b64decode(json_input["pipeline"].encode("utf8")))
        new_version  = json_input["new_version"]
        token_list   = ast.literal_eval(json_input["token_list"])

        if not new_version:
            if not self.manager.isPipelineRegistered(name):
                self.logger.info("registering first version for %s" % name)
                try:
                    #TODO: handle the owner in the api
                    new_pipeline = self.manager.register(name,pipeline_fn,token_list)

                    return {
                        "code"   : 200,
                        "status" : "registered",
                        "ts"     : now.isoformat()
                    }
                except Exception as e:
                    self.logger.info("error registering %s: %s" % (name,e))
                    return {
                        "code"   : 302,
                        "status" : "Could not register pipeline",
                        "error"  : "%s" % e,
                        "ts"     : now.isoformat()
                    }

            self.logger.info("already registered %s" % name)
            return {
                "code"   : 301,
                "status" : "Already Registered",
                "ts"     : now.isoformat()
            }
        else:
            if self.manager.isPipelineRegistered(name):
                self.logger.info("registering new version for %s" % name)
                try:
                    #TODO: handle the owner in the api
                    new_pipeline = self.manager.register(name,pipeline_fn,token_list,new_version = True)

                    return {
                        "code"   : 200,
                        "status" : "new version registered: %f" % new_pipeline.version,
                        "ts"     : now.isoformat()
                    }
                except Exception as e:
                    self.logger.info("error registering %s: %s" % (name,e))
                    return {
                        "code"   : 302,
                        "status" : "Could not register pipeline",
                        "error"  : "%s" % e,
                        "ts"     : now.isoformat()
                    }

            self.logger.info("no previous versions for pipeline %s found" % name)
            return {
                "code"   : 301,
                "status" : "No previous version found for pipeline %s" % name,
                "ts"     : now.isoformat()
            }

    def get(self): 
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        json_input = request.json
                
        name        = json_input["name"]
        
        if self.manager.isPipelineRegistered(name):
            self.logger.info("getting %s" % name)
            try:
                pipelines = self.manager.getPipelines(**json_input)
                return {
                    "code"     : 200,
                    "status"   : "ok",
                    "pipeline" : [p.asJson() for p in pipelines],
                    "ts"       : now.isoformat()
                }
            except Exception as e:
                self.logger.info("error getting %s: %s" % (name,e))
                return {
                    "code"   : 302,
                    "status" : "%s" % e,
                    "ts"     : now.isoformat()
                }
        
        self.logger.info("pipeline not registered %s" % name)
        return {
            "code"   : 301,
            "status" : "Pipeline Not Registered",
            "ts"     : now.isoformat()
        }
    
    def activate(self):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        json_input = request.json

        name        = json_input["name"]
        
        if self.manager.isPipelineRegistered(name):
            self.logger.info("getting %s" % name)
            try:
                pipelines = self.manager.getPipelines(**json_input)
                
                if len(pipelines)==1:
                    pipeline = pipelines[0]
                    
                    if pipeline.isActive():
                        return {
                            "code"     : 203,
                            "status"   : "already active",
                            "ts"       : now.isoformat()
                        }
                    
                    # deactivate all the pipelines with given name
                    self.manager.deactivateAll(pipeline)
                    
                    # activate the pipeline
                    if pipeline.setActive(True):
                        return {
                            "code"     : 202,
                            "status"   : "active",
                            "ts"       : now.isoformat()
                        }
                    else:
                        return {
                            "code"   : 306,
                            "status" : "could not activate pipeline %s" % name,
                            "ts"     : now.isoformat()
                        }
                        
                elif len(pipelines)>1:
                    # this should never happend (multiple active pipelines)
                    e = MultiplePipelineFound(name)
                    return {
                        "code"   : 305,
                        "status" : "%s:%s" % (type(e).__name__,e),
                        "ts"     : now.isoformat()
                    }
                else:
                    return {
                        "code"   : 307,
                        "status" : "pipeline not found or already active:" % name,
                        "ts"     : now.isoformat()
                    }
            except Exception as e:            
                self.logger.info("error getting %s: %s" % (name,e))
                return {
                    "code"   : 305,
                    "status" : "%s:%s" % (type(e).__name__,e),
                    "ts"     : now.isoformat()
                }
        
        self.logger.info("pipeline not registered %s" % label)
        return {
            "code"   : 301,
            "status" : "Pipeline Not Registered",
            "ts"     : now.isoformat()
        }

    def deactivate(self):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        json_input = request.json
                
        name        = json_input["name"]
        
        if self.manager.isPipelineRegistered(name):
            self.logger.info("getting %s" % name)
            try:
                pipelines = self.manager.getPipelines(**json_input, active = True)
                
                if len(pipelines)==1:
                    pipeline = pipelines[0]
                    
                    if not pipeline.isActive():
                        return {
                            "code"     : 203,
                            "status"   : "already not active",
                            "ts"       : now.isoformat()
                        }
                    # deactivate the pipeline
                    if pipeline.setActive(False):
                        return {
                            "code"     : 202,
                            "status"   : "not active",
                            "ts"       : now.isoformat()
                        }
                    else:
                        return {
                            "code"   : 308,
                            "status" : "could not deactivate pipeline %s" % name,
                            "ts"     : now.isoformat()
                        }
                        
                elif len(pipelines)>1:
                    e = MultipleActivePipelineRegistered(name)
                    return {
                        "code"   : 305,
                        "status" : "%s:%s" % (type(e).__name__,e),
                        "ts"     : now.isoformat()
                    }
                else:
                    return {
                        "code"   : 307,
                        "status" : "pipeline not found or already deactivated:" % name,
                        "ts"     : now.isoformat()
                    }
            except Exception as e:
                self.logger.info("error getting %s: %s" % (name,e))
                return {
                    "code"   : 305,
                    "status" : "%s:%s" % (type(e).__name__,e),
                    "ts"     : now.isoformat()
                }
        
        self.logger.info("pipeline not registered %s" % label)
        return {
            "code"   : 301,
            "status" : "Pipeline Not Registered",
            "ts"     : now.isoformat()
        }
    
    def execution_status(self, exec_id):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        print("%s : getting status for execution id %s" % (now, exec_id))
        
        executor = self.manager.getExecution(exec_id)
        
        if executor is not None:
            return {
                "code"         : 202,
                "status"       : "executor active",
                "state"        : executor.state,
                "start_ts"     : executor.start_ts,
                "end_ts"       : executor.end_ts,
                "exec_time"    : executor.exec_time,
                "state"        : executor.state,
                "return_value" : executor.pipeline_ret,
                "version"      : executor.version,
                "creation"     : executor.creation,
                "execution_id" : exec_id,
                "ts"           : now.isoformat()
            }
        else:
            return {
                "code"         : 310,
                "status"       : "Executor gone",
                "state"        : "unknown",
                "execution_id" : exec_id,
                "ts"           : now.isoformat()
            }
    
    def get_execution(self, exec_id):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        print("%s : getting execution id %s" % (now, exec_id))
        
        executor = self.manager.getExecution(exec_id)
        
        if executor is not None:
            return {
                "code"         : 202,
                "status"       : "ok",
                "executor"     : executor.serialize(),
                "execution_id" : exec_id,
                "ts"           : now.isoformat()
            }
        else:
            return {
                "code"         : 310,
                "status"       : "Executor does not exists",
                "executor"     : "",
                "execution_id" : exec_id,
                "ts"           : now.isoformat()
            }
 
    def get_execution_output(self, exec_id):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        print("%s : getting output for execution id %s" % (now, exec_id))
        
        executor = self.manager.getExecution(exec_id)
        
        if executor is not None:
            return {
                "code"         : 202,
                "status"       : "ok",
                "output"       : executor.getOutput(),
                "error"        : executor.getErrors(),
                "execution_id" : exec_id,
                "ts"           : now.isoformat()
            }
        else:
            return {
                "code"         : 310,
                "status"       : "Executor does not exists",
                "executor"     : "",
                "execution_id" : exec_id,
                "ts"           : now.isoformat()
            }
   
    def cancel_execution(self, exec_id):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        print("%s : cancelling execution id %s" % (now, exec_id))
        
        executor = self.manager.getExecution(exec_id)
        if executor is not None:
            if self.manager.cancelExecution(exec_id):
                executor = self.manager.getExecution(exec_id)
                return {
                    "code"         : 202,
                    "status"       : "ok",
                    "executor"     : executor.serialize(),
                    "execution_id" : exec_id,
                    "ts"           : now.isoformat()
                }
            else:
                executor = self.manager.getExecution(exec_id)
                return {
                    "code"         : 311,
                    "status"       : "Executor could not be cancelled",
                    "executor"     : executor.serialize(),
                    "execution_id" : exec_id,
                    "ts"           : now.isoformat()
                }
        else:
            return {
                "code"         : 310,
                "status"       : "No such Executor",
                "executor"     : "",
                "execution_id" : exec_id,
                "ts"           : now.isoformat()
            }
    
    def execute(self):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        json_input = request.json
        
        pipeline_args   = []
        pipeline_kwargs = {}
        
        name            = json_input["name"]
        version         = json_input["version"]

        local_job       = True
        cores           = 1
        partition       = None
        memory          = None

        if "asJob" in json_input:
            local_job = json_input["asJob"]

        if "cores" in json_input:
            cores = int(json_input["cores"])

        if "partition" in json_input:
            partition = int(json_input["partition"])

        if "memory" in json_input:
            memory = int(json_input["memory"])

        if "args" in json_input:
            pipeline_args   = jsonpickle.loads(json_input["args"])
        
        if "kwargs" in json_input:
            pipeline_kwargs = jsonpickle.loads(json_input["kwargs"])

        pipelines = []
        try:        
            pipelines = self.manager.getPipelines(name = name, version = version)
            
        except Exception as e:
            return {
                "code"        : 305,
                "status"      : "%s : %s" % (type(e).__name__, e),
                "ts"          : now.isoformat()
            }
        
        if len(pipelines)==1:
            pipeline = pipelines[0]
            try:
                executor = None
                if local_job:
                    executor = self.manager.execute(pipeline, *pipeline_args,**pipeline_kwargs)
                else:
                    executor = self.manager.createExecutor(pipeline, local = local_job, cores=cores, partition=partition, memory=memory)
                    executor.run(*pipeline_args,**pipeline_kwargs)

                return {
                    "code"         : 201,
                    "status"       : executor.state,
                    "execution_id" : executor.getExecutionId(),
                    "ts"           : now.isoformat()
                }
            except Exception as e:
                self.logger.info("execution failed:",e)
                return {
                    "code"        : 303,
                    "status"      : "Execution failed. %s" % e,
                    "ts"          : now.isoformat()
                }
        elif len(pipelines)>1:
            e = MultipleActivePipelineRegistered(name)
            return {
                "code"   : 305,
                "status" : "%s:%s" % (type(e).__name__,e),
                "ts"     : now.isoformat()
            }
        else:
            e = NoActivePipelineRegistered(name)
            return {
                "code"   : 306,
                "status" : "%s:%s" % (type(e).__name__,e),
                "ts"     : now.isoformat()
            }

    def scheduleAt(self):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        json_input = request.json

        name            = json_input["name"]
        label           = json_input["label"]
        trigger_time    = json_input["trigger_time"]
        recurrency      = json_input["recurrency"]
        tags            = json_input["tags"]
        pipeline_args   = jsonpickle.loads(json_input["args"])
        pipeline_kwargs = jsonpickle.loads(json_input["kwargs"])

        pipelines = []
        try:        
            pipelines = self.manager.getPipelines(name = name, active = True)
            
        except Exception as e:
            return {
                "code"        : 305,
                "status"      : "%s : %s" % (type(e).__name__, e),
                "ts"          : now.isoformat()
            }
        
        if len(pipelines)==1:
            pipeline = pipelines[0]
            pipeline.setArguments(pipeline_args)
            pipeline.setKeywordArguments(pipeline_kwargs)
            try:
                
                if self.manager.scheduleAt(pipeline, trigger_time=trigger_time, label=label, recurrency=recurrency,tags=tags):
                    return {
                        "code"         : 201,
                        "status"       : True,
                        "ts"           : now.isoformat()
                    }
            except Exception as e:
                self.logger.info("scheduling failed:",e)
                return {
                    "code"        : 303,
                    "status"      : "Scheduling failed. %s" % e,
                    "ts"          : now.isoformat()
                }
        elif len(pipelines)>1:
            e = MultipleActivePipelineRegistered(name)
            return {
                "code"   : 305,
                "status" : "%s:%s" % (type(e).__name__,e),
                "ts"     : now.isoformat()
            }
        else:
            e = NoActivePipelineRegistered(name)
            return {
                "code"   : 306,
                "status" : "%s:%s" % (type(e).__name__,e),
                "ts"     : now.isoformat()
            }
    
    def get_last_execution(self, pipeline_name):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        print("%s : getting last execution for %s" % (now, pipeline_name))
        
        executor = self.manager.getLastExecution(pipeline_name)
        
        try:
            if executor is not None:
                if "asJson" in request.args:
                    return {
                        "code"         : 202,
                        "status"       : "ok",
                        "executor"     : executor.asJson(),
                        "ts"           : now.isoformat()
                    }

                else:
                    return {
                        "code"         : 202,
                        "status"       : "ok",
                        "executor"     : executor.serialize(),
                        "ts"           : now.isoformat()
                    }
            else:
                return {
                    "code"         : 310,
                    "status"       : "Executor does not exists",
                    "executor"     : None,
                    "ts"           : now.isoformat()
                }
        except Exception as e:
            return {
                "code"         : 501,
                "status"       : "Exception when recovering Executor",
                "exception"    : str(e),
                "ts"           : now.isoformat()
            }

    def get_execution_list(self,pipeline_name):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        
        if self.manager.isPipelineRegistered(pipeline_name):
            self.logger.info("getting execution list for %s" % pipeline_name)
            
            executions = self.manager.getExecutionList(pipeline_name)
            
            if len(executions)>0:
                executors_list_json = [ ex.serialize() for ex in executions ]
                
                return {
                    "code"         : 204,
                    "status"       : "ok",
                    "pipeline_name": pipeline_name,
                    "executions"   : executors_list_json,
                    "ts"           : now.isoformat()
                }
            else:
                return {
                    "code"         : 311,
                    "status"       : "No executions found",
                    "pipeline_name": pipeline_name,
                    "executions"   : [],
                    "ts"           : now.isoformat()
                }

        self.logger.info("pipeline not registered %s" % label)
        return {
            "code"         : 301,
            "status"       : "Pipeline Not Registered",
            "pipeline_name": pipeline_name,
            "ts"           : now.isoformat()
        }

    def get_running_executions(self):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        
        self.logger.info("getting running executions")
            
        executions = self.manager.getRunningExecutions()
            
        if len(executions)>0:

            if "asJson" in request.args:
                executors_list_json = [ ex.asJson() for ex in executions ]
            else:
                executors_list_json = [ ex.serialize() for ex in executions ]

            return {
                "code"         : 204,
                "status"       : "ok",
                "executions"   : executors_list_json,
                "ts"           : now.isoformat()
            }

        else:
            return {
                "code"         : 401,
                "status"       : "No executions running",
                "executions"   : [],
                "ts"           : now.isoformat()
            }

    def get_executions_by(self):
        
        args = request.json
        if "where" in args:
            where = args["where"]
            now = datetime.now(timezone.utc).astimezone(get_localzone())

            self.logger.info("getting executions by %s" % where)

            executions = self.manager.getExecutionsBy(where)

            if len(executions)>0:
                executors_list_json = []
                if "asJson" in request.args:
                    try:
                        executors_list_json = [ ex.asJson() for ex in executions ]
                    except Exception as e:
                        print(e)
                else:
                    executors_list_json = [ ex.serialize() for ex in executions ]

                return {
                    "code"         : 204,
                    "status"       : "ok",
                    "where"        : where,
                    "executions"   : executors_list_json,
                    "ts"           : now.isoformat()
                }
            else:
                return {
                    "code"         : 401,
                    "status"       : "No executions found",
                    "where"        : where,
                    "executions"   : [],
                    "ts"           : now.isoformat()
                }
        else:
            return {
                "code"         : 501,
                "status"       : "no where string given",
                "executions"   : [],
                "ts"           : now.isoformat()
            }
    
    def get_scheduled_executions(self,pipeline_name):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        
        if self.manager.isPipelineRegistered(pipeline_name):
            self.logger.info("getting scheduled executions for %s" % pipeline_name)
            
            pipelines = []
            try:        
                pipelines = self.manager.getPipelines(name = pipeline_name, active = True)

            except Exception as e:
                return {
                    "code"        : 305,
                    "status"      : "%s : %s" % (type(e).__name__, e),
                    "ts"          : now.isoformat()
                }
        
            if len(pipelines)==1:
                pipeline = pipelines[0]

                sch_execs = self.manager.getScheduledExecutions(pipeline)
            
                if len(sch_execs)>0:
                    sch_execs_list_json = [ x.serialize() for x in sch_execs ]

                    return {
                        "code"         : 204,
                        "status"       : "ok",
                        "pipeline_name": pipeline_name,
                        "scheduled"    : sch_execs_list_json,
                        "ts"           : now.isoformat()
                    }
                else:
                    return {
                        "code"         : 311,
                        "status"       : "No scheduled executions found",
                        "pipeline_name": pipeline_name,
                        "scheduled"    : [],
                        "ts"           : now.isoformat()
                    }
            else:
                self.logger.info("multiple active pipelines registered %s" % pipeline_name)
                return {
                    "code"         : 303,
                    "status"       : "Multiple active pipelines registered",
                    "pipeline_name": pipeline_name,
                    "ts"           : now.isoformat()
                }
            
        self.logger.info("pipeline not registered %s" % pipeline_name)
        return {
            "code"         : 301,
            "status"       : "Pipeline Not Registered",
            "pipeline_name": pipeline_name,
            "ts"           : now.isoformat()
        }
    
    def get_scheduled_execution_by_id(self,scheduled_execution_id):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        
        if (scheduled_execution_id is not None and scheduled_execution_id!=""):
            sch_evt = self.manager.getScheduledExecutionById(scheduled_execution_id)

            if isinstance(sch_evt,ScheduledEvent):
                sch_exec_json = sch_evt.serialize()

                return {
                    "code"          : 204,
                    "status"        : "ok",
                    "scheduled_evt" : sch_exec_json,
                    "ts"            : now.isoformat()
                }
            else:
                return {
                    "code"          : 311,
                    "status"        : "No scheduled execution found",
                    "scheduled_evt" : None,
                    "ts"            : now.isoformat()
                }
        else:
            raise RuntimeError("you must provide a valid scheduled event id")
    
    def isPipelineRegistered(self, pipeline_name):
        return self.manager.isPipelineRegistered(pipeline_name)

    def cancel_scheduled_execution(self, scheduled_execution_id):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        
        sch_evt = self.manager.getScheduledExecutionById(scheduled_execution_id)
    
        if isinstance(sch_evt, ScheduledEvent):    
            if self.manager.cancelScheduledExecution(sch_evt):
                return {
                    "code"         : 204,
                    "status"       : "ok",
                    "ts"           : now.isoformat()
                }
            else:
                return {
                    "code"          : 311,
                    "status"        : "scheduled event not cancelled",
                    "scheduled_evt" : None,
                    "ts"            : now.isoformat()
                }

    def create_notification(self,label):
        data = None
        if request.method == "POST":
            data = request.json
        else:
            data = request.args

        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            notification = self.manager.createNotification(label,data=data)

            if notification is not None:
                notif_json = notification.serialize()
                return {
                    "code"         : 204,
                    "status"       : "ok",
                    "notification" : notif_json,
                    "ts"           : now.isoformat()
                }
            else:
                return {
                    "code"          : 311,
                    "status"        : "notification not created",
                    "notification"  : None,
                    "ts"            : now.isoformat()
                }
        except Exception as e:
            print(e)
            return {
                "code"          : 502,
                "status"        : "Exception raised when creating notification",
                "exception"     : e,
                "ts"            : now.isoformat()
            }
      
    def get_last_notification(self,label):
        now = datetime.now(timezone.utc).astimezone(get_localzone())        
        last_notification = self.manager.getLastNotification(label)
        
        if last_notification is not None:
            notif_json = last_notification.serialize()
            return {
                "code"         : 204,
                "status"       : "ok",
                "notification" : notif_json,
                "ts"           : now.isoformat()
            }
        else:
            return {
                "code"          : 311,
                "status"        : "last notification not found",
                "notification"  : None,
                "ts"            : now.isoformat()
            }
        
    def get_notification_list(self,label):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        notification_list = self.manager.getNotificationList(label)
        if len(notification_list)>0:
            notif_list_json = []
            for n in notification_list:
                notif_list_json.append(n.serialize())
            
            return {
                "code"              : 204,
                "status"            : "ok",
                "notification_list" : notif_list_json,
                "ts"                : now.isoformat()
            }
        else:
            return {
                "code"              : 311,
                "status"            : "last notification not found",
                "notification_list" : [],
                "ts"                : now.isoformat()
            }
    
    def subscribe_notification_pipeline(self, label):
        data = request.get_json()

        now = datetime.now(timezone.utc).astimezone(get_localzone())

        try:
            pipeline_name = data["pipeline_name"]
            subscriber = self.manager.subscribePipelineNotification(label,pipeline_name)

            if subscriber is not None:
                subscriber_json = subscriber.serialize()
                return {
                    "code"         : 204,
                    "status"       : "ok",
                    "subscription" : subscriber_json,
                    "ts"           : now.isoformat()
                }
        except Exception as e:
            print(e)
            return {
                "code"          : 501,
                "status"        : "error subscribing pipeline",
                "exception"     : base64.b64encode(dill.dumps(e)),
                "ts"            : now.isoformat()
            }

    def unsubscribe_notification_pipeline(self, label):
        data = request.get_json()

        now = datetime.now(timezone.utc).astimezone(get_localzone())

        try:
            pipeline_name = data["pipeline_name"]
            if self.manager.unsubscribePipelineNotification(label,pipeline_name):
                return {
                    "code"         : 204,
                    "status"       : "ok",
                    "ts"           : now.isoformat()
                }
        except Exception as e:
            print(e)
            return {
                "code"          : 501,
                "status"        : "error unsubscribing pipeline",
                "exception"     : base64.b64encode(dill.dumps(e)),
                "ts"            : now.isoformat()
            }

    def get_subscribed_pipelines(self, label):
        pass
        
    def stop(self):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        self.manager.stop()
        super().release()
        return {
            "code"   : 210,
            "status" : "stop",
            "ts"     : now.isoformat()
        }
    
    def put_key(self):
        json_input = request.json
        label = json_input["label"]
        passphrase = json_input["passphrase"]
        key_json = json_input['key']
        key = jsonpickle.loads(key_json)
        key.decodeKey()
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            self.logger.info("storing key with label:%s" % (label))
            self.manager.putKey(key, passphrase)
            return {
                "code"   : 205,
                "status" : "ok",
                "ts"     : now.isoformat()
            }
        except AlreadyExists as e:
            self.logger.info("%s" % (e))
            return {
                "code"   : 312,
                "status" : "Could not store key",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error storing key with label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not store key",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }            
        
    def get_key(self, label, passphrase):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            key = self.manager.getKey(label, passphrase)
            key_encode = key.serialize()
            return {
                "code"   : 205,
                "status"   : "ok",
                "key"    : key_encode,
                "ts"     : now.isoformat()
            }
        except KeyNotFound as e:
            self.logger.info("%s" % (e))
            return {
                "code"   : 312,
                "status" : "key not found",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error retrieving  key with label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not retrieve key",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        
    def get_public_key(self):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            key = self.manager.getPublicKey()
            key_encode = key.serialize()
            return {
                "code"   : 205,
                "status" : "ok",
                "key"    : key_encode,
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error retrieving public key :%s" % (e))
            return {
                "code"   : 313,
                "status" : "Could not retrieve key",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
            
    def put_credential(self):
        json_input = request.json
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        label = json_input["label"]
        credential_json = json_input['credential']
        n_unlock = json_input["n_unlock"]
        shared_users = json_input["shared_users"]
        credential = jsonpickle.loads(credential_json)
        try:
            self.logger.info("storing credential with label:%s" % (label))
            self.manager.putCredential(credential, n_unlock, shared_users)
            return {
                "code"   : 205,
                "status" : "ok",
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error storing credential with label %s: %s" % (label,e))
            return {
                "code"   : 312,
                "status" : "Could not store credential",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        
    def get_credential(self):
        json_input = request.json
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        label = json_input['label']
        decrypt = json_input["decrypt"]
        token_json = json_input["token"]
        token = jsonpickle.loads(token_json)
        whom = json_input["whom"]
        try:
            credential = self.manager.getCredential(label, decrypt, token, whom)
            return {
                "code"       : 205,
                "status"     : "ok",
                "credential" : credential.serialize(),
                "ts"         : now.isoformat()
            }
        except CredentialNotFound as e:
            return {
                    "code"   : 312,
                    "status" : "credential not found",
                    "error"  : "%s" % e,
                    "ts"     : now.isoformat()
                }
        except TokenNotFound as e:
            return {
                    "code"   : 313,
                    "status" : "Token not Found",
                    "error"  : "%s" % e,
                    "ts"     : now.isoformat()
                }
        except Exception as e:
                self.logger.info("error retrieving  credential with label %s: %s" % (label,e))
                return {
                    "code"   : 314,
                    "status" : "Could not retrieve credential",
                    "error"  : "%s" % e,
                    "ts"     : now.isoformat()
                }
        
    def get_key_list(self, active):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            active = ast.literal_eval(active)
            status = 'inactive'
            if active is True:
                status = 'active'
            list_keys = self.manager.getKeyList(active)
            if len(list_keys) != 0:
                keys = []
                for key in list_keys:
                    keys.append(key.serialize())
                return {
                    "code"   : 206,
                    "status" : "ok",
                    "keys"   : jsonpickle.encode(keys),
                    "ts"     : now.isoformat()
                }
            else:
                self.logger.info("no keys %s registered in the vault"%(status))
                return {
                    "code"   : 312,
                    "status" : "there are no keys",
                    "ts"     : now.isoformat()
                }
        except Exception as e:
            self.logger.info("error listing keys: %s" % (e))
            return {
                "code"   : 313,
                "status" : "Could not list keys",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
    
    def key_expiration(self):
        json_input = request.json
        label = json_input["label"]
        key_json = json_input['key']
        key = jsonpickle.loads(key_json)
        key.decodeKey()
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            status = self.manager.keyExpiration(key)
            return {
                "code"       : 206,
                "status"     : "ok",
                "key_status" : jsonpickle.encode(status),
                "ts"         : now.isoformat()
            }
        except KeyNotFound as e:
            return {
                "code"   : 312,
                "status" : "key not found",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        except ExpiredKey as e:
            return {
                "code"   : 314,
                "status" : "key expired",
                'key_status': jsonpickle.encode(False),
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error verifying expiration of key with label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not verify expiration of key",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
    
    def get_key_expiration_date(self):
        json_input = request.json
        label = json_input["label"]
        key_json = json_input['key']
        key = jsonpickle.loads(key_json)
        key.decodeKey()
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            date = self.manager.getKeyExpirationDate(key)
            return {
                "code"   : 206,
                "status" : "ok",
                "date"   : jsonpickle.encode(date),
                "ts"     : now.isoformat()
            }
        except KeyNotFound as e:
            return {
                "code"   : 312,
                "status" : "key not found",
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error geting expiration date of key with label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not get key expiration date",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
            
    def get_credential_list(self, active):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            active = ast.literal_eval(active)
            status = 'inactive'
            if active is True:
                status = 'active'
            list_credentials = self.manager.getCredentialList(active)
            if len(list_credentials) != 0:
                return {
                    "code"        : 206,
                    "status"      : "ok",
                    "credentials" : jsonpickle.encode(list_credentials),
                    "ts"          : now.isoformat()
                }
            else:
                self.logger.info("no credentials %s registered in the vault"%(status))
                return {
                    "code"        : 312,
                    "status"      : "there are no credentials",
                    "ts"          : now.isoformat()
                }
        except Exception as e:
            self.logger.info("error listing credentials: %s" % (e))
            return {
                "code"   : 313,
                "status" : "Could not list credentials",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }    
        
    def credential_expiration(self, label):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            status = self.manager.credentialExpiration(label)
            return {
                "code"              : 206,
                "status"            : "ok",
                "credential_status" : jsonpickle.encode(status),
                "ts"                : now.isoformat()
            }
        except CredentialNotFound as e:
            return {
                "code"   : 312,
                "status" : "credential not found",
                "ts"     : now.isoformat()
            }
        except ExpiredCredential as e:
            return {
                "code"   : 314,
                "status" : "credential expired",
                "credential_status" : jsonpickle.encode(False),
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error checking credential: %s" % (e))
            return {
                "code"   : 313,
                "status" : "Could not check credentials",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
                
    def get_credential_expiration_date(self, label):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            date = self.manager.getCredentialExpirationDate(label)
            return {
                "code"   : 206,
                "status" : "ok",
                "date"   : jsonpickle.encode(date),
                "ts"     : now.isoformat()
            }
        except CredentialNotFound as e:
            return {
                "code"   : 312,
                "status" : "credential not found",
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error listing credentials: %s" % (e))
            return {
                "code"   : 313,
                "status" : "Could not list credentials",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        
    def sign_credential(self):
        json_input = request.json
        credential_json = json_input['credential']
        credential = jsonpickle.loads(credential_json)
        key_json = json_input['key']
        key = json_input['key']
        if key_json is not None:
            key = jsonpickle.loads(key_json)
            key.decodeKey()
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        label= credential.getLabel()
        try:
            self.logger.info("signing credential with label:%s" % (label))
            self.manager.signCredential(credential, key)
            return {
                "code"   : 206,
                "status" : "ok",
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error signing credential with label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not sign credential",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
            
    def verify_credential(self):
        json_input = request.json
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        label = json_input["label"]
        credential_json = json_input['credential']
        credential = jsonpickle.loads(credential_json)
        try:
            self.logger.info("checking sign of credential with label:%s" % (label))
            self.manager.verifyCredential(credential)
            return {
                "code"   : 206,
                "status" : "ok",
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error checking credential with label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not check credential",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        
    def encrypt_credential(self):
        json_input = request.json
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        key_json = json_input["key"]
        key = jsonpickle.loads(key_json)
        key.decodeKey()
        credential_json = json_input['credential']
        credential = jsonpickle.loads(credential_json)
        label = credential.getLabel()
        try:
            self.logger.info("encrypt credential with label:%s" % (label))
            credential_encrypted = self.manager.encryptCredential(credential, key)
            return {
                "code"   : 206,
                "status" : "ok",
                "credential_encrypted" : jsonpickle.encode(credential_encrypted),
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error encrypting credential with label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not encrypt credential",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }        
        
    def create_token(self):
        json_input = request.json
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        credential_json = json_input['credential']
        credential = jsonpickle.loads(credential_json)
        label = credential.getLabel()
        min_unlock = json_input['minimum_unlock']
        shared_users = json_input['shared_users']
        try:
            self.logger.info("creating token for credential with label:%s" % (label))
            token = self.manager.createToken(credential, min_unlock, shared_users)
            return {
                "code"   : 206,
                "status" : "ok",
                "token"  : jsonpickle.encode(token),
                "credential_encrypted" : jsonpickle.encode(credential),
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error creating token for credential with label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not create token",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        
    def put_token(self):
        json_input = request.json
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        credential_json = json_input['credential']
        credential = jsonpickle.loads(credential_json)
        label = credential.getLabel()
        token_json = json_input['token_list']
        token_list = jsonpickle.loads(token_json)
        try:
            self.logger.info("storing token for credential with label:%s" % (label))
            self.manager.putToken(credential, token_list)
            return {
                "code"   : 205,
                "status" : "ok",
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error storing token for credential with label %s: %s" % (label,e))
            return {
                "code"   : 312,
                "status" : "Could not store token",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }        
        
    def get_token(self, label, whom, active):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        status = ast.literal_eval(active)
        try:
            token = self.manager.getToken(label, whom, status)
            return {
                "code"   : 205,
                "status" : "ok",
                "token"  : jsonpickle.encode(token),
                "ts"     : now.isoformat()
            }
        except TokenNotFound as e:
            return {
                    "code"   : 312,
                    "status" : "token not found",
                    "error"  : "%s" % e,
                    "ts"     : now.isoformat()
                }
        except Exception as e:
            self.logger.info("error retrieving token for credential with label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not retrieve token",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        
    def assign_token(self):
        json_input = request.json
        now = datetime.now(timezone.utc).astimezone(get_localzone())        
        whom = json_input['whom']
        label = json_input['label']
        exp_date_json = json_input['exp_date']
        exp_date = jsonpickle.loads(exp_date_json)
        comment = json_input['comment']
        try:
            assigned_token = self.manager.assignToken(whom, label, exp_date, comment)
            return {
                "code"   : 205,
                "status" : "ok",
                "token"  : jsonpickle.encode(assigned_token),
                "ts"     : now.isoformat()
            }
        except TokenNotFound as e:
            return {
                "code"   : 312,
                "status" : "token not found",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error assigning token for label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not retrieve token",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }

    def get_assigned_token(self, label, whom):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            token = self.manager.getAssignedToken(label, whom)
            return {
                "code"   : 205,
                "status" : "ok",
                "token"  : jsonpickle.encode(token),
                "ts"     : now.isoformat()
            }
        except TokenNotFound as e:
            return {
                "code"   : 312,
                "status" : "token not found",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error retrieving token for label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not retrieve token",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }

    def sign_token(self):
        json_input = request.json
        now = datetime.now(timezone.utc).astimezone(get_localzone())        
        label = json_input['label']
        whom = json_input['whom']
        try:
            self.logger.info("signing token with label:%s" % (label))
            self.manager.signToken(label, whom)
            return {
                "code"   : 206,
                "status" : "ok",
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error signing token with label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not sign token",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }

    def verify_token(self):
        json_input = request.json
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        token_json = json_input['token']
        token = jsonpickle.loads(token_json)
        try:
            self.logger.info("checking sign of token")
            self.manager.verifyToken(token)
            return {
                "code"   : 206,
                "status" : "ok",
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error checking token: %s" % (e))
            return {
                "code"   : 313,
                "status" : "Could not check token",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }

    def get_token_list(self, active):
        active = ast.literal_eval(active)
        status = 'inactive'
        if active is True:
            status = 'active'
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            token_list = self.manager.getTokenList(active)
            if len(token_list) != 0:
                return {
                    "code"   : 206,
                    "status" : "ok",
                    "token_list"   : jsonpickle.encode(token_list),
                    "ts"     : now.isoformat()
                }
            else:
                self.logger.info("there are no %s tokens registered in the vault" % (status))
                return {
                    "code"   : 312,
                    "status" : "there are no tokens",
                    "token_list"   : None,
                    "ts"     : now.isoformat()
                }
        except Exception as e:
            self.logger.info("error listing tokens: %s" % (e))
            return {
                "code"   : 313,
                "status" : "Could not list tokens",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }

    def get_token_expiration_date(self, label, whom=None):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            date = self.manager.getTokenExpirationDate(label, whom)
            return {
                "code"   : 206,
                "status" : "ok",
                "date"   : jsonpickle.encode(date),
                "ts"     : now.isoformat()
            }
        except TokenNotFound as e:
            return {
                "code"   : 312,
                "status" : "token not found",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error geting expiration date of token with label %s: %s" % (label,e))
            return {
                "code"   : 314,
                "status" : "Could not get token expiration date",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }

    def token_expiration(self, label, whom=None):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            status = self.manager.tokenExpiration(label, whom)
            return {
                "code"       : 206,
                "status"     : "ok",
                "token_status" : jsonpickle.encode(status),
                "ts"         : now.isoformat()
            }
        except TokenNotFound as e:
            return {
                "code"   : 312,
                "status" : "token not found",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        except ExpiredToken as e:
            return {
                "code"   : 313,
                "status" : "token expired",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error verifying expiration of token with label %s: %s" % (label,e))
            return {
                "code"   : 314,
                "status" : "Could not verify expiration of token",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        
    def get_register_credential(self, label, token=None, whom=None):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            credential = self.manager.getRegisterCredential(label, token, whom)
            #credential_encode = credential.serialize()
            credential_encode = jsonpickle.encode(credential)
            return {
                "code"       : 205,
                "status"     : "ok",
                "credential" : credential_encode,
                "ts"         : now.isoformat()
            }
        except Exception as e:
            self.logger.info("error retrieving  credential with label %s: %s" % (label,e))
            return {
                "code"   : 313,
                "status" : "Could not retrieve credential",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
        
    def put_persistent_dict(self, dict_name):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        json_input = request.json
        
        if "key" not in json_input or "value" not in json_input:
            return {
                "code"   : 311,
                "status" : "No key value provided",
                "ts"     : now.isoformat()
            }    

        key = json_input['key']
        value = json_input['value']
        try:
            if self.manager.putInPersistentDict(dict_name, key,value):
                return {
                    "code"       : 200,
                    "status"     : "ok",
                    "ts"         : now.isoformat()
                }
            else:
                return {
                    "code"   : 312,
                    "status" : "Error putting key in persistent dict",
                    "ts"     : now.isoformat()
                }    
        except Exception as e:
            self.logger.info("error putting key in dict %s:%s" % (dict_name,e))
            return {
                "code"   : 313,
                "status" : "Error putting key in persistent dict",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }
    
    def get_persistent_dict(self, dict_name, key):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            value = self.manager.getFromPersistentDict(dict_name, key)
            
            return {
                "code"     : 200,
                "status"   : "OK",
                "key"      : key,
                "response" : value,
                "ts"       : now.isoformat()
           }

        except NoSuchDictionary as e:
            return {
                "code"   : 313,
                "status" : "Dictionary %s does not exists" % dict_name,
                "ts"     : now.isoformat()
            }
        
        except NoSuchKeyInDictionary as e:
            return {
                "code"   : 313,
                "status" : "key %s does not exists" % key,
                "ts"     : now.isoformat()
            }
        
        except Exception as e:
            self.logger.info("error getting key %s in dict %s:%s" % (key, dict_name,e))
            return {
                "code"   : 313,
                "status" : "Error getting key in persistent dict",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }

    def get_persistent_dict_as_json(self, dict_name, key):
        now = datetime.now(timezone.utc).astimezone(get_localzone())
        try:
            value = self.manager.getFromPersistentDict(dict_name, key)
            
            json_value = eval(value)
            
            return {
                "code"     : 200,
                "status"   : "OK",
                "key"      : key,
                "response" : json_value,
                "ts"       : now.isoformat()
           }

        except NoSuchDictionary as e:
            return {
                "code"   : 313,
                "status" : "Dictionary %s does not exists" % dict_name,
                "ts"     : now.isoformat()
            }
        
        except NoSuchKeyInDictionary as e:
            return {
                "code"   : 313,
                "status" : "key %s does not exists" % key,
                "ts"     : now.isoformat()
            }
        
        except Exception as e:
            self.logger.info("error putting key in dict %s:%s" % (dict_name,e))
            return {
                "code"   : 313,
                "status" : "Error getting key in persistent dict",
                "error"  : "%s" % e,
                "ts"     : now.isoformat()
            }

In [46]:
%%deploy /orch/orchestrator/__init__.py
from .RemoteProcedureNotification import *
from .RemoteProcedureNotificationManager import *
from .RemoteProcedureNotificationSubscriber import *
from .OrchestratorAccess import *
from .OrchestratorManager import *
from .OrchestratorService import *
from .OrchCredentialManager import *

## Orchestrator API


In [47]:
%%deploy /orch/base/AbstractApiClient.py
import json
import jsonpickle
import requests

from requests.exceptions import *

from ..exceptions import APIResponseError

class AbstractApiClient(object):
    def __init__(self, api_url = "http://127.0.0.1:8020"):
        self.api_url = api_url
        
    def get(self,uri, timeout=180):
        url = "%s%s" % (self.api_url,uri)
        try:
            response = requests.get(url, timeout=timeout)
            if response.status_code >= 200 and response.status_code <=499:
                json_response = response.json()
                return json_response
            else:
                raise(APIResponseError("%d:%s" % (response.status_code,response.text)))
                
        except ConnectionError as e:
            print("Connection Error. Technical details given below.\n")
            print(str(e))            
        except Timeout as e:
            print("Timeout Error")
            print(str(e))
        except RequestException as e:
            print("General Error")
            print(str(e))
        except KeyboardInterrupt as e:
            print("Someone closed the program")
        except Exception as e:
            print(str(e))
            raise e
            
    def post(self,uri,timeout=180, **kwargs):
        url = "%s%s" % (self.api_url,uri)
        try:
            response = requests.post(url,timeout=timeout,**kwargs)

            if response.status_code >= 200 and response.status_code <=499:
                json_response = response.json()
                return json_response
            else:
                raise(APIResponseError("%d:%s" % (response.status_code,response.text)))
        except ConnectionError as e:
            print("Connection Error. Technical details given below.\n")
            print(str(e))            
        except Timeout as e:
            print("Timeout Error")
            print(str(e))
        except RequestException as e:
            print("General Error")
            print(str(e))
        except KeyboardInterrupt as e:
            print("Someone closed the program")
        except Exception as e:
            print(str(e))
            raise e

In [48]:
%%deploy /orch/api/Pipeline.py

from ..pipelinemanager import AbstractPipeline

class Pipeline(AbstractPipeline):
    pass

In [49]:
%%deploy /orch/api/Execution.py
import base64
import dill
import inspect
from ..pipelinemanager import AbstractExecutor

class Execution(AbstractExecutor):
    pass

In [50]:
%%deploy /orch/api/Executor.py
# Executor skeleton
import sys
import io
from io import StringIO
from uuid import uuid4
import base64
import dill
import jsonpickle
import inspect
import time
from datetime import date, datetime, timezone
from tzlocal import get_localzone

from ..exceptions import *

class Executor():
    def __init__(self, orch_api, pipeline, local=True, cores=1, partition=None, memory=None):
        self.orch_api    = orch_api
        self.pipeline    = pipeline
        self.cores       = cores
        self.local_job   = local
        self.memory      = memory
        self.partition   = partition
   
    def setLocalJob(self, b):
        self.local_job = b

    def setCores(self, cores):
        self.cores = cores

    def run(self, *args, **kwargs):
        post_data = {
            "name"     : self.pipeline.name,
            "version"  : self.pipeline.version,
            "cores"    : self.cores,
            "asJob"    : self.local_job,
            "args"     : jsonpickle.encode( args ),
            "kwargs"   : jsonpickle.encode( kwargs )
        }
        
        if self.memory is not None:
            post_data["memory"] = self.memory

        if self.partition is not None:
            post_data["partition"] = self.partition

        print(post_data)
            
        try:
            json_response = self.orch_api.post("/execute",json=post_data)
            if json_response["code"] == 201:
                self.execution_id = json_response["execution_id"]
                return self.execution_id
            else:
                raise PipelineExecutionError(json_response)
        except Exception as e:
            raise e

    def cancel(self):
        pass


In [51]:
%%deploy /orch/api/ScheduledEvent.py
import base64
import dill
import inspect
from ..scheduler import AbstractScheduledEvent

class ScheduledEvent(AbstractScheduledEvent):
    pass

In [52]:
%%deploy /orch/api/RemoteProcedureNotification.py
import base64
import dill
import inspect
from ..orchestrator import AbstractRemoteProcedureNotification

class RemoteProcedureNotification(AbstractRemoteProcedureNotification):
    pass

In [53]:
%%deploy /orch/api/RemoteProcedureNotificationSubscriber.py
import base64
import dill
import inspect
from ..orchestrator import AbstractRemoteProcedureNotificationSubscriber

class RemoteProcedureNotificationSubscriber(AbstractRemoteProcedureNotificationSubscriber):
    pass

In [54]:
%%deploy /orch/api/Orchestrator.py
import jsonpickle
from inspect import isfunction
import dill
import base64
import urllib
from datetime import datetime

from .Execution import Execution
from .ScheduledEvent import ScheduledEvent
from .RemoteProcedureNotification import RemoteProcedureNotification
from .Pipeline import Pipeline
from .Executor import *

from ..base import AbstractApiClient
from ..exceptions import PipelineExecutionError, ImplementationIsNotAFunction, APIResponseError, PipelineSchedulingError
from ..orchestrator import RemoteProcedureNotificationSubscriber

from credentialmanager.Credential import Credential
from credentialmanager.EncryptionKey import EncryptionKey

class Orchestrator(AbstractApiClient):
    def __init__(self, api_url="http://127.0.0.1:8020"):
        super().__init__(api_url)
        
    def status(self):
        return self.get("/")
    
    def stop(self):
        # TODO: check permision to stop
        #self.get("/stop")
        #return True
        return False
    
    def subscribePipelineNotification(self, label, pipeline, *args, **kw_args):
        if isinstance(label,str) and len(label)>0 and isinstance(pipeline,Pipeline):

            subscription_data = {
                "pipeline_name" : pipeline.name
            }

            response = self.post("/rpn/%s/subscribe" % label, json=subscription_data)
            obj_list = []
            if response["code"]==204:
                sobj = response["subscription"]
                obj = RemoteProcedureNotificationSubscriber.fromJson(dill.loads(base64.b64decode(sobj["sobj"].encode("utf8"))))
                return obj
            elif response["code"]==501:
                e = dill.loads(base64.b64decode(response["exception"].encode("urf8")))
                raise e
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        else:
            raise RuntimeError("you must provide a valid notification label")

    def unsubscribePipelineNotification(self, label, pipeline, *args, **kw_args):
        if isinstance(label,str) and len(label)>0 and isinstance(pipeline,Pipeline):

            subscription_data = {
                "pipeline_name" : pipeline.name
            }

            response = self.post("/rpn/%s/unsubscribe" % label, json=subscription_data)
            obj_list = []
            if response["code"]==204:
                return True
            elif response["code"]==501:
                e = dill.loads(base64.b64decode(response["exception"].encode("urf8")))
                raise e
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        else:
            raise RuntimeError("you must provide a valid notification label")

    def createNotification(self, label, data={}):
        if isinstance(label,str) and len(label)>0:
            
            response = None
            if len(data.keys())>0:
                params = urllib.parse.urlencode(data)
                response = self.get("/rpn/%s/notify?%s" % (label,params))
            else:
                response = self.get("/rpn/%s/notify" % label)
                
            obj_list = []
            if response["code"]==204:
                sobj = response["notification"]
                obj = RemoteProcedureNotification.fromJson(dill.loads(base64.b64decode(sobj["sobj"].encode("utf8"))))
                return obj
            elif response["code"]==311:
                # notification not created
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        else:
            raise RuntimeError("you must provide a valid notification label")
    
    def getLastNotification(self,label):
        if isinstance(label,str) and len(label)>0:
            response = self.get("/rpn/%s/last" % label)
            obj_list = []
            if response["code"]==204:
                sobj = response["notification"]
                obj = RemoteProcedureNotification.fromJson(dill.loads(base64.b64decode(sobj["sobj"].encode("utf8"))))
                return obj
            elif response["code"]==311:
                # notification not created
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        else:
            raise RuntimeError("you must provide a valid notification label")
    
    def getNotificationList(self,label):
        if label is not None or label!="":
            try:
                response = self.get("/rpn/%s/list" % label)
                obj_list = []
                if response["code"]==204:
                    for sobj in response["notification_list"]:
                        obj = RemoteProcedureNotification.fromJson(dill.loads(base64.b64decode(sobj["sobj"].encode("utf8"))))
                        obj_list.append(obj)
                    return obj_list
                elif response["code"]==311:
                    # no rpn found
                    return []
                else:
                    raise APIResponseError("%d:%s" % (response["code"],response["status"]))
                
            except Exception as e:
                raise e

        raise RuntimeError("you must provide a valid notification label")

    def register(self, name,fn, token_list=None, new_version =False):
        
        if not isfunction(fn):
            raise ImplementationIsNotAFunction(fn)
        
        post_data = {
            "name"        : name,
            "pipeline"    : base64.b64encode(dill.dumps(fn)).decode("utf8"),
            "token_list"  : str(token_list),
            "new_version" : new_version
        }

        return self.post("/register",json=post_data)
    
    def getActivePipeline(self, name):
        pipelines = self.getPipelines(name, active = True)
        if len(pipelines)==1:
            return pipelines[0]
        elif len(pipelines)>1:
            raise MultipleActivePipelineRegistered(name)
        else:
            raise NoActivePipelineRegistered(name)

    def getPipelines(self, name, **kwargs):
        post_data = {
            "name" : name,
        }

        post_data.update(kwargs)
        json_response = self.post("/get",json=post_data)
        
        if "pipeline" in json_response:
            pipe_json_list = json_response["pipeline"]
            pipelines_list = [ Pipeline.fromJson(pipe_json) for pipe_json in pipe_json_list]
            return pipelines_list
        else:
            return None
        
    def activatePipeline(self, pipeline):
        post_data = {
            "name"     : pipeline.name,
            "version"  : pipeline.version
        }
        return self.post('/activate',json=post_data)
    
    def deactivatePipeline(self, pipeline):
        post_data = {
            "name"     : pipeline.name,
            "version"  : pipeline.version
        }
        return self.post('/deactivate',json=post_data)
    
    def getExecutionStatus(self, exec_id):
        if exec_id is not None:
            try:
                return self.get("/execution/%s/status" % exec_id)
            except Exception as e:
                raise e
        
        raise RuntimeError("you must provide a valid execution_id")

    def getExecution(self, exec_id):
        if exec_id is not None:
            try:
                json_response = self.get("/execution/%s/get" % exec_id)
                if json_response["code"] == 202:
                    s_executor = dill.loads(base64.b64decode(json_response["executor"]["sobj"]))
                    executor = Execution.fromJson(s_executor)
                    return executor
                else:
                    return json_response

            except Exception as e:
                raise e

        
        raise RuntimeError("you must provide a valid execution_id")
     
    def cancelExecution(self, exec_id):
        if exec_id is not None:
            try:
                json_response = self.get("/execution/%s/cancel" % exec_id)
                if json_response["code"] == 202 or json_response["code"] == 311:
                    s_executor = dill.loads(base64.b64decode(json_response["executor"]["sobj"]))
                    executor = Execution.fromJson(s_executor)
                    json_response["executor"] = executor
                    return json_response
                else:
                    return json_response

            except Exception as e:
                raise e

        raise RuntimeError("you must provide a valid execution_id")
        
    def notifyExecution(self, target):
        print("notifyExecution only works when executing within the orchestrator")
        
    def getExecutionList(self, name):
        if name is not None or name!="":
            try:
                response = self.get("/execution/%s/list" % name)
                obj_list = []
                if response["code"]==204:
                    for sobj in response["executions"]:
                        obj = Execution.fromJson(dill.loads(base64.b64decode(sobj["sobj"].encode("utf8"))))
                        obj_list.append(obj)
                    return obj_list
                elif response["code"]==311:
                    # no execution found
                    return []
                else:
                    raise APIResponseError("%d:%s" % (response["code"],response["status"]))
                
            except Exception as e:
                raise e
        
        raise RuntimeError("you must provide a valid execution_id")
    
    def getRunningExecutions(self):
        try:
            response = self.get("/execution/running")
            obj_list = []
            if response["code"]==204:
                for sobj in response["executions"]:
                    obj = Execution.fromJson(dill.loads(base64.b64decode(sobj["sobj"].encode("utf8"))))
                    obj_list.append(obj)
                return obj_list
            elif response["code"]==401:
                # no execution found
                return []
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))

        except Exception as e:
            raise e
      
    def getExecutionsBy(self,where):
        try:
            json_data = {
                "where": where
            }
            
            response = self.post("/execution/getby",json=json_data)
            obj_list = []
            if response["code"]==204:
                for sobj in response["executions"]:
                    obj = Execution.fromJson(dill.loads(base64.b64decode(sobj["sobj"].encode("utf8"))))
                    obj_list.append(obj)
                return obj_list
            elif response["code"]==401:
                # no execution found
                return []
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))

        except Exception as e:
            raise e    

    def getLastExecution(self, name):
        if name is not None:
            try:
                json_response = self.get("/execution/%s/last" % name)
                if json_response["code"] == 202:
                    s_executor = dill.loads(base64.b64decode(json_response["executor"]["sobj"]))
                    executor = Execution.fromJson(s_executor)
                    return executor
                else:
                    return json_response
                
            except Exception as e:
                raise e

    def createExecutor(self, pipeline, *args, **kwargs):
        executor = Executor(self, pipeline, *args, **kwargs)
        return executor

    def execute(self, pipeline, *args, **kwargs):
        post_data = {
            "name"     : pipeline.name,
            "version"  : pipeline.version,
            "args"     : jsonpickle.encode( args ),
            "kwargs"   : jsonpickle.encode( kwargs )
        }
        try:
            json_response = self.post("/execute",json=post_data)
            if json_response["code"] == 201:
                return json_response["execution_id"]
            else:
                raise PipelineExecutionError(json_response)
        except Exception as e:
            raise e
                
    def scheduleAt(self, pipeline, label = None, trigger_time=datetime.now().strftime("%H:%M:%S"), recurrency=None, tags=[]):
        post_data = {
            "name"         : pipeline.name,
            "label"        : label,
            "trigger_time" : trigger_time,
            "recurrency"   : recurrency,
            "tags"         : tags,
            "args"         : jsonpickle.encode( pipeline.getArguments() ),
            "kwargs"       : jsonpickle.encode( pipeline.getKeywordArguments() )
        }
        
        print(post_data)
        
        try:
            json_response = self.post("/scheduleAt",json=post_data)
            if json_response["code"] == 201:
                return json_response
            else:
                raise PipelineSchedulingError(json_response)
        except Exception as e:
            raise e

    def getScheduledExecutions(self, pipeline):
        if isinstance(pipeline, Pipeline):
            name = pipeline.name
            if name is not None or name!="":
                try:
                    response = self.get("/execution/%s/scheduled" % name)
                    obj_list = []
                    if response["code"]==204:
                        for sobj in response["scheduled"]:
                            obj = ScheduledEvent.fromJson(dill.loads(base64.b64decode(sobj["sobj"].encode("ascii"))))
                            obj_list.append(obj)
                        return obj_list
                    elif response["code"]==311:
                        # no execution found
                        return []
                    else:
                        raise APIResponseError("%d:%s" % (response["code"],response["status"]))

                except Exception as e:
                    raise e
        raise RuntimeError("pipeline argument must be a Pipeline instance")
    
    def getScheduledExecutionById(self, scheduled_event_id):
        if scheduled_event_id is not None and scheduled_event_id!="":
            try:
                response = self.get("/schedule/%s/get" % scheduled_event_id)
                if response["code"]==204:
                    return ScheduledEvent.fromJson(dill.loads(base64.b64decode(response["scheduled_evt"]["sobj"].encode("utf8"))))
                else:
                    raise APIResponseError("%d:%s" % (response["code"],response["status"]))

            except Exception as e:
                raise e
        else:
            raise RuntimeError("you must provide a valid scheduled event id")

    def cancelScheduledExecution(self, scheduled_event):
        if isinstance(scheduled_event,ScheduledEvent):
            try:
                response = self.get("/schedule/%s/cancel" % scheduled_event.uuid)
                if response["code"]==204:
                    return True
                else:
                    raise APIResponseError("%d:%s" % (response["code"],response["status"]))

            except Exception as e:
                raise e
        else:
            raise RuntimeError("you must provide a valid scheduled event")

    def putKey(self, key, passphrase=None):
        if not isinstance(key, EncryptionKey):
            raise RuntimeError("key argument must be a Key instance")
        if not isinstance(passphrase,(str,type(None))):
            raise RuntimeError("you must provide a valid passphrase")
        key.encodeKey()
        key_encode = jsonpickle.encode(key)
        key.decodeKey()
        post_data = {
            "label"      : key.getLabel(),
            "key"        : key_encode,
            "passphrase" : passphrase,
        }
        return self.post("/putKey",json=post_data)
        
    def getKey(self, label, passphrase=None):
        if not isinstance(label,str):
            raise RuntimeError("you must provide a valid label")
        if not isinstance(passphrase,(str,type(None))):
            raise RuntimeError("you must provide a valid passphrase")
        try:
            response = self.get("/key/%s/%s/getKey" %(label, passphrase))
            if response["code"]==205:
                key = EncryptionKey.deserialize(response['key'])
                response['key'] = key
                return response
            elif response["code"]==312:
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))

        except Exception as e:
            raise e
            
    def getPublicKey(self): # NUEVA
        try:
            response = self.get("/key/getPublicKey")
            if response["code"]==205:
                key = EncryptionKey.deserialize(response['key'])
                response['key'] = key
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))

        except Exception as e:
            raise e
        
    def getKeyList(self, active=True):
        if not isinstance(active,bool):
            raise RuntimeError("you must provide a valid argument")
        try:
            response = self.get("/KeyList/%s" % active)
            if response["code"]==206:
                keys = jsonpickle.loads(response['keys'])
                list_keys=[]
                for key in keys:
                    list_keys.append(EncryptionKey.deserialize(key))
                response['keys'] = list_keys
                return response
            elif response["code"]==312:
                #don't exist keys
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))

        except Exception as e:
            raise e
    
    def keyExpiration(self, key):
        if not isinstance(key, EncryptionKey):
            raise RuntimeError("key argument must be a Key instance")
        key.encodeKey()
        key_encode = jsonpickle.encode(key)
        key.decodeKey()
        post_data = {
            "label"  : key.getLabel(),
            "key"    : key_encode,
        }
        try:
            response = self.post("/keyExpiration",json=post_data)
            if response["code"]==206 or response["code"]==314:
                response['key_status'] = jsonpickle.loads(response['key_status'])
                return response
            elif response["code"]==312:
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        except Exception as e:
            raise e
    
    def getKeyExpirationDate(self, key):
        if not isinstance(key, EncryptionKey):
            raise RuntimeError("key argument must be a Key instance")
        
        key.encodeKey()
        key_encode = jsonpickle.encode(key)
        key.decodeKey()
        post_data = {
            "label"  : key.getLabel(),
            "key"    : key_encode,
        }
        try:
            response = self.post("/keyExpirationDate",json=post_data)
            if response["code"]==206:
                response['date'] = jsonpickle.loads(response['date'])
                return response
            elif response["code"]==312:
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        except Exception as e:
            raise e
     
    def getCredentialList(self, active=True):
        if not isinstance(active,bool):
            raise RuntimeError("you must provide a valid argument")
        try:
            response = self.get("/credentialList/%s" % active)
            if response["code"]==206:
                response['credentials'] = jsonpickle.loads(response['credentials'])
                return response
            elif response["code"]==312:
                #don't exist credentials
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))

        except Exception as e:
            raise e
     
    def credentialExpiration(self, label):
        if not isinstance(label,str):
            raise RuntimeError("you must provide a valid label")
        try:
            response = self.get("/credential/%s/credentialExpiration" % label)
            if response["code"]==206 or response["code"]==314:
                response['credential_status'] = jsonpickle.loads(response['credential_status'])
                return response
            elif response["code"]==312 or response["code"]==313:
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        except Exception as e:
            raise e
            
    def getCredentialExpirationDate(self, label):
        if not isinstance(label,str):
            raise RuntimeError("you must provide a valid label")
        try:
            response = self.get("/credential/%s/credentialExpirationDate" % label)
            if response["code"]==206:
                response['date'] = jsonpickle.loads(response['date'])
                return response
            elif response["code"]==312:
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        except Exception as e:
            raise e
    
    def signCredential(self, credential, key=None):
        if not isinstance(credential, Credential):
            raise RuntimeError("credential argument must be a Credential instance")
        key_encode = None
        if key is not None:
            if not isinstance(key, EncryptionKey):
                raise RuntimeError("key argument must be a Key instance")
            key.encodeKey()
            key_encode= jsonpickle.encode(key)
            key.decodeKey()
        post_data = {
            "label"       : credential.getLabel(),
            "credential"  : jsonpickle.encode(credential),
            "key"         : key_encode,
        }
        
        return self.post("/signCredential",json=post_data)
    
    def putCredential(self, credential, n_unlock=2, shared_users=4):
        if not isinstance(credential, Credential):
            raise RuntimeError("credential argument must be a Credential instance")
        if not isinstance(n_unlock, int):
            raise RuntimeError("credential argument must be a Credential instance") 
        if not isinstance(shared_users, int):
            raise RuntimeError("credential argument must be a Credential instance") 
        post_data = {
            "label"         : credential.getLabel(),
            "credential"    : jsonpickle.encode(credential),
            "n_unlock"      : n_unlock,
            "shared_users"  : shared_users,
        }
        return self.post("/putCredential",json=post_data)
        
    def getCredential(self, label, decrypt=True, token=None, whom=None):
        if not isinstance(label,str):
            raise RuntimeError("you must provide a valid label")
        if not isinstance(decrypt,bool):
            raise RuntimeError("you must provide a valid decrypt argument")
        if not isinstance(whom,(str,type(None))):
            raise RuntimeError("you must provide a valid whom argument")
        token_encode = jsonpickle.encode(token)
        try:
            post_data = {
                "label"   : label,
                "decrypt" : decrypt,
                "token"   : token_encode,
                "whom"    : whom,
            }      
            response = self.post("/getCredential",json=post_data)
            if response["code"]==205:
                response['credential'] = Credential.deserialize(response['credential'])
                return  response
            elif response["code"]==312 or response["code"]==313:
                #don't exist credential or token not found to decrypt
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))   
        except Exception as e:
            raise e
    
    def verifyCredential(self, credential):
        if not isinstance(credential, Credential):
            raise RuntimeError("credential argument must be a Credential instance")  
        post_data = {
            "label"        : credential.getLabel(),
            "credential"  : jsonpickle.encode(credential),
        }      
        return self.post("/verifyCredential",json=post_data)
    
    def encryptCredential(self, credential, recipient_key):
        if not isinstance(credential, Credential):
            raise RuntimeError("credential argument must be a Credential instance")  
        if not isinstance(recipient_key, EncryptionKey):
                raise RuntimeError("key argument must be a Key instance")
        recipient_key.encodeKey()
        key_encode= jsonpickle.encode(recipient_key)
        recipient_key.decodeKey()
        post_data = {
            "credential" : jsonpickle.encode(credential),
            "key"        : key_encode,
        }
        try:
            response = self.post("/encryptCredential",json=post_data)
            if response["code"]==206:
                response['credential_encrypted'] = jsonpickle.loads(response['credential_encrypted'])
                return response 
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        except Exception as e:
            raise e        
        
    def createToken(self, credential, min_unlock=2, shared_users=4):
        if not isinstance(credential, Credential):
            raise RuntimeError("credential argument must be a Credential instance")
        if not isinstance(min_unlock, int):
            raise RuntimeError("argument must be int")
        if not isinstance(shared_users, int):
            raise RuntimeError("argument must be int")
        post_data = {
            "credential"     : jsonpickle.encode(credential),
            "minimum_unlock" : min_unlock,
            "shared_users"   : shared_users, 
        }
        try:
            response = self.post("/createToken",json=post_data)
            if response["code"]==206:
                response["token"] = jsonpickle.loads(response["token"])
                response["credential_encrypted"] = jsonpickle.loads(response["credential_encrypted"])
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        except Exception as e:
            raise e

    def putToken(self, credential, token_list):
        if not isinstance(credential, Credential):
            raise RuntimeError("credential argument must be a Credential instance")
        if not isinstance(token_list, list):
            raise RuntimeError("argument must be a list")
        post_data = {
            "credential"  : jsonpickle.encode(credential),
            "token_list"  : jsonpickle.encode(token_list),
        }        
        return self.post("/putToken",json=post_data)
    
    def getToken(self, label=None, whom=None, active=False):
        if not isinstance(label,(str,type(None))):
            raise RuntimeError("you must provide a valid label")
        if not isinstance(whom,(str,type(None))):
            raise RuntimeError("you must provide a valid whom argument")
        try:
            response = self.get("/credential/%s/%s/%s/getToken" % (label, whom, active))
            if response["code"]==205:
                response['token'] = jsonpickle.loads(response['token'])
                return response 
            elif response["code"]==312:
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))

        except Exception as e:
            raise e
    
    def assignToken(self, whom, label, exp_date, comment=""):
        if not isinstance(whom,str):
            raise RuntimeError("you must provide a valid whom argument")
        if not isinstance(label,str):
            raise RuntimeError("you must provide a valid label")
        if not isinstance(comment,str):
            raise RuntimeError("you must provide a valid comment argument")
        post_data = {
            "whom"      : whom,
            "label"     : label,
            "exp_date"  : jsonpickle.encode(exp_date),
            "comment"   : comment,
        }
        
        return self.post("/assignToken",json=post_data)
    
    def getAssignedToken(self, label, whom):
        if not isinstance(label,str):
            raise RuntimeError("you must provide a valid label")
        if not isinstance(whom,str):
            raise RuntimeError("you must provide a valid whom argument")
        try:
            response = self.get("/token/%s/%s/getAssignedToken" %(label, whom))
            if response["code"]==205:
                response['token'] = jsonpickle.loads(response['token'])
                return response
            elif response["code"]==312:
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))

        except Exception as e:
            raise e
    
    def signToken(self, label, whom):
        if not isinstance(label,str):
            raise RuntimeError("you must provide a valid label")
        if not isinstance(whom,str):
            raise RuntimeError("you must provide a valid whom argument")
        post_data = {
            "label" : label,
            "whom"  : whom,
        }
        return self.post("/signToken",json=post_data)
        
    def verifyToken(self, token):  
        post_data = {
            "token"  : jsonpickle.encode(token),
        }      
        return self.post("/verifyToken",json=post_data)
    
    def getTokenList(self, active=False):
        if not isinstance(active,bool):
            raise RuntimeError("you must provide a valid argument")
        try:
            response = self.get("/tokenList/%s" % active)
            if response["code"]==206:
                response['token_list'] = jsonpickle.loads(response['token_list'])
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))

        except Exception as e:
            raise e
    
    def getTokenExpirationDate(self, label, whom):
        if not isinstance(label,str):
            raise RuntimeError("you must provide a valid label")
        if not isinstance(whom,str):
            raise RuntimeError("you must provide a valid label")
        try:
            response = self.get("/token/%s/%s/tokenExpirationDate" %(label, whom))
            if response["code"]==206:
                response['date'] = jsonpickle.loads(response['date'])
                return response
            elif response["code"]==312:
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        except Exception as e:
            raise e
    
    def tokenExpiration(self, label, whom=None):
        if not isinstance(label,str):
            raise RuntimeError("you must provide a valid label")
        if not isinstance(whom,(str,type(None))):
            raise RuntimeError("you must provide a valid label")
        try:
            response = self.get("/token/%s/%s/tokenExpiration" % (label, whom))
            if response["code"]==206:
                response['token_status'] = jsonpickle.loads(response['token_status'])
                return response
            elif response["code"]==312 or response["code"]==313:
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        except Exception as e:
            raise e
            
    def getCredential(self, label, token=None, whom=None): # NUEVA
        if not isinstance(label,str):
            raise RuntimeError("you must provide a valid label")
        if not isinstance(token,(str,type(None))):
            raise RuntimeError("you must provide a valid token")
        if not isinstance(whom,(str,type(None))):
            raise RuntimeError("you must provide a valid argument")
        try:
            response = self.get("/credential/%s/%s/%s/getRegisterCredential" % (label, token, whom))
            if response["code"]==205:
                #credential = Credential.deserialize(response['credential'])
                credential = jsonpickle.loads(response['credential'])
                response['credential'] = credential
                return response
            elif response["code"]==312:
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))

        except Exception as e:
            raise e
            
    def putInPersistentDict(self, dict_name, key, value):
        post_data = {
            "key"   : key,
            "value" : value
        }
        try:
            response = self.post("/pd/%s/put" % dict_name, json=post_data)
            if response["code"]==200:
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        except Exception as e:
            raise e

    def getFromPersistentDict(self, dict_name, key):
        try:
            response = self.get("/pd/%s/get/%s" % (dict_name,key))
            if response["code"]==200:
                return response
            else:
                raise APIResponseError("%d:%s" % (response["code"],response["status"]))
        except Exception as e:
            raise e

In [55]:
%%deploy /orch/api/__init__.py

from .Execution import *
from .Pipeline import *
from .ScheduledEvent import *
from .RemoteProcedureNotification import *
from .Orchestrator import *

In [56]:
raise StopExecution

# Testing

## Use case for OrchestratorManager (service side)

In [None]:
import time
from datetime import datetime, timedelta
from orch.orchestrator import OrchestratorManager

smtp_crd = {
    "sender":"",
    "smtp_host":"",
    "username":"",
    "password":""
}

om = OrchestratorManager(smtp_crd=smtp_crd)
#orch_access = om

In [None]:
om.stop()

### get executions by tests

In [None]:
exl = om.getExecutionsBy("start_ts>='2022-07-08 02:00:00'")
print(len(exl))
for ex in exl:
    print(ex)

In [None]:
exl = om.getExecutionsBy("state=6")
print(len(exl))
for ex in exl:
    print(ex)

In [None]:
try:
    om.cancelExecution("12b738c2-b79c-47ba-a5a4-b858954af430")
except Exception as e:
    print(type(e),e)

In [None]:
exl = om.getRunningExecutions()
print(len(exl))
for ex in exl:
    print(ex)

### Notification tests

In [None]:
# test RemoteProcedureNotification methods

n = om.createNotification("testssss")
print(n)

In [None]:
print(om.getLastNotification("test"))

In [None]:
print(om.getNotificationList("testssss"))

### Test a function

In [None]:
# create a pipeline 

def test_pipeline(*args, **kw_args):
    global orch_access
        
    orch_access.addVariable("my_var","my value")
    
    print("this is the execution of a test pipeline")
    
    report = "this is a test report:<br>"
    
    orch_access.setReport(report)
    orch_access.notifyExecution([""])
        
    time.sleep(30)
        
    return {"return value":"blabla"}


In [None]:
# register the function in the orchestrator

om.register("test_pipeline",test_pipeline, new_version=True)

# get the registered pipeline 
pipeline = om.getPipelines("test_pipeline")[-1]
print(pipeline)
om.activatePipeline(pipeline)

In [None]:
# get the registered pipeline 
pipeline = om.getPipelines("test_pipeline")[-1]
print(pipeline)

In [None]:
# manual execution of the pipeline
exec_obj = om.execute(pipeline)

In [None]:
# check the results of the execution
print(exec_obj)
print(exec_obj.getOutput())
print(exec_obj.getErrors())
result = exec_obj.getReturnValue()
display(result)


In [None]:
# trigger the manual execution and then cancel it
pipeline = om.getPipelines("test_pipeline")[-1]

exec_obj = om.execute(pipeline)

print("sleeping 15s before cancelling")
time.sleep(1)
exec_obj.cancel()
print("sleeping 2s after cancelation")
time.sleep(2)
print(exec_obj)
print(exec_obj.getOutput())

In [None]:
print(exec_obj)
print(exec_obj.getOutput())

In [None]:
for execution in om.getExecutionList("test_pipeline"):
    print(execution)

In [None]:
for execution in pipeline.getExecutions():
    print(execution.getOutput())


In [None]:
trigger_time = (datetime.now() + timedelta(seconds=10)).strftime("%H:%M:%S")
print(trigger_time)

In [None]:
trigger_time = (datetime.now() + timedelta(seconds=10)).strftime("%H:%M:%S")

recurrency = "1m" # seconds

my_pipeline = om.getActivePipeline("test_pipeline")
print(my_pipeline)

my_pipeline.setArguments( ("arg1","arg2") )

if om.scheduleAt(my_pipeline, trigger_time=trigger_time, recurrency=recurrency):
    print("pipeline scheduled at %s with recurrency at %s" % (trigger_time, recurrency))
else:
    print("pipeline not scheduled")

scheduled_executions = om.getScheduledExecutions(my_pipeline)
print("scheduled executions for my_pipeline")
for sch_exec in scheduled_executions:
    print(sch_exec)

In [None]:
sch_lst = om.getScheduledExecutions(my_pipeline)
print(sch_lst)

sch_evt = sch_lst[-1]
print(sch_evt)

om.cancelScheduledExecution(sch_evt)

In [None]:
scheduled_executions = om.getScheduledExecutions(my_pipeline)
print("scheduled executions for my_pipeline")
for sch_exec in scheduled_executions:
    print(sch_exec)

In [None]:
for execution in my_pipeline.getExecutions():
    print(execution)


In [None]:
sch_lst = om.getScheduledExecutions(my_pipeline)
print(sch_lst)

In [None]:
om.stop()

raise StopExecution

### PipelineManager usage via Orchestrator

In [None]:
my_pipeline = om.getActivePipeline("test_pipeline")
print(my_pipeline)

In [None]:
# execute a pipeline correctly 
executor = om.execute(my_pipeline,1,2,3)
while executor.isPreparing() or executor.isRunning():
    print("execution state:",executor.state)
    time.sleep(1)
print("execution done. final state:",executor.state)
print(executor.getOutput())
print(executor.getErrors())

In [None]:
# execute a pipeline with less arguments
executor = om.execute(my_pipeline,1,2)
while executor.isPreparing() or executor.isRunning():
    print("execution state:",executor.state)
    time.sleep(1)
print("execution done. final state:",executor.state)
print(executor.getOutput())
print(executor.getErrors())

In [None]:
execution_list = om.getExecutionList("test_pipeline")
for execution in execution_list:
    print(execution)

In [None]:
# execute a non-active pipeline
# the method getPipelines returns a list, that why we get the elements 0 from the list to get the pipeline object directly

my_pipeline_v1 = om.getPipelines("test_pipeline", version=1)[0]
print(my_pipeline_v1)

executor_v1 = om.execute(my_pipeline_v1,10,20)
while executor_v1.isPreparing() or executor_v1.isRunning():
    print("execution state:",executor_v1.state)
    time.sleep(1)
print("execution done. final state:",executor_v1.state)
print(executor_v1.getOutput())
print(executor_v1.getErrors())

In [None]:
raise StopExecution

In [None]:
om.stop()

## Use case of Orchestrator API (client side)

In [None]:
from orch.orchestrator import *

orch_srv = OrchestratorService(db_conn_str="sqlite:///orchestrator.sqlite")
orch_srv.start()

In [None]:
orch_srv.stop()

### status

In [None]:
from orch.api import Orchestrator
from datetime import timedelta
orch = Orchestrator()
print(orch.status())

### test getting running executions

In [None]:
exl = orch.getRunningExecutions()
print(exl)

### test getting executions by

In [None]:
exl = orch.getExecutionsBy("state=6")
print(exl)

### test stop

In [None]:
orch.stop()

### create notification

In [None]:
rpn = orch.createNotification("my_rpn",data={"p1":10,"p2":"ddfdfdf"})

In [None]:
print(rpn)

In [None]:
rpn = orch.createNotification("test_notification",data={"triggers": ["test_pipeline"]})

In [None]:
print(rpn)

### create notification via direct api call

In [None]:
import requests

url="http://localhost:8020/rpn/my_direct_rpn/notify?p1=23&p2=sdfsdfsd"

r = requests.get(url)
print(r.json())


### get last notification

In [None]:
last_rpn = orch.getLastNotification("my_direct_rpn")
print(last_rpn)
print(last_rpn.getData())

In [None]:
last_rpn = orch.getLastNotification("my_rpn")
print(last_rpn)
print(last_rpn.getData())

### get notification list

In [None]:
rpn_list = orch.getNotificationList("my_rpn")
print(rpn_list)

### registration of a test pipeline

In [None]:
import base64

register=True

In [None]:
if register:
    def test_pipeline(arg1,arg2):
        import time
        print("this is a test pipeline")
        print("arg1:",arg1)
        print("arg2:",arg2)
        count = 0
        for i in range(0,30):
            count+=1
            time.sleep(1)
        return count
      
    print(orch.register("test_pipeline",test_pipeline))

In [None]:
# register a second version of the test_pipeline
if register:
    def test_pipeline(arg1,arg2,arg3):
        import time
        print("this is a test pipeline version 2")
        print("arg1:",arg1)
        print("arg2:",arg2)
        print("arg3:",arg3)
        count = 0
        for i in range(0,30):
            count+=1
            time.sleep(1)
        return count
      
    print(orch.register("test_pipeline",test_pipeline, new_version=True))

### manual execution of a pipeline

In [None]:
# execute once the version 1 of my_pipeline

test_pipelines = orch.getPipelines("test_pipeline")
for p in test_pipelines:
    print(p)

test_pipeline_v1 = orch.getPipelines("test_pipeline",version=1)[0]
print(test_pipeline_v1)

In [None]:
last_exec_id = orch.execute(test_pipeline_v1, 10, 20)

In [None]:
last_exec = orch.getLastExecution("test_pipeline")
print(last_exec)

print(last_exec.getOutput())

### test cancel

In [None]:
last_exec = orch.getLastExecution("test_pipeline")
exec_id = last_exec.uuid

orch.cancelExecution(exec_id)

### test pipeline schedule

In [None]:
from datetime import datetime
import time

# execute once the version 1 of my_pipeline
test_pipeline_v1 = orch.getPipelines("test_pipeline",version=1)[0]
print(test_pipeline_v1)

if orch.activatePipeline(test_pipeline_v1):
    print("pipeline activated")
else:
    print("pipeline not active")

test_pipeline = orch.getActivePipeline("test_pipeline")
print(test_pipeline)

# set pipeline arguments to schedule execution
test_pipeline.setArguments( (1,2) )

# trigger the pipeline in 10 seconds
trigger_time = (datetime.now() + timedelta(seconds=10)).strftime("%H:%M:%S")

if orch.scheduleAt(test_pipeline, trigger_time=trigger_time):
    print("pipeline scheduled at",trigger_time)
else:
    print("pipeline not scheduled")

print("waiting for pipelie to be triggered")
time.sleep(14)

print("execution list for my_pipeline")
last_exec = orch.getLastExecution("test_pipeline")

print(last_exec)

while not last_exec.isDone():
    print("waiting for pipeline execution to finish. state",last_exec.state)
    # at the api we need to update last_exec in order to realize when execution is done
    # at manager, the state is automaticaly refreshed by the ORM, then, no need to refresh last_exec
    last_exec = orch.getLastExecution("test_pipeline")
    time.sleep(5)

print("pipeline execution done")
    
print(last_exec)

print(last_exec.getOutput())
print(last_exec.getErrors())

### test get last execution and output

In [None]:
my_pipeline_v1 = orch.getPipelines("test_pipeline",version=1)[-1]
last_exec = orch.getLastExecution("test_pipeline")
print(last_exec)
print(last_exec.getOutput())

In [None]:
# activate my_pipeline version 2 and execute it via scheduler
test_pipeline_v2 = orch.getPipelines("test_pipeline",version=2)[0]
print(test_pipeline_v2)

if orch.activatePipeline(test_pipeline_v2):
    print("pipeline activated")
else:
    print("pipeline not active")
    
trigger_time = (datetime.now() + timedelta(seconds=10)).strftime("%H:%M:%S")

test_pipeline = orch.getActivePipeline("test_pipeline")
print(test_pipeline)

test_pipeline.setArguments( (1,2,3) )

if orch.scheduleAt(test_pipeline, trigger_time=trigger_time):
    print("pipeline scheduled at",trigger_time)
else:
    print("pipeline not scheduled")

# wait for pipeline to begin its scheduled execution
time.sleep(11)
    
print("execution list for my_pipeline")
last_exec = orch.getLastExecution("test_pipeline")

while not last_exec.isDone():
    print("waiting for pipeline execution to finish. state",last_exec.state)
    last_exec = orch.getLastExecution("test_pipeline")
    time.sleep(5)

print("pipeline execution done")
    
print(last_exec)

print(last_exec.getOutput())
print(last_exec.getErrors())


In [None]:
# schedule a recurrent execution, execute the pipeline twice and then cancel the execution

test_pipeline_v2 = orch.getPipelines("test_pipeline",version=3)[0]
print(test_pipeline_v2)

if orch.activatePipeline(test_pipeline_v2):
    print("pipeline activated")
else:
    print("pipeline not active")
    
trigger_time = (datetime.now() + timedelta(seconds=10)).strftime("%H:%M:%S")

recurrency = "20s" # seconds

test_pipeline = orch.getActivePipeline("test_pipeline")
print(test_pipeline)

test_pipeline.setArguments( (1,2,3) )

if orch.scheduleAt(test_pipeline, trigger_time=trigger_time, recurrency=recurrency):
    print("pipeline scheduled at %s with recurrency at %s" % (trigger_time, recurrency))
else:
    print("pipeline not scheduled")

scheduled_executions = orch.getScheduledExecutions(test_pipeline)
print("scheduled executions for my_pipeline")
for sch_exec in scheduled_executions:
    print(sch_exec)

In [None]:
# wait for pipeline to begin its scheduled execution
time.sleep(11)

execution_times = 2

for t in range(0,execution_times):
    last_exec = orch.getLastExecution("test_pipeline")
    print("last execution for my_pipeline at time %d" % t)
    print(last_exec)
    
    while not last_exec.isDone():
        print("waiting for pipeline execution to finish. state",last_exec.state)
        time.sleep(5)
        last_exec = orch.getLastExecution("test_pipeline")

    print("pipeline execution done")
    time.sleep(16)
    print("sleep ends")


In [None]:
test_pipeline = orch.getActivePipeline("test_pipeline")
print(test_pipeline)
scheduled_executions = orch.getScheduledExecutions(test_pipeline)
for scex in scheduled_executions:
    print(scex)

In [None]:
# cancel the execution
if orch.cancelScheduledExecution(scheduled_executions[-1]):
    sch_evt = orch.getScheduledExecutionById(scheduled_executions[-1].uuid)
    print("%s cancelled" % sch_evt)

scheduled_executions = orch.getScheduledExecutions(test_pipeline)
print("scheduled executions for my_pipeline")
for sch_exec in scheduled_executions:
    print(sch_exec)

In [None]:
orch_srv.stop()

# END