Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/old-buckets-lay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-sync-rules': patch
---

Fix compiling streams with multiple parameter matchers in a subquery.
59 changes: 52 additions & 7 deletions packages/sync-rules/src/streams/from_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,18 +407,63 @@ class SyncStreamCompiler {
tools.error('This subquery must return exactly one column', query);
}

const column = tools.compileRowValueExtractor(query.columns?.[0]?.expr);
if (isClauseError(column)) {
const columnOrError = tools.compileRowValueExtractor(query.columns?.[0]?.expr);
if (isClauseError(columnOrError)) {
return;
}
const column = columnOrError;

const where = tools.compileClause(query.where);
const where = this.whereClauseToFilters(tools, query.where);
const filter = where.toDisjunctiveNormalForm(tools);

function checkValidSubqueryFilter(
operator: FilterOperator
): CompareRowValueWithStreamParameter | EvaluateSimpleCondition | null {
if (operator instanceof CompareRowValueWithStreamParameter || operator instanceof EvaluateSimpleCondition) {
return operator;
}

tools.error('Unsupported condition for stream subqueries', operator.location ?? undefined);
return null;
}

function constructSubquery(filter: FilterOperator) {
if (filter instanceof Or) {
// Subqueries can't have variants, so the DNF must be a single conjunction.
if (filter.inner.length != 1) {
tools.error("Stream subqueries can't use OR filters", filter.location ?? undefined);
}

return constructSubquery(filter.inner[0]);
} else if (filter instanceof And) {
const first = checkValidSubqueryFilter(filter.inner[0]);
if (!first) {
return;
}
const subquery = new Subquery(sourceTable, column, first);
for (const rest of filter.inner.slice(1)) {
const checked = checkValidSubqueryFilter(rest);
if (checked) {
subquery.addFilter(checked);
}
}

return subquery;
} else {
const validated = checkValidSubqueryFilter(filter);
if (validated) {
return new Subquery(sourceTable, column, validated);
}
}
}

const compiledSubquery = constructSubquery(filter);
this.errors.push(...tools.errors);
return [
new Subquery(sourceTable, column, this.compiledClauseToFilter(tools, query.where?._location, where)),
tools
];

if (!compiledSubquery) {
return;
}
return [compiledSubquery, tools];
}

private checkValidSelectStatement(stmt: Statement) {
Expand Down
6 changes: 4 additions & 2 deletions packages/sync-rules/src/streams/variant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,10 @@ export class StreamVariant {
return [];
}

// This will be an array of values (i.e. a total evaluation) because there are no dynamic parameters.
return this.partiallyEvaluateParameters(params) as SqliteJsonValue[][];
return this.cartesianProductOfParameterInstantiations(
// This will be an array of values (i.e. a total evaluation) because there are no dynamic parameters.
this.partiallyEvaluateParameters(params) as SqliteJsonValue[][]
);
}

/**
Expand Down
116 changes: 116 additions & 0 deletions packages/sync-rules/test/src/streams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,33 @@ describe('streams', () => {
]);
});

test('OR in subquery', () => {
const [_, errors] = syncStreamFromSql(
's',
`select * from comments where issue_id in (select id from issues where owner_id = auth.user_id() or name = 'test')`,
options
);

expect(errors).toMatchObject([
expect.toBeSqlRuleError(`Stream subqueries can't use OR filters`, `owner_id = auth.user_id() or name = 'test'`)
]);
});

test('nested subqueries', () => {
const [_, errors] = syncStreamFromSql(
's',
`select * from comments where issue_id in (select id from issues where owner_id in (select id from users where is_admin))`,
options
);

expect(errors).toMatchObject([
expect.toBeSqlRuleError(
`Unsupported condition for stream subqueries`,
`owner_id in (select id from users where is_admin`
)
]);
});

test('subquery with two columns', () => {
const [_, errors] = syncStreamFromSql(
's',
Expand Down Expand Up @@ -719,6 +746,95 @@ describe('streams', () => {
stream.resolveResultSets(schema, outputSchema);
expect(Object.keys(outputSchema)).toStrictEqual(['outer']);
});

test('multiple matchers in subquery', async () => {
// https://discord.com/channels/1138230179878154300/1422138173907144724/1443338137660031117
const scene = new TestSourceTable('Scene');
const projectInvitation = new TestSourceTable('ProjectInvitation');
const schema = new StaticSchema([
{
tag: DEFAULT_TAG,
schemas: [
{
name: 'test_schema',
tables: [
{
name: 'Scene',
columns: [
{ name: '_id', pg_type: 'uuid' },
{ name: 'project', pg_type: 'uuid' }
]
},
{
name: 'ProjectInvitation',
columns: [
{ name: 'project', pg_type: 'uuid' },
{ name: 'appliedTo', pg_type: 'text' },
{ name: 'appliedTo', pg_type: 'text' },
{ name: 'status', pg_type: 'text' }
]
}
]
}
]
}
]);

const desc = parseStream(
`SELECT _id as id, *
FROM "Scene"
WHERE
project IN (
SELECT project
FROM "ProjectInvitation"
WHERE "appliedTo" != ''
AND (auth.parameters() ->> 'haystack_id') IN "appliedTo"
AND project = subscription.parameter('project')
AND "status" = 'CLAIMED'
)
`,
'stream',
{ ...options, schema }
);

expect(evaluateBucketIds(desc, scene, { _id: 'scene', project: 'foo' })).toStrictEqual(['1#stream|0["foo"]']);

expect(
desc.evaluateParameterRow(projectInvitation, {
project: 'foo',
appliedTo: '[1,2]',
status: 'CLAIMED'
})
).toStrictEqual([
{
lookup: ParameterLookup.normalized('stream', '0', [1n, 'foo']),
bucketParameters: [
{
result: 'foo'
}
]
},
{
lookup: ParameterLookup.normalized('stream', '0', [2n, 'foo']),
bucketParameters: [
{
result: 'foo'
}
]
}
]);

expect(
await queryBucketIds(desc, {
token: { sub: 'user1', haystack_id: 1 },
parameters: { project: 'foo' },
getParameterSets(lookups) {
expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', [1n, 'foo'])]);
return [{ result: 'foo' }];
}
})
).toStrictEqual(['1#stream|0["foo"]']);
});
});
});

Expand Down