Skip to content

Commit

Permalink
Copy to and from stdout and stdin - fixes #170
Browse files Browse the repository at this point in the history
  • Loading branch information
porsager committed Aug 12, 2021
1 parent d55e8d5 commit 2cf1b8f
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 20 deletions.
15 changes: 9 additions & 6 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ function Backend({
parsers,
onauth,
onready,
oncopy,
transform,
onnotice,
onnotify
Expand Down Expand Up @@ -92,7 +93,9 @@ function Backend({
}

/* c8 ignore next 3 */
function CopyDone() { /* No handling needed */ }
function CopyDone() {
backend.query.readable.push(null)
}

function DataRow(x) {
let index = 7
Expand Down Expand Up @@ -127,21 +130,21 @@ function Backend({
}

/* c8 ignore next 3 */
function CopyData() { /* No handling needed until implemented */ }
function CopyData(x) {
backend.query.readable.push(x.slice(5, -1))
}

function ErrorResponse(x) {
onerror(errors.postgres(parseError(x)))
}

/* c8 ignore next 3 */
function CopyInResponse() {
backend.error = errors.notSupported('CopyInResponse')
oncopy()
}

/* c8 ignore next 3 */
function CopyOutResponse() {
backend.error = errors.notSupported('CopyOutResponse')
}
function CopyOutResponse() { /* No handling needed */ }

/* c8 ignore next 3 */
function EmptyQueryResponse() { /* No handling needed */ }
Expand Down
7 changes: 6 additions & 1 deletion lib/bytes.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const size = 256
let buffer = Buffer.allocUnsafe(size)

const messages = ['B', 'C', 'Q', 'P', 'F', 'p', 'D', 'E', 'H', 'S'].reduce((acc, x) => {
const messages = ['B', 'C', 'Q', 'P', 'F', 'p', 'D', 'E', 'H', 'S', 'd', 'c', 'f'].reduce((acc, x) => {
const v = x.charCodeAt(0)
acc[x] = () => {
buffer[0] = v
Expand Down Expand Up @@ -45,6 +45,11 @@ const b = Object.assign(messages, {
b.i += x
return b
},
raw(x) {
buffer = Buffer.concat([buffer.slice(0, b.i), x])
b.i = buffer.length
return b
},
end(at = 1) {
buffer.writeUInt32BE(b.i - at, at)
const out = buffer.slice(0, b.i)
Expand Down
10 changes: 10 additions & 0 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ function Connection(options = {}) {
onnotice,
onready,
onauth,
oncopy,
error
})

Expand Down Expand Up @@ -235,6 +236,15 @@ function Connection(options = {}) {
ready && ended && ended()
}

function oncopy() {
backend.query.writable.push = x => socket.write(
x === null
? frontend.CopyDone()
: frontend.CopyData(x)
)
backend.query.writable.forEach(backend.query.writable.push)
}

function multi() {
if (next)
return (next = false, true)
Expand Down
25 changes: 24 additions & 1 deletion lib/frontend.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ module.exports = {
Parse,
Query,
Close,
Execute
Execute,
CopyData,
CopyDone,
CopyFail
}

function StartupMessage({ user, database, connection }) {
Expand Down Expand Up @@ -147,6 +150,26 @@ function Query(x) {
.end()
}

function CopyData(x) {
return bytes
.d()
.raw(x)
.end()
}

function CopyDone() {
return bytes
.c()
.end()
}

function CopyFail(err) {
return bytes
.f()
.str(String(err) + N)
.end()
}

function Bind(name, args, rows = 0) {
let prev

Expand Down
61 changes: 55 additions & 6 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const fs = require('fs')
const Url = require('url')
const Stream = require('stream')
const Connection = require('./connection.js')
const Queue = require('./queue.js')
const { errors, PostgresError } = require('./errors.js')
Expand Down Expand Up @@ -146,7 +147,7 @@ function Postgres(a, b) {
})
.then(begin && (() => {
connections.push(connection)
next()
next(connection)
}))

function scoped(xs) {
Expand All @@ -158,7 +159,8 @@ function Postgres(a, b) {
let c
, x

while (queries.length && (c = getConnection(queries.peek().fn)) && (x = queries.shift())) {
while ((x = queries.peek()) && (c = x.query && x.query.connection || getConnection(queries.peek().fn)) && queries.shift()) {
x.query && x.query.connection && x.query.writable && (x.query.connection.blocked = true)
x.fn
? transaction(x, c)
: send(c, x.query, x.xs, x.args)
Expand Down Expand Up @@ -205,9 +207,11 @@ function Postgres(a, b) {
}

function send(connection, query, xs, args) {
connection
? process.nextTick(connection.send, query, query.tagged ? parseTagged(query, xs, args) : parseUnsafe(query, xs, args))
: queries.push({ query, xs, args })
connection && (query.connection = connection)
if (!connection || connection.blocked)
return queries.push({ query, xs, args, connection })

process.nextTick(connection.send, query, query.tagged ? parseTagged(query, xs, args) : parseUnsafe(query, xs, args))
}

function getConnection(reserve) {
Expand Down Expand Up @@ -325,9 +329,15 @@ function Postgres(a, b) {
}

function addMethods(promise, query) {
promise.readable = () => readable(promise, query)
promise.writable = () => writable(promise, query)
promise.raw = () => (query.raw = true, promise)
promise.stream = (fn) => (query.stream = fn, promise)
promise.cursor = (rows, fn) => {
promise.cursor = cursor(promise, query)
}

function cursor(promise, query) {
return (rows, fn) => {
if (typeof rows === 'function') {
fn = rows
rows = 1
Expand All @@ -339,6 +349,45 @@ function Postgres(a, b) {
}
}

function readable(promise, query) {
query.connection.blocked = true
promise.catch(err => query.readable.destroy(err)).then(() => {
query.connection.blocked = false
next()
})
return query.readable = new Stream.Readable({
read() { /* backpressure handling not possible */ }
})
}

function writable(promise, query) {
query.connection.blocked = true
let error
query.prepare = false
query.simple = true
query.writable = []
promise.catch(err => error = err).then(() => {
query.connection.blocked = false
next()
})
return new Stream.Writable({
write(chunk, encoding, callback) {
if (error)
return callback(error)

query.writable.push(chunk)
callback()
},
final(callback) {
if (error)
return callback(error)

query.writable.push(null)
promise.then(() => callback(), callback)
}
})
}

function listen(channel, fn) {
const listener = getListener()

Expand Down
2 changes: 2 additions & 0 deletions tests/copy.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
1 2 3
4 5 6
74 changes: 68 additions & 6 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -472,11 +472,6 @@ t('Connection destroyed with query before', async() => {
return ['CONNECTION_DESTROYED', await error]
})

t('Message not supported', async() => {
await sql`create table test (x int)`
return ['MESSAGE_NOT_SUPPORTED', await sql`copy test to stdout`.catch(x => x.code), await sql`drop table test`]
})

t('transform column', async() => {
const sql = postgres({
...options,
Expand Down Expand Up @@ -925,7 +920,11 @@ t('bytea serializes and parses', async() => {
await sql`create table test (x bytea)`
await sql`insert into test values (${ buf })`

return [0, Buffer.compare(buf, (await sql`select x from test`)[0].x)]
return [
0,
Buffer.compare(buf, (await sql`select x from test`)[0].x),
await sql`drop table test`
]
})

t('Stream works', async() => {
Expand Down Expand Up @@ -1337,3 +1336,66 @@ t('Raw method returns values unparsed as Buffer', async() => {
true
]
})

t('Copy read works', async() => {
const result = []

await sql`create table test (x int)`
await sql`insert into test select * from generate_series(1,10)`
const readable = sql`copy test to stdout`.readable()
readable.on('data', x => result.push(x))
readable.on('error', x => p('error', x))
await new Promise(r => readable.on('end', r))

return [
result.length,
10,
await sql`drop table test`
]
})

t('Copy write works', async() => {
await sql`create table test (x int)`
const writable = sql`copy test from stdin`.writable()

writable.write('1\n')
writable.write('1\n')
writable.end()

await new Promise(r => writable.on('finish', r))

return [
(await sql`select 1 from test`).length,
2,
await sql`drop table test`
]
})


t('Copy from file works', async() => {
await sql`create table test (x int, y int, z int)`
await new Promise(r => require('fs')
.createReadStream('copy.csv')
.pipe(sql`copy test from stdin`.writable())
.on('finish', r)
)

return [
JSON.stringify(await sql`select * from test`),
'[{"x":1,"y":2,"z":3},{"x":4,"y":5,"z":6}]',
await sql`drop table test`
]
})

t('Copy from works in transaction', async() => {
await sql`create table test(x int)`
const xs = await sql.begin(async sql => {
sql`copy test from stdin`.writable().end('1\n2')
return sql`select 1 from test`
})

return [
xs.length,
2
]
})

0 comments on commit 2cf1b8f

Please sign in to comment.