From 1d85102698f22f1c5fb59d8b3b81def04464943a Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 15 Oct 2025 15:34:49 +0200 Subject: [PATCH 1/4] Sync streams: Support aliases --- packages/sync-rules/src/BaseSqlDataQuery.ts | 18 ++-- .../sync-rules/src/SqlBucketDescriptor.ts | 4 +- packages/sync-rules/src/SqlDataQuery.ts | 12 +-- packages/sync-rules/src/SqlParameterQuery.ts | 16 ++-- .../sync-rules/src/StaticSqlParameterQuery.ts | 4 +- packages/sync-rules/src/TableQuerySchema.ts | 11 ++- .../TableValuedFunctionSqlParameterQuery.ts | 16 ++-- .../src/events/SqlEventSourceQuery.ts | 10 +- packages/sync-rules/src/sql_filters.ts | 95 +++++++++++++++---- packages/sync-rules/src/streams/from_sql.ts | 12 +-- packages/sync-rules/src/streams/stream.ts | 4 +- packages/sync-rules/src/types.ts | 8 ++ packages/sync-rules/test/src/streams.test.ts | 70 +++++++++++++- 13 files changed, 206 insertions(+), 74 deletions(-) diff --git a/packages/sync-rules/src/BaseSqlDataQuery.ts b/packages/sync-rules/src/BaseSqlDataQuery.ts index 342330e18..1dd569f9f 100644 --- a/packages/sync-rules/src/BaseSqlDataQuery.ts +++ b/packages/sync-rules/src/BaseSqlDataQuery.ts @@ -2,7 +2,7 @@ import { SelectedColumn } from 'pgsql-ast-parser'; import { SqlRuleError } from './errors.js'; import { ColumnDefinition } from './ExpressionType.js'; import { SourceTableInterface } from './SourceTableInterface.js'; -import { SqlTools } from './sql_filters.js'; +import { AvailableTable, SqlTools } from './sql_filters.js'; import { TablePattern } from './TablePattern.js'; import { BucketIdTransformer, @@ -31,7 +31,7 @@ export interface EvaluateRowOptions { export interface BaseSqlDataQueryOptions { sourceTable: TablePattern; - table: string; + table: AvailableTable; sql: string; columns: SelectedColumn[]; extractors: RowValueExtractor[]; @@ -52,7 +52,7 @@ export class BaseSqlDataQuery { * * This is used for the output table name. */ - readonly table: string; + readonly table: AvailableTable; /** * The source SQL query, for debugging purposes. @@ -121,12 +121,12 @@ export class BaseSqlDataQuery { // Wildcard without alias - use source return sourceTable; } else { - return this.table; + return this.table.sqlName; } } isUnaliasedWildcard() { - return this.sourceTable.isWildcard && this.table == this.sourceTable.tablePattern; + return this.sourceTable.isWildcard && !this.table.isAliased; } columnOutputNames(): string[] { @@ -157,7 +157,7 @@ export class BaseSqlDataQuery { this.getColumnOutputsFor(schemaTable, output); } result.push({ - name: this.table, + name: this.table.sqlName, columns: Object.values(output) }); } @@ -181,7 +181,7 @@ export class BaseSqlDataQuery { try { const { table, row, bucketIds } = options; - const tables = { [this.table]: this.addSpecialParameters(table, row) }; + const tables = { [this.table.schemaName]: this.addSpecialParameters(table, row) }; const resolvedBucketIds = bucketIds(tables); const data = this.transformRow(tables); @@ -221,7 +221,7 @@ export class BaseSqlDataQuery { protected getColumnOutputsFor(schemaTable: SourceSchemaTable, output: Record) { const querySchema: QuerySchema = { getColumn: (table, column) => { - if (table == this.table) { + if (table == this.table.schemaName) { return schemaTable.getColumn(column); } else { // TODO: bucket parameters? @@ -229,7 +229,7 @@ export class BaseSqlDataQuery { } }, getColumns: (table) => { - if (table == this.table) { + if (table == this.table.schemaName) { return schemaTable.getColumns(); } else { return []; diff --git a/packages/sync-rules/src/SqlBucketDescriptor.ts b/packages/sync-rules/src/SqlBucketDescriptor.ts index 353dbb21c..c121b9038 100644 --- a/packages/sync-rules/src/SqlBucketDescriptor.ts +++ b/packages/sync-rules/src/SqlBucketDescriptor.ts @@ -217,12 +217,12 @@ export class SqlBucketDescriptor implements BucketSource { debugWriteOutputTables(result: Record): void { for (let q of this.dataQueries) { - result[q.table!] ??= []; + result[q.table!.sqlName] ??= []; const r = { query: q.sql }; - result[q.table!].push(r); + result[q.table!.sqlName].push(r); } } diff --git a/packages/sync-rules/src/SqlDataQuery.ts b/packages/sync-rules/src/SqlDataQuery.ts index ac4843789..8a0922133 100644 --- a/packages/sync-rules/src/SqlDataQuery.ts +++ b/packages/sync-rules/src/SqlDataQuery.ts @@ -4,7 +4,7 @@ import { BaseSqlDataQuery, BaseSqlDataQueryOptions, RowValueExtractor } from './ import { SqlRuleError } from './errors.js'; import { ExpressionType } from './ExpressionType.js'; import { SourceTableInterface } from './SourceTableInterface.js'; -import { SqlTools } from './sql_filters.js'; +import { AvailableTable, SqlTools } from './sql_filters.js'; import { checkUnsupportedFeatures, isClauseError } from './sql_support.js'; import { SyncRulesOptions } from './SqlSyncRules.js'; import { TablePattern } from './TablePattern.js'; @@ -48,7 +48,7 @@ export class SqlDataQuery extends BaseSqlDataQuery { if (tableRef?.name == null) { throw new SqlRuleError('Must SELECT from a single table', sql, q.from?.[0]._location); } - const alias: string = tableRef.alias ?? tableRef.name; + const alias = AvailableTable.fromAst(tableRef); const sourceTable = new TablePattern(tableRef.schema ?? options.defaultSchema, tableRef.name); let querySchema: QuerySchema | undefined = undefined; @@ -71,7 +71,7 @@ export class SqlDataQuery extends BaseSqlDataQuery { const where = q.where; const tools = new SqlTools({ table: alias, - parameterTables: ['bucket'], + parameterTables: [new AvailableTable('bucket')], valueTables: [alias], compatibilityContext: compatibility, sql, @@ -123,7 +123,7 @@ export class SqlDataQuery extends BaseSqlDataQuery { } else { extractors.push({ extract: (tables, output) => { - const row = tables[alias]; + const row = tables[alias.schemaName]; for (let key in row) { if (key.startsWith('_')) { continue; @@ -132,7 +132,7 @@ export class SqlDataQuery extends BaseSqlDataQuery { } }, getTypes(schema, into) { - for (let column of schema.getColumns(alias)) { + for (let column of schema.getColumns(alias.schemaName)) { into[column.name] ??= column; } } @@ -146,7 +146,7 @@ export class SqlDataQuery extends BaseSqlDataQuery { // Not performing schema-based validation - assume there is an id hasId = true; } else { - const idType = querySchema.getColumn(alias, 'id')?.type ?? ExpressionType.NONE; + const idType = querySchema.getColumn(alias.schemaName, 'id')?.type ?? ExpressionType.NONE; if (!idType.isNone()) { hasId = true; } diff --git a/packages/sync-rules/src/SqlParameterQuery.ts b/packages/sync-rules/src/SqlParameterQuery.ts index b8102b7bf..c6fccb822 100644 --- a/packages/sync-rules/src/SqlParameterQuery.ts +++ b/packages/sync-rules/src/SqlParameterQuery.ts @@ -8,7 +8,7 @@ import { import { BucketParameterQuerier, ParameterLookup, ParameterLookupSource } from './BucketParameterQuerier.js'; import { SqlRuleError } from './errors.js'; import { SourceTableInterface } from './SourceTableInterface.js'; -import { SqlTools } from './sql_filters.js'; +import { AvailableTable, SqlTools } from './sql_filters.js'; import { checkUnsupportedFeatures, isClauseError, isParameterValueClause } from './sql_support.js'; import { StaticSqlParameterQuery } from './StaticSqlParameterQuery.js'; import { TablePattern } from './TablePattern.js'; @@ -33,7 +33,7 @@ import { filterJsonRow, getBucketId, isJsonValue, isSelectStatement, normalizePa export interface SqlParameterQueryOptions { sourceTable: TablePattern; - table: string; + table: AvailableTable; sql: string; lookupExtractors: Record; parameterExtractors: Record; @@ -95,8 +95,8 @@ export class SqlParameterQuery { if (tableRef?.name == null) { throw new SqlRuleError('Must SELECT from a single table', sql, q.from?.[0]._location); } - const alias: string = q.from?.[0].name.alias ?? tableRef.name; - if (tableRef.name != alias) { + const alias = new AvailableTable(tableRef.name, q.from?.[0].name.alias); + if (alias.isAliased) { errors.push(new SqlRuleError('Table aliases not supported in parameter queries', sql, q.from?.[0]._location)); } const sourceTable = new TablePattern(tableRef.schema ?? options.defaultSchema, tableRef.name); @@ -119,7 +119,7 @@ export class SqlParameterQuery { const tools = new SqlTools({ table: alias, - parameterTables: ['token_parameters', 'user_parameters'], + parameterTables: [new AvailableTable('token_parameters'), new AvailableTable('user_parameters')], sql, supportsExpandingParameters: true, supportsParameterExpressions: true, @@ -214,7 +214,7 @@ export class SqlParameterQuery { * * Currently, this always matches sourceTable.name. */ - readonly table: string; + readonly table: AvailableTable; /** * The source SQL query, for debugging purposes. @@ -308,7 +308,7 @@ export class SqlParameterQuery { */ evaluateParameterRow(row: SqliteRow): EvaluatedParametersResult[] { const tables = { - [this.table]: row + [this.table.sqlName]: row }; try { const filterParameters = this.filter.filterRow(tables); @@ -336,7 +336,7 @@ export class SqlParameterQuery { } private transformRows(row: SqliteRow): SqliteRow[] { - const tables = { [this.table]: row }; + const tables = { [this.table.sqlName]: row }; let result: SqliteRow = {}; for (let key in this.lookupExtractors) { const extractor = this.lookupExtractors[key]; diff --git a/packages/sync-rules/src/StaticSqlParameterQuery.ts b/packages/sync-rules/src/StaticSqlParameterQuery.ts index 33c711f16..35ea8c5b5 100644 --- a/packages/sync-rules/src/StaticSqlParameterQuery.ts +++ b/packages/sync-rules/src/StaticSqlParameterQuery.ts @@ -1,7 +1,7 @@ import { SelectedColumn, SelectFromStatement } from 'pgsql-ast-parser'; import { BucketDescription, BucketPriority, DEFAULT_BUCKET_PRIORITY } from './BucketDescription.js'; import { SqlRuleError } from './errors.js'; -import { SqlTools } from './sql_filters.js'; +import { AvailableTable, SqlTools } from './sql_filters.js'; import { checkUnsupportedFeatures, isClauseError, isParameterValueClause, sqliteBool } from './sql_support.js'; import { BucketIdTransformer, @@ -43,7 +43,7 @@ export class StaticSqlParameterQuery { const tools = new SqlTools({ table: undefined, - parameterTables: ['token_parameters', 'user_parameters'], + parameterTables: [new AvailableTable('token_parameters'), new AvailableTable('user_parameters')], supportsParameterExpressions: true, compatibilityContext: options.compatibility, sql diff --git a/packages/sync-rules/src/TableQuerySchema.ts b/packages/sync-rules/src/TableQuerySchema.ts index 5c68ae1e9..910182298 100644 --- a/packages/sync-rules/src/TableQuerySchema.ts +++ b/packages/sync-rules/src/TableQuerySchema.ts @@ -1,14 +1,19 @@ import { ColumnDefinition } from './ExpressionType.js'; +import { AvailableTable } from './sql_filters.js'; import { QuerySchema, SourceSchemaTable } from './types.js'; +/** + * Exposes a list of {@link SourceSchemaTable}s as a {@link QuerySchema} by only exposing the subset of the schema + * referenced in a `FROM` clause. + */ export class TableQuerySchema implements QuerySchema { constructor( private tables: SourceSchemaTable[], - private alias: string + private alias: AvailableTable ) {} getColumn(table: string, column: string): ColumnDefinition | undefined { - if (table != this.alias) { + if (table != this.alias.schemaName) { return undefined; } for (let table of this.tables) { @@ -21,7 +26,7 @@ export class TableQuerySchema implements QuerySchema { } getColumns(table: string): ColumnDefinition[] { - if (table != this.alias) { + if (table != this.alias.schemaName) { return []; } let columns: Record = {}; diff --git a/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts b/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts index 49597d814..2e49e336c 100644 --- a/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts +++ b/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts @@ -1,6 +1,6 @@ import { FromCall, SelectFromStatement } from 'pgsql-ast-parser'; import { SqlRuleError } from './errors.js'; -import { SqlTools } from './sql_filters.js'; +import { AvailableTable, SqlTools } from './sql_filters.js'; import { checkUnsupportedFeatures, isClauseError, isParameterValueClause, sqliteBool } from './sql_support.js'; import { generateTableValuedFunctions, TableValuedFunction } from './TableValuedFunctions.js'; import { @@ -26,7 +26,7 @@ export interface TableValuedFunctionSqlParameterQueryOptions { filter: ParameterValueClause | undefined; callClause: ParameterValueClause | undefined; function: TableValuedFunction; - callTableName: string; + callTable: AvailableTable; errors: SqlRuleError[]; } @@ -59,12 +59,12 @@ export class TableValuedFunctionSqlParameterQuery { throw new SqlRuleError(`Table-valued function ${call.function.name} is not defined.`, sql, call); } - const callTable = call.alias?.name ?? call.function.name; + const callTable = AvailableTable.fromCall(call); const callExpression = call.args[0]; const tools = new SqlTools({ table: callTable, - parameterTables: ['token_parameters', 'user_parameters', callTable], + parameterTables: [new AvailableTable('token_parameters'), new AvailableTable('user_parameters'), callTable], supportsParameterExpressions: true, compatibilityContext: compatibility, sql @@ -108,7 +108,7 @@ export class TableValuedFunctionSqlParameterQuery { filter: isClauseError(filter) ? undefined : filter, callClause: isClauseError(callClause) ? undefined : callClause, function: functionImpl, - callTableName: callTable, + callTable, priority: priority ?? DEFAULT_BUCKET_PRIORITY, queryId, errors @@ -186,7 +186,7 @@ export class TableValuedFunctionSqlParameterQuery { * * Only used internally. */ - readonly callTableName: string; + readonly callTable: AvailableTable; readonly errors: SqlRuleError[]; @@ -201,7 +201,7 @@ export class TableValuedFunctionSqlParameterQuery { this.filter = options.filter; this.callClause = options.callClause; this.function = options.function; - this.callTableName = options.callTableName; + this.callTable = options.callTable; this.errors = options.errors; } @@ -232,7 +232,7 @@ export class TableValuedFunctionSqlParameterQuery { const mergedParams: ParameterValueSet = { ...parameters, lookup: (table, column) => { - if (table == this.callTableName) { + if (table == this.callTable.schemaName) { return row[column]!; } else { return parameters.lookup(table, column); diff --git a/packages/sync-rules/src/events/SqlEventSourceQuery.ts b/packages/sync-rules/src/events/SqlEventSourceQuery.ts index 629067f22..e2233b949 100644 --- a/packages/sync-rules/src/events/SqlEventSourceQuery.ts +++ b/packages/sync-rules/src/events/SqlEventSourceQuery.ts @@ -3,7 +3,7 @@ import { BaseSqlDataQuery, BaseSqlDataQueryOptions, RowValueExtractor } from '.. import { SqlRuleError } from '../errors.js'; import { ExpressionType } from '../ExpressionType.js'; import { SourceTableInterface } from '../SourceTableInterface.js'; -import { SqlTools } from '../sql_filters.js'; +import { AvailableTable, SqlTools } from '../sql_filters.js'; import { checkUnsupportedFeatures, isClauseError } from '../sql_support.js'; import { SyncRulesOptions } from '../SqlSyncRules.js'; import { TablePattern } from '../TablePattern.js'; @@ -49,7 +49,7 @@ export class SqlEventSourceQuery extends BaseSqlDataQuery { if (tableRef?.name == null) { throw new SqlRuleError('Must SELECT from a single table', sql, q.from?.[0]._location); } - const alias: string = tableRef.alias ?? tableRef.name; + const alias = AvailableTable.fromAst(tableRef); const sourceTable = new TablePattern(tableRef.schema ?? options.defaultSchema, tableRef.name); let querySchema: QuerySchema | undefined = undefined; @@ -99,7 +99,7 @@ export class SqlEventSourceQuery extends BaseSqlDataQuery { } else { extractors.push({ extract: (tables, output) => { - const row = tables[alias]; + const row = tables[alias.schemaName]; for (let key in row) { if (key.startsWith('_')) { continue; @@ -108,7 +108,7 @@ export class SqlEventSourceQuery extends BaseSqlDataQuery { } }, getTypes(schema, into) { - for (let column of schema.getColumns(alias)) { + for (let column of schema.getColumns(alias.schemaName)) { into[column.name] ??= column; } } @@ -136,7 +136,7 @@ export class SqlEventSourceQuery extends BaseSqlDataQuery { evaluateRowWithErrors(table: SourceTableInterface, row: SqliteRow): EvaluatedEventRowWithErrors { try { - const tables = { [this.table!]: this.addSpecialParameters(table, row) }; + const tables = { [this.table!.sqlName]: this.addSpecialParameters(table, row) }; const data = this.transformRow(tables); return { diff --git a/packages/sync-rules/src/sql_filters.ts b/packages/sync-rules/src/sql_filters.ts index b9e8b9d90..6e6586101 100644 --- a/packages/sync-rules/src/sql_filters.ts +++ b/packages/sync-rules/src/sql_filters.ts @@ -1,5 +1,5 @@ import { JSONBig } from '@powersync/service-jsonbig'; -import { Expr, ExprRef, Name, NodeLocation, QName, QNameAliased, SelectedColumn } from 'pgsql-ast-parser'; +import { Expr, ExprRef, FromCall, Name, NodeLocation, QName, QNameAliased, SelectedColumn } from 'pgsql-ast-parser'; import { nil } from 'pgsql-ast-parser/src/utils.js'; import { BucketPriority, isValidPriority } from './BucketDescription.js'; import { ExpressionType } from './ExpressionType.js'; @@ -53,13 +53,64 @@ export const MATCH_CONST_TRUE: TrueIfParametersMatch = [{}]; Object.freeze(MATCH_CONST_TRUE); Object.freeze(MATCH_CONST_FALSE); +/** + * A table that has been made available to a result set by being included in a `FROM`. + */ +export class AvailableTable { + /** + * The name of the table in the schema. + */ + schemaName: string; + + /** + * The alias under which the {@link schemaName} is made available to the current query. + */ + alias?: string; + + /** + * The name a table has in an SQL expression context. + */ + public get sqlName(): string { + return this.alias ?? this.schemaName; + } + + get isAliased(): boolean { + return this.sqlName != this.schemaName; + } + + constructor(schemaName: string, alias?: string) { + this.schemaName = schemaName; + this.alias = alias; + } + + static fromAst(name: QNameAliased): AvailableTable { + return new AvailableTable(name.name, name.alias); + } + + static fromCall(name: FromCall): AvailableTable { + return new AvailableTable(name.function.name, name.alias?.name); + } + + /** + * Finds the first table matching the given SQL name. + */ + static search( + identifier: string | AvailableTable | undefined, + available: AvailableTable[] + ): AvailableTable | undefined { + const target = identifier instanceof AvailableTable ? identifier.sqlName : identifier; + + return available.find((tbl) => tbl.sqlName == target); + } +} + export interface SqlToolsOptions { /** * Default table name, if any. I.e. SELECT FROM . * * Used for to determine the table when using bare column names. */ - table?: string; + table?: AvailableTable; /** * Set of tables used for FilterParameters. @@ -68,14 +119,14 @@ export interface SqlToolsOptions { * "bucket" (bucket parameters for data query) * "token_parameters" (token parameters for parameter query) */ - parameterTables?: string[]; + parameterTables?: AvailableTable[]; /** * Set of tables used in QueryParameters. * - * If not specified, defaults to [table]. + * If not specified, defaults to {@link table}. */ - valueTables?: string[]; + valueTables?: AvailableTable[]; /** * For debugging / error messages. @@ -111,13 +162,15 @@ export interface SqlToolsOptions { } export class SqlTools { - readonly defaultTable?: string; - readonly valueTables: string[]; + readonly defaultTable?: AvailableTable; + readonly valueTables: AvailableTable[]; /** * ['bucket'] for data queries * ['token_parameters', 'user_parameters'] for parameter queries + * + * These are never aliased. */ - readonly parameterTables: string[]; + readonly parameterTables: AvailableTable[]; readonly sql: string; readonly errors: SqlRuleError[] = []; @@ -215,10 +268,10 @@ export class SqlTools { this.checkRef(table, expr); return { evaluate(tables: QueryParameters): SqliteValue { - return tables[table]?.[column]; + return tables[table.schemaName]?.[column]; }, getColumnDefinition(schema) { - return schema.getColumn(table, column); + return schema.getColumn(table.schemaName, column); } } satisfies RowValueClause; } else { @@ -630,12 +683,12 @@ export class SqlTools { /** * Check if an expression is a parameter_table reference. */ - isParameterRef(expr: Expr): expr is ExprRef { + private isParameterRef(expr: Expr): expr is ExprRef { if (expr.type != 'ref') { return false; } - const tableName = expr.table?.name ?? this.defaultTable; - return this.parameterTables.includes(tableName ?? ''); + const tableName = expr.table?.name ?? this.defaultTable ?? ''; + return AvailableTable.search(tableName, this.parameterTables) != null; } /** @@ -710,17 +763,17 @@ export class SqlTools { } } - private checkRef(table: string, ref: ExprRef) { + private checkRef(table: AvailableTable, ref: ExprRef) { if (this.schema) { - const type = this.schema.getColumn(table, ref.name); + const type = this.schema.getColumn(table.schemaName, ref.name); if (type == null) { this.warn(`Column not found: ${ref.name}`, ref); } } } - getParameterRefClause(expr: ExprRef): ParameterValueClause { - const table = (expr.table?.name ?? this.defaultTable)!; + private getParameterRefClause(expr: ExprRef): ParameterValueClause { + const table = AvailableTable.search(expr.table?.name ?? this.defaultTable!, this.parameterTables)!.schemaName; const column = expr.name; return { key: `${table}.${column}`, @@ -741,13 +794,15 @@ export class SqlTools { * * Only "value" tables are supported here, not parameter values. */ - getTableName(ref: ExprRef): string { + getTableName(ref: ExprRef): AvailableTable { if (this.refHasSchema(ref)) { throw new SqlRuleError(`Specifying schema in column references is not supported`, this.sql, ref); } const tableName = ref.table?.name ?? this.defaultTable; - if (this.valueTables.includes(tableName ?? '')) { - return tableName!; + const found = AvailableTable.search(tableName, this.valueTables); + + if (found != null) { + return found; } else if (ref.table?.name == null) { throw new SqlRuleError(`Table name required`, this.sql, ref); } else { diff --git a/packages/sync-rules/src/streams/from_sql.ts b/packages/sync-rules/src/streams/from_sql.ts index 476fb089c..fa01a8864 100644 --- a/packages/sync-rules/src/streams/from_sql.ts +++ b/packages/sync-rules/src/streams/from_sql.ts @@ -13,7 +13,7 @@ import { } from '../sql_support.js'; import { TablePattern } from '../TablePattern.js'; import { TableQuerySchema } from '../TableQuerySchema.js'; -import { SqlTools } from '../sql_filters.js'; +import { AvailableTable, SqlTools } from '../sql_filters.js'; import { BaseSqlDataQuery, BaseSqlDataQueryOptions, RowValueExtractor } from '../BaseSqlDataQuery.js'; import { ExpressionType } from '../ExpressionType.js'; import { SyncStream } from './stream.js'; @@ -114,7 +114,7 @@ class SyncStreamCompiler { private compileDataQuery( tools: SqlTools, query: SelectFromStatement, - alias: string, + alias: AvailableTable, sourceTable: TablePattern ): BaseSqlDataQueryOptions { let hasId = false; @@ -143,7 +143,7 @@ class SyncStreamCompiler { } else { extractors.push({ extract: (tables, output) => { - const row = tables[alias]; + const row = tables[alias.schemaName]; for (let key in row) { if (key.startsWith('_')) { continue; @@ -152,7 +152,7 @@ class SyncStreamCompiler { } }, getTypes(schema, into) { - for (let column of schema.getColumns(alias)) { + for (let column of schema.getColumns(alias.schemaName)) { into[column.name] ??= column; } } @@ -166,7 +166,7 @@ class SyncStreamCompiler { // Not performing schema-based validation - assume there is an id hasId = true; } else { - const idType = querySchema.getColumn(alias, 'id')?.type ?? ExpressionType.NONE; + const idType = querySchema.getColumn(alias.schemaName, 'id')?.type ?? ExpressionType.NONE; if (!idType.isNone()) { hasId = true; } @@ -417,7 +417,7 @@ class SyncStreamCompiler { if (tableRef?.name == null) { throw new SqlRuleError('Must SELECT from a single table', this.sql, stmt.from?.[0]._location); } - const alias: string = tableRef.alias ?? tableRef.name; + const alias = AvailableTable.fromAst(tableRef); const sourceTable = new TablePattern(tableRef.schema ?? this.options.defaultSchema, tableRef.name); let querySchema: QuerySchema | undefined = undefined; diff --git a/packages/sync-rules/src/streams/stream.ts b/packages/sync-rules/src/streams/stream.ts index 0197b83d2..01070b74d 100644 --- a/packages/sync-rules/src/streams/stream.ts +++ b/packages/sync-rules/src/streams/stream.ts @@ -142,12 +142,12 @@ export class SyncStream implements BucketSource { } debugWriteOutputTables(result: Record): void { - result[this.data.table!] ??= []; + result[this.data.table!.sqlName] ??= []; const r = { query: this.data.sql }; - result[this.data.table!].push(r); + result[this.data.table!.sqlName].push(r); } evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[] { diff --git a/packages/sync-rules/src/types.ts b/packages/sync-rules/src/types.ts index 2cbfe6639..842f726f6 100644 --- a/packages/sync-rules/src/types.ts +++ b/packages/sync-rules/src/types.ts @@ -391,7 +391,15 @@ export interface ParameterValueClause { } export interface QuerySchema { + /** + * @param table The unaliased table, as it appears in the source schema. + * @param column Name of the column to look up. + */ getColumn(table: string, column: string): ColumnDefinition | undefined; + /** + * + * @param table The unaliased table, as it appears in the source schema. + */ getColumns(table: string): ColumnDefinition[]; } diff --git a/packages/sync-rules/test/src/streams.test.ts b/packages/sync-rules/test/src/streams.test.ts index ec30f6585..688cb6aba 100644 --- a/packages/sync-rules/test/src/streams.test.ts +++ b/packages/sync-rules/test/src/streams.test.ts @@ -612,6 +612,70 @@ describe('streams', () => { ).toStrictEqual(['1#stream|1[]', '1#stream|0["issue_id"]']); }); }); + + describe('regression tests', () => { + test('table alias', async () => { + // Regression test for https://discord.com/channels/1138230179878154300/1422138173907144724/1427962895425208382 + const accountMember = new TestSourceTable('account_member'); + const schema = new StaticSchema([ + { + tag: DEFAULT_TAG, + schemas: [ + { + name: 'test_schema', + tables: [ + { + name: 'account_member', + columns: [ + { name: 'id', pg_type: 'uuid' }, + { name: 'account_id', pg_type: 'uuid' } + ] + } + ] + } + ] + } + ]); + + const stream = parseStream( + 'select * from account_member as "outer" where account_id in (select "inner".account_id from account_member as "inner" where "inner".id = auth.user_id())', + 'account_member', + { ...options, schema } + ); + const row = { id: 'id', account_id: 'account_id' }; + + expect(stream.tableSyncsData(accountMember)).toBeTruthy(); + expect(stream.tableSyncsParameters(accountMember)).toBeTruthy(); + + // Ensure lookup steps work. + expect(stream.evaluateParameterRow(accountMember, row)).toStrictEqual([ + { + lookup: ParameterLookup.normalized('account_member', '0', ['id']), + bucketParameters: [ + { + result: 'account_id' + } + ] + } + ]); + expect(evaluateBucketIds(stream, accountMember, row)).toStrictEqual(['1#account_member|0["account_id"]']); + expect( + await queryBucketIds(stream, { + token: { sub: 'id' }, + parameters: {}, + getParameterSets(lookups) { + expect(lookups).toStrictEqual([ParameterLookup.normalized('account_member', '0', ['id'])]); + return [{ result: 'account_id' }]; + } + }) + ).toStrictEqual(['1#account_member|0["account_id"]']); + + // And that the data alias is respected for generated schemas. + const outputSchema = {}; + stream.resolveResultSets(schema, outputSchema); + expect(Object.keys(outputSchema)).toStrictEqual(['outer']); + }); + }); }); const USERS = new TestSourceTable('users'); @@ -705,7 +769,7 @@ async function createQueriers( }, {} ), - streams: { stream: [{ opaque_id: 0, parameters: options?.parameters ?? null }] }, + streams: { [stream.name]: [{ opaque_id: 0, parameters: options?.parameters ?? null }] }, bucketIdTransformer }; @@ -750,8 +814,8 @@ async function queryBucketIds( return buckets; } -function parseStream(sql: string, name = 'stream') { - const [stream, errors] = syncStreamFromSql(name, sql, options); +function parseStream(sql: string, name = 'stream', parseOptions: StreamParseOptions = options) { + const [stream, errors] = syncStreamFromSql(name, sql, parseOptions); if (errors.length) { throw new Error(`Unexpected errors when parsing stream ${sql}: ${errors}`); } From 1c9a28f01f60e8541da05e6580613a1b97115e11 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 15 Oct 2025 15:35:46 +0200 Subject: [PATCH 2/4] Add changeset --- .changeset/good-moles-confess.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/good-moles-confess.md diff --git a/.changeset/good-moles-confess.md b/.changeset/good-moles-confess.md new file mode 100644 index 000000000..9b4b287d3 --- /dev/null +++ b/.changeset/good-moles-confess.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-sync-rules': patch +--- + +Sync streams: Support table aliases in subqueries. From 8627e22980fd380e8a6022ae8da1fa8d4e52be0e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 15 Oct 2025 15:47:17 +0200 Subject: [PATCH 3/4] Consistency --- packages/sync-rules/src/SqlParameterQuery.ts | 2 +- packages/sync-rules/src/events/SqlEventSourceQuery.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/sync-rules/src/SqlParameterQuery.ts b/packages/sync-rules/src/SqlParameterQuery.ts index c6fccb822..65d78c8f8 100644 --- a/packages/sync-rules/src/SqlParameterQuery.ts +++ b/packages/sync-rules/src/SqlParameterQuery.ts @@ -308,7 +308,7 @@ export class SqlParameterQuery { */ evaluateParameterRow(row: SqliteRow): EvaluatedParametersResult[] { const tables = { - [this.table.sqlName]: row + [this.table.schemaName]: row }; try { const filterParameters = this.filter.filterRow(tables); diff --git a/packages/sync-rules/src/events/SqlEventSourceQuery.ts b/packages/sync-rules/src/events/SqlEventSourceQuery.ts index e2233b949..3e635fc97 100644 --- a/packages/sync-rules/src/events/SqlEventSourceQuery.ts +++ b/packages/sync-rules/src/events/SqlEventSourceQuery.ts @@ -136,7 +136,7 @@ export class SqlEventSourceQuery extends BaseSqlDataQuery { evaluateRowWithErrors(table: SourceTableInterface, row: SqliteRow): EvaluatedEventRowWithErrors { try { - const tables = { [this.table!.sqlName]: this.addSpecialParameters(table, row) }; + const tables = { [this.table!.schemaName]: this.addSpecialParameters(table, row) }; const data = this.transformRow(tables); return { From 31c296b0a0cdfa8e947d497060c21875b2128793 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 15 Oct 2025 18:21:45 +0200 Subject: [PATCH 4/4] Review feedback --- packages/sync-rules/src/BaseSqlDataQuery.ts | 6 ++--- packages/sync-rules/src/SqlDataQuery.ts | 6 ++--- packages/sync-rules/src/SqlParameterQuery.ts | 5 ++-- packages/sync-rules/src/TableQuerySchema.ts | 4 ++-- .../TableValuedFunctionSqlParameterQuery.ts | 2 +- .../src/events/SqlEventSourceQuery.ts | 6 ++--- packages/sync-rules/src/sql_filters.ts | 24 +++++++++++-------- packages/sync-rules/src/streams/from_sql.ts | 6 ++--- 8 files changed, 32 insertions(+), 27 deletions(-) diff --git a/packages/sync-rules/src/BaseSqlDataQuery.ts b/packages/sync-rules/src/BaseSqlDataQuery.ts index 1dd569f9f..b426cea03 100644 --- a/packages/sync-rules/src/BaseSqlDataQuery.ts +++ b/packages/sync-rules/src/BaseSqlDataQuery.ts @@ -181,7 +181,7 @@ export class BaseSqlDataQuery { try { const { table, row, bucketIds } = options; - const tables = { [this.table.schemaName]: this.addSpecialParameters(table, row) }; + const tables = { [this.table.nameInSchema]: this.addSpecialParameters(table, row) }; const resolvedBucketIds = bucketIds(tables); const data = this.transformRow(tables); @@ -221,7 +221,7 @@ export class BaseSqlDataQuery { protected getColumnOutputsFor(schemaTable: SourceSchemaTable, output: Record) { const querySchema: QuerySchema = { getColumn: (table, column) => { - if (table == this.table.schemaName) { + if (table == this.table.nameInSchema) { return schemaTable.getColumn(column); } else { // TODO: bucket parameters? @@ -229,7 +229,7 @@ export class BaseSqlDataQuery { } }, getColumns: (table) => { - if (table == this.table.schemaName) { + if (table == this.table.nameInSchema) { return schemaTable.getColumns(); } else { return []; diff --git a/packages/sync-rules/src/SqlDataQuery.ts b/packages/sync-rules/src/SqlDataQuery.ts index 8a0922133..30f0a2802 100644 --- a/packages/sync-rules/src/SqlDataQuery.ts +++ b/packages/sync-rules/src/SqlDataQuery.ts @@ -123,7 +123,7 @@ export class SqlDataQuery extends BaseSqlDataQuery { } else { extractors.push({ extract: (tables, output) => { - const row = tables[alias.schemaName]; + const row = tables[alias.nameInSchema]; for (let key in row) { if (key.startsWith('_')) { continue; @@ -132,7 +132,7 @@ export class SqlDataQuery extends BaseSqlDataQuery { } }, getTypes(schema, into) { - for (let column of schema.getColumns(alias.schemaName)) { + for (let column of schema.getColumns(alias.nameInSchema)) { into[column.name] ??= column; } } @@ -146,7 +146,7 @@ export class SqlDataQuery extends BaseSqlDataQuery { // Not performing schema-based validation - assume there is an id hasId = true; } else { - const idType = querySchema.getColumn(alias.schemaName, 'id')?.type ?? ExpressionType.NONE; + const idType = querySchema.getColumn(alias.nameInSchema, 'id')?.type ?? ExpressionType.NONE; if (!idType.isNone()) { hasId = true; } diff --git a/packages/sync-rules/src/SqlParameterQuery.ts b/packages/sync-rules/src/SqlParameterQuery.ts index 65d78c8f8..cfc465969 100644 --- a/packages/sync-rules/src/SqlParameterQuery.ts +++ b/packages/sync-rules/src/SqlParameterQuery.ts @@ -212,7 +212,8 @@ export class SqlParameterQuery { * The table name or alias, as referred to in the SQL query. * Not used directly outside the query. * - * Currently, this always matches sourceTable.name. + * Since aliases aren't allowed in parameter queries, this always matches sourceTable.name (checked by + * {@link fromSql}). */ readonly table: AvailableTable; @@ -308,7 +309,7 @@ export class SqlParameterQuery { */ evaluateParameterRow(row: SqliteRow): EvaluatedParametersResult[] { const tables = { - [this.table.schemaName]: row + [this.table.nameInSchema]: row }; try { const filterParameters = this.filter.filterRow(tables); diff --git a/packages/sync-rules/src/TableQuerySchema.ts b/packages/sync-rules/src/TableQuerySchema.ts index 910182298..05a42a199 100644 --- a/packages/sync-rules/src/TableQuerySchema.ts +++ b/packages/sync-rules/src/TableQuerySchema.ts @@ -13,7 +13,7 @@ export class TableQuerySchema implements QuerySchema { ) {} getColumn(table: string, column: string): ColumnDefinition | undefined { - if (table != this.alias.schemaName) { + if (table != this.alias.nameInSchema) { return undefined; } for (let table of this.tables) { @@ -26,7 +26,7 @@ export class TableQuerySchema implements QuerySchema { } getColumns(table: string): ColumnDefinition[] { - if (table != this.alias.schemaName) { + if (table != this.alias.nameInSchema) { return []; } let columns: Record = {}; diff --git a/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts b/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts index 2e49e336c..b4b4430f6 100644 --- a/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts +++ b/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts @@ -232,7 +232,7 @@ export class TableValuedFunctionSqlParameterQuery { const mergedParams: ParameterValueSet = { ...parameters, lookup: (table, column) => { - if (table == this.callTable.schemaName) { + if (table == this.callTable.nameInSchema) { return row[column]!; } else { return parameters.lookup(table, column); diff --git a/packages/sync-rules/src/events/SqlEventSourceQuery.ts b/packages/sync-rules/src/events/SqlEventSourceQuery.ts index 3e635fc97..3558eb6d3 100644 --- a/packages/sync-rules/src/events/SqlEventSourceQuery.ts +++ b/packages/sync-rules/src/events/SqlEventSourceQuery.ts @@ -99,7 +99,7 @@ export class SqlEventSourceQuery extends BaseSqlDataQuery { } else { extractors.push({ extract: (tables, output) => { - const row = tables[alias.schemaName]; + const row = tables[alias.nameInSchema]; for (let key in row) { if (key.startsWith('_')) { continue; @@ -108,7 +108,7 @@ export class SqlEventSourceQuery extends BaseSqlDataQuery { } }, getTypes(schema, into) { - for (let column of schema.getColumns(alias.schemaName)) { + for (let column of schema.getColumns(alias.nameInSchema)) { into[column.name] ??= column; } } @@ -136,7 +136,7 @@ export class SqlEventSourceQuery extends BaseSqlDataQuery { evaluateRowWithErrors(table: SourceTableInterface, row: SqliteRow): EvaluatedEventRowWithErrors { try { - const tables = { [this.table!.schemaName]: this.addSpecialParameters(table, row) }; + const tables = { [this.table!.nameInSchema]: this.addSpecialParameters(table, row) }; const data = this.transformRow(tables); return { diff --git a/packages/sync-rules/src/sql_filters.ts b/packages/sync-rules/src/sql_filters.ts index 6e6586101..dde2a73ea 100644 --- a/packages/sync-rules/src/sql_filters.ts +++ b/packages/sync-rules/src/sql_filters.ts @@ -46,6 +46,7 @@ import { } from './types.js'; import { isJsonValue } from './utils.js'; import { CompatibilityContext } from './compatibility.js'; +import { TablePattern } from './TablePattern.js'; export const MATCH_CONST_FALSE: TrueIfParametersMatch = []; export const MATCH_CONST_TRUE: TrueIfParametersMatch = [{}]; @@ -55,15 +56,18 @@ Object.freeze(MATCH_CONST_FALSE); /** * A table that has been made available to a result set by being included in a `FROM`. + * + * This is used to lookup references inside queries only, which is why this doesn't reference the schema name (that's + * covered by {@link TablePattern}). */ export class AvailableTable { /** * The name of the table in the schema. */ - schemaName: string; + nameInSchema: string; /** - * The alias under which the {@link schemaName} is made available to the current query. + * The alias under which the {@link nameInSchema} is made available to the current query. */ alias?: string; @@ -71,15 +75,15 @@ export class AvailableTable { * The name a table has in an SQL expression context. */ public get sqlName(): string { - return this.alias ?? this.schemaName; + return this.alias ?? this.nameInSchema; } get isAliased(): boolean { - return this.sqlName != this.schemaName; + return this.sqlName != this.nameInSchema; } constructor(schemaName: string, alias?: string) { - this.schemaName = schemaName; + this.nameInSchema = schemaName; this.alias = alias; } @@ -92,7 +96,7 @@ export class AvailableTable { } /** - * Finds the first table matching the given SQL name. + * Finds the first table matching the given name in SQL. */ static search( identifier: string | AvailableTable | undefined, @@ -268,10 +272,10 @@ export class SqlTools { this.checkRef(table, expr); return { evaluate(tables: QueryParameters): SqliteValue { - return tables[table.schemaName]?.[column]; + return tables[table.nameInSchema]?.[column]; }, getColumnDefinition(schema) { - return schema.getColumn(table.schemaName, column); + return schema.getColumn(table.nameInSchema, column); } } satisfies RowValueClause; } else { @@ -765,7 +769,7 @@ export class SqlTools { private checkRef(table: AvailableTable, ref: ExprRef) { if (this.schema) { - const type = this.schema.getColumn(table.schemaName, ref.name); + const type = this.schema.getColumn(table.nameInSchema, ref.name); if (type == null) { this.warn(`Column not found: ${ref.name}`, ref); } @@ -773,7 +777,7 @@ export class SqlTools { } private getParameterRefClause(expr: ExprRef): ParameterValueClause { - const table = AvailableTable.search(expr.table?.name ?? this.defaultTable!, this.parameterTables)!.schemaName; + const table = AvailableTable.search(expr.table?.name ?? this.defaultTable!, this.parameterTables)!.nameInSchema; const column = expr.name; return { key: `${table}.${column}`, diff --git a/packages/sync-rules/src/streams/from_sql.ts b/packages/sync-rules/src/streams/from_sql.ts index fa01a8864..648c377b9 100644 --- a/packages/sync-rules/src/streams/from_sql.ts +++ b/packages/sync-rules/src/streams/from_sql.ts @@ -143,7 +143,7 @@ class SyncStreamCompiler { } else { extractors.push({ extract: (tables, output) => { - const row = tables[alias.schemaName]; + const row = tables[alias.nameInSchema]; for (let key in row) { if (key.startsWith('_')) { continue; @@ -152,7 +152,7 @@ class SyncStreamCompiler { } }, getTypes(schema, into) { - for (let column of schema.getColumns(alias.schemaName)) { + for (let column of schema.getColumns(alias.nameInSchema)) { into[column.name] ??= column; } } @@ -166,7 +166,7 @@ class SyncStreamCompiler { // Not performing schema-based validation - assume there is an id hasId = true; } else { - const idType = querySchema.getColumn(alias.schemaName, 'id')?.type ?? ExpressionType.NONE; + const idType = querySchema.getColumn(alias.nameInSchema, 'id')?.type ?? ExpressionType.NONE; if (!idType.isNone()) { hasId = true; }