Skip to content

Commit

Permalink
feature: 1. 添加节点回调 API 2. 获取节点详情及创建任务 API 返回任务树及任务链接 (TencentBlueKing#75
Browse files Browse the repository at this point in the history
)
  • Loading branch information
homholueng authored and pagezz-canway committed Apr 11, 2019
1 parent 0d1b943 commit afa5bb5
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 14 deletions.
1 change: 1 addition & 0 deletions gcloud/apigw/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@
views.modify_constants_for_periodic_task),
url(r'^get_task_detail/(?P<task_id>\d+)/(?P<bk_biz_id>\d+)/$', views.get_task_detail),
url(r'^get_task_node_detail/(?P<task_id>\d+)/(?P<bk_biz_id>\d+)/$', views.get_task_node_detail),
url(r'^node_callback/(?P<task_id>\d+)/(?P<bk_biz_id>\d+)/$', views.node_callback),
]
45 changes: 43 additions & 2 deletions gcloud/apigw/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,13 @@ def create_task(request, template_id, bk_biz_id):
flow_type=params.get('flow_type', 'common'),
current_flow='execute_task' if params.get('flow_type', 'common') == 'common' else 'func_claim',
)
return JsonResponse({'result': True, 'data': {'task_id': task.id}})
return JsonResponse({
'result': True,
'data': {
'task_id': task.id,
'task_url': task.url,
'pipeline_tree': task.pipeline_tree
}})


@login_exempt
Expand Down Expand Up @@ -598,6 +604,41 @@ def get_task_node_detail(request, task_id, bk_biz_id):

node_id = request.GET.get('node_id')
component_code = request.GET.get('component_code')
subprocess_stack = json.loads(request.GET.get('subprocess_stack', '[]'))
try:
subprocess_stack = json.loads(request.GET.get('subprocess_stack', '[]'))
except Exception:
return JsonResponse({
'result': False,
'message': 'subprocess_stack is not a valid array json'
})
result = task.get_node_detail(node_id, component_code, subprocess_stack)
return JsonResponse(result)


@login_exempt
@csrf_exempt
@require_POST
@apigw_required
@api_check_user_perm_of_business('manage_business')
def node_callback(request, task_id, bk_biz_id):
try:
params = json.loads(request.body)
except Exception:
return JsonResponse({
'result': False,
'message': 'invalid param format'
})

try:
task = TaskFlowInstance.objects.get(id=task_id, business__cc_id=bk_biz_id)
except TaskFlowInstance.DoesNotExist:
message = 'task[id={task_id}] of business[bk_biz_id={bk_biz_id}] does not exist'.format(
task_id=task_id,
bk_biz_id=bk_biz_id)
logger.exception(message)
return JsonResponse({'result': False, 'message': message})

node_id = params.get('node_id')
callback_data = params.get('callback_data')

return JsonResponse(task.callback(node_id, callback_data))
6 changes: 1 addition & 5 deletions gcloud/taskflow3/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,7 @@ def node_callback(request, token):
'message': 'invalid request body'
}, status=400)

result, message = TaskFlowInstance.objects.callback(node_id, callback_data)
return JsonResponse({
'result': result,
'message': message
})
return JsonResponse(TaskFlowInstance.objects.callback(node_id, callback_data))


def get_taskflow_root_context(request, taskflow_id):
Expand Down
23 changes: 20 additions & 3 deletions gcloud/taskflow3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,9 +745,15 @@ def callback(self, act_id, data):
try:
result = pipeline_api.activity_callback(activity_id=act_id, callback_data=data)
except Exception as e:
return False, e.message
return {
'result': False,
'message': e.message
}

return result.result, result.message
return {
'result': result.result,
'message': result.message
}


class TaskFlowInstance(models.Model):
Expand Down Expand Up @@ -1231,7 +1237,9 @@ def get_task_detail(self):
'start_time': format_datetime(self.start_time),
'finish_time': format_datetime(self.finish_time),
'executor': self.executor,
'elapsed_time': self.elapsed_time
'elapsed_time': self.elapsed_time,
'pipeline_tree': self.pipeline_tree,
'task_url': self.url
}
exec_data = self.pipeline_instance.execution_data
# inputs data
Expand All @@ -1251,3 +1259,12 @@ def get_task_detail(self):
'ex_data': outputs.get('ex_data', '')
})
return data

