Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/hot-socks-love.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/drizzle-driver': minor
'@powersync/node': minor
---

Add support for concurrent read queries with Drizzle.
4 changes: 2 additions & 2 deletions packages/drizzle-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@
"@powersync/web": "workspace:*",
"@journeyapps/wa-sqlite": "^1.3.2",
"@types/node": "^20.17.6",
"drizzle-orm": "^0.35.2",
"drizzle-orm": "^0.44.7",
"vite": "^6.1.0",
"vite-plugin-top-level-await": "^1.4.4",
"vite-plugin-wasm": "^3.3.0"
}
}
}
25 changes: 17 additions & 8 deletions packages/drizzle-driver/src/sqlite/PowerSyncSQLiteBaseSession.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { LockContext, QueryResult } from '@powersync/common';
import type { QueryResult } from '@powersync/common';
import type { WithCacheConfig } from 'drizzle-orm/cache/core/types';
import { entityKind } from 'drizzle-orm/entity';
import type { Logger } from 'drizzle-orm/logger';
import { NoopLogger } from 'drizzle-orm/logger';
Expand All @@ -7,13 +8,13 @@ import { type Query } from 'drizzle-orm/sql/sql';
import type { SQLiteAsyncDialect } from 'drizzle-orm/sqlite-core/dialect';
import type { SelectedFieldsOrdered } from 'drizzle-orm/sqlite-core/query-builders/select.types';
import {
type PreparedQueryConfig as PreparedQueryConfigBase,
type SQLiteExecuteMethod,
SQLiteSession,
SQLiteTransaction,
type PreparedQueryConfig as PreparedQueryConfigBase,
type SQLiteExecuteMethod,
type SQLiteTransactionConfig
} from 'drizzle-orm/sqlite-core/session';
import { PowerSyncSQLitePreparedQuery } from './PowerSyncSQLitePreparedQuery.js';
import { PowerSyncSQLitePreparedQuery, type ContextProvider } from './PowerSyncSQLitePreparedQuery.js';

export interface PowerSyncSQLiteSessionOptions {
logger?: Logger;
Expand All @@ -39,7 +40,7 @@ export class PowerSyncSQLiteBaseSession<
protected logger: Logger;

constructor(
protected db: LockContext,
protected contextProvider: ContextProvider,
protected dialect: SQLiteAsyncDialect,
protected schema: RelationalSchemaConfig<TSchema> | undefined,
protected options: PowerSyncSQLiteSessionOptions = {}
Expand All @@ -53,16 +54,24 @@ export class PowerSyncSQLiteBaseSession<
fields: SelectedFieldsOrdered | undefined,
executeMethod: SQLiteExecuteMethod,
isResponseInArrayMode: boolean,
customResultMapper?: (rows: unknown[][], mapColumnValue?: (value: unknown) => unknown) => unknown
customResultMapper?: (rows: unknown[][], mapColumnValue?: (value: unknown) => unknown) => unknown,
queryMetadata?: {
type: 'select' | 'update' | 'delete' | 'insert';
tables: string[];
},
cacheConfig?: WithCacheConfig
): PowerSyncSQLitePreparedQuery<T> {
return new PowerSyncSQLitePreparedQuery(
this.db,
this.contextProvider,
query,
this.logger,
fields,
executeMethod,
isResponseInArrayMode,
customResultMapper
customResultMapper,
undefined, // cache not supported yet
queryMetadata,
cacheConfig
);
}

Expand Down
41 changes: 39 additions & 2 deletions packages/drizzle-driver/src/sqlite/PowerSyncSQLiteDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ import {
createTableRelationsHelpers,
extractTablesRelationalConfig,
ExtractTablesWithRelations,
TableRelationalConfig,
type RelationalSchemaConfig,
type TablesRelationalConfig
} from 'drizzle-orm/relations';
import { SQLiteTransaction } from 'drizzle-orm/sqlite-core';
import { SQLiteSession, SQLiteTable, SQLiteTransaction } from 'drizzle-orm/sqlite-core';
import { BaseSQLiteDatabase } from 'drizzle-orm/sqlite-core/db';
import { SQLiteAsyncDialect } from 'drizzle-orm/sqlite-core/dialect';
import { RelationalQueryBuilder } from 'drizzle-orm/sqlite-core/query-builders/query';
import type { DrizzleConfig } from 'drizzle-orm/utils';
import { toCompilableQuery } from './../utils/compilableQuery.js';
import { PowerSyncSQLiteTransactionConfig } from './PowerSyncSQLiteBaseSession.js';
import { PowerSyncSQLiteBaseSession, PowerSyncSQLiteTransactionConfig } from './PowerSyncSQLiteBaseSession.js';
import { PowerSyncSQLiteSession } from './PowerSyncSQLiteSession.js';

export type DrizzleQuery<T> = { toSQL(): Query; execute(): Promise<T | T[]> };
Expand Down Expand Up @@ -54,6 +56,41 @@ export class PowerSyncSQLiteDatabase<

super('async', dialect, session as any, schema as any);
this.db = db;

/**
* A hack in order to use read locks for `db.query.users.findMany()` etc queries.
* We don't currently get queryMetadata for these queries, so we can't use the regular session.
* This session always uses read locks.
*/
const querySession = new PowerSyncSQLiteBaseSession(
{
useReadContext: (callback) => db.readLock(callback),
useWriteContext: (callback) => db.readLock(callback)
},
dialect,
schema,
{
logger
}
);
if (this._.schema) {
// https://github.com/drizzle-team/drizzle-orm/blob/ad4ddd444d066b339ffd5765cb6ec3bf49380189/drizzle-orm/src/sqlite-core/db.ts#L72
const query = this.query as {
[K in keyof TSchema]: RelationalQueryBuilder<'async', any, any, any>;
};
for (const [tableName, columns] of Object.entries(this._.schema)) {
query[tableName as keyof TSchema] = new RelationalQueryBuilder(
'async',
schema!.fullSchema,
this._.schema,
this._.tableNamesMap,
schema!.fullSchema[tableName] as SQLiteTable,
columns as TableRelationalConfig,
dialect,
querySession as SQLiteSession<'async', any, any, any>
);
}
}
}

