From f9f3b1f48d11bdf1a946abccde536f1f9e16de5b Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Thu, 13 Jan 2022 14:44:58 +0000 Subject: [PATCH 1/8] Elasticsearch implementation of upper rel + qual clause pushdown combo --- pg_es_fdw/_es_query.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/pg_es_fdw/_es_query.py b/pg_es_fdw/_es_query.py index c98cbf6..1c6ee8b 100644 --- a/pg_es_fdw/_es_query.py +++ b/pg_es_fdw/_es_query.py @@ -82,6 +82,20 @@ def quals_to_es( """Convert a list of Multicorn quals to an ElasticSearch query""" ignore_columns = ignore_columns or [] + query = {} + if quals is not None and len(quals) > 0: + query = { + "query": { + "bool": { + "must": [ + _qual_to_es(q, column_map) + for q in quals + if q.field_name not in ignore_columns + ] + } + } + } + # Aggregation/grouping queries if aggs is not None: aggs_query = { @@ -94,7 +108,7 @@ def quals_to_es( } if group_clauses is None: - return {"aggs": aggs_query} + query["aggs"] = aggs_query if group_clauses is not None: group_query = { @@ -111,17 +125,7 @@ def quals_to_es( if aggs is not None: group_query["group_buckets"]["aggregations"] = aggs_query - return {"aggs": group_query} + query["aggs"] = group_query # Regular query - return { - "query": { - "bool": { - "must": [ - _qual_to_es(q, column_map) - for q in quals - if q.field_name not in ignore_columns - ] - } - } - } + return query From d36b6298b8fbcd70860ca34bb86e8a3390386d2b Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Fri, 14 Jan 2022 11:37:07 +0000 Subject: [PATCH 2/8] Make scan query the default one even when empty --- pg_es_fdw/_es_query.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/pg_es_fdw/_es_query.py b/pg_es_fdw/_es_query.py index 1c6ee8b..1a60854 100644 --- a/pg_es_fdw/_es_query.py +++ b/pg_es_fdw/_es_query.py @@ -82,19 +82,17 @@ def quals_to_es( """Convert a list of Multicorn quals to an ElasticSearch query""" ignore_columns = ignore_columns or [] - query = {} - if quals is not None and len(quals) > 0: - query = { - "query": { - "bool": { - "must": [ - _qual_to_es(q, column_map) - for q in quals - if q.field_name not in ignore_columns - ] - } + query = { + "query": { + "bool": { + "must": [ + _qual_to_es(q, column_map) + for q in quals + if q.field_name not in ignore_columns + ] } } + } # Aggregation/grouping queries if aggs is not None: From 2f2ed9fada776b43183e458b1e40826491c4347c Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 17 Jan 2022 11:14:09 +0000 Subject: [PATCH 3/8] Enable pushdown of COUNT(*) Since ES lacks analogous operation primitive we need to handle it differently depending when - only aggregations present we must specify track_total_hits to true, so as to get an accurate count above the default 10000 - when grouping clauses are present, the doc_count field suffices, which is present by default. --- pg_es_fdw/__init__.py | 12 ++++++++++++ pg_es_fdw/_es_query.py | 10 +++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pg_es_fdw/__init__.py b/pg_es_fdw/__init__.py index b757f39..9aa467d 100644 --- a/pg_es_fdw/__init__.py +++ b/pg_es_fdw/__init__.py @@ -319,6 +319,12 @@ def _handle_aggregation_response(self, query, response, aggs, group_clauses): result = {} for agg_name in aggs: + if agg_name == "count.*": + # COUNT(*) is a special case, since it doesn't have a + # corresponding aggregation primitice in ES + result[agg_name] = response["hits"]["total"]["value"] + continue + result[agg_name] = response["aggregations"][agg_name]["value"] yield result else: @@ -331,6 +337,12 @@ def _handle_aggregation_response(self, query, response, aggs, group_clauses): if aggs is not None: for agg_name in aggs: + if agg_name == "count.*": + # In general case with GROUP BY clauses COUNT(*) + # is taken from the bucket's doc_count field + result[agg_name] = bucket["doc_count"] + continue + result[agg_name] = bucket[agg_name]["value"] yield result diff --git a/pg_es_fdw/_es_query.py b/pg_es_fdw/_es_query.py index 1a60854..d434056 100644 --- a/pg_es_fdw/_es_query.py +++ b/pg_es_fdw/_es_query.py @@ -18,6 +18,7 @@ "min": "min", "sum": "sum", "count": "value_count", + "count.*": None # not mapped to a particular function } @@ -103,9 +104,17 @@ def quals_to_es( } } for agg_name, agg_props in aggs.items() + if agg_name != "count.*" } if group_clauses is None: + if "count.*" in aggs: + # There is no particular COUNT(*) equivalent in ES, instead + # for plain aggregations (e.g. no grouping statements), we need + # to enable the track_total_hits option in order to get an + # accuate number of matched docs. + query["track_total_hits"] = True + query["aggs"] = aggs_query if group_clauses is not None: @@ -125,5 +134,4 @@ def quals_to_es( query["aggs"] = group_query - # Regular query return query From 249903034e348e472fa46881c8def7217d70f5fe Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 17 Jan 2022 12:31:01 +0000 Subject: [PATCH 4/8] Fix PG pattern matching operator handling --- pg_es_fdw/_es_query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pg_es_fdw/_es_query.py b/pg_es_fdw/_es_query.py index d434056..07ec6e4 100644 --- a/pg_es_fdw/_es_query.py +++ b/pg_es_fdw/_es_query.py @@ -44,7 +44,7 @@ def _base_qual_to_es(col, op, value, column_map=None): return {"bool": {"must_not": {"term": {col: value}}}} if op == "~~": - return {"match": {col: value.replace("%", "*")}} + return {"wildcard": {col: value.replace("%", "*")}} # For unknown operators, get everything return {"match_all": {}} From ff95446620a4c13df28d2db33e3cd297e51dd54e Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 18 Jan 2022 14:28:45 +0000 Subject: [PATCH 5/8] Implement proper patern match translation between PG and ES --- pg_es_fdw/__init__.py | 2 +- pg_es_fdw/_es_query.py | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/pg_es_fdw/__init__.py b/pg_es_fdw/__init__.py index 9aa467d..f2be3cc 100644 --- a/pg_es_fdw/__init__.py +++ b/pg_es_fdw/__init__.py @@ -321,7 +321,7 @@ def _handle_aggregation_response(self, query, response, aggs, group_clauses): for agg_name in aggs: if agg_name == "count.*": # COUNT(*) is a special case, since it doesn't have a - # corresponding aggregation primitice in ES + # corresponding aggregation primitive in ES result[agg_name] = response["hits"]["total"]["value"] continue diff --git a/pg_es_fdw/_es_query.py b/pg_es_fdw/_es_query.py index 07ec6e4..9066843 100644 --- a/pg_es_fdw/_es_query.py +++ b/pg_es_fdw/_es_query.py @@ -1,3 +1,4 @@ +import re try: from multicorn import ANY except ImportError: @@ -22,6 +23,20 @@ } +def _convert_pattern_match_to_es(expr): + def _pg_es_pattern_map(matchobj): + if matchobj.group(0) == "%": + return "*" + elif matchobj.group(0) == "_": + return "?" + elif matchobj.group(0) == "\%": + return "%" + elif matchobj.group(0) == "\_": + return "_" + + return re.sub(r'\\?%|\\?_', _pg_es_pattern_map, expr) + + def _base_qual_to_es(col, op, value, column_map=None): if column_map: col = column_map.get(col, col) @@ -44,7 +59,7 @@ def _base_qual_to_es(col, op, value, column_map=None): return {"bool": {"must_not": {"term": {col: value}}}} if op == "~~": - return {"wildcard": {col: value.replace("%", "*")}} + return {"wildcard": {col: _convert_pattern_match_to_es(value)}} # For unknown operators, get everything return {"match_all": {}} From 8ed1be10255d725f973bcda0fdcea3be0a486b63 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 19 Jan 2022 14:18:15 +0000 Subject: [PATCH 6/8] Add list of supported operators by ES --- pg_es_fdw/__init__.py | 3 ++- pg_es_fdw/_es_query.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pg_es_fdw/__init__.py b/pg_es_fdw/__init__.py index f2be3cc..a38323f 100644 --- a/pg_es_fdw/__init__.py +++ b/pg_es_fdw/__init__.py @@ -10,7 +10,7 @@ from multicorn import ForeignDataWrapper from multicorn.utils import log_to_postgres as log2pg -from ._es_query import _PG_TO_ES_AGG_FUNCS, quals_to_es +from ._es_query import _PG_TO_ES_AGG_FUNCS, _OPERATORS_SUPPORTED, quals_to_es class ElasticsearchFDW(ForeignDataWrapper): @@ -97,6 +97,7 @@ def can_pushdown_upperrel(self): return { "groupby_supported": True, "agg_functions": _PG_TO_ES_AGG_FUNCS, + "operators_supported": _OPERATORS_SUPPORTED, } def explain( diff --git a/pg_es_fdw/_es_query.py b/pg_es_fdw/_es_query.py index 9066843..e4b623d 100644 --- a/pg_es_fdw/_es_query.py +++ b/pg_es_fdw/_es_query.py @@ -22,6 +22,8 @@ "count.*": None # not mapped to a particular function } +_OPERATORS_SUPPORTED = [">", ">=", "<", "<=", "=", "<>", "!=", "~~"] + def _convert_pattern_match_to_es(expr): def _pg_es_pattern_map(matchobj): From f3dd1ea7ef619cd94d7488bb80913149c41b4286 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 19 Jan 2022 18:41:44 +0000 Subject: [PATCH 7/8] Fix backslash esacpe pattern match variants --- pg_es_fdw/_es_query.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pg_es_fdw/_es_query.py b/pg_es_fdw/_es_query.py index e4b623d..6fbfc7b 100644 --- a/pg_es_fdw/_es_query.py +++ b/pg_es_fdw/_es_query.py @@ -35,8 +35,10 @@ def _pg_es_pattern_map(matchobj): return "%" elif matchobj.group(0) == "\_": return "_" + elif matchobj.group(0) == "\\\\": + return "\\" - return re.sub(r'\\?%|\\?_', _pg_es_pattern_map, expr) + return re.sub(r'\\\\|\\?%|\\?_', _pg_es_pattern_map, fr"{expr}") def _base_qual_to_es(col, op, value, column_map=None): From 4dfffd72553afd17be40ba19f98e7e3a8e072b2d Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Fri, 21 Jan 2022 09:26:17 +0000 Subject: [PATCH 8/8] Escape special ES character when used in pattern matching --- pg_es_fdw/_es_query.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pg_es_fdw/_es_query.py b/pg_es_fdw/_es_query.py index 6fbfc7b..3e0e2d9 100644 --- a/pg_es_fdw/_es_query.py +++ b/pg_es_fdw/_es_query.py @@ -35,10 +35,14 @@ def _pg_es_pattern_map(matchobj): return "%" elif matchobj.group(0) == "\_": return "_" + elif matchobj.group(0) == "*": + return "\\*" + elif matchobj.group(0) == "?": + return "\\?" elif matchobj.group(0) == "\\\\": return "\\" - return re.sub(r'\\\\|\\?%|\\?_', _pg_es_pattern_map, fr"{expr}") + return re.sub(r'\\\\|\\?%|\\?_|\*|\?', _pg_es_pattern_map, fr"{expr}") def _base_qual_to_es(col, op, value, column_map=None):