Skip to content

Commit b4480bd

Browse files
committed
fix(dialect-worker): fix stream issue
1 parent 417504c commit b4480bd

File tree

9 files changed

+159
-146
lines changed

9 files changed

+159
-146
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
"release": "pnpm i && bumpp package.json packages/dialect*/package.json packages/plugin*/package.json",
1818
"serve": "cd playground && pnpm run dev",
1919
"publish": "pnpm -r -F \"./packages/*\" publish --no-git-checks",
20-
"test": "vitest run --root ./test --passWithNoTests",
20+
"test": "pnpm run build && vitest run --root ./test --passWithNoTests",
2121
"test:dev": "vitest --root ./packages/sqlite-builder",
22+
"test:bun": "bun test test-bun",
2223
"typecheck": "tsc --noEmit",
2324
"format": "eslint . --fix",
2425
"lint": "eslint .",

packages/dialect-bun-worker/src/driver.ts

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -110,44 +110,44 @@ class BunWorkerConnection implements DatabaseConnection {
110110
async *streamQuery<R>(compiledQuery: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
111111
const { parameters, sql, query } = compiledQuery
112112
if (!SelectQueryNode.is(query)) {
113-
throw new Error('WaSqlite dialect only supported SELECT queries')
113+
throw new Error('bun:sqlite worker dialect only supported `SELECT` queries')
114114
}
115115
this.worker.postMessage([3, sql, parameters] satisfies MainToWorkerMsg)
116-
let resolver: ((value: IteratorResult<{ rows: QueryResult<R>[] }>) => void) | null = null
117-
let rejecter: ((reason: any) => void) | null = null
118-
119-
this.mitt!.on(3, (data, err) => {
120-
if (err && rejecter) {
121-
rejecter(err)
122-
}
123-
if (resolver) {
124-
resolver({ value: { rows: data! }, done: false })
125-
resolver = null
116+
let done = false
117+
let resolveFn: (value: IteratorResult<QueryResult<R>>) => void
118+
let rejectFn: (reason?: any) => void
119+
120+
const dataListener = (data: QueryResult<any>[] | null, err: unknown): void => {
121+
if (err) {
122+
rejectFn(err)
123+
} else {
124+
resolveFn({ value: { rows: data as any }, done: false })
126125
}
127-
})
126+
}
127+
this.mitt!.on(3/* data */, dataListener)
128128

129-
this.mitt!.on(4, (_, err) => {
130-
if (err && rejecter) {
131-
rejecter(err)
132-
}
133-
if (resolver) {
134-
resolver({ value: undefined, done: true })
129+
const endListener = (_: null, err: unknown): void => {
130+
if (err) {
131+
rejectFn(err)
132+
} else {
133+
resolveFn({ value: undefined, done: true })
135134
}
136-
})
135+
}
136+
this.mitt!.on(4/* end */, endListener)
137137

138-
return {
139-
[Symbol.asyncIterator]() {
140-
return this
141-
},
142-
async next() {
143-
return new Promise<IteratorResult<any>>((resolve, reject) => {
144-
resolver = resolve
145-
rejecter = reject
146-
})
147-
},
148-
async return() {
149-
return { value: undefined, done: true }
150-
},
138+
while (!done) {
139+
const result = await new Promise<IteratorResult<QueryResult<R>>>((res, rej) => {
140+
resolveFn = res
141+
rejectFn = rej
142+
})
143+
144+
if (result.done) {
145+
done = true
146+
this.mitt?.off(3/* data */, dataListener)
147+
this.mitt?.off(4/* end */, endListener)
148+
} else {
149+
yield result.value
150+
}
151151
}
152152
}
153153

packages/dialect-bun-worker/tsup.config.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { defineConfig } from 'tsup'
22

33
export default defineConfig({
4-
entry: [
5-
'src/index.ts',
6-
'src/worker.ts',
7-
],
4+
entry: {
5+
index: 'src/index.ts',
6+
worker: 'src/worker/index.ts',
7+
},
88
clean: true,
99
format: ['cjs', 'esm'],
1010
shims: true,

packages/dialect-sqlite-worker/src/driver.ts

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class ConnectionMutex {
9696
export class SqliteWorkerConnection implements DatabaseConnection {
9797
constructor(
9898
private worker: Worker,
99-
private emit?: EventEmitter,
99+
private mitt?: EventEmitter,
100100
) { }
101101

102102
async *streamQuery<R>(compiledQuery: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
@@ -105,52 +105,52 @@ export class SqliteWorkerConnection implements DatabaseConnection {
105105
throw new Error('Sqlite worker dialect only supported SELECT queries')
106106
}
107107
this.worker.postMessage(['2', sql, parameters] satisfies MainMsg)
108-
let resolver: ((value: IteratorResult<{ rows: QueryResult<R>[] }>) => void) | null = null
109-
let rejecter: ((reason: any) => void) | null = null
110-
111-
this.emit!.on('2', (data, err) => {
112-
if (err && rejecter) {
113-
rejecter(err)
114-
}
115-
if (resolver) {
116-
resolver({ value: { rows: data! }, done: false })
117-
resolver = null
108+
let done = false
109+
let resolveFn: (value: IteratorResult<QueryResult<R>>) => void
110+
let rejectFn: (reason?: any) => void
111+
112+
const dataListener = (data: QueryResult<any>[] | null, err: unknown): void => {
113+
if (err) {
114+
rejectFn(err)
115+
} else {
116+
resolveFn({ value: { rows: data as any }, done: false })
118117
}
119-
})
118+
}
119+
this.mitt!.on('2'/* data */, dataListener)
120120

121-
this.emit!.on('3', (_, err) => {
122-
if (err && rejecter) {
123-
rejecter(err)
124-
}
125-
if (resolver) {
126-
resolver({ value: undefined, done: true })
121+
const endListener = (_: null, err: unknown): void => {
122+
if (err) {
123+
rejectFn(err)
124+
} else {
125+
resolveFn({ value: undefined, done: true })
127126
}
128-
})
127+
}
128+
this.mitt!.on('3'/* end */, endListener)
129129

130-
return {
131-
[Symbol.asyncIterator]() {
132-
return this
133-
},
134-
async next() {
135-
return new Promise<IteratorResult<any>>((resolve, reject) => {
136-
resolver = resolve
137-
rejecter = reject
138-
})
139-
},
140-
async return() {
141-
return { value: undefined, done: true }
142-
},
130+
while (!done) {
131+
const result = await new Promise<IteratorResult<QueryResult<R>>>((res, rej) => {
132+
resolveFn = res
133+
rejectFn = rej
134+
})
135+
136+
if (result.done) {
137+
done = true
138+
this.mitt?.off('2'/* data */, dataListener)
139+
this.mitt?.off('3'/* end */, endListener)
140+
} else {
141+
yield result.value
142+
}
143143
}
144144
}
145145

146146
async executeQuery<R>(compiledQuery: CompiledQuery<unknown>): Promise<QueryResult<R>> {
147147
const { parameters, sql } = compiledQuery
148148
this.worker.postMessage(['0', sql, parameters] satisfies MainMsg)
149149
return new Promise((resolve, reject) => {
150-
if (!this.emit) {
150+
if (!this.mitt) {
151151
reject(new Error('kysely instance has been destroyed'))
152152
}
153-
this.emit!.once('0', (data: QueryResult<any>, err) => (data && !err) ? resolve(data) : reject(err))
153+
this.mitt!.once('0', (data: QueryResult<any>, err) => (data && !err) ? resolve(data) : reject(err))
154154
})
155155
}
156156
}

packages/dialect-sqlite-worker/tsup.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ export default defineConfig([
44
{
55
entry: {
66
index: 'src/index.ts',
7-
worker: 'src/worker.ts',
7+
worker: 'src/worker/index.ts',
88
},
99
clean: true,
1010
format: ['cjs', 'esm'],

packages/dialect-wasqlite-worker/src/driver.ts

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -122,47 +122,45 @@ class WaSqliteWorkerConnection implements DatabaseConnection {
122122
this.mitt = mitt
123123
}
124124

125-
streamQuery<R>(compiledQuery: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
125+
async *streamQuery<R>(compiledQuery: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
126126
const { parameters, sql, query } = compiledQuery
127127
if (!SelectQueryNode.is(query)) {
128128
throw new Error('WaSqlite dialect only supported SELECT queries')
129129
}
130130
this.worker.postMessage([3, sql, parameters] satisfies MainToWorkerMsg)
131-
let resolver: ((value: IteratorResult<{ rows: QueryResult<R>[] }>) => void) | null = null
132-
let rejecter: ((reason: any) => void) | null = null
133-
134-
this.mitt!.on(3, (data, err) => {
135-
if (err && rejecter) {
136-
rejecter(err)
137-
}
138-
if (resolver) {
139-
resolver({ value: { rows: data! }, done: false })
140-
resolver = null
131+
let done = false
132+
let resolveFn: (value: IteratorResult<QueryResult<R>>) => void
133+
let rejectFn: (reason?: any) => void
134+
135+
this.mitt!.on(3/* data */, (data, err): void => {
136+
if (err) {
137+
rejectFn(err)
138+
} else {
139+
resolveFn({ value: { rows: data as any }, done: false })
141140
}
142141
})
143142

144-
this.mitt!.on(4, (_, err) => {
145-
if (err && rejecter) {
146-
rejecter(err)
147-
}
148-
if (resolver) {
149-
resolver({ value: undefined, done: true })
143+
this.mitt!.on(4/* end */, (_, err): void => {
144+
if (err) {
145+
rejectFn(err)
146+
} else {
147+
resolveFn({ value: undefined, done: true })
150148
}
151149
})
152150

153-
return {
154-
[Symbol.asyncIterator]() {
155-
return this
156-
},
157-
async next() {
158-
return new Promise<IteratorResult<any>>((resolve, reject) => {
159-
resolver = resolve
160-
rejecter = reject
161-
})
162-
},
163-
async return() {
164-
return { value: undefined, done: true }
165-
},
151+
while (!done) {
152+
const result = await new Promise<IteratorResult<QueryResult<R>>>((res, rej) => {
153+
resolveFn = res
154+
rejectFn = rej
155+
})
156+
157+
if (result.done) {
158+
done = true
159+
this.mitt?.off(3/* data */)
160+
this.mitt?.off(4/* end */)
161+
} else {
162+
yield result.value
163+
}
166164
}
167165
}
168166

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// @ts-expect-error bun type
2+
import { describe, expect, it } from 'bun:test'
3+
import { BunWorkerDialect } from '../packages/dialect-bun-worker/src'
4+
import { testCase } from '../test/utils'
5+
6+
describe('bun worker dialect test', () => {
7+
it('test bun worker', async () => {
8+
const dialect = new BunWorkerDialect({
9+
url: ':memory:',
10+
})
11+
await testCase(dialect, expect)
12+
})
13+
})

test/dialect-sqlite-worker.test.ts

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,12 @@
1-
import type { Dialect, Generated } from 'kysely'
2-
import { Kysely } from 'kysely'
3-
import { afterAll, describe, expect, it } from 'vitest'
1+
import { describe, expect, it } from 'vitest'
42
import { SqliteWorkerDialect } from '../packages/dialect-sqlite-worker'
3+
import { testCase } from './utils'
54

6-
interface DB {
7-
test: TestTable
8-
}
9-
interface TestTable {
10-
id: Generated<number>
11-
name: string
12-
age: number
13-
int8: Uint8Array
14-
}
155
describe('sqlite worker dialect test', () => {
16-
let db: Kysely<DB>
17-
async function init(dialect: Dialect) {
18-
db = new Kysely<DB>({ dialect })
19-
await db.schema.createTable('test')
20-
.addColumn('id', 'integer', builder => builder.autoIncrement().primaryKey())
21-
.addColumn('name', 'text')
22-
.addColumn('age', 'integer')
23-
.addColumn('int8', 'blob')
24-
.execute()
25-
await db.insertInto('test')
26-
.values({
27-
age: 18,
28-
name: `test ${dialect.toString()}`,
29-
int8: new Uint8Array([1, 2, 3]),
30-
})
31-
.execute()
32-
const { age, name, int8 } = await db.selectFrom('test').selectAll().limit(1).executeTakeFirstOrThrow()
33-
expect(age).toStrictEqual(18)
34-
expect(name).toStrictEqual(`test ${dialect.toString()}`)
35-
expect(int8).toStrictEqual(Uint8Array.from([1, 2, 3]))
36-
const rows = db.selectFrom('test').selectAll().stream()
37-
for await (const row of rows) {
38-
expect(row.age).toStrictEqual(18)
39-
expect(row.name).toStrictEqual(`test ${dialect.toString()}`)
40-
expect(row.int8).toStrictEqual(Uint8Array.from([1, 2, 3]))
41-
}
42-
}
43-
afterAll(async () => {
44-
await db.destroy()
45-
})
46-
it('test', async () => {
6+
it('test sqlite worker', async () => {
477
const dialect = new SqliteWorkerDialect({
488
source: ':memory:',
499
})
50-
await init(dialect)
10+
await testCase(dialect, expect)
5111
})
5212
})

test/utils.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { type Dialect, type Generated, Kysely } from 'kysely'
2+
3+
interface DB {
4+
test: TestTable
5+
}
6+
interface TestTable {
7+
id: Generated<number>
8+
name: string
9+
age: number
10+
int8: Uint8Array
11+
}
12+
export async function testCase(dialect: Dialect, expect: any): Promise<void> {
13+
const db = new Kysely<DB>({ dialect })
14+
await db.schema.createTable('test')
15+
.addColumn('id', 'integer', builder => builder.autoIncrement().primaryKey())
16+
.addColumn('name', 'text')
17+
.addColumn('age', 'integer')
18+
.addColumn('int8', 'blob')
19+
.execute()
20+
await db.insertInto('test')
21+
.values({
22+
age: 18,
23+
name: `test ${dialect.toString()}`,
24+
int8: new Uint8Array([1, 2, 3]),
25+
})
26+
.execute()
27+
const { age, name, int8 } = await db.selectFrom('test').selectAll().limit(1).executeTakeFirstOrThrow()
28+
expect(age).toStrictEqual(18)
29+
expect(name).toStrictEqual(`test ${dialect.toString()}`)
30+
expect(int8).toStrictEqual(Uint8Array.from([1, 2, 3]))
31+
const rows = db.selectFrom('test').selectAll().stream()
32+
let count = 0
33+
for await (const row of rows) {
34+
count++
35+
expect(row.age).toStrictEqual(18)
36+
expect(row.name).toStrictEqual(`test ${dialect.toString()}`)
37+
expect(row.int8).toStrictEqual(Uint8Array.from([1, 2, 3]))
38+
}
39+
expect(count).toStrictEqual(1)
40+
await db.destroy()
41+
}

0 commit comments

Comments
 (0)