Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions python/multicorn/sqlalchemyfdw.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
from contextlib import contextmanager
from typing import Optional

from sqlalchemy import create_engine
from sqlalchemy import cast, create_engine, literal
from sqlalchemy.engine.url import make_url, URL
from sqlalchemy.exc import UnsupportedCompilationError
from sqlalchemy.sql import select, operators as sqlops, func, and_
Expand Down Expand Up @@ -362,6 +362,8 @@ def __init__(self, fdw_options, fdw_columns):

self.engine = _create_engine(url, self.connect_args)

self.cast_quals = fdw_options.get("cast_quals", "false") == "true"

schema = fdw_options["schema"] if "schema" in fdw_options else None
tablename = fdw_options["tablename"]
sqlacols = []
Expand All @@ -378,7 +380,7 @@ def __init__(self, fdw_options, fdw_columns):
self._connection = None
self._row_id_column = fdw_options.get("primary_key", None)

self.batch_size = int(fdw_options.get("batch_size", 10000))
self.batch_size = int(fdw_options["batch_size"]) if "batch_size" in fdw_options else None
self.envvars = json.loads(fdw_options.get("envvars", "{}"))

def _need_explicit_null_ordering(self, key):
Expand Down Expand Up @@ -450,7 +452,10 @@ def _build_statement(self, quals, columns, sortkeys, aggs=None, group_clauses=No
for qual in quals:
operator = OPERATORS.get(qual.operator, None)
if operator:
clauses.append(operator(self.table.c[qual.field_name], qual.value))
value = qual.value if not self.cast_quals \
else cast(literal(qual.value), self.table.c[qual.field_name].type.as_generic())

clauses.append(operator(self.table.c[qual.field_name], value))
else:
log_to_postgres(f"Qual {qual} with operator {qual.operator} not pushed to foreign db", ERROR if is_aggregation else WARNING)
if clauses:
Expand Down
12 changes: 12 additions & 0 deletions src/multicorn.c
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,18 @@ multicorn_foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
/* Get the fpinfo of the underlying scan relation. */
ofpinfo = (MulticornPlanState *) fpinfo->outerrel->fdw_private;

/*
* If underlying scan relation has more quals than we are attempting to push
* down, this means that some are missing in qual_list, hence the aggregation
* wouldn't be correct. Examples includes using a float value in a WHERE clause
* against a column of integer type.
* NB: If we ever decide to add support for join-agg pushdown this won't work.
*/
if (list_length(ofpinfo->qual_list) < list_length(fpinfo->outerrel->baserestrictinfo))
{
return false;
}

/*
* If underlying scan relation has any quals with unsupported operators
* we cannot pushdown the aggregation.
Expand Down