Skip to content

Commit 3a6eded

Browse files
committed
feat(dialect-sqlite-worker): support stream
1 parent d88c1eb commit 3a6eded

File tree

7 files changed

+97
-43
lines changed

7 files changed

+97
-43
lines changed

packages/dialect-sqlite-worker/src/driver.ts

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Worker } from 'node:worker_threads'
22
import { join } from 'node:path'
33
import { EventEmitter } from 'node:events'
44
import type { DatabaseConnection, Driver, QueryResult } from 'kysely'
5-
import { CompiledQuery } from 'kysely'
5+
import { CompiledQuery, SelectQueryNode } from 'kysely'
66
import type { MainMsg, SqliteWorkerDialectConfig, WorkerMsg } from './type'
77

88
export class SqliteWorkerDriver implements Driver {
@@ -100,8 +100,48 @@ export class SqliteWorkerConnection implements DatabaseConnection {
100100
private emit?: EventEmitter,
101101
) { }
102102

103-
async *streamQuery<R>(): AsyncIterableIterator<QueryResult<R>> {
104-
throw new Error('sqlite-worker driver doesn\'t support streaming')
103+
async *streamQuery<R>(compiledQuery: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
104+
const { parameters, sql, query } = compiledQuery
105+
if (!SelectQueryNode.is(query)) {
106+
throw new Error('WaSqlite dialect only supported SELECT queries')
107+
}
108+
this.worker.postMessage(['2', sql, parameters] satisfies MainMsg)
109+
let resolver: ((value: IteratorResult<{ rows: QueryResult<R>[] }>) => void) | null = null
110+
let rejecter: ((reason: any) => void) | null = null
111+
112+
this.emit!.on('2', (data, err) => {
113+
if (err && rejecter) {
114+
rejecter(err)
115+
}
116+
if (resolver) {
117+
resolver({ value: { rows: data! }, done: false })
118+
resolver = null
119+
}
120+
})
121+
122+
this.emit!.on('3', (_, err) => {
123+
if (err && rejecter) {
124+
rejecter(err)
125+
}
126+
if (resolver) {
127+
resolver({ value: undefined, done: true })
128+
}
129+
})
130+
131+
return {
132+
[Symbol.asyncIterator]() {
133+
return this
134+
},
135+
async next() {
136+
return new Promise<IteratorResult<any>>((resolve, reject) => {
137+
resolver = resolve
138+
rejecter = reject
139+
})
140+
},
141+
async return() {
142+
return { value: undefined, done: true }
143+
},
144+
}
105145
}
106146

107147
async executeQuery<R>(compiledQuery: CompiledQuery<unknown>): Promise<QueryResult<R>> {

packages/dialect-sqlite-worker/src/type.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ type RunMsg = [
2727
]
2828

2929
type CloseMsg = [type: '1']
30+
type StreamMsg = [type: '2', sql: string, parameters?: readonly unknown[]]
3031

31-
export type MainMsg = RunMsg | CloseMsg
32+
export type MainMsg = RunMsg | CloseMsg | StreamMsg
3233

3334
export type WorkerMsg =
3435
| [
@@ -41,3 +42,13 @@ export type WorkerMsg =
4142
data: null,
4243
err: unknown,
4344
]
45+
| [
46+
type: '2',
47+
data: QueryResult<any>[] | null,
48+
err: unknown,
49+
]
50+
| [
51+
type: '3',
52+
data: null,
53+
err: unknown,
54+
]

packages/dialect-sqlite-worker/src/worker.ts

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,42 @@ if (!parentPort) {
99
const { src, option } = workerData
1010
const db = new Database(src, option)
1111

12-
parentPort.on('message', (msg: MainMsg) => {
12+
parentPort.on('message', ([msg, data1, data2]: MainMsg) => {
1313
const ret: WorkerMsg = [
14-
msg[0],
14+
msg,
1515
null,
1616
null,
1717
]
1818

1919
try {
20-
if (msg[0] === '1') {
21-
db.close()
22-
parentPort!.postMessage(ret)
23-
return
24-
}
25-
const stmt = db.prepare(msg[1])
26-
if (stmt.reader) {
27-
ret[1] = {
28-
rows: stmt.all(msg[2]),
20+
switch (msg) {
21+
case '0': {
22+
const stmt = db.prepare(data1)
23+
if (stmt.reader) {
24+
ret[1] = {
25+
rows: stmt.all(data2),
26+
}
27+
} else {
28+
const { changes, lastInsertRowid } = stmt.run(data2)
29+
ret[1] = {
30+
rows: [],
31+
numAffectedRows: BigInt(changes),
32+
insertId: BigInt(lastInsertRowid),
33+
}
34+
}
35+
break
2936
}
30-
} else {
31-
const { changes, lastInsertRowid } = stmt.run(msg[2])
32-
ret[1] = {
33-
rows: [],
34-
numAffectedRows: BigInt(changes),
35-
insertId: BigInt(lastInsertRowid),
37+
case '1':
38+
db.close()
39+
break
40+
case '2': {
41+
const stmt = db.prepare(data1)
42+
const iter = stmt.iterate(data2)
43+
for (const row of iter) {
44+
parentPort!.postMessage([msg, [row as any], null] satisfies WorkerMsg)
45+
}
46+
ret[0] = '3'
47+
break
3648
}
3749
}
3850
} catch (error) {

packages/dialect-wasqlite-worker/src/driver.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ export class WaSqliteWorkerDriver implements Driver {
3737
0,
3838
this.config.fileName,
3939
// if use OPFS, wasm should use sync version
40-
parseWorkerOrURL(this.config.url ?? defaultWasmURL, !useOPFS),
40+
parseWorkerOrURL(this.config.url ?? defaultWasmURL, !useOPFS) as string,
4141
useOPFS,
4242
] satisfies MainMsg)
4343
await new Promise<void>((resolve, reject) => {
@@ -123,12 +123,12 @@ class WaSqliteWorkerConnection implements DatabaseConnection {
123123
this.mitt = mitt
124124
}
125125

126-
streamQuery<R>(compiledQuery: CompiledQuery, chunkSize = 1): AsyncIterableIterator<QueryResult<R>> {
126+
streamQuery<R>(compiledQuery: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
127127
const { parameters, sql, query } = compiledQuery
128128
if (!SelectQueryNode.is(query)) {
129129
throw new Error('WaSqlite dialect only supported SELECT queries')
130130
}
131-
this.worker.postMessage([3, chunkSize, sql, parameters] satisfies MainMsg)
131+
this.worker.postMessage([3, sql, parameters] satisfies MainMsg)
132132
let resolver: ((value: IteratorResult<{ rows: QueryResult<R>[] }>) => void) | null = null
133133
let rejecter: ((reason: any) => void) | null = null
134134

packages/dialect-wasqlite-worker/src/type.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ type RunMsg = [
5555
]
5656
type StreamMsg = [
5757
type: 3,
58-
chunkSize: number,
5958
sql: string,
6059
parameters?: readonly unknown[],
6160
]

packages/dialect-wasqlite-worker/src/worker.ts

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,8 @@ async function exec(isSelect: boolean, sql: string, parameters?: readonly unknow
3030
async function stream(onData: (data: any) => void, sql: string, parameters?: readonly unknown[]): Promise<void> {
3131
await db.stream(onData, sql, parameters as any[])
3232
}
33-
onmessage = async ({ data }: MessageEvent<MainMsg>) => {
34-
const [msg, data1, data2, data3] = data
35-
const ret: WorkerMsg = [
36-
msg,
37-
null,
38-
null,
39-
]
33+
onmessage = async ({ data: [msg, data1, data2, data3] }: MessageEvent<MainMsg>) => {
34+
const ret: WorkerMsg = [msg, null, null]
4035
try {
4136
switch (msg) {
4237
case 0:
@@ -48,19 +43,10 @@ onmessage = async ({ data }: MessageEvent<MainMsg>) => {
4843
case 2:
4944
await db.close()
5045
break
51-
case 3: {
52-
let result: any[] = []
53-
await stream((val) => {
54-
if (result.length < data1) {
55-
result.push(val)
56-
} else {
57-
postMessage([3, result, null] satisfies WorkerMsg)
58-
result = []
59-
}
60-
}, data2, data3)
46+
case 3:
47+
await stream(val => postMessage([3, [val], null] satisfies WorkerMsg), data1, data2)
6148
ret[0] = 4
6249
break
63-
}
6450
}
6551
} catch (error) {
6652
ret[2] = error

test/dialect-sqlite-worker.test.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ describe('sqlite worker dialect test', () => {
3333
expect(age).toStrictEqual(18)
3434
expect(name).toStrictEqual(`test ${dialect.toString()}`)
3535
expect(int8).toStrictEqual(Uint8Array.from([1, 2, 3]))
36+
const rows = db.selectFrom('test').selectAll().stream()
37+
for await (const row of rows) {
38+
expect(row.age).toStrictEqual(18)
39+
expect(row.name).toStrictEqual(`test ${dialect.toString()}`)
40+
expect(row.int8).toStrictEqual(Uint8Array.from([1, 2, 3]))
41+
}
3642
}
3743
afterAll(async () => {
3844
await db.destroy()

0 commit comments

Comments
 (0)