Skip to content

Commit

Permalink
bugfix: 修复流程状态获取接口 API 返回的节点名与实际节点名不符的问题 (TencentBlueKing#78)
Browse files Browse the repository at this point in the history
* bugfix: 修复流程状态获取接口 API 返回的节点名与实际节点名不符的问题

* improvement: name 最大长度常量抽取

* minor: 构造函数默认值修改
  • Loading branch information
homholueng authored and pagezz-canway committed Apr 11, 2019
1 parent afa5bb5 commit dec6582
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 11 deletions.
7 changes: 4 additions & 3 deletions pipeline/engine/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

from pipeline.core.flow.activity import SubProcess
from pipeline.engine import states
from pipeline.engine.models import Status, NodeRelationship, FunctionSwitch
from pipeline.engine.models import Status, NodeRelationship, FunctionSwitch, NAME_MAX_LENGTH
from pipeline.engine.core.handlers import FLOW_NODE_HANDLERS
from pipeline.conf import settings as pipeline_settings

logger = logging.getLogger('celery')

RERUN_MAX_LIMIT = pipeline_settings.PIPELINE_RERUN_MAX_TIMES


Expand Down Expand Up @@ -74,8 +75,8 @@ def run_loop(process):
return

# try to transit current node to running state
action = Status.objects.transit(id=current_node.id, to_state=states.RUNNING, start=True,
name=str(current_node.__class__))
name = (current_node.name or str(current_node.__class__))[:NAME_MAX_LENGTH]
action = Status.objects.transit(id=current_node.id, to_state=states.RUNNING, start=True, name=name)

# check rerun limit
if not isinstance(current_node, SubProcess) and RERUN_MAX_LIMIT != 0 and \
Expand Down
12 changes: 7 additions & 5 deletions pipeline/engine/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
from pipeline.conf import settings as pipeline_settings

logger = logging.getLogger('celery')

RERUN_MAX_LIMIT = pipeline_settings.PIPELINE_RERUN_MAX_TIMES
NAME_MAX_LENGTH = 64


class ProcessSnapshotManager(models.Manager):
Expand Down Expand Up @@ -694,8 +696,8 @@ def states_for(self, id_list):

def prepare_for_pipeline(self, pipeline):
cls_str = str(pipeline.__class__)
cls_name = pipeline.__class__.__name__[:64]
self.create(id=pipeline.id, state=states.READY, name=cls_str if len(cls_str) <= 64 else cls_name)
cls_name = pipeline.__class__.__name__[:NAME_MAX_LENGTH]
self.create(id=pipeline.id, state=states.READY, name=cls_str if len(cls_str) <= NAME_MAX_LENGTH else cls_name)

def fail(self, node, ex_data):
action_res = self.transit(node.id, states.FAILED)
Expand Down Expand Up @@ -807,7 +809,7 @@ def lock(self, id):
class Status(models.Model):
id = models.CharField(_(u"节点 ID"), unique=True, primary_key=True, max_length=32)
state = models.CharField(_(u"状态"), max_length=10)
name = models.CharField(_(u"节点名称"), max_length=64, default='')
name = models.CharField(_(u"节点名称"), max_length=NAME_MAX_LENGTH, default='')
retry = models.IntegerField(_(u"重试次数"), default=0)
loop = models.IntegerField(_(u"循环次数"), default=1)
skip = models.BooleanField(_(u"是否跳过"), default=False)
Expand Down Expand Up @@ -946,7 +948,7 @@ def update_celery_info(self, id, lock, celery_id, schedule_date, is_scheduling=F
class ScheduleService(models.Model):
SCHEDULE_ID_SPLIT_DIVISION = 32

id = models.CharField(_(u"ID 节点ID+version"), max_length=64, unique=True, primary_key=True)
id = models.CharField(_(u"ID 节点ID+version"), max_length=NAME_MAX_LENGTH, unique=True, primary_key=True)
activity_id = models.CharField(_(u"节点 ID"), max_length=32, db_index=True)
process_id = models.CharField(_(u"Pipeline 进程 ID"), max_length=32)
schedule_times = models.IntegerField(_(u"被调度次数"), default=0)
Expand Down Expand Up @@ -1073,7 +1075,7 @@ def start_task(self, schedule_id, start_func, kwargs):


class ScheduleCeleryTask(models.Model):
schedule_id = models.CharField(_(u"schedule ID"), max_length=64, unique=True, db_index=True)
schedule_id = models.CharField(_(u"schedule ID"), max_length=NAME_MAX_LENGTH, unique=True, db_index=True)
celery_task_id = models.CharField(_(u"celery 任务 ID"), max_length=40, default='')

objects = ScheduleCeleryTaskManager()
Expand Down
4 changes: 2 additions & 2 deletions pipeline/tests/engine/core/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def test_run_loop(self):

with patch('pipeline.engine.core.runtime.FLOW_NODE_HANDLERS', mock_handlers):
# 6.1. test should return
current_node = IdentifyObject()
current_node = IdentifyObject(name='name')
process = MockPipelineProcess(top_pipeline=PipelineObject(node=current_node),
destination_id=uniqid(),
current_node_id=current_node.id)
Expand All @@ -275,7 +275,7 @@ def test_run_loop(self):
Status.objects.transit.assert_called_with(id=current_node.id,
to_state=states.RUNNING,
start=True,
name=str(current_node.__class__))
name=current_node.name)

process.refresh_current_node.assert_called_once_with(current_node.id)

Expand Down
3 changes: 2 additions & 1 deletion pipeline/tests/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ def get_outputs(self):


class IdentifyObject(object):
def __init__(self, id=None):
def __init__(self, id=None, name=''):
self.id = id or uniqid()
self.name = name


class StartEventObject(IdentifyObject):
Expand Down

0 comments on commit dec6582

Please sign in to comment.