Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions mutate/util/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,38 @@ def get_config(module: str, group: str) -> Dict[str, Any]:
def get_pipelines():
try:
query_result = Postgres().fetchall("""
SELECT
ulp.pipeline_id,
json_agg(json_build_object('input_plugin', uli.input_plugin, 'conf', confs))
AS inputs,
json_agg(DISTINCT ulf.logstash_filter) AS filters
FROM
utm_logstash_pipeline AS ulp
LEFT JOIN
utm_logstash_input AS uli ON ulp.id = uli.pipeline_id
LEFT JOIN
(SELECT
ulic.input_id,
json_build_object(ulic.conf_key, ulic.conf_value) AS confs
FROM
utm_logstash_input_configuration AS ulic
GROUP BY
ulic.input_id, ulic.conf_key, ulic.conf_value
) AS confs ON uli.id = confs.input_id
LEFT JOIN
utm_group_logstash_pipeline_filters AS ulg ON ulg.pipeline_id = ulp.id
LEFT JOIN
utm_logstash_filter AS ulf ON ulf.id = ulg.filter_id
GROUP BY
ulp.pipeline_id;
select
ulg.id,
ulp.pipeline_id,
json_agg(json_build_object('input_plugin', uli.input_plugin, 'conf', confs))
as inputs,
json_agg(distinct ulg.logstash_filter) as filters
from
utm_logstash_pipeline as ulp left join utm_logstash_input as uli on ulp.id = uli.pipeline_id
left join ( select ulic.id, ulic.input_id, json_build_object(ulic.conf_key, ulic.conf_value) as confs
from utm_logstash_input_configuration as ulic group by ulic.id, ulic.input_id, ulic.conf_key, ulic.conf_value
order by ulic.id ) as confs on uli.id = confs.input_id
left join ( select ulg.*, ulf.logstash_filter from utm_group_logstash_pipeline_filters ulg left join utm_logstash_filter ulf
on ulg.filter_id = ulf.id order by id asc) as ulg
on ulg.pipeline_id = ulp.id
group by
ulp.pipeline_id,
ulg.id;
""")
return {row['pipeline_id']: dict(row) for row in query_result}

pipelines_dict = {}
for row in query_result:
pipeline_id = row['pipeline_id']
if pipeline_id not in pipelines_dict:
pipelines_dict[pipeline_id]= {
'pipeline_id': pipeline_id,
'inputs': row['inputs'],
'filters': row['filters']
}
else:
pipelines_dict[pipeline_id]['filters'].extend(row['filters'])

return pipelines_dict

except Exception as e:
raise
Expand Down