Skip to content

Commit

Permalink
Support transform in subscribe messages
Browse files Browse the repository at this point in the history
  • Loading branch information
porsager committed Sep 30, 2022
1 parent 2fd6edd commit 4e28de9
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 24 deletions.
52 changes: 30 additions & 22 deletions src/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export default function Subscribe(postgres, options) {

function data(x) {
if (x[0] === 0x77)
parse(x.subarray(25), state, sql.options.parsers, handle)
parse(x.subarray(25), state, sql.options.parsers, handle, options.transform)
else if (x[0] === 0x6b && x[17])
pong()
}
Expand Down Expand Up @@ -137,15 +137,15 @@ function Time(x) {
return new Date(Date.UTC(2000, 0, 1) + Number(x / BigInt(1000)))
}

function parse(x, state, parsers, handle) {
function parse(x, state, parsers, handle, transform) {
const char = (acc, [k, v]) => (acc[k.charCodeAt(0)] = v, acc)

Object.entries({
R: x => { // Relation
let i = 1
const r = state[x.readUInt32BE(i)] = {
schema: String(x.subarray(i += 4, i = x.indexOf(0, i))) || 'pg_catalog',
table: String(x.subarray(i + 1, i = x.indexOf(0, i + 1))),
schema: x.toString('utf8', i += 4, i = x.indexOf(0, i)) || 'pg_catalog',
table: x.toString('utf8', i + 1, i = x.indexOf(0, i + 1)),
columns: Array(x.readUInt16BE(i += 2)),
keys: []
}
Expand All @@ -157,7 +157,9 @@ function parse(x, state, parsers, handle) {
while (i < x.length) {
column = r.columns[columnIndex++] = {
key: x[i++],
name: String(x.subarray(i, i = x.indexOf(0, i))),
name: transform.column.from
? transform.column.from(x.toString('utf8', i, i = x.indexOf(0, i)))
: x.toString('utf8', i, i = x.indexOf(0, i)),
type: x.readUInt32BE(i += 1),
parser: parsers[x.readUInt32BE(i)],
atttypmod: x.readUInt32BE(i += 4)
Expand All @@ -176,8 +178,7 @@ function parse(x, state, parsers, handle) {
I: x => { // Insert
let i = 1
const relation = state[x.readUInt32BE(i)]
const row = {}
tuples(x, row, relation.columns, i += 7)
const { row } = tuples(x, relation.columns, i += 7, transform)

handle(row, {
command: 'insert',
Expand All @@ -189,13 +190,10 @@ function parse(x, state, parsers, handle) {
const relation = state[x.readUInt32BE(i)]
i += 4
const key = x[i] === 75
const row = key || x[i] === 79
? {}
handle(key || x[i] === 79
? tuples(x, key ? relation.keys : relation.columns, i += 3, transform).row
: null

tuples(x, row, key ? relation.keys : relation.columns, i += 3)

handle(row, {
, {
command: 'delete',
relation,
key
Expand All @@ -206,35 +204,36 @@ function parse(x, state, parsers, handle) {
const relation = state[x.readUInt32BE(i)]
i += 4
const key = x[i] === 75
const old = key || x[i] === 79
? {}
const xs = key || x[i] === 79
? tuples(x, key ? relation.keys : relation.columns, i += 3, transform)
: null

old && (i = tuples(x, old, key ? relation.keys : relation.columns, i += 3))
xs && (i = xs.i)

const row = {}
tuples(x, row, relation.columns, i + 3)
const { row } = tuples(x, relation.columns, i + 3, transform)

handle(row, {
command: 'update',
relation,
key,
old
old: xs && xs.row
})
},
T: () => { /* noop */ }, // Truncate,
C: () => { /* noop */ } // Commit
}).reduce(char, {})[x[0]](x)
}

function tuples(x, row, columns, xi) {
function tuples(x, columns, xi, transform) {
let type
, column
, value

const row = transform.raw ? new Array(columns.length) : {}
for (let i = 0; i < columns.length; i++) {
type = x[xi++]
column = columns[i]
row[column.name] = type === 110 // n
value = type === 110 // n
? null
: type === 117 // u
? undefined
Expand All @@ -243,9 +242,18 @@ function tuples(x, row, columns, xi) {
: column.parser.array === true
? column.parser(x.toString('utf8', xi + 5, xi += 4 + x.readUInt32BE(xi)))
: column.parser(x.toString('utf8', xi + 4, xi += 4 + x.readUInt32BE(xi)))

transform.raw
? (row[i] = transform.raw === true
? value
: transform.value.from ? transform.value.from(value, column) : value)
: (row[column.name] = transform.value.from
? transform.value.from(value, column)
: value
)
}

return xi
return { i: xi, row: transform.row.from ? transform.row.from(row) : row }
}

function parseEvent(x) {
Expand Down
50 changes: 48 additions & 2 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -1859,8 +1859,7 @@ t('multiple queries before connect', async() => {
t('subscribe', { timeout: 2 }, async() => {
const sql = postgres({
database: 'postgres_js_test',
publications: 'alltables',
fetch_types: false
publications: 'alltables'
})

await sql.unsafe('create publication alltables for all tables')
Expand Down Expand Up @@ -1899,6 +1898,53 @@ t('subscribe', { timeout: 2 }, async() => {
]
})

t('subscribe with transform', { timeout: 2 }, async() => {
const sql = postgres({
transform: {
column: {
from: postgres.toCamel,
to: postgres.fromCamel
}
},
database: 'postgres_js_test',
publications: 'alltables'
})

await sql.unsafe('create publication alltables for all tables')

const result = []

const { unsubscribe } = await sql.subscribe('*', (row, { command, old }) =>
result.push(command, row.nameInCamel || row.id, old && old.nameInCamel)
)

await sql`
create table test (
id serial primary key,
name_in_camel text
)
`

await sql`insert into test (name_in_camel) values ('Murray')`
await sql`update test set name_in_camel = 'Rothbard'`
await sql`delete from test`
await sql`alter table test replica identity full`
await sql`insert into test (name_in_camel) values ('Murray')`
await sql`update test set name_in_camel = 'Rothbard'`
await sql`delete from test`
await delay(10)
await unsubscribe()
await sql`insert into test (name_in_camel) values ('Oh noes')`
await delay(10)
return [
'insert,Murray,,update,Rothbard,,delete,1,,insert,Murray,,update,Rothbard,Murray,delete,Rothbard,',
result.join(','),
await sql`drop table test`,
await sql`drop publication alltables`,
await sql.end()
]
})

t('subscribe reconnects and calls onsubscribe', { timeout: 4 }, async() => {
const sql = postgres({
database: 'postgres_js_test',
Expand Down

0 comments on commit 4e28de9

Please sign in to comment.