def callback(self, act_id, data):
if not self.has_node(act_id):
return {
'result': False,
'message': 'task[{tid}] does not have node[{nid}]'.format(tid=self.id, nid=act_id)
}

return TaskFlowInstance.objects.callback(act_id, data)
114 changes: 110 additions & 4 deletions gcloud/tests/apigw/test_apigw.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,14 @@ def wrapper(*args, **kwargs):
TEST_APP_CODE = 'app_code'
TEST_TEMPLATE_ID = '1' # do not change this to non number
TEST_TASKFLOW_ID = '2' # do not change this to non number
TEST_TASKFLOW_URL = 'url'
TEST_TASKFLOW_PIPELINE_TREE = 'pipeline_tree'
TEST_PERIODIC_TASK_ID = '3' # do not change to this non number
TEST_DATA = 'data'
TEST_NODE_ID = 'node_id'
TEST_CALLBACK_DATA = 'callback_data'
TEST_COMPONENT_CODE = 'component_code'
TEST_SUBPROCESS_STACK = '[1, 2, 3]'


class APITest(TestCase):
Expand All @@ -77,6 +83,9 @@ def setUpClass(cls):
cls.SET_PERIODIC_TASK_ENABLED_URL = '/apigw/set_periodic_task_enabled/{task_id}/{bk_biz_id}/'
cls.MODIFY_PERIODIC_TASK_CRON_URL = '/apigw/modify_cron_for_periodic_task/{task_id}/{bk_biz_id}/'
cls.MODIFY_PERIODIC_TASK_CONSTANTS_URL = '/apigw/modify_constants_for_periodic_task/{task_id}/{bk_biz_id}/'
cls.GET_TASK_DETAIL = '/apigw/get_task_detail/{task_id}/{bk_biz_id}/'
cls.GET_TASK_NODE_DETAIL = '/apigw/get_task_node_detail/{task_id}/{bk_biz_id}/'
cls.NODE_CALLBACK = '/apigw/node_callback/{task_id}/{bk_biz_id}/'

super(APITest, cls).setUpClass()

Expand Down Expand Up @@ -295,15 +304,16 @@ def test_get_template_info__for_common_template_does_not_exists(self):
@mock.patch(TASKINSTANCE_CREATE, MagicMock(return_value=MockTaskFlowInstance(id=TEST_TASKFLOW_ID)))
@mock.patch(APIGW_VIEW_JSON_SCHEMA_VALIDATE, MagicMock())
def test_create_task__success(self):
pt1 = MockPipelineTemplate(id=1,
name='pt1')
pt1 = MockPipelineTemplate(id=1, name='pt1')

tmpl = MockTaskTemplate(id=1, pipeline_template=pt1)
biz = MockBusiness(cc_id=TEST_BIZ_CC_ID, cc_name=TEST_BIZ_CC_NAME)