transaction<T>(
Expand Down
71 changes: 57 additions & 14 deletions packages/drizzle-driver/src/sqlite/PowerSyncSQLitePreparedQuery.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
import { LockContext, QueryResult } from '@powersync/common';
import { Column, DriverValueDecoder, getTableName, SQL } from 'drizzle-orm';
import type { LockContext, QueryResult } from '@powersync/common';
import { Column, DriverValueDecoder, SQL, getTableName } from 'drizzle-orm';
import type { Cache } from 'drizzle-orm/cache/core';
import type { WithCacheConfig } from 'drizzle-orm/cache/core/types';
import { entityKind, is } from 'drizzle-orm/entity';
import type { Logger } from 'drizzle-orm/logger';
import { fillPlaceholders, type Query } from 'drizzle-orm/sql/sql';
import { SQLiteColumn } from 'drizzle-orm/sqlite-core';
import type { SelectedFieldsOrdered } from 'drizzle-orm/sqlite-core/query-builders/select.types';
import {
SQLitePreparedQuery,
type PreparedQueryConfig as PreparedQueryConfigBase,
type SQLiteExecuteMethod,
SQLitePreparedQuery
type SQLiteExecuteMethod
} from 'drizzle-orm/sqlite-core/session';

type PreparedQueryConfig = Omit<PreparedQueryConfigBase, 'statement' | 'run'>;

/**
* Callback which uses a LockContext for database operations.
*/
export type LockCallback<T> = (ctx: LockContext) => Promise<T>;

/**
* Provider for specific database contexts.
* Handlers are provided a context to the provided callback.
* This does not necessarily need to acquire a database lock for each call.
* Calls might use the same lock context for multiple operations.
* The read/write context may relate to a single read OR write context.
*/
export type ContextProvider = {
useReadContext: <T>(fn: LockCallback<T>) => Promise<T>;
useWriteContext: <T>(fn: LockCallback<T>) => Promise<T>;
};

export class PowerSyncSQLitePreparedQuery<
T extends PreparedQueryConfig = PreparedQueryConfig
> extends SQLitePreparedQuery<{
Expand All @@ -25,36 +44,48 @@ export class PowerSyncSQLitePreparedQuery<
}> {
static readonly [entityKind]: string = 'PowerSyncSQLitePreparedQuery';

private readOnly = false;

constructor(
private db: LockContext,
private contextProvider: ContextProvider,
query: Query,
private logger: Logger,
private fields: SelectedFieldsOrdered | undefined,
executeMethod: SQLiteExecuteMethod,
private _isResponseInArrayMode: boolean,
private customResultMapper?: (rows: unknown[][]) => unknown
private customResultMapper?: (rows: unknown[][]) => unknown,
cache?: Cache | undefined,
queryMetadata?:
| {
type: 'select' | 'update' | 'delete' | 'insert';
tables: string[];
}
| undefined,
cacheConfig?: WithCacheConfig | undefined
) {
super('async', executeMethod, query);
super('async', executeMethod, query, cache, queryMetadata, cacheConfig);
this.readOnly = queryMetadata?.type == 'select';
}

async run(placeholderValues?: Record<string, unknown>): Promise<QueryResult> {
const params = fillPlaceholders(this.query.params, placeholderValues ?? {});
this.logger.logQuery(this.query.sql, params);
const rs = await this.db.execute(this.query.sql, params);
return rs;
return this.useContext(async (ctx) => {
return await ctx.execute(this.query.sql, params);
});
}

async all(placeholderValues?: Record<string, unknown>): Promise<T['all']> {
const { fields, query, logger, customResultMapper } = this;
if (!fields && !customResultMapper) {
const params = fillPlaceholders(query.params, placeholderValues ?? {});
logger.logQuery(query.sql, params);
const rs = await this.db.execute(this.query.sql, params);
return rs.rows?._array ?? [];
return await this.useContext(async (ctx) => {
return await ctx.getAll(this.query.sql, params);
});
}

const rows = (await this.values(placeholderValues)) as unknown[][];

if (customResultMapper) {
const mapped = customResultMapper(rows) as T['all'];
return mapped;
Expand All @@ -69,7 +100,9 @@ export class PowerSyncSQLitePreparedQuery<
const { fields, customResultMapper } = this;
const joinsNotNullableMap = (this as any).joinsNotNullableMap;
if (!fields && !customResultMapper) {
return this.db.get(this.query.sql, params);
return this.useContext(async (ctx) => {
return await ctx.get(this.query.sql, params);
});
}

const rows = (await this.values(placeholderValues)) as unknown[][];
Expand All @@ -90,12 +123,22 @@ export class PowerSyncSQLitePreparedQuery<
const params = fillPlaceholders(this.query.params, placeholderValues ?? {});
this.logger.logQuery(this.query.sql, params);

return await this.db.executeRaw(this.query.sql, params);
return await this.useContext(async (ctx) => {
return await ctx.executeRaw(this.query.sql, params);
});
}

isResponseInArrayMode(): boolean {
return this._isResponseInArrayMode;
}

protected useContext<T>(callback: LockCallback<T>): Promise<T> {
if (this.readOnly) {
return this.contextProvider.useReadContext(callback);
} else {
return this.contextProvider.useWriteContext(callback);
}
}
}

/**
Expand Down
26 changes: 22 additions & 4 deletions packages/drizzle-driver/src/sqlite/PowerSyncSQLiteSession.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AbstractPowerSyncDatabase, DBAdapter } from '@powersync/common';
import { AbstractPowerSyncDatabase, LockContext } from '@powersync/common';
import { entityKind } from 'drizzle-orm/entity';
import type { RelationalSchemaConfig, TablesRelationalConfig } from 'drizzle-orm/relations';
import type { SQLiteAsyncDialect } from 'drizzle-orm/sqlite-core/dialect';
Expand All @@ -21,7 +21,16 @@ export class PowerSyncSQLiteSession<
schema: RelationalSchemaConfig<TSchema> | undefined,
options: PowerSyncSQLiteSessionOptions = {}
) {
super(db, dialect, schema, options);
super(
// Top level operations use the respective locks.
{
useReadContext: (callback) => db.readLock(callback),
useWriteContext: (callback) => db.writeLock(callback)
},
dialect,
schema,
options
);
this.client = db;
}

Expand All @@ -39,14 +48,23 @@ export class PowerSyncSQLiteSession<
}

protected async internalTransaction<T>(
connection: DBAdapter,
connection: LockContext,
fn: (tx: PowerSyncSQLiteTransaction<TFullSchema, TSchema>) => T,
config: PowerSyncSQLiteTransactionConfig = {}
): Promise<T> {
const tx = new PowerSyncSQLiteTransaction<TFullSchema, TSchema>(
'async',
(this as any).dialect,
new PowerSyncSQLiteBaseSession(connection, this.dialect, this.schema, this.options),
new PowerSyncSQLiteBaseSession(
{
// We already have a fixed context here. We need to use it for both "read" and "write" operations.
useReadContext: (callback) => callback(connection),
useWriteContext: (callback) => callback(connection)
},
this.dialect,
this.schema,
this.options
),
this.schema
);

Expand Down
Loading
Loading