diff --git a/pipeline/engine/core/runtime.py b/pipeline/engine/core/runtime.py index c7ba2e5785..3c81c48a87 100644 --- a/pipeline/engine/core/runtime.py +++ b/pipeline/engine/core/runtime.py @@ -17,11 +17,12 @@ from pipeline.core.flow.activity import SubProcess from pipeline.engine import states -from pipeline.engine.models import Status, NodeRelationship, FunctionSwitch +from pipeline.engine.models import Status, NodeRelationship, FunctionSwitch, NAME_MAX_LENGTH from pipeline.engine.core.handlers import FLOW_NODE_HANDLERS from pipeline.conf import settings as pipeline_settings logger = logging.getLogger('celery') + RERUN_MAX_LIMIT = pipeline_settings.PIPELINE_RERUN_MAX_TIMES @@ -74,8 +75,8 @@ def run_loop(process): return # try to transit current node to running state - action = Status.objects.transit(id=current_node.id, to_state=states.RUNNING, start=True, - name=str(current_node.__class__)) + name = (current_node.name or str(current_node.__class__))[:NAME_MAX_LENGTH] + action = Status.objects.transit(id=current_node.id, to_state=states.RUNNING, start=True, name=name) # check rerun limit if not isinstance(current_node, SubProcess) and RERUN_MAX_LIMIT != 0 and \ diff --git a/pipeline/engine/models/core.py b/pipeline/engine/models/core.py index 257dd7ba66..246ab14c4c 100644 --- a/pipeline/engine/models/core.py +++ b/pipeline/engine/models/core.py @@ -35,7 +35,9 @@ from pipeline.conf import settings as pipeline_settings logger = logging.getLogger('celery') + RERUN_MAX_LIMIT = pipeline_settings.PIPELINE_RERUN_MAX_TIMES +NAME_MAX_LENGTH = 64 class ProcessSnapshotManager(models.Manager): @@ -694,8 +696,8 @@ def states_for(self, id_list): def prepare_for_pipeline(self, pipeline): cls_str = str(pipeline.__class__) - cls_name = pipeline.__class__.__name__[:64] - self.create(id=pipeline.id, state=states.READY, name=cls_str if len(cls_str) <= 64 else cls_name) + cls_name = pipeline.__class__.__name__[:NAME_MAX_LENGTH] + self.create(id=pipeline.id, state=states.READY, name=cls_str if len(cls_str) <= NAME_MAX_LENGTH else cls_name) def fail(self, node, ex_data): action_res = self.transit(node.id, states.FAILED) @@ -807,7 +809,7 @@ def lock(self, id): class Status(models.Model): id = models.CharField(_(u"节点 ID"), unique=True, primary_key=True, max_length=32) state = models.CharField(_(u"状态"), max_length=10) - name = models.CharField(_(u"节点名称"), max_length=64, default='') + name = models.CharField(_(u"节点名称"), max_length=NAME_MAX_LENGTH, default='') retry = models.IntegerField(_(u"重试次数"), default=0) loop = models.IntegerField(_(u"循环次数"), default=1) skip = models.BooleanField(_(u"是否跳过"), default=False) @@ -946,7 +948,7 @@ def update_celery_info(self, id, lock, celery_id, schedule_date, is_scheduling=F class ScheduleService(models.Model): SCHEDULE_ID_SPLIT_DIVISION = 32 - id = models.CharField(_(u"ID 节点ID+version"), max_length=64, unique=True, primary_key=True) + id = models.CharField(_(u"ID 节点ID+version"), max_length=NAME_MAX_LENGTH, unique=True, primary_key=True) activity_id = models.CharField(_(u"节点 ID"), max_length=32, db_index=True) process_id = models.CharField(_(u"Pipeline 进程 ID"), max_length=32) schedule_times = models.IntegerField(_(u"被调度次数"), default=0) @@ -1073,7 +1075,7 @@ def start_task(self, schedule_id, start_func, kwargs): class ScheduleCeleryTask(models.Model): - schedule_id = models.CharField(_(u"schedule ID"), max_length=64, unique=True, db_index=True) + schedule_id = models.CharField(_(u"schedule ID"), max_length=NAME_MAX_LENGTH, unique=True, db_index=True) celery_task_id = models.CharField(_(u"celery 任务 ID"), max_length=40, default='') objects = ScheduleCeleryTaskManager() diff --git a/pipeline/tests/engine/core/test_runtime.py b/pipeline/tests/engine/core/test_runtime.py index 2f657b2d77..005f5a25f4 100644 --- a/pipeline/tests/engine/core/test_runtime.py +++ b/pipeline/tests/engine/core/test_runtime.py @@ -253,7 +253,7 @@ def test_run_loop(self): with patch('pipeline.engine.core.runtime.FLOW_NODE_HANDLERS', mock_handlers): # 6.1. test should return - current_node = IdentifyObject() + current_node = IdentifyObject(name='name') process = MockPipelineProcess(top_pipeline=PipelineObject(node=current_node), destination_id=uniqid(), current_node_id=current_node.id) @@ -275,7 +275,7 @@ def test_run_loop(self): Status.objects.transit.assert_called_with(id=current_node.id, to_state=states.RUNNING, start=True, - name=str(current_node.__class__)) + name=current_node.name) process.refresh_current_node.assert_called_once_with(current_node.id) diff --git a/pipeline/tests/mock.py b/pipeline/tests/mock.py index 55489eebda..4177508723 100644 --- a/pipeline/tests/mock.py +++ b/pipeline/tests/mock.py @@ -45,8 +45,9 @@ def get_outputs(self): class IdentifyObject(object): - def __init__(self, id=None): + def __init__(self, id=None, name=''): self.id = id or uniqid() + self.name = name class StartEventObject(IdentifyObject):