with mock.patch(BUSINESS_GET, MagicMock(return_value=biz)):
with mock.patch(TASKTEMPLATE_SELECT_RELATE, MagicMock(return_value=MockQuerySet(get_result=tmpl))):
assert_data = {'task_id': TEST_TASKFLOW_ID}
assert_data = {'task_id': TEST_TASKFLOW_ID,
'task_url': TEST_TASKFLOW_URL,
'pipeline_tree': TEST_TASKFLOW_PIPELINE_TREE}
response = self.client.post(path=self.CREATE_TASK_URL.format(template_id=TEST_TEMPLATE_ID,
bk_biz_id=TEST_BIZ_CC_ID),
data=json.dumps({'name': 'name',
Expand Down Expand Up @@ -344,7 +354,9 @@ def test_create_task__success(self):
tmpl = MockCommonTemplate(id=1, pipeline_template=pt1)

with mock.patch(COMMONTEMPLATE_SELECT_RELATE, MagicMock(return_value=MockQuerySet(get_result=tmpl))):
assert_data = {'task_id': TEST_TASKFLOW_ID}
assert_data = {'task_id': TEST_TASKFLOW_ID,
'task_url': TEST_TASKFLOW_URL,
'pipeline_tree': TEST_TASKFLOW_PIPELINE_TREE}
response = self.client.post(path=self.CREATE_TASK_URL.format(template_id=TEST_TEMPLATE_ID,
bk_biz_id=TEST_BIZ_CC_ID),
data=json.dumps({'name': 'name',
Expand Down Expand Up @@ -940,3 +952,97 @@ def test_modify_constants_for_periodic_task__modify_constants_raise(self):

self.assertFalse(data['result'])
self.assertTrue('message' in data)

def test_get_task_detail__success(self):
mock_taskflow = MockTaskFlowInstance(get_task_detail_return=TEST_DATA)
with mock.patch(TASKINSTANCE_GET, MagicMock(return_value=mock_taskflow)):
assert_data = TEST_DATA
response = self.client.get(path=self.GET_TASK_DETAIL.format(task_id=TEST_TASKFLOW_ID,
bk_biz_id=TEST_BIZ_CC_ID))

data = json.loads(response.content)

self.assertTrue(data['result'])
self.assertEqual(data['data'], assert_data)

@mock.patch(TASKINSTANCE_GET, MagicMock(side_effect=TaskFlowInstance.DoesNotExist()))
def test_get_task_detail__success__taskflow_does_not_exists(self):
response = self.client.get(path=self.GET_TASK_DETAIL.format(task_id=TEST_TASKFLOW_ID,
bk_biz_id=TEST_BIZ_CC_ID))

data = json.loads(response.content)

self.assertFalse(data['result'])
self.assertTrue('message' in data)

def test_get_task_node_detail__success(self):
mock_taskflow = MockTaskFlowInstance(get_node_detail_return={'result': True, 'data': TEST_DATA})
with mock.patch(TASKINSTANCE_GET, MagicMock(return_value=mock_taskflow)):
assert_data = TEST_DATA
response = self.client.get(path=self.GET_TASK_NODE_DETAIL.format(task_id=TEST_TASKFLOW_ID,
bk_biz_id=TEST_BIZ_CC_ID),
data={'node_id': TEST_NODE_ID,
'component_code': TEST_COMPONENT_CODE,
'subprocess_stack': TEST_SUBPROCESS_STACK})

data = json.loads(response.content)

self.assertTrue(data['result'])
self.assertEqual(data['data'], assert_data)
mock_taskflow.get_node_detail.assert_called_once_with(TEST_NODE_ID,
TEST_COMPONENT_CODE,
json.loads(TEST_SUBPROCESS_STACK))

@mock.patch(TASKINSTANCE_GET, MagicMock(side_effect=TaskFlowInstance.DoesNotExist()))
def test_get_task_node_detail__taskflow_doest_not_exist(self):
response = self.client.get(path=self.GET_TASK_NODE_DETAIL.format(task_id=TEST_TASKFLOW_ID,
bk_biz_id=TEST_BIZ_CC_ID),
data={'node_id': TEST_NODE_ID,
'component_code': TEST_COMPONENT_CODE,
'subprocess_stack': TEST_SUBPROCESS_STACK})

data = json.loads(response.content)
self.assertFalse(data['result'])
self.assertTrue('message' in data)

def test_get_task_node_detail__with_invalid_subprocess_stack(self):
response = self.client.get(path=self.GET_TASK_NODE_DETAIL.format(task_id=TEST_TASKFLOW_ID,
bk_biz_id=TEST_BIZ_CC_ID),
data={'node_id': TEST_NODE_ID,
'component_code': TEST_COMPONENT_CODE,
'subprocess_stack': 'abcdefg'})

data = json.loads(response.content)
self.assertFalse(data['result'])
self.assertTrue('message' in data)

def test_node_callback__success(self):
mock_instance = MockTaskFlowInstance()
with mock.patch(TASKINSTANCE_GET, MagicMock(return_value=mock_instance)):
response = self.client.post(path=self.NODE_CALLBACK.format(task_id=TEST_TASKFLOW_ID,
bk_biz_id=TEST_BIZ_CC_ID),
data=json.dumps({
'node_id': TEST_NODE_ID,
'callback_data': TEST_CALLBACK_DATA
}),
content_type='application/json')

data = json.loads(response.content)

self.assertTrue(data['result'])
mock_instance.callback.assert_called_once_with(TEST_NODE_ID, TEST_CALLBACK_DATA)

@mock.patch(TASKINSTANCE_GET, MagicMock(side_effect=TaskFlowInstance.DoesNotExist()))
def test_node_callback__taskflow_does_not_exists(self):
response = self.client.post(path=self.NODE_CALLBACK.format(task_id=TEST_TASKFLOW_ID,
bk_biz_id=TEST_BIZ_CC_ID),
data=json.dumps({
'node_id': TEST_NODE_ID,
'callback_data': TEST_CALLBACK_DATA
}),
content_type='application/json')

data = json.loads(response.content)

self.assertFalse(data['result'])
self.assertTrue('message' in data)
7 changes: 7 additions & 0 deletions gcloud/tests/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ def __init__(self, **kwargs):
'side_effect': kwargs.get('get_status_raise')})
self.format_pipeline_status = MagicMock(**{'return_value': kwargs.get('format_pipeline_status_return'),
'side_effect': kwargs.get('format_pipeline_status_raise')})
self.url = kwargs.get('url', 'url')
self.pipeline_tree = kwargs.get('pipeline_tree', 'pipeline_tree')
self.callback = MagicMock(return_value=kwargs.get('callback_return', {'result': True,
'message': 'success'}))
self.get_task_detail = MagicMock(return_value=kwargs.get('get_task_detail_return', 'task_detail'))
self.get_node_detail = MagicMock(return_value=kwargs.get('get_node_detail_return', {'result': True,
'data': 'data'}))


class MockPeriodicTask(object):
Expand Down
3 changes: 3 additions & 0 deletions gcloud/tests/mock_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
TASKINSTANCE_FORMAT_STATUS = 'gcloud.taskflow3.models.TaskFlowInstance.format_pipeline_status'
TASKINSTANCE_EXTEN_CLASSIFIED_COUNT = 'gcloud.taskflow3.models.TaskFlowInstance.objects.extend_classified_count'
TASKINSTANCE_PREVIEW_TREE = 'gcloud.taskflow3.models.TaskFlowInstance.objects.preview_pipeline_tree_exclude_task_nodes'
TASKINSTANCE_OBJECTS_CALLBACK = 'gcloud.taskflow3.models.TaskFlowInstance.objects.callback'
TASKINSTANCE_CALLBACK = 'gcloud.taskflow3.models.TaskFlowInstance.callback'
TASKINSTANCE_HAS_NODE = 'gcloud.taskflow3.models.TaskFlowInstance.has_node'

PERIODIC_TASK_FILTER = 'gcloud.periodictask.models.PeriodicTask.objects.filter'
PERIODIC_TASK_GET = 'gcloud.periodictask.models.PeriodicTask.objects.get'
Expand Down
39 changes: 39 additions & 0 deletions gcloud/tests/taskflow3/models/test_taskflow_instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017-2019 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from django.test import TestCase

from gcloud.taskflow3.models import TaskFlowInstance
from gcloud.tests.mock import * # noqa
from gcloud.tests.mock_settings import * # noqa


class TaskflowTestCase(TestCase):

def test_callback(self):
instance = TaskFlowInstance()

objects_callback_return = {'result': True, 'message': 'success'}

with mock.patch(TASKINSTANCE_OBJECTS_CALLBACK, MagicMock(return_value=objects_callback_return)):
with mock.patch(TASKINSTANCE_HAS_NODE, MagicMock(return_value=False)):
result = instance.callback('act_id', 'data')
self.assertFalse(result['result'])
self.assertTrue('message' in result)
TaskFlowInstance.objects.callback.assert_not_called()

with mock.patch(TASKINSTANCE_HAS_NODE, MagicMock(return_value=True)):
result = instance.callback('act_id', 'data')
self.assertTrue(result['result'])
self.assertTrue('message' in result)
TaskFlowInstance.objects.callback.assert_called_once_with('act_id', 'data')

0 comments on commit afa5bb5

Please sign in to comment.