From 16d10894e35acb5583a58bc88e947e170b6459f2 Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Mon, 4 May 2026 12:30:17 +1000 Subject: [PATCH 1/3] feat: add aggregate pushdown support for MySQL FDW --- CLAUDE.md | 80 ++++-- docs/catalog/mysql.md | 97 ++++++- wrappers/src/fdw/mysql_fdw/README.md | 1 + wrappers/src/fdw/mysql_fdw/mysql_fdw.rs | 133 +++++++++- wrappers/src/fdw/mysql_fdw/tests.rs | 331 ++++++++++++++++++++++++ 5 files changed, 616 insertions(+), 26 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 417c4c1fd..02ddda573 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -50,6 +50,11 @@ pub trait ForeignDataWrapper> { fn delete(&mut self, rowid: &Cell) -> Result<(), E>; fn end_modify(&mut self) -> Result<(), E>; + // Optional methods for aggregate pushdown + fn supported_aggregates(&self) -> Vec; + fn supports_group_by(&self) -> bool; + fn begin_aggregate_scan(&mut self, aggregates: &[Aggregate], group_by: &[Column], quals: &[Qual], options: &HashMap) -> Result<(), E>; + // Optional methods fn re_scan(&mut self) -> Result<(), E>; fn get_rel_size(...) -> Result<(i64, i32), E>; @@ -89,23 +94,24 @@ pub struct MyFdw { ... } ### Native FDWs (wrappers/src/fdw/) -| FDW | Feature Flag | Supports Write | -|-----|--------------|----------------| -| BigQuery | `bigquery_fdw` | Yes | -| ClickHouse | `clickhouse_fdw` | Yes | -| Stripe | `stripe_fdw` | Yes | -| S3 Vectors | `s3vectors_fdw` | Yes | -| S3 | `s3_fdw` | No | -| Firebase | `firebase_fdw` | No | -| Airtable | `airtable_fdw` | No | -| Auth0 | `auth0_fdw` | No | -| AWS Cognito | `cognito_fdw` | No | -| DuckDB | `duckdb_fdw` | No | -| Apache Iceberg | `iceberg_fdw` | No | -| Logflare | `logflare_fdw` | No | -| Redis | `redis_fdw` | No | -| SQL Server | `mssql_fdw` | No | -| HelloWorld | `helloworld_fdw` | No (demo) | +| FDW | Feature Flag | Supports Write | Aggregate Pushdown | +|-----|--------------|----------------|--------------------| +| BigQuery | `bigquery_fdw` | Yes | Yes | +| ClickHouse | `clickhouse_fdw` | Yes | Yes | +| MySQL | `mysql_fdw` | Yes | Yes | +| Stripe | `stripe_fdw` | Yes | No | +| S3 Vectors | `s3vectors_fdw` | Yes | No | +| S3 | `s3_fdw` | No | No | +| Firebase | `firebase_fdw` | No | No | +| Airtable | `airtable_fdw` | No | No | +| Auth0 | `auth0_fdw` | No | No | +| AWS Cognito | `cognito_fdw` | No | No | +| DuckDB | `duckdb_fdw` | No | No | +| Apache Iceberg | `iceberg_fdw` | No | No | +| Logflare | `logflare_fdw` | No | No | +| Redis | `redis_fdw` | No | No | +| SQL Server | `mssql_fdw` | No | Yes | +| HelloWorld | `helloworld_fdw` | No (demo) | No | ### Wasm FDWs (wasm-wrappers/fdw/) @@ -292,6 +298,46 @@ FDWs receive pushdown hints through `begin_scan`: Use `Qual::deparse()` to convert to SQL-like strings. +### Aggregate Pushdown + +FDWs can push `COUNT`, `SUM`, `AVG`, `MIN`, `MAX` (with optional `GROUP BY`) down to the remote source by implementing three optional trait methods: + +```rust +fn supported_aggregates(&self) -> Vec { + vec![ + AggregateKind::Count, + AggregateKind::CountColumn, + AggregateKind::Sum, + AggregateKind::Avg, + AggregateKind::Min, + AggregateKind::Max, + ] +} + +fn supports_group_by(&self) -> bool { true } + +fn begin_aggregate_scan( + &mut self, + aggregates: &[Aggregate], + group_by: &[Column], + quals: &[Qual], + options: &HashMap, +) -> Result<(), E> { + // Build remote SQL, set self.tgt_cols (group-by columns first, + // then aggregate aliases), then initiate streaming. + Ok(()) +} +``` + +Key types from `supabase_wrappers::prelude`: +- `AggregateKind`: enum of `Count | CountColumn | Sum | Avg | Min | Max` +- `Aggregate`: holds `kind`, `column: Option`, `distinct: bool`, `alias: String`, `type_oid` +- `Aggregate::deparse_with_alias()` renders `FUNC(col) AS alias` (no remote quoting — apply your own if needed) + +`iter_scan` is reused unchanged; `tgt_cols` must match the SELECT order exactly so column-name lookup works. + +FDWs that support aggregate pushdown: BigQuery, ClickHouse, MySQL. + ## PostgreSQL Version Support Supported via feature flags: `pg13`, `pg14`, `pg15` (default), `pg16`, `pg17`, `pg18` diff --git a/docs/catalog/mysql.md b/docs/catalog/mysql.md index 705e1729c..88176190d 100644 --- a/docs/catalog/mysql.md +++ b/docs/catalog/mysql.md @@ -133,7 +133,7 @@ create foreign table mysql.my_table ( #### Notes -- Supports `where`, `order by` and `limit` clause pushdown +- Supports `where`, `order by`, `limit` and aggregate clause pushdown - Data is streamed row-by-row from MySQL, making it suitable for large result sets - When using `rowid_column`, it must be specified for data modification operations @@ -141,6 +141,37 @@ create foreign table mysql.my_table ( This FDW supports `where`, `order by` and `limit` clause pushdown. +### Aggregate Pushdown + +The FDW pushes common aggregate queries down to MySQL so the aggregation +runs remotely and only the final result rows are transferred to Postgres. This +is much faster than fetching every row and aggregating locally, especially +over large tables. + +**Supported aggregates** — `count(*)`, `count(col)`, `count(distinct col)`, +`sum(col)`, `avg(col)`, `min(col)`, `max(col)`. + +**Supported shapes** — scalar aggregates, `group by` over plain columns, with +or without a `where` clause. Pushdown also works when the foreign `table` +option is a sub-query. + +```sql +-- All of these run as a single aggregate query on MySQL: +select count(*) from mysql.my_table; +select status, sum(amount) from mysql.my_table group by status; +select count(distinct name) from mysql.my_table where id = 1; +``` + +**Cases that are not pushed down** — the query still returns the correct +result, but the aggregation happens in Postgres after fetching the rows: + +- The query has a `having` clause +- The aggregate has a `filter (where …)` clause +- A `distinct` modifier is used on anything other than `count` +- The aggregate's argument is not a plain column (for example `sum(a + 1)`) +- A `group by` item is not a plain column (for example `group by id + 1`) +- The aggregate function is not in the list above (for example `stddev`, `group_concat`) + ## Import Foreign Schema This FDW supports [`import foreign schema`](https://www.postgresql.org/docs/current/sql-importforeignschema.html) to automatically create foreign table definitions by reading the MySQL table structure from `information_schema`. @@ -282,6 +313,70 @@ delete from mysql.people where id = 2; ``` +### Aggregate Query Examples + +These examples assume an `orders` table in MySQL and a matching foreign +table on Postgres: + +```sql +-- Run on MySQL +create table orders ( + id bigint primary key auto_increment, + user_id bigint not null, + amount decimal(12,2) not null, + status varchar(50) not null +); + +insert into orders (user_id, amount, status) values + (1, 100.00, 'paid'), + (1, 50.00, 'paid'), + (2, 200.00, 'pending'), + (2, 75.00, 'paid'), + (3, 300.00, 'paid'); +``` + +```sql +-- Foreign table on Postgres +create foreign table mysql.orders ( + id bigint, + user_id bigint, + amount numeric, + status text +) + server mysql_server + options ( + table 'orders' + ); +``` + +Each query below runs a single aggregate query against MySQL and returns +just the result rows: + +```sql +-- Total order count +select count(*) from mysql.orders; + +-- Total revenue from paid orders +select sum(amount) from mysql.orders where status = 'paid'; + +-- Per-user order count and revenue +select user_id, count(*) as orders, sum(amount) as revenue +from mysql.orders +group by user_id +order by user_id; + +-- Smallest and largest order +select min(amount), max(amount) from mysql.orders; + +-- Number of distinct users who placed an order +select count(distinct user_id) from mysql.orders; + +-- Average order value per status +select status, avg(amount) as avg_amount +from mysql.orders +group by status; +``` + ### Import foreign schema example This example imports all tables from a MySQL database automatically: diff --git a/wrappers/src/fdw/mysql_fdw/README.md b/wrappers/src/fdw/mysql_fdw/README.md index 051d64dd3..08b92962d 100644 --- a/wrappers/src/fdw/mysql_fdw/README.md +++ b/wrappers/src/fdw/mysql_fdw/README.md @@ -10,4 +10,5 @@ This is a foreign data wrapper for [MySQL](https://www.mysql.com/). It is develo | Version | Date | Notes | | ------- | ---------- | ---------------------------------------------- | +| 0.1.1 | 2026-04-28 | Added aggregate pushdown support | | 0.1.0 | 2026-03-27 | Initial version | diff --git a/wrappers/src/fdw/mysql_fdw/mysql_fdw.rs b/wrappers/src/fdw/mysql_fdw/mysql_fdw.rs index 0992cad53..37794a6b2 100644 --- a/wrappers/src/fdw/mysql_fdw/mysql_fdw.rs +++ b/wrappers/src/fdw/mysql_fdw/mysql_fdw.rs @@ -138,6 +138,30 @@ fn mysql_type_to_pg( } } +// Like Aggregate::deparse() but wraps column names in MySQL backtick quotes. +fn deparse_agg_mysql(agg: &Aggregate) -> String { + let func_name = agg.kind.sql_name(); + match agg.kind { + AggregateKind::Count => format!("{func_name}(*)"), + _ => { + let col = agg + .column + .as_ref() + .map(|c| quote_ident(&c.name)) + .unwrap_or_else(|| "*".to_string()); + if agg.distinct { + format!("{func_name}(DISTINCT {col})") + } else { + format!("{func_name}({col})") + } + } + } +} + +fn deparse_agg_with_alias_mysql(agg: &Aggregate) -> String { + format!("{} AS {}", deparse_agg_mysql(agg), quote_ident(&agg.alias)) +} + fn quote_ident(name: &str) -> String { format!("`{}`", name.replace('`', "``")) } @@ -192,7 +216,7 @@ impl CellFormatter for MysqlCellFormatter { } #[wrappers_fdw( - version = "0.1.0", + version = "0.1.1", author = "Wener", website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/mysql_fdw", error_type = "MysqlFdwError" @@ -205,7 +229,7 @@ pub(crate) struct MysqlFdw { tgt_cols: Vec, sql_query: String, stream: Option>, - scaned_row_cnt: usize, + scanned_row_cnt: usize, } impl MysqlFdw { @@ -228,7 +252,7 @@ impl MysqlFdw { .join(", ") }; - let table = if self.table.starts_with("(") && self.table.ends_with(")") { + let table = if self.table.starts_with('(') && self.table.ends_with(')') { self.table.clone() } else { quote_ident(&self.table) @@ -274,6 +298,57 @@ impl MysqlFdw { Ok(sql) } + fn deparse_aggregate( + &self, + aggregates: &[Aggregate], + group_by: &[Column], + quals: &[Qual], + ) -> MysqlFdwResult { + let table = if self.table.starts_with('(') && self.table.ends_with(')') { + self.table.clone() + } else { + quote_ident(&self.table) + }; + + // SELECT: group-by columns first (backtick-quoted), then aggregate expressions + let mut select_items: Vec = group_by.iter().map(|c| quote_ident(&c.name)).collect(); + for agg in aggregates { + select_items.push(deparse_agg_with_alias_mysql(agg)); + } + let mut sql = format!( + "select {} from {} as _wrappers_tbl", + select_items.join(", "), + table + ); + + // WHERE: same qual-to-SQL as deparse(), skip array-valued quals + if !quals.is_empty() { + let mut fmt = MysqlCellFormatter; + let cond = quals + .iter() + // Array quals (use_or=true) are skipped — PostgreSQL re-applies them locally. + .filter(|q| !matches!(&q.value, Value::Array(_))) + .map(|q| deparse_qual(q, &mut fmt)) + .collect::>() + .join(" and "); + if !cond.is_empty() { + sql.push_str(&format!(" where {cond}")); + } + } + + // GROUP BY: backtick-quoted column names + if !group_by.is_empty() { + let cols = group_by + .iter() + .map(|c| quote_ident(&c.name)) + .collect::>() + .join(", "); + sql.push_str(&format!(" group by {cols}")); + } + + Ok(sql) + } + fn create_conn(&self) -> MysqlFdwResult { if let Some(pool) = &self.pool { let conn = self.rt.block_on(async { @@ -344,7 +419,7 @@ impl ForeignDataWrapper for MysqlFdw { tgt_cols: Vec::new(), sql_query: String::default(), stream: None, - scaned_row_cnt: 0, + scanned_row_cnt: 0, }) } @@ -358,7 +433,7 @@ impl ForeignDataWrapper for MysqlFdw { ) -> MysqlFdwResult<()> { self.table = require_option("table", options)?.to_string(); self.tgt_cols = columns.to_vec(); - self.scaned_row_cnt = 0; + self.scanned_row_cnt = 0; self.sql_query = self.deparse(quals, columns, sorts, limit)?; self.setup_streaming() } @@ -380,7 +455,7 @@ impl ForeignDataWrapper for MysqlFdw { tgt_row.push(&tgt_col.name, cell); } row.replace_with(tgt_row); - self.scaned_row_cnt += 1; + self.scanned_row_cnt += 1; return Ok(Some(())); } } @@ -388,12 +463,12 @@ impl ForeignDataWrapper for MysqlFdw { stats::inc_stats( Self::FDW_NAME, stats::Metric::RowsIn, - self.scaned_row_cnt as i64, + self.scanned_row_cnt as i64, ); stats::inc_stats( Self::FDW_NAME, stats::Metric::RowsOut, - self.scaned_row_cnt as i64, + self.scanned_row_cnt as i64, ); Ok(None) @@ -482,6 +557,48 @@ impl ForeignDataWrapper for MysqlFdw { self.disconnect_pool() } + fn supported_aggregates(&self) -> Vec { + vec![ + AggregateKind::Count, + AggregateKind::CountColumn, + AggregateKind::Sum, + AggregateKind::Avg, + AggregateKind::Min, + AggregateKind::Max, + ] + } + + fn supports_group_by(&self) -> bool { + true + } + + fn begin_aggregate_scan( + &mut self, + aggregates: &[Aggregate], + group_by: &[Column], + quals: &[Qual], + options: &HashMap, + ) -> MysqlFdwResult<()> { + self.table = require_option("table", options)?.to_string(); + self.scanned_row_cnt = 0; + self.sql_query = self.deparse_aggregate(aggregates, group_by, quals)?; + + // tgt_cols must match the SELECT order exactly: group-by columns first, + // then aggregate result columns named by their aliases. iter_scan uses + // tgt_col.name to look up each value in the MySQL result row. + let mut tgt_cols: Vec = group_by.to_vec(); + for (i, agg) in aggregates.iter().enumerate() { + tgt_cols.push(Column { + name: agg.alias.clone(), + num: group_by.len() + i + 1, + type_oid: agg.type_oid, + }); + } + self.tgt_cols = tgt_cols; + + self.setup_streaming() + } + fn import_foreign_schema( &mut self, stmt: ImportForeignSchemaStmt, diff --git a/wrappers/src/fdw/mysql_fdw/tests.rs b/wrappers/src/fdw/mysql_fdw/tests.rs index a1430ac88..b088d669f 100644 --- a/wrappers/src/fdw/mysql_fdw/tests.rs +++ b/wrappers/src/fdw/mysql_fdw/tests.rs @@ -52,6 +52,44 @@ mod tests { ); } + fn setup_mysql_agg_test_data() { + let rt = create_async_runtime().expect("failed to create runtime"); + let pool = Pool::new("mysql://root:password@localhost:3306/testdb"); + let mut conn = rt + .block_on(async { + let conn = pool.get_conn().await?; + Ok::<_, MySqlError>(conn) + }) + .expect("failed to connect to mysql"); + + let mut exec = |sql: &str| { + rt.block_on(async { + conn.query_drop(sql).await?; + Ok::<_, MySqlError>(()) + }) + .expect("failed to execute query"); + }; + + exec("CREATE DATABASE IF NOT EXISTS testdb"); + exec("DROP TABLE IF EXISTS testdb.orders"); + exec( + "CREATE TABLE testdb.orders ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + name VARCHAR(100) NOT NULL, + amount DECIMAL(12,2) NOT NULL, + status VARCHAR(50) NOT NULL + )", + ); + exec( + "INSERT INTO testdb.orders (name, amount, status) VALUES + ('Alice', 100.00, 'active'), + ('Bob', 50.00, 'active'), + ('Carol', 200.00, 'inactive'), + ('Dave', 75.00, 'active'), + ('Eve', 150.00, 'inactive')", + ); + } + #[pg_test] fn mysql_smoketest() { setup_mysql_test_data(); @@ -263,4 +301,297 @@ mod tests { assert_eq!(deleted_cnt, 0); }); } + + #[pg_test] + fn mysql_aggregate_pushdown_test() { + setup_mysql_agg_test_data(); + + Spi::connect_mut(|c| { + c.update( + "CREATE FOREIGN DATA WRAPPER mysql_agg_wrapper + HANDLER mysql_fdw_handler VALIDATOR mysql_fdw_validator", + None, + &[], + ) + .expect("failed to create FDW"); + c.update( + "CREATE SERVER mysql_agg_server FOREIGN DATA WRAPPER mysql_agg_wrapper + OPTIONS (conn_string 'mysql://root:password@localhost:3306/testdb')", + None, + &[], + ) + .expect("failed to create server"); + c.update("CREATE SCHEMA IF NOT EXISTS mysql_agg", None, &[]) + .expect("failed to create schema"); + c.update( + "CREATE FOREIGN TABLE mysql_agg.orders ( + id bigint, + name text, + amount numeric, + status text + ) + SERVER mysql_agg_server + OPTIONS (table 'orders')", + None, + &[], + ) + .expect("failed to create foreign table"); + + // Helper: collect EXPLAIN lines and assert no local Aggregate node. + // A pushed-down aggregate is a bare ForeignScan; a non-pushed aggregate + // has a HashAggregate/GroupAggregate node above ForeignScan. + macro_rules! assert_pushed_down { + ($c:expr, $sql:expr) => {{ + let explain = format!("EXPLAIN {}", $sql); + let plan: Vec = $c + .select(&explain, None, &[]) + .unwrap() + .filter_map(|r| r.get::<&str>(1).unwrap().map(|s| s.to_string())) + .collect(); + assert!( + !plan + .iter() + .any(|l| l.contains("Aggregate") && l.contains("(cost=")), + "Expected pushdown for [{}] but plan shows local aggregation:\n{:?}", + $sql, + plan + ); + }}; + } + + macro_rules! assert_not_pushed_down { + ($c:expr, $sql:expr) => {{ + let explain = format!("EXPLAIN {}", $sql); + let plan: Vec = $c + .select(&explain, None, &[]) + .unwrap() + .filter_map(|r| r.get::<&str>(1).unwrap().map(|s| s.to_string())) + .collect(); + assert!( + plan.iter() + .any(|l| l.contains("Aggregate") && l.contains("(cost=")), + "Expected NO pushdown (local Aggregate) for [{}], plan:\n{:?}", + $sql, + plan + ); + }}; + } + + // --- COUNT(*) whole-table --- + let cnt: i64 = c + .select("SELECT COUNT(*) FROM mysql_agg.orders", None, &[]) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap(); + assert_eq!(cnt, 5); + assert_pushed_down!(c, "SELECT COUNT(*) FROM mysql_agg.orders"); + + // --- COUNT(id) --- + let cnt: i64 = c + .select("SELECT COUNT(id) FROM mysql_agg.orders", None, &[]) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap(); + assert_eq!(cnt, 5); + assert_pushed_down!(c, "SELECT COUNT(id) FROM mysql_agg.orders"); + + // --- COUNT(DISTINCT status) — 'active' and 'inactive' = 2 --- + let cnt: i64 = c + .select( + "SELECT COUNT(DISTINCT status) FROM mysql_agg.orders", + None, + &[], + ) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap(); + assert_eq!(cnt, 2); + assert_pushed_down!(c, "SELECT COUNT(DISTINCT status) FROM mysql_agg.orders"); + + // --- SUM whole-table --- + // amounts: 100, 50, 200, 75, 150 → sum=575 + let s: f64 = c + .select("SELECT SUM(amount) FROM mysql_agg.orders", None, &[]) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap() + .to_string() + .parse() + .unwrap(); + assert!((s - 575.0).abs() < 0.01, "SUM expected 575.0, got {s}"); + assert_pushed_down!(c, "SELECT SUM(amount) FROM mysql_agg.orders"); + + // --- AVG whole-table → 575/5 = 115.0 --- + let avg: f64 = c + .select("SELECT AVG(amount) FROM mysql_agg.orders", None, &[]) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap() + .to_string() + .parse() + .unwrap(); + assert!((avg - 115.0).abs() < 0.01, "AVG expected 115.0, got {avg}"); + assert_pushed_down!(c, "SELECT AVG(amount) FROM mysql_agg.orders"); + + // --- MIN whole-table → 50 --- + let mn: f64 = c + .select("SELECT MIN(amount) FROM mysql_agg.orders", None, &[]) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap() + .to_string() + .parse() + .unwrap(); + assert!((mn - 50.0).abs() < 0.01, "MIN expected 50.0, got {mn}"); + assert_pushed_down!(c, "SELECT MIN(amount) FROM mysql_agg.orders"); + + // --- MAX whole-table → 200 --- + let mx: f64 = c + .select("SELECT MAX(amount) FROM mysql_agg.orders", None, &[]) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap() + .to_string() + .parse() + .unwrap(); + assert!((mx - 200.0).abs() < 0.01, "MAX expected 200.0, got {mx}"); + assert_pushed_down!(c, "SELECT MAX(amount) FROM mysql_agg.orders"); + + // --- GROUP BY status, COUNT(*) --- + // active: Alice(100), Bob(50), Dave(75) → count=3 + // inactive: Carol(200), Eve(150) → count=2 + let mut rows: Vec<(String, i64)> = c + .select( + "SELECT status, COUNT(*) AS cnt FROM mysql_agg.orders + GROUP BY status ORDER BY status", + None, + &[], + ) + .unwrap() + .map(|r| { + let s = r + .get_by_name::<&str, _>("status") + .unwrap() + .unwrap() + .to_string(); + let n = r.get_by_name::("cnt").unwrap().unwrap(); + (s, n) + }) + .collect(); + rows.sort(); + assert_eq!( + rows, + vec![("active".to_string(), 3), ("inactive".to_string(), 2)] + ); + assert_pushed_down!( + c, + "SELECT status, COUNT(*) FROM mysql_agg.orders GROUP BY status" + ); + + // --- WHERE + GROUP BY --- + // amount > 75: Alice(100, active), Carol(200, inactive), Eve(150, inactive) + // active SUM = 100; inactive SUM = 350 + let mut rows: Vec<(String, f64)> = c + .select( + "SELECT status, SUM(amount) AS s FROM mysql_agg.orders + WHERE amount > 75 GROUP BY status ORDER BY status", + None, + &[], + ) + .unwrap() + .map(|r| { + let st = r + .get_by_name::<&str, _>("status") + .unwrap() + .unwrap() + .to_string(); + let s: f64 = r + .get_by_name::("s") + .unwrap() + .unwrap() + .to_string() + .parse() + .unwrap(); + (st, s) + }) + .collect(); + rows.sort_by(|a, b| a.0.cmp(&b.0)); + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].0, "active"); + assert!( + (rows[0].1 - 100.0).abs() < 0.01, + "active SUM expected 100.0, got {}", + rows[0].1 + ); + assert_eq!(rows[1].0, "inactive"); + assert!( + (rows[1].1 - 350.0).abs() < 0.01, + "inactive SUM expected 350.0, got {}", + rows[1].1 + ); + assert_pushed_down!( + c, + "SELECT status, SUM(amount) FROM mysql_agg.orders WHERE amount > 75 GROUP BY status" + ); + + // --- Negative: HAVING — must NOT push down --- + // active count=3, inactive count=2 → HAVING COUNT(*) > 2 → only 'active' + let rows: Vec<(String, i64)> = c + .select( + "SELECT status, COUNT(*)::bigint AS cnt FROM mysql_agg.orders + GROUP BY status HAVING COUNT(*) > 2 ORDER BY status", + None, + &[], + ) + .unwrap() + .map(|r| { + let st = r + .get_by_name::<&str, _>("status") + .unwrap() + .unwrap() + .to_string(); + let n = r.get_by_name::("cnt").unwrap().unwrap(); + (st, n) + }) + .collect(); + assert_eq!(rows, vec![("active".to_string(), 3)]); + assert_not_pushed_down!( + c, + "SELECT status, COUNT(*) FROM mysql_agg.orders GROUP BY status HAVING COUNT(*) > 2" + ); + + // --- Negative: unsupported aggregate (STDDEV) --- + // amounts: 100, 50, 200, 75, 150 + // mean=115, sum-of-sq-deviations=14500, sample stddev=sqrt(14500/4)≈60.21 + let stddev: f64 = c + .select("SELECT STDDEV(amount) FROM mysql_agg.orders", None, &[]) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap() + .to_string() + .parse() + .unwrap(); + assert!( + (stddev - 60.21).abs() < 0.1, + "expected STDDEV ≈ 60.21, got {stddev}" + ); + assert_not_pushed_down!(c, "SELECT STDDEV(amount) FROM mysql_agg.orders"); + }); + } } From b1447326de8d1cf494a382aee644ff545c05a6a0 Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Mon, 4 May 2026 13:01:42 +1000 Subject: [PATCH 2/3] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- wrappers/src/fdw/mysql_fdw/mysql_fdw.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/wrappers/src/fdw/mysql_fdw/mysql_fdw.rs b/wrappers/src/fdw/mysql_fdw/mysql_fdw.rs index 37794a6b2..b49aa58f2 100644 --- a/wrappers/src/fdw/mysql_fdw/mysql_fdw.rs +++ b/wrappers/src/fdw/mysql_fdw/mysql_fdw.rs @@ -321,13 +321,12 @@ impl MysqlFdw { table ); - // WHERE: same qual-to-SQL as deparse(), skip array-valued quals + // WHERE: preserve all pushed-down quals, including array-valued ones, + // because aggregate pushdown has no lower ForeignScan left to reapply them. if !quals.is_empty() { let mut fmt = MysqlCellFormatter; let cond = quals .iter() - // Array quals (use_or=true) are skipped — PostgreSQL re-applies them locally. - .filter(|q| !matches!(&q.value, Value::Array(_))) .map(|q| deparse_qual(q, &mut fmt)) .collect::>() .join(" and "); From db6052a41e5f97fe3a58c20880b50778fec95667 Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Mon, 4 May 2026 13:19:36 +1000 Subject: [PATCH 3/3] feat: add tests for aggregate pushdown with subquery foreign tables in MySQL --- wrappers/src/fdw/mysql_fdw/tests.rs | 114 ++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/wrappers/src/fdw/mysql_fdw/tests.rs b/wrappers/src/fdw/mysql_fdw/tests.rs index b088d669f..b11115654 100644 --- a/wrappers/src/fdw/mysql_fdw/tests.rs +++ b/wrappers/src/fdw/mysql_fdw/tests.rs @@ -336,6 +336,19 @@ mod tests { &[], ) .expect("failed to create foreign table"); + // Subquery foreign table — exercises the starts_with('(') branch in + // deparse_aggregate, which passes the subquery through unquoted. + c.update( + "CREATE FOREIGN TABLE mysql_agg.orders_sub ( + amount numeric, + status text + ) + SERVER mysql_agg_server + OPTIONS (table '(select amount, status from testdb.orders)')", + None, + &[], + ) + .expect("failed to create subquery foreign table"); // Helper: collect EXPLAIN lines and assert no local Aggregate node. // A pushed-down aggregate is a bare ForeignScan; a non-pushed aggregate @@ -592,6 +605,107 @@ mod tests { "expected STDDEV ≈ 60.21, got {stddev}" ); assert_not_pushed_down!(c, "SELECT STDDEV(amount) FROM mysql_agg.orders"); + + // --- IN clause: array-valued qual must be included in the pushed-down WHERE --- + // id IN (1,2,3) → Alice(100,active), Bob(50,active), Carol(200,inactive) + // deparse_aggregate renders use_or=true array quals as `id`=1 or `id`=2 or `id`=3 + let cnt: i64 = c + .select( + "SELECT COUNT(*) FROM mysql_agg.orders WHERE id IN (1, 2, 3)", + None, + &[], + ) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap(); + assert_eq!(cnt, 3, "COUNT(*) WHERE id IN (1,2,3) expected 3, got {cnt}"); + assert_pushed_down!( + c, + "SELECT COUNT(*) FROM mysql_agg.orders WHERE id IN (1, 2, 3)" + ); + + // SUM over the same IN filter: 100+50+200 = 350 + let s: f64 = c + .select( + "SELECT SUM(amount) FROM mysql_agg.orders WHERE id IN (1, 2, 3)", + None, + &[], + ) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap() + .to_string() + .parse() + .unwrap(); + assert!( + (s - 350.0).abs() < 0.01, + "SUM WHERE id IN (1,2,3) expected 350.0, got {s}" + ); + assert_pushed_down!( + c, + "SELECT SUM(amount) FROM mysql_agg.orders WHERE id IN (1, 2, 3)" + ); + + // --- Subquery table: verify aggregate pushdown through starts_with('(') branch --- + // COUNT(*) over the subquery — same 5 rows + let cnt: i64 = c + .select("SELECT COUNT(*) FROM mysql_agg.orders_sub", None, &[]) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap(); + assert_eq!(cnt, 5); + assert_pushed_down!(c, "SELECT COUNT(*) FROM mysql_agg.orders_sub"); + + // GROUP BY SUM over the subquery + // active: 100+50+75=225, inactive: 200+150=350 + let mut rows: Vec<(String, f64)> = c + .select( + "SELECT status, SUM(amount) AS s FROM mysql_agg.orders_sub + GROUP BY status ORDER BY status", + None, + &[], + ) + .unwrap() + .map(|r| { + let st = r + .get_by_name::<&str, _>("status") + .unwrap() + .unwrap() + .to_string(); + let s: f64 = r + .get_by_name::("s") + .unwrap() + .unwrap() + .to_string() + .parse() + .unwrap(); + (st, s) + }) + .collect(); + rows.sort_by(|a, b| a.0.cmp(&b.0)); + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].0, "active"); + assert!( + (rows[0].1 - 225.0).abs() < 0.01, + "active SUM expected 225.0, got {}", + rows[0].1 + ); + assert_eq!(rows[1].0, "inactive"); + assert!( + (rows[1].1 - 350.0).abs() < 0.01, + "inactive SUM expected 350.0, got {}", + rows[1].1 + ); + assert_pushed_down!( + c, + "SELECT status, SUM(amount) FROM mysql_agg.orders_sub GROUP BY status" + ); }); } }