diff --git a/mutate/util/misc.py b/mutate/util/misc.py index 5aacc4a41..63e07f092 100644 --- a/mutate/util/misc.py +++ b/mutate/util/misc.py @@ -43,32 +43,38 @@ def get_config(module: str, group: str) -> Dict[str, Any]: def get_pipelines(): try: query_result = Postgres().fetchall(""" - SELECT - ulp.pipeline_id, - json_agg(json_build_object('input_plugin', uli.input_plugin, 'conf', confs)) - AS inputs, - json_agg(DISTINCT ulf.logstash_filter) AS filters - FROM - utm_logstash_pipeline AS ulp - LEFT JOIN - utm_logstash_input AS uli ON ulp.id = uli.pipeline_id - LEFT JOIN - (SELECT - ulic.input_id, - json_build_object(ulic.conf_key, ulic.conf_value) AS confs - FROM - utm_logstash_input_configuration AS ulic - GROUP BY - ulic.input_id, ulic.conf_key, ulic.conf_value - ) AS confs ON uli.id = confs.input_id - LEFT JOIN - utm_group_logstash_pipeline_filters AS ulg ON ulg.pipeline_id = ulp.id - LEFT JOIN - utm_logstash_filter AS ulf ON ulf.id = ulg.filter_id - GROUP BY - ulp.pipeline_id; + select + ulg.id, + ulp.pipeline_id, + json_agg(json_build_object('input_plugin', uli.input_plugin, 'conf', confs)) + as inputs, + json_agg(distinct ulg.logstash_filter) as filters + from + utm_logstash_pipeline as ulp left join utm_logstash_input as uli on ulp.id = uli.pipeline_id + left join ( select ulic.id, ulic.input_id, json_build_object(ulic.conf_key, ulic.conf_value) as confs + from utm_logstash_input_configuration as ulic group by ulic.id, ulic.input_id, ulic.conf_key, ulic.conf_value + order by ulic.id ) as confs on uli.id = confs.input_id + left join ( select ulg.*, ulf.logstash_filter from utm_group_logstash_pipeline_filters ulg left join utm_logstash_filter ulf + on ulg.filter_id = ulf.id order by id asc) as ulg + on ulg.pipeline_id = ulp.id + group by + ulp.pipeline_id, + ulg.id; """) - return {row['pipeline_id']: dict(row) for row in query_result} + + pipelines_dict = {} + for row in query_result: + pipeline_id = row['pipeline_id'] + if pipeline_id not in pipelines_dict: + pipelines_dict[pipeline_id]= { + 'pipeline_id': pipeline_id, + 'inputs': row['inputs'], + 'filters': row['filters'] + } + else: + pipelines_dict[pipeline_id]['filters'].extend(row['filters']) + + return pipelines_dict except Exception as e: raise