diff --git a/python/multicorn/sqlalchemyfdw.py b/python/multicorn/sqlalchemyfdw.py index 8a11b140..f70e2465 100644 --- a/python/multicorn/sqlalchemyfdw.py +++ b/python/multicorn/sqlalchemyfdw.py @@ -156,7 +156,7 @@ from contextlib import contextmanager from typing import Optional -from sqlalchemy import create_engine +from sqlalchemy import cast, create_engine, literal from sqlalchemy.engine.url import make_url, URL from sqlalchemy.exc import UnsupportedCompilationError from sqlalchemy.sql import select, operators as sqlops, func, and_ @@ -362,6 +362,8 @@ def __init__(self, fdw_options, fdw_columns): self.engine = _create_engine(url, self.connect_args) + self.cast_quals = fdw_options.get("cast_quals", "false") == "true" + schema = fdw_options["schema"] if "schema" in fdw_options else None tablename = fdw_options["tablename"] sqlacols = [] @@ -378,7 +380,7 @@ def __init__(self, fdw_options, fdw_columns): self._connection = None self._row_id_column = fdw_options.get("primary_key", None) - self.batch_size = int(fdw_options.get("batch_size", 10000)) + self.batch_size = int(fdw_options["batch_size"]) if "batch_size" in fdw_options else None self.envvars = json.loads(fdw_options.get("envvars", "{}")) def _need_explicit_null_ordering(self, key): @@ -450,7 +452,10 @@ def _build_statement(self, quals, columns, sortkeys, aggs=None, group_clauses=No for qual in quals: operator = OPERATORS.get(qual.operator, None) if operator: - clauses.append(operator(self.table.c[qual.field_name], qual.value)) + value = qual.value if not self.cast_quals \ + else cast(literal(qual.value), self.table.c[qual.field_name].type.as_generic()) + + clauses.append(operator(self.table.c[qual.field_name], value)) else: log_to_postgres(f"Qual {qual} with operator {qual.operator} not pushed to foreign db", ERROR if is_aggregation else WARNING) if clauses: diff --git a/src/multicorn.c b/src/multicorn.c index 58558c72..ce16370c 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -1689,6 +1689,18 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, /* Get the fpinfo of the underlying scan relation. */ ofpinfo = (MulticornPlanState *) fpinfo->outerrel->fdw_private; + /* + * If underlying scan relation has more quals than we are attempting to push + * down, this means that some are missing in qual_list, hence the aggregation + * wouldn't be correct. Examples includes using a float value in a WHERE clause + * against a column of integer type. + * NB: If we ever decide to add support for join-agg pushdown this won't work. + */ + if (list_length(ofpinfo->qual_list) < list_length(fpinfo->outerrel->baserestrictinfo)) + { + return false; + } + /* * If underlying scan relation has any quals with unsupported operators * we cannot pushdown the aggregation.