In [1]:
import time
import warnings
warnings.filterwarnings('ignore')

import hkube_notebook
from hkube_notebook import AlgorithmBuilder, PipelineBuilder, PipelineExecutor, TrackerType
print(hkube_notebook.__version__)

connect_options={'base_url':'https://cd.hkube.io','path':'/hkube/api-server/api/v1'}

0.3


# Build Pipeline

In [2]:
zazaBuilder = PipelineBuilder(name='zaza', options=connect_options)
zazaBuilder.add_node(node_name='green', alg_name='green-alg', input=["@flowInput.tata"])
zazaBuilder.add_node(node_name='yellow', alg_name='yellow-alg', input=["@green"])
zazaBuilder.add_node(node_name='black', alg_name='black-alg', input=["@yellow"])
# add unknown algorithm => error
#zazaBuilder.add_node(node_name='gold', alg_name='gold-alg', input=["@black"])
zazaBuilder.get_raw()

{'name': 'zaza',
 'nodes': [{'nodeName': 'green',
   'algorithmName': 'green-alg',
   'input': ['@flowInput.tata']},
  {'nodeName': 'yellow', 'algorithmName': 'yellow-alg', 'input': ['@green']},
  {'nodeName': 'black', 'algorithmName': 'black-alg', 'input': ['@yellow']}],
 'options': {'base_url': 'https://cd.hkube.io',
  'path': '/hkube/api-server/api/v1'},
 'flowInput': {}}

# Execute raw pipeline from builder (webhook tracker)

In [3]:
zazaRawExec = PipelineExecutor(raw=zazaBuilder.get_raw(), options=connect_options, tracker=TrackerType.POLLING)
results = zazaRawExec.exec(input={ 'tata': {} })

  0%|          | 0/100 [00:00<?, ?it/s]

OK - pipeline is running, jobId: v14t0akt17y5
PollingTracker thread finished, jobId: v14t0akt17y5
getting results...
pipeline "zaza" status: completed
timeTook: 3.683 seconds
RESULT (1 of 1 items):
RESULT ITEM 1:
42
<<<<< finished


# Store pipeline, async execute 3 times (webhook listener)

In [4]:
zazaBuilder.store()

OK: pipeline "zaza" was stored successfully!


True

In [5]:
zazaStoredListenerExec = PipelineExecutor(name='zaza', options=connect_options, tracker=TrackerType.POLLING)
jobId1 = zazaStoredListenerExec.exec_async(input={'tata': {}})
jobId2 = zazaStoredListenerExec.exec_async(input={'tata': {}})
jobId3 = zazaStoredListenerExec.exec_async(input={'tata': {}})

  0%|          | 0/100 [00:00<?, ?it/s]

OK - pipeline is running, jobId: tfjy0rvdg5ap


  0%|          | 0/100 [00:00<?, ?it/s]

OK - pipeline is running, jobId: 8ycsii5ewuwo


  0%|          | 0/100 [00:00<?, ?it/s]

OK - pipeline is running, jobId: 1zn4avz5adzf


In [6]:
# run it when above jobs are still running to see their status
status_list = zazaStoredListenerExec.get_all_status()

jobId tfjy0rvdg5ap: active - 0% completed, 1 storing, 1 creating, 1 preschedule
jobId 8ycsii5ewuwo: active - 0% completed, 1 active, 1 preschedule, 1 creating
jobId 1zn4avz5adzf: active - 0% completed, 1 active, 1 preschedule, 1 creating


# Async excute stored pipeline 3 times (status polling), stop one

In [7]:
zazaStoredPollingExec = PipelineExecutor(name='zaza', options=connect_options, tracker=TrackerType.POLLING)
# async execution
jobId1 = zazaStoredPollingExec.exec_async(input={'tata': {}})
jobId2 = zazaStoredPollingExec.exec_async(input={'tata': {}})
jobId3 = zazaStoredPollingExec.exec_async(input={'tata': {}})
time.sleep(4)
zazaStoredPollingExec.stop(jobId=jobId2)

  0%|          | 0/100 [00:00<?, ?it/s]

OK - pipeline is running, jobId: td0k04z6nmw3


  0%|          | 0/100 [00:00<?, ?it/s]

OK - pipeline is running, jobId: q2whsmp5jqnv


  0%|          | 0/100 [00:00<?, ?it/s]

OK - pipeline is running, jobId: h8mp7dcrqpt4
PollingTracker thread finished, jobId: tfjy0rvdg5ap
PollingTracker thread finished, jobId: 8ycsii5ewuwo
PollingTracker thread finished, jobId: 1zn4avz5adzf
ERROR: delete pipeline "zaza" failed: unable to stop pipeline zaza because its in completed status (code: 400)


False

In [8]:
# get result of the first
results1 = zazaStoredPollingExec.get_results(jobId=jobId3, max_display=10)

getting results...
PollingTracker thread finished, jobId: td0k04z6nmw3
PollingTracker thread finished, jobId: q2whsmp5jqnv
PollingTracker thread finished, jobId: h8mp7dcrqpt4
pipeline "zaza" status: completed
timeTook: 4.097 seconds
RESULT (1 of 1 items):
RESULT ITEM 1:
42


# Execute pipeline with (too small) timeout

In [9]:
results = zazaStoredPollingExec.exec(input={ 'tata': {} }, timeout_sec=2)

  0%|          | 0/100 [00:00<?, ?it/s]

OK - pipeline is running, jobId: lkhdjf30hvh7
getting results...
PollingTracker thread finished, jobId: lkhdjf30hvh7
ERROR: get results for jobId lkhdjf30hvh7 failed: unable to get results for pipeline zaza because its in active status (code: 400)
<<<<< finished


# Delete stored pipeline

In [10]:
zazaBuilder.delete()

OK: pipeline "zaza" was deleted successfully!


True

# Build & exec eval node pipeline, control dispaled size

In [11]:
splitBuilder = PipelineBuilder(name='multi_result', options=connect_options)
eval_split = {
    "code": [
    "function split(input) {",
    "return input[0].split(' ');",
    "}"]
}
splitBuilder.add_node(node_name='split', alg_name='eval-alg', input=["#@flowInput.text"],
                extra_data=eval_split)
splitBuilder.get_raw()

{'name': 'multi_result',
 'nodes': [{'nodeName': 'split',
   'algorithmName': 'eval-alg',
   'input': ['#@flowInput.text'],
   'extraData': {'code': ['function split(input) {',
     "return input[0].split(' ');",
     '}']}}],
 'options': {'base_url': 'https://cd.hkube.io',
  'path': '/hkube/api-server/api/v1'},
 'flowInput': {}}

In [12]:
splitExec = PipelineExecutor(raw=splitBuilder.get_raw(), options=connect_options, tracker=TrackerType.POLLING)
input = {
        "text": ["In mathematics and computer science, a directed acyclic graph ",
    "(DAG (About this sound listen)), is a finite directed graph with",
    " no directed cycles. That is, it consists of finitely many vertices and",
    "edges, with each edge directed from one vertex to another, ",
    "such that there is no way to start at any vertex v and follow",
    "a consistently-directed sequence of edges that eventually ",
    "loops back to v again. Equivalently, a DAG is a directed graph",
    "that has a topological ordering, a sequence of the vertices ",
    "such that every edge is directed from earlier to later in the sequence.",
    "The corresponding concept for undirected graphs is a forest, an undirected graph without ",
    "cycles. Choosing an orientation for a forest produces a special kind of directed acyclic graph",
    "called a polytree. However there are many other kinds of directed acyclic graph that are not"]
}
results = splitExec.exec(input=input, max_displayed_results=3)

  0%|          | 0/100 [00:00<?, ?it/s]

OK - pipeline is running, jobId: 189hx7omb9o1
PollingTracker thread finished, jobId: 189hx7omb9o1
getting results...
pipeline "multi_result" status: completed
timeTook: 0.558 seconds
RESULT (3 of 12 items):
RESULT ITEM 1:
[
    "In",
    "mathematics",
    "and",
    "computer",
    "science,",
    "a",
    "directed",
    "acyclic",
    "graph",
    ""
]
RESULT ITEM 2:
[
    "(DAG",
    "(About",
    "this",
    "sound",
    "listen)),",
    "is",
    "a",
    "finite",
    "directed",
    "graph",
    "with"
]
RESULT ITEM 3:
[
    "",
    "no",
    "directed",
    "cycles.",
    "That",
    "is,",
    "it",
    "consists",
    "of",
    "finitely",
    "many",
    "vertices",
    "and"
]
<<<<< finished


In [13]:
# execute without results display
results = splitExec.exec(input=input, max_displayed_results=0)

  0%|          | 0/100 [00:00<?, ?it/s]

OK - pipeline is running, jobId: 63xvxx7id7hc
PollingTracker thread finished, jobId: 63xvxx7id7hc
getting results...
pipeline "multi_result" status: completed
timeTook: 0.508 seconds
RESULTS ITEMS: 12
<<<<< finished


# Iterate over results

In [14]:
i = 0
print("TOTAL: {} results".format(len(results)))
for item in results:
    print('ITEM {}: {}'.format(i, results[i]))
    i += 1

TOTAL: 12 results
ITEM 0: {'nodeName': 'split', 'batchIndex': 1, 'algorithmName': 'eval-alg', 'result': ['In', 'mathematics', 'and', 'computer', 'science,', 'a', 'directed', 'acyclic', 'graph', '']}
ITEM 1: {'nodeName': 'split', 'batchIndex': 2, 'algorithmName': 'eval-alg', 'result': ['(DAG', '(About', 'this', 'sound', 'listen)),', 'is', 'a', 'finite', 'directed', 'graph', 'with']}
ITEM 2: {'nodeName': 'split', 'batchIndex': 3, 'algorithmName': 'eval-alg', 'result': ['', 'no', 'directed', 'cycles.', 'That', 'is,', 'it', 'consists', 'of', 'finitely', 'many', 'vertices', 'and']}
ITEM 3: {'nodeName': 'split', 'batchIndex': 4, 'algorithmName': 'eval-alg', 'result': ['edges,', 'with', 'each', 'edge', 'directed', 'from', 'one', 'vertex', 'to', 'another,', '']}
ITEM 4: {'nodeName': 'split', 'batchIndex': 5, 'algorithmName': 'eval-alg', 'result': ['such', 'that', 'there', 'is', 'no', 'way', 'to', 'start', 'at', 'any', 'vertex', 'v', 'and', 'follow']}
ITEM 5: {'nodeName': 'split', 'batchIndex':

# Get pipelines

In [15]:
# Get all stored pipelines
stored = PipelineExecutor.get_all_stored(options=connect_options)
# ...now you may iterate on 'stored'...

# Get all running pipeline jobs
running = PipelineExecutor.get_running_jobs(options=connect_options)

Got 18 stored pipelines: ['batch_trigger', 'pausePipe', 'simple', 'batch', 'big-batch', 'big-wait-batch', 'wait-any-batch', 'batch-on-node', 'simple-wait-batch', 'trigger-1', 'trigger-2', 'trigger-3', 'trigger-4', 'streaming', 'video1', 'sleep', 'full-full-less', 'orit']
Got 0 running jobs:


# Get Algorithms

In [16]:
alg_mgr = AlgorithmBuilder(options=connect_options)
algs = alg_mgr.get_all()

Got 13 algorithms: ['eval-alg', 'green-alg', 'yellow-alg', 'black-alg', 'alg1-ttt', 'green-alg1', 'green-alg2', 'green-alg3', 'green-alg4', 'green-alg5', 'test2', 'sort-alg', 'test-alg']


# Create Algorithm

In [17]:
folder = '/home/yuvalso/anaconda3/Amir/test_algorithm'
tarfilename = alg_mgr.create_algfile_by_folder(folder)
worker_env = { "WORKER_ALGORITHM_PROTOCOL": "ws" }
config = alg_mgr.create_config('test3-alg', 'main.py', worker_env=worker_env)
alg_mgr.apply(compressed_alg_file=tarfilename, config=config)
algs = alg_mgr.get_all()


ERROR: failed to create tar file /home/jovyan/work/alg.tar.gz: [Errno 2] No such file or directory: '/home/yuvalso/anaconda3/Amir/test_algorithm'


TypeError: expected str, bytes or os.PathLike object, not NoneType

In [None]:
testBuilder = PipelineBuilder(name='AMIRYI', api_server_base_url=api_server)
testBuilder.add_node(node_name='sort', alg_name='test3-alg', input=["@flowInput.array", "@flowInput.dir"])
testBuilder.get_raw()

In [None]:
sortRawExec = PipelineExecutor(raw=testBuilder.get_raw(), api_server_base_url=api_server, tracker=TrackerType.POLLING)
results = sortRawExec.exec(input={ "array": [5, 10, 7, -7, 21, 16, 19, 0], "dir": "asc"})

In [None]:
# delete alg
alg_mgr.delete('test3-alg')