@@ -491,14 +491,14 @@ def _user_convert_msg_before_run(self, msg: typing.Union[str, dict]) -> dict:
491
491
return msg
492
492
493
493
def _submit_task (self , kw ):
494
-
495
494
kw ['body' ] = self ._convert_msg_before_run (kw ['body' ])
496
495
self ._print_message_get_from_broker (kw ['body' ])
497
496
if self ._judge_is_daylight ():
498
497
self ._requeue (kw )
499
498
time .sleep (self .time_interval_for_check_do_not_run_time )
500
499
return
501
500
function_only_params = delete_keys_and_return_new_dict (kw ['body' ], )
501
+ kw ['function_only_params' ] = function_only_params
502
502
if self ._get_priority_conf (kw , 'do_task_filtering' ) and self ._redis_filter .check_value_exists (
503
503
function_only_params ,self ._get_priority_conf (kw , 'filter_str' )): # 对函数的参数进行检查,过滤已经执行过并且成功的任务。
504
504
self .logger .warning (f'redis的 [{ self ._redis_filter_key_name } ] 键 中 过滤任务 { kw ["body" ]} ' )
@@ -688,7 +688,7 @@ def _run(self, kw: dict, ):
688
688
max_retry_times = self ._get_priority_conf (kw , 'max_retry_times' )
689
689
current_function_result_status = FunctionResultStatus (self .queue_name , self .consuming_function .__name__ , kw ['body' ], )
690
690
current_retry_times = 0
691
- function_only_params = delete_keys_and_return_new_dict ( kw ['body' ])
691
+ function_only_params = kw ['function_only_params' ]
692
692
for current_retry_times in range (max_retry_times + 1 ):
693
693
current_function_result_status .run_times = current_retry_times + 1
694
694
current_function_result_status .run_status = RunStatus .running
@@ -747,10 +747,10 @@ def _run(self, kw: dict, ):
747
747
# noinspection PyProtectedMember
748
748
def _run_consuming_function_with_confirm_and_retry (self , kw : dict , current_retry_times ,
749
749
function_result_status : FunctionResultStatus , ):
750
- function_only_params = delete_keys_and_return_new_dict ( kw ['body' ]) if self ._do_not_delete_extra_from_msg is False else kw ['body' ]
750
+ function_only_params = kw ['function_only_params' ] if self ._do_not_delete_extra_from_msg is False else kw ['body' ]
751
751
task_id = kw ['body' ]['extra' ]['task_id' ]
752
752
t_start = time .time ()
753
- # function_result_status.run_times = current_retry_times + 1
753
+
754
754
fct = funboost_current_task ()
755
755
fct_context = FctContext (function_params = function_only_params ,
756
756
full_msg = kw ['body' ],
@@ -872,7 +872,7 @@ def _unified_run(self, kw, is_async):
872
872
max_retry_times = self ._get_priority_conf (kw , 'max_retry_times' )
873
873
current_function_result_status = FunctionResultStatus (self .queue_name , self .consuming_function .__name__ , kw ['body' ], )
874
874
current_retry_times = 0
875
- function_only_params = delete_keys_and_return_new_dict ( kw ['body' ])
875
+ function_only_params = kw ['function_only_params' ]
876
876
for current_retry_times in range (max_retry_times + 1 ):
877
877
current_function_result_status .run_times = current_retry_times + 1
878
878
current_function_result_status .run_status = RunStatus .running
@@ -936,8 +936,8 @@ async def _async_run_consuming_function_with_confirm_and_retry(self, kw: dict, c
936
936
function_result_status : FunctionResultStatus , ):
937
937
"""虽然和上面有点大面积重复相似,这个是为了asyncio模式的,asyncio模式真的和普通同步模式的代码思维和形式区别太大,
938
938
框架实现兼容async的消费函数很麻烦复杂,连并发池都要单独写"""
939
- function_only_params = delete_keys_and_return_new_dict ( kw ['body' ]) if self ._do_not_delete_extra_from_msg is False else kw ['body' ]
940
- function_result_status . run_times = current_retry_times + 1
939
+ function_only_params = kw ['function_only_params' ] if self ._do_not_delete_extra_from_msg is False else kw ['body' ]
940
+
941
941
# noinspection PyBroadException
942
942
t_start = time .time ()
943
943
fct = funboost_current_task ()
0 commit comments