Skip to content

Commit

Permalink
- Add step monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
maycuatroi committed Jun 11, 2023
1 parent 266ed08 commit e85fd2a
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 32 deletions.
2 changes: 1 addition & 1 deletion evoflow/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.8.0
0.1.9.0
85 changes: 75 additions & 10 deletions evoflow/entities/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,22 @@
import json
import os
import sys

import typing
from time import sleep

from rich.columns import Columns
from rich.live import Live
from rich.panel import Panel
from rich.progress import Progress
from rich.spinner import Spinner
from rich.text import Text
from rich.tree import Tree
from tqdm import tqdm

from evoflow.controller.log_controller import logger, pretty_dict
from evoflow.entities.core.base_object import BaseObject
from evoflow.entities.core.step import Step
from evoflow.entities.core.step_list import StepList
from evoflow.entities.core.transaction import Transaction


Expand Down Expand Up @@ -43,23 +53,40 @@ def kill(self, **kwargs):
def finish(self, **kwargs):
logger.info(f"Finish job: {self.name}")

def __step_generator(self):
def __step_generator(self) -> typing.Generator:
self.stacks = [self.start_step]
while len(self.stacks) > 0:
step_i = self.stacks.pop()
self.current_step = step_i
yield step_i
for i, step in enumerate(self.stacks):
if step.is_ready():
self.current_step = step
self.stacks.pop(i)
yield step
sleep(0.1)

def run(self, **kwargs):
with Live(
Panel(Columns([]), title=f"Running {self.name}"),
refresh_per_second=20,
) as live:
# live.update(Panel(spinners, title="Panel 2"))

# add
while True:
# do self.__run by new thread
self.__run(live=live, **kwargs)

def __run(self, live=None, **kwargs):
self.compile()
logger.info(f"Running job: {self.name}")
self.params_pool = kwargs
step_generator = self.__step_generator()

for step in tqdm(step_generator, unit="step"):
log_string = f"Running step : {step.name}"
for step in step_generator:
log_string = f"Running step : {step}"
logger.info(log_string)

step.prepare(**self.params_pool)
if live is not None:
self.__update_live(live)
try:
action_params = inspect.getfullargspec(step.action).args
build_params = {}
Expand All @@ -68,8 +95,9 @@ def run(self, **kwargs):
continue
build_params[param] = step.__dict__.get(param)
last_result = step.action(**build_params)
except AttributeError:
except AttributeError as e:
logger.error(f"Current Job params: {pretty_dict(self.params_pool)}")
step.set_error(e)
raise
step.end(**kwargs)

Expand All @@ -84,8 +112,10 @@ def run(self, **kwargs):

def __init__(self, name=None, start_step: Step = None, **kwargs):
self.current_step = None
self.__start_step = start_step
self.__start_step: Step = start_step
self.params_pool = {}
self.__steps = []
self.__running_steps = []
if name is None:
name = os.getenv("JOB_NAME")
super().__init__(name=name, **kwargs)
Expand Down Expand Up @@ -198,3 +228,38 @@ def __enter__(self, *args, **kwargs):

def __exit__(self, *args, **kwargs):
self.finish()

def find_previous_steps(self, step, all_steps):
previous_steps = []
for step_i in all_steps:
if step in step_i.get_next_steps():
previous_steps.append(step_i)
return previous_steps

def compile(self):
self.__steps = self.get_all_steps()
for step in self.__steps:
step.job = self

def add_running_step(self, step):
self.__running_steps.append(step)

def remove_running_step(self, step):
self.__running_steps.remove(step)

def __update_live(self, live):
tree = Tree(self.name)
for step in self.__running_steps:
is_step_list = isinstance(step, StepList)
if is_step_list:
remaining_steps = len(step.get_remaining_step())
total_step = len(step.steps)
step_title = f"{step.name} {total_step-remaining_steps}/{total_step}"
else:
step_title = step.name
spinner = Spinner("material", text=Text(step_title, style="green"))
step_live = tree.add(spinner)
if isinstance(step, StepList):
for sub_step in step.steps:
step_live.add(Spinner("material", text=Text(sub_step.name, style="blue")))
live.update(Panel(tree, title=f"Running {self.name}"))
44 changes: 41 additions & 3 deletions evoflow/entities/core/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ class Step(BaseObject):
Step is a cells of jobs
"""

STATUS_PENDING = "pending" # waiting to run
STATUS_RUNNING = "running" # running
STATUS_SUCCESS = "success" # success
STATUS_FAILED = "failed" # failed
STATUS_SKIPPED = "skipped" # skipped by condition
STATUS_READY = "ready" # ready to run

def __call__(self, func) -> "Step":
"""
Expand Down Expand Up @@ -44,8 +51,10 @@ def wrapper_action(*args: tuple, **kwargs: dict) -> "Step":
def __init__(self, name=None, transactions=None, **kwargs):
"""
@param name: Name of the step
@param kwargs:
Args:
name: Name of the step
transactions: List of transactions
**kwargs:
"""
self.is_start = kwargs.get("is_start")
self.is_end = kwargs.get("is_end")
Expand All @@ -58,6 +67,10 @@ def __init__(self, name=None, transactions=None, **kwargs):
name = self.__class__.__name__

self.params = {"name": name}
self.previous_steps = []
self.error = None
self.status = self.STATUS_PENDING
self.job = None
super().__init__(name=name, **kwargs)

@abc.abstractmethod
Expand All @@ -67,11 +80,24 @@ def action(self, **kwargs):
"""
pass

def do_action(self, **kwargs):
"""
Performs the function of step
"""

return self.action(**kwargs)

def set_error(self, error):
self.status = self.STATUS_FAILED
self.error = error

def end(self, **kwargs) -> dict:
"""
Kết thúc step, kill các object không cần thiết để giải phóng bộ nhớ
"""
# Global.caa.start_command('clear history')
self.status = self.STATUS_SUCCESS
self.job.remove_running_step(self)
return self.params

def prepare(self, **kwargs):
"""
Expand All @@ -80,6 +106,8 @@ def prepare(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
self.params = kwargs
self.job.add_running_step(self)
self.status = self.STATUS_RUNNING

def __str__(self):
return self.name
Expand Down Expand Up @@ -122,3 +150,13 @@ def __lshift__(self, other):
step_list = StepList(other)
return step_list.next(self)
raise Exception("Invalid step")

def is_ready(self):
"""
Check if step is ready to run
"""
for step in self.previous_steps:
if step.status != self.STATUS_SUCCESS:
return False
self.status = self.STATUS_READY
return True
14 changes: 4 additions & 10 deletions evoflow/entities/core/step_list.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import typing
from concurrent.futures import ThreadPoolExecutor

from tqdm import tqdm

from evoflow.entities.core.step import Step


Expand Down Expand Up @@ -49,10 +46,7 @@ def action(self, **kwargs):
"""
kwargs = {**self.params, **kwargs}
with ThreadPoolExecutor(max_workers=10) as executor:
list(
tqdm(
executor.map(lambda step: step.action(**kwargs), self.steps),
total=len(self.steps),
desc=self.name,
)
)
list(executor.map(lambda step: step.action(**kwargs), self.steps))

def get_remaining_step(self):
return [step for step in self.steps if step.status != Step.STATUS_SUCCESS]
5 changes: 2 additions & 3 deletions evoflow/entities/core/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ def __init__(self, step: Step, to: Step, condition="always"):
self.__from_step__ = step
self.condition = Condition(condition)
self.__from_step__.transactions.append(self)
self.to.previous_steps.append(self.__from_step__)

def validate(self, **kwargs):
if type(self.condition.condition_function) == str:
if self.condition.condition_function == "always":
return True
return self.build_condition_from_string(
self.condition.condition_function, self.__from_step__.params
)
return self.build_condition_from_string(self.condition.condition_function, self.__from_step__.params)
return bool(self.condition.condition_function(**kwargs))

def build_condition_from_string(self, condition_function_string, params):
Expand Down
14 changes: 9 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# This template is a low-dependency template.
# By default there is no requirements added here.
# Add the requirements you need to this file.
# or run `make init` to create this file automatically based on the template.
# You can also run `make switch-to-poetry` to use the poetry package manager.
tqdm
rich
python-pptx
json_tricks
xlrd
openpyxl
coloredlogs
pyunpack
patool

0 comments on commit e85fd2a

Please sign in to comment.