Skip to content

Commit

Permalink
Copy (#211)
Browse files Browse the repository at this point in the history
* Copy to and from stdout and stdin - fixes #170

* Should not discard last byte

* Fix copy when queued

* Handle backpressure for readable and writable

* Fix abort on .writable()
  • Loading branch information
porsager committed Aug 13, 2021
1 parent 8b405b3 commit 8e675a3
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 26 deletions.
18 changes: 11 additions & 7 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ function Backend({
parsers,
onauth,
onready,
oncopy,
ondata,
transform,
onnotice,
onnotify
Expand Down Expand Up @@ -92,7 +94,9 @@ function Backend({
}

/* c8 ignore next 3 */
function CopyDone() { /* No handling needed */ }
function CopyDone() {
backend.query.readable.push(null)
}

function DataRow(x) {
let index = 7
Expand Down Expand Up @@ -127,21 +131,21 @@ function Backend({
}

/* c8 ignore next 3 */
function CopyData() { /* No handling needed until implemented */ }
function CopyData(x) {
ondata(x.slice(5))
}

function ErrorResponse(x) {
onerror(errors.postgres(parseError(x)))
}

/* c8 ignore next 3 */
function CopyInResponse() {
backend.error = errors.notSupported('CopyInResponse')
oncopy()
}

/* c8 ignore next 3 */
function CopyOutResponse() {
backend.error = errors.notSupported('CopyOutResponse')
}
function CopyOutResponse() { /* No handling needed */ }

/* c8 ignore next 3 */
function EmptyQueryResponse() { /* No handling needed */ }
Expand Down Expand Up @@ -227,7 +231,7 @@ function Backend({

/* c8 ignore next 3 */
function CopyBothResponse() {
backend.error = errors.notSupported('CopyBothResponse')
oncopy()
}

function ReadyForQuery() {
Expand Down
7 changes: 6 additions & 1 deletion lib/bytes.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const size = 256
let buffer = Buffer.allocUnsafe(size)

const messages = ['B', 'C', 'Q', 'P', 'F', 'p', 'D', 'E', 'H', 'S'].reduce((acc, x) => {
const messages = ['B', 'C', 'Q', 'P', 'F', 'p', 'D', 'E', 'H', 'S', 'd', 'c', 'f'].reduce((acc, x) => {
const v = x.charCodeAt(0)
acc[x] = () => {
buffer[0] = v
Expand Down Expand Up @@ -45,6 +45,11 @@ const b = Object.assign(messages, {
b.i += x
return b
},
raw(x) {
buffer = Buffer.concat([buffer.slice(0, b.i), x])
b.i = buffer.length
return b
},
end(at = 1) {
buffer.writeUInt32BE(b.i - at, at)
const out = buffer.slice(0, b.i)
Expand Down
32 changes: 27 additions & 5 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ function Connection(options = {}) {
const queries = Queue()
, id = count++
, uid = Math.random().toString(36).slice(2)
, connection = { send, end, destroy }

const socket = postgresSocket(options, {
ready,
Expand All @@ -47,6 +46,8 @@ function Connection(options = {}) {
cleanup
})

const connection = { send, end, destroy, socket }

const backend = Backend({
onparse,
onparameter,
Expand All @@ -59,6 +60,8 @@ function Connection(options = {}) {
onnotice,
onready,
onauth,
oncopy,
ondata,
error
})

Expand Down Expand Up @@ -235,6 +238,21 @@ function Connection(options = {}) {
ready && ended && ended()
}

function oncopy() {
backend.query.writable.push = ({ chunk, error, callback }) => {
error
? socket.write(frontend.CopyFail(error))
: chunk === null
? socket.write(frontend.CopyDone())
: socket.write(frontend.CopyData(chunk), callback)
}
backend.query.writable.forEach(backend.query.writable.push)
}

function ondata(x) {
!backend.query.readable.push(x) && socket.pause()
}

function multi() {
if (next)
return (next = false, true)
Expand Down Expand Up @@ -378,11 +396,15 @@ function postgresSocket(options, {
succeeded = true
i >= options.host.length && (i = 0)
},
write: x => {
pause: () => socket.pause(),
resume: () => socket.resume(),
isPaused: () => socket.isPaused(),
write: (x, callback) => {
buffer = buffer ? Buffer.concat([buffer, x]) : Buffer.from(x)
if (buffer.length >= 1024)
return write()
return write(callback)
next === null && (next = setImmediate(write))
callback && callback()
},
destroy: () => {
socket && socket.destroy()
Expand All @@ -395,8 +417,8 @@ function postgresSocket(options, {
connect
}

function write() {
socket.write(buffer)
function write(callback) {
socket.write(buffer, callback)
next !== null && clearImmediate(next)
buffer = next = null
}
Expand Down
25 changes: 24 additions & 1 deletion lib/frontend.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ module.exports = {
Parse,
Query,
Close,
Execute
Execute,
CopyData,
CopyDone,
CopyFail
}

function StartupMessage({ user, database, connection }) {
Expand Down Expand Up @@ -147,6 +150,26 @@ function Query(x) {
.end()
}

function CopyData(x) {
return bytes
.d()
.raw(x)
.end()
}

function CopyDone() {
return bytes
.c()
.end()
}

function CopyFail(err) {
return bytes
.f()
.str(String(err) + N)
.end()
}

function Bind(name, args, rows = 0) {
let prev

Expand Down
70 changes: 64 additions & 6 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const fs = require('fs')
const Url = require('url')
const Stream = require('stream')
const Connection = require('./connection.js')
const Queue = require('./queue.js')
const { errors, PostgresError } = require('./errors.js')
Expand Down Expand Up @@ -146,7 +147,7 @@ function Postgres(a, b) {
})
.then(begin && (() => {
connections.push(connection)
next()
next(connection)
}))

function scoped(xs) {
Expand All @@ -158,7 +159,8 @@ function Postgres(a, b) {
let c
, x

while (queries.length && (c = getConnection(queries.peek().fn)) && (x = queries.shift())) {
while ((x = queries.peek()) && (c = x.query && x.query.connection || getConnection(queries.peek().fn)) && queries.shift()) {
x.query && x.query.connection && x.query.writable && (c.blocked = true)
x.fn
? transaction(x, c)
: send(c, x.query, x.xs, x.args)
Expand Down Expand Up @@ -205,9 +207,12 @@ function Postgres(a, b) {
}

function send(connection, query, xs, args) {
connection
? process.nextTick(connection.send, query, query.tagged ? parseTagged(query, xs, args) : parseUnsafe(query, xs, args))
: queries.push({ query, xs, args })
connection && (query.connection = connection)
if (!connection || connection.blocked)
return queries.push({ query, xs, args, connection })

connection.blocked = query.blocked
process.nextTick(connection.send, query, query.tagged ? parseTagged(query, xs, args) : parseUnsafe(query, xs, args))
}

function getConnection(reserve) {
Expand Down Expand Up @@ -325,9 +330,15 @@ function Postgres(a, b) {
}

function addMethods(promise, query) {
promise.readable = () => readable(promise, query)
promise.writable = () => writable(promise, query)
promise.raw = () => (query.raw = true, promise)
promise.stream = (fn) => (query.stream = fn, promise)
promise.cursor = (rows, fn) => {
promise.cursor = cursor(promise, query)
}

function cursor(promise, query) {
return (rows, fn) => {
if (typeof rows === 'function') {
fn = rows
rows = 1
Expand All @@ -339,6 +350,53 @@ function Postgres(a, b) {
}
}

function readable(promise, query) {
query.connection
? query.connection.blocked = true
: query.blocked = true

const read = () => query.connection.socket.isPaused() && query.connection.socket.resume()
promise.catch(err => query.readable.destroy(err)).then(() => {
query.connection.blocked = false
read()
next()
})
return query.readable = new Stream.Readable({ read })
}

function writable(promise, query) {
query.connection
? query.connection.blocked = true
: query.blocked = true
let error
query.prepare = false
query.simple = true
query.writable = []
promise.catch(err => error = err).then(() => {
query.connection.blocked = false
next()
})
return query.readable = new Stream.Duplex({
read() { /* backpressure handling not possible */ },
write(chunk, encoding, callback) {
error
? callback(error)
: query.writable.push({ chunk, callback })
},
destroy(error, callback) {
query.writable.push({ error })
callback(error)
},
final(callback) {
if (error)
return callback(error)

query.writable.push({ chunk: null })
promise.then(() => callback(), callback)
}
})
}

function listen(channel, fn) {
const listener = getListener()

Expand Down
2 changes: 2 additions & 0 deletions tests/copy.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
1 2 3
4 5 6

0 comments on commit 8e675a3

Please sign in to comment.