From 1857e5ee8711b1cac648dbe5c486d04133a5c7ab Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 24 Jan 2022 14:18:07 +0000 Subject: [PATCH 1/7] Implement support for aggregations in sqlalchemy FDW --- python/multicorn/sqlalchemyfdw.py | 80 +++++++++++++++++++++++-------- src/python.c | 11 ++--- 2 files changed, 63 insertions(+), 28 deletions(-) diff --git a/python/multicorn/sqlalchemyfdw.py b/python/multicorn/sqlalchemyfdw.py index 643d042a..1f26bf68 100644 --- a/python/multicorn/sqlalchemyfdw.py +++ b/python/multicorn/sqlalchemyfdw.py @@ -159,7 +159,7 @@ from sqlalchemy import create_engine from sqlalchemy.engine.url import make_url, URL from sqlalchemy.exc import UnsupportedCompilationError -from sqlalchemy.sql import select, operators as sqlops, and_ +from sqlalchemy.sql import select, operators as sqlops, func, and_ from sqlalchemy.sql.expression import nullsfirst, nullslast, text from . import ForeignDataWrapper, TableDefinition, ColumnDefinition @@ -225,6 +225,14 @@ def _parse_url_from_options(fdw_options): setattr(url, param, fdw_options[param]) return url +_PG_AGG_FUNC_MAPPING = { + "min": func.min, + "max": func.max, + "sum", func.sum, + "count": func.count, + "count.*": func.count +} + OPERATORS = { "=": operator.eq, @@ -401,13 +409,35 @@ def can_sort(self, sortkeys): return [] return sortkeys - def explain(self, quals, columns, sortkeys=None, verbose=False): + def can_pushdown_upperrel(self): + return { + "groupby_supported": True, + "agg_functions": _PG_AGG_FUNC_MAPPING.keys(), + "operators_supported": [op for op in OPERATORS if isinstance(op, str)], + } + + def explain(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None, verbose=False): sortkeys = sortkeys or [] - statement = self._build_statement(quals, columns, sortkeys) + statement = self._build_statement(quals, columns, sortkeys, aggs=aggs, group_clauses=group_clauses) return [str(statement)] - def _build_statement(self, quals, columns, sortkeys): - statement = select([self.table]) + def _build_statement(self, quals, columns, sortkeys, aggs=None, group_clauses=None): + is_aggregation = aggs or group_clauses + + if not is_aggregation: + statement = select([self.table]) + else: + target_list = [self.table.c[col] for col in group_clauses] + + for agg_name, agg_props in aggs.items(): + agg_func = _PG_AGG_FUNC_MAPPING[agg_props["function"]] + + target_list.append( + agg_func() if agg_props["column"] == "*" else agg_func(self.table.c[agg_props["column"]]) + ) + + statement = select(*target_list) + clauses = [] for qual in quals: operator = OPERATORS.get(qual.operator, None) @@ -417,16 +447,20 @@ def _build_statement(self, quals, columns, sortkeys): log_to_postgres("Qual not pushed to foreign db: %s" % qual, WARNING) if clauses: statement = statement.where(and_(*clauses)) - if columns: - columns = [self.table.c[col] for col in columns] - elif columns is None: - columns = [self.table] + + if not is_aggregation: + if columns: + columns = [self.table.c[col] for col in columns] + elif columns is None: + columns = [self.table] + else: + # This is the case where we're asked to output no columns (just a list of empty dicts) + # in a SELECT 1, but I can't get SQLAlchemy to emit `SELECT 1 FROM some_table`, so + # we just select a single column. + columns = [self.table.c[list(self.table.c)[0].name]] + statement = statement.with_only_columns(columns) else: - # This is the case where we're asked to output no columns (just a list of empty dicts) - # in a SELECT 1, but I can't get SQLAlchemy to emit `SELECT 1 FROM some_table`, so - # we just select a single column. - columns = [self.table.c[list(self.table.c)[0].name]] - statement = statement.with_only_columns(columns) + statement = statement.group_by(*[self.table.c[col] for col in group_clauses]) for sortkey in sortkeys: column = self.table.c[sortkey.attname] @@ -440,12 +474,13 @@ def _build_statement(self, quals, columns, sortkeys): statement = statement.order_by(column) return statement - def execute(self, quals, columns, sortkeys=None): + def execute(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None): """ The quals are turned into an and'ed where clause. """ sortkeys = sortkeys or [] - statement = self._build_statement(quals, columns, sortkeys) + is_aggregation = aggs or group_clauses + statement = self._build_statement(quals, columns, sortkeys, aggs=aggs, group_clauses=group_clauses) log_to_postgres(str(statement), DEBUG) # If a dialect doesn't support streaming using server-side cursors, @@ -453,7 +488,7 @@ def execute(self, quals, columns, sortkeys=None): offset = 0 with inject_envvars(self.envvars): while True: - if self.batch_size is not None: + if self.batch_size is not None and not is_aggregation: statement = statement.limit(self.batch_size).offset(offset) offset += self.batch_size @@ -467,9 +502,14 @@ def execute(self, quals, columns, sortkeys=None): rs = list(rs) returned = 0 for item in rs: - yield dict(item) - returned += 1 - if self.batch_size is None or returned < self.batch_size: + if not is_aggregation: + yield dict(item) + returned += 1 + else: + # TODO: parse the response according to Multicorn upperrel format + return None + + if self.batch_size is None or returned < self.batch_size or is_aggregation: return @property diff --git a/src/python.c b/src/python.c index 36963196..4474e05c 100644 --- a/src/python.c +++ b/src/python.c @@ -1771,14 +1771,9 @@ canPushdownUpperrel(MulticornPlanState * state) Py_XDECREF(p_object); /* Determine which aggregation functions are supported */ - p_object = PyMapping_GetItemString(p_upperrel_pushdown, "agg_functions"); - if (p_object != NULL && p_object != Py_None) - { - p_agg_funcs = PyMapping_Keys(p_object); - pythonUnicodeSequenceToList(p_agg_funcs, &state->agg_functions); - Py_DECREF(p_agg_funcs); - } - Py_XDECREF(p_object); + p_agg_funcs = PyMapping_GetItemString(p_upperrel_pushdown, "agg_functions"); + pythonUnicodeSequenceToList(p_agg_funcs, &state->agg_functions); + Py_XDECREF(p_agg_funcs); /* Construct supported qual operators list */ p_ops = PyMapping_GetItemString(p_upperrel_pushdown, "operators_supported"); From 7601933aeb9f785ced1e9b4c2d78538f236bf873 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 24 Jan 2022 16:00:49 +0000 Subject: [PATCH 2/7] Make proper list return for agg functions --- python/multicorn/__init__.py | 12 ++++-------- python/multicorn/sqlalchemyfdw.py | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/python/multicorn/__init__.py b/python/multicorn/__init__.py index 81ff3648..a54d6f98 100755 --- a/python/multicorn/__init__.py +++ b/python/multicorn/__init__.py @@ -224,17 +224,13 @@ def can_pushdown_upperrel(self): { "groupby_supported": , # can be ommited if false - "agg_functions": { - : , - ... - }, + "agg_functions": ["min", "max", "sum", ...], "supported_operators": [">", "<", "=", ...] } - Each entry in `agg_functions` dict corresponds to a maping between - the name of a aggregation function in PostgreSQL, and the equivalent - foreign function. If no mapping exists for an aggregate function any - queries containing it won't be pushed down. + Each entry in `agg_functions` list corresponds to the name of a + aggregation function in PostgreSQL, which the FDW can pushdown. + If a query has a function not in this list it won't be pushed down. The `supported_operators` entry lists all operators that can be used in qual (WHERE) clauses so that the aggregation pushdown will still diff --git a/python/multicorn/sqlalchemyfdw.py b/python/multicorn/sqlalchemyfdw.py index 1f26bf68..c52febd6 100644 --- a/python/multicorn/sqlalchemyfdw.py +++ b/python/multicorn/sqlalchemyfdw.py @@ -412,7 +412,7 @@ def can_sort(self, sortkeys): def can_pushdown_upperrel(self): return { "groupby_supported": True, - "agg_functions": _PG_AGG_FUNC_MAPPING.keys(), + "agg_functions": list(_PG_AGG_FUNC_MAPPING), "operators_supported": [op for op in OPERATORS if isinstance(op, str)], } From d6204efd3f15afdc4514620a63c7c6e57ad309ea Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 25 Jan 2022 09:49:52 +0000 Subject: [PATCH 3/7] Add aliases to aggregated targets aligned with agg names from Multicorn --- python/multicorn/sqlalchemyfdw.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/python/multicorn/sqlalchemyfdw.py b/python/multicorn/sqlalchemyfdw.py index c52febd6..3e75431f 100644 --- a/python/multicorn/sqlalchemyfdw.py +++ b/python/multicorn/sqlalchemyfdw.py @@ -228,7 +228,7 @@ def _parse_url_from_options(fdw_options): _PG_AGG_FUNC_MAPPING = { "min": func.min, "max": func.max, - "sum", func.sum, + "sum": func.sum, "count": func.count, "count.*": func.count } @@ -431,10 +431,9 @@ def _build_statement(self, quals, columns, sortkeys, aggs=None, group_clauses=No for agg_name, agg_props in aggs.items(): agg_func = _PG_AGG_FUNC_MAPPING[agg_props["function"]] + agg_target = agg_func() if agg_props["column"] == "*" else agg_func(self.table.c[agg_props["column"]]) - target_list.append( - agg_func() if agg_props["column"] == "*" else agg_func(self.table.c[agg_props["column"]]) - ) + target_list.append(agg_target.label(agg_name)) statement = select(*target_list) @@ -496,18 +495,16 @@ def execute(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None): statement ) + logging.error(rs) + # Workaround pymssql "trash old results on new query" # behaviour (See issue #100) if self.engine.driver == "pymssql" and self.transaction is not None: rs = list(rs) returned = 0 for item in rs: - if not is_aggregation: - yield dict(item) - returned += 1 - else: - # TODO: parse the response according to Multicorn upperrel format - return None + yield dict(item) + returned += 1 if self.batch_size is None or returned < self.batch_size or is_aggregation: return From 0af7ba0ef038a68a829cc070817ab4c38e5820c6 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 25 Jan 2022 11:57:51 +0000 Subject: [PATCH 4/7] Fix missing group clause in aggregation --- python/multicorn/sqlalchemyfdw.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/multicorn/sqlalchemyfdw.py b/python/multicorn/sqlalchemyfdw.py index 3e75431f..d4daf5e1 100644 --- a/python/multicorn/sqlalchemyfdw.py +++ b/python/multicorn/sqlalchemyfdw.py @@ -427,7 +427,9 @@ def _build_statement(self, quals, columns, sortkeys, aggs=None, group_clauses=No if not is_aggregation: statement = select([self.table]) else: - target_list = [self.table.c[col] for col in group_clauses] + target_list = [] + if group_clauses is not None: + target_list = [self.table.c[col] for col in group_clauses] for agg_name, agg_props in aggs.items(): agg_func = _PG_AGG_FUNC_MAPPING[agg_props["function"]] @@ -458,7 +460,7 @@ def _build_statement(self, quals, columns, sortkeys, aggs=None, group_clauses=No # we just select a single column. columns = [self.table.c[list(self.table.c)[0].name]] statement = statement.with_only_columns(columns) - else: + elif group_clauses is not None: statement = statement.group_by(*[self.table.c[col] for col in group_clauses]) for sortkey in sortkeys: @@ -495,8 +497,6 @@ def execute(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None): statement ) - logging.error(rs) - # Workaround pymssql "trash old results on new query" # behaviour (See issue #100) if self.engine.driver == "pymssql" and self.transaction is not None: From a4f20b59f746efc8d136368bf7fef5116eab520b Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 26 Jan 2022 13:25:04 +0000 Subject: [PATCH 5/7] Fix a number of edge cases e.g. missing avg or bare grouping query --- python/multicorn/sqlalchemyfdw.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/python/multicorn/sqlalchemyfdw.py b/python/multicorn/sqlalchemyfdw.py index d4daf5e1..1964bd71 100644 --- a/python/multicorn/sqlalchemyfdw.py +++ b/python/multicorn/sqlalchemyfdw.py @@ -226,6 +226,7 @@ def _parse_url_from_options(fdw_options): return url _PG_AGG_FUNC_MAPPING = { + "avg": func.avg, "min": func.min, "max": func.max, "sum": func.sum, @@ -419,7 +420,7 @@ def can_pushdown_upperrel(self): def explain(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None, verbose=False): sortkeys = sortkeys or [] statement = self._build_statement(quals, columns, sortkeys, aggs=aggs, group_clauses=group_clauses) - return [str(statement)] + return ["\n" + str(statement.compile(dialect=self.engine.dialect, compile_kwargs={"literal_binds": True})) + "\n"] def _build_statement(self, quals, columns, sortkeys, aggs=None, group_clauses=None): is_aggregation = aggs or group_clauses @@ -428,16 +429,18 @@ def _build_statement(self, quals, columns, sortkeys, aggs=None, group_clauses=No statement = select([self.table]) else: target_list = [] + if group_clauses is not None: target_list = [self.table.c[col] for col in group_clauses] - for agg_name, agg_props in aggs.items(): - agg_func = _PG_AGG_FUNC_MAPPING[agg_props["function"]] - agg_target = agg_func() if agg_props["column"] == "*" else agg_func(self.table.c[agg_props["column"]]) + if aggs is not None: + for agg_name, agg_props in aggs.items(): + agg_func = _PG_AGG_FUNC_MAPPING[agg_props["function"]] + agg_target = agg_func() if agg_props["column"] == "*" else agg_func(self.table.c[agg_props["column"]]) - target_list.append(agg_target.label(agg_name)) + target_list.append(agg_target.label(agg_name)) - statement = select(*target_list) + statement = select(*target_list).select_from(self.table) clauses = [] for qual in quals: @@ -445,7 +448,7 @@ def _build_statement(self, quals, columns, sortkeys, aggs=None, group_clauses=No if operator: clauses.append(operator(self.table.c[qual.field_name], qual.value)) else: - log_to_postgres("Qual not pushed to foreign db: %s" % qual, WARNING) + log_to_postgres(f"Qual {qual} with operator {qual.operator} not pushed to foreign db", ERROR if is_aggregation else WARNING) if clauses: statement = statement.where(and_(*clauses)) From 035d4b34f5f823d397001872aa1101c1bcf9d006 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Thu, 27 Jan 2022 11:36:01 +0000 Subject: [PATCH 6/7] Enable pagination for aggregation queries in SQLAlchemy FDW --- python/multicorn/sqlalchemyfdw.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/multicorn/sqlalchemyfdw.py b/python/multicorn/sqlalchemyfdw.py index 1964bd71..2b96577a 100644 --- a/python/multicorn/sqlalchemyfdw.py +++ b/python/multicorn/sqlalchemyfdw.py @@ -492,7 +492,7 @@ def execute(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None): offset = 0 with inject_envvars(self.envvars): while True: - if self.batch_size is not None and not is_aggregation: + if self.batch_size is not None: statement = statement.limit(self.batch_size).offset(offset) offset += self.batch_size @@ -509,7 +509,7 @@ def execute(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None): yield dict(item) returned += 1 - if self.batch_size is None or returned < self.batch_size or is_aggregation: + if self.batch_size is None or returned < self.batch_size: return @property From d7970ccd453343030e5271367ffd00c028f2b25f Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Thu, 27 Jan 2022 11:41:12 +0000 Subject: [PATCH 7/7] Add comment on explain statement transformation --- python/multicorn/sqlalchemyfdw.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/multicorn/sqlalchemyfdw.py b/python/multicorn/sqlalchemyfdw.py index 2b96577a..8a11b140 100644 --- a/python/multicorn/sqlalchemyfdw.py +++ b/python/multicorn/sqlalchemyfdw.py @@ -420,6 +420,10 @@ def can_pushdown_upperrel(self): def explain(self, quals, columns, sortkeys=None, aggs=None, group_clauses=None, verbose=False): sortkeys = sortkeys or [] statement = self._build_statement(quals, columns, sortkeys, aggs=aggs, group_clauses=group_clauses) + + # The literal_binds option below ensures that qualifiers are displayed as raw strings + # instead of being masked by placeholder bound parameters, thus providing more transparency + # during use (and testing). return ["\n" + str(statement.compile(dialect=self.engine.dialect, compile_kwargs={"literal_binds": True})) + "\n"] def _build_statement(self, quals, columns, sortkeys, aggs=None, group_clauses=None):