Skip to content

Commit

Permalink
feat: implements unlisten functionality (#155)
Browse files Browse the repository at this point in the history
* feat: implements unlisten functionality

Relates to #133

* fix: return listening result from subscecuent `listen` calls

So, the client can unlisten any of its subscription

* fix: makes sure unlisten can be called even after there is no events for the bound topic

* fix: makes sure unlisten clears the right subscription
  • Loading branch information
stalniy committed Mar 5, 2021
1 parent 995bc7e commit 6ef71c8
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 6 deletions.
35 changes: 31 additions & 4 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -337,31 +337,58 @@ function Postgres(a, b) {
}

function listen(channel, fn) {
const listener = getListener();

if (channel in listeners) {
listeners[channel].push(fn)
return Promise.resolve(channel)
return Promise.resolve(Object.assign({}, listener.result, {
unlisten,
}))
}

listeners[channel] = [fn]
return query({ raw: true }, getListener(), 'listen ' + escape(channel))
return query({ raw: true }, listener.conn, 'listen ' + escape(channel))
.then((result) => {
listener.result = result
return Object.assign({}, listener.result, {
unlisten,
})
})

function unlisten() {
if (!listeners[channel]) {
return Promise.resolve()
}

listeners[channel] = listeners[channel].filter(handler => handler !== fn)

if (!listeners[channel].length) {
delete listeners[channel]
return query({ raw: true }, getListener().conn, 'unlisten ' + escape(channel))
.then(() => {})
}
return Promise.resolve()
}
}

function getListener() {
if (listener)
return listener

listener = Connection(Object.assign({
const conn = Connection(Object.assign({
onnotify: (c, x) => c in listeners && listeners[c].forEach(fn => fn(x)),
onclose: () => {
Object.entries(listeners).forEach(([channel, fns]) => {
delete listeners[channel]
Promise.all(fns.map(fn => listen(channel, fn).catch(() => { /* noop */ })))
})
listener = null;
}
},
options
))
all.push(listener)
listener = { conn, result: null };
all.push(conn);
return listener
}

Expand Down
49 changes: 49 additions & 0 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,55 @@ t('listen reconnects', async() => {
return ['ab', xs.join('')]
})

t('unlisten removes subscription', async() => {
const listener = postgres(options)
, xs = []

const { unlisten } = await listener.listen('test', x => xs.push(x))
await listener.notify('test', 'a')
await delay(50)
await unlisten()
await listener.notify('test', 'b')
await delay(50)
listener.end()

return ['a', xs.join('')]
})

t('listen after unlisten', async() => {
const listener = postgres(options)
, xs = []

const { unlisten } = await listener.listen('test', x => xs.push(x))
await listener.notify('test', 'a')
await delay(50)
await unlisten()
await listener.notify('test', 'b')
await delay(50)
await listener.listen('test', x => xs.push(x))
await listener.notify('test', 'c')
await delay(50)
listener.end();

return ['ac', xs.join('')]
})

t('multiple listeners and unlisten one', async() => {
const listener = postgres(options)
, xs = []

await listener.listen('test', x => xs.push('1', x))
const s2 = await listener.listen('test', x => xs.push('2', x))
await listener.notify('test', 'a')
await delay(50)
await s2.unlisten()
await listener.notify('test', 'b')
await delay(50)
listener.end();

return ['1a2a1b', xs.join('')]
})

t('responds with server parameters (application_name)', async() =>
['postgres.js', await new Promise((resolve, reject) => postgres({
...options,
Expand Down
9 changes: 7 additions & 2 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ declare namespace postgres {

interface PendingRequest extends Promise<[] & ResultMeta<null>> { }

interface ListenRequest extends Promise<ListenMeta> { }
interface ListenMeta extends ResultMeta<null> {
unlisten(): Promise<void>
}

interface Helper<T, U extends any[] = T[]> {
first: T;
rest: U;
Expand Down Expand Up @@ -336,7 +341,7 @@ declare namespace postgres {
file<T extends any[] = Row[]>(path: string, options?: { cache?: boolean }): PendingQuery<AsRowList<T>>;
file<T extends any[] = Row[]>(path: string, args: SerializableParameter[], options?: { cache?: boolean }): PendingQuery<AsRowList<T>>;
json(value: any): Parameter;
listen(channel: string, cb: (value?: string) => void): PendingRequest;
listen(channel: string, cb: (value?: string) => void): ListenRequest;
notify(channel: string, payload: string): PendingRequest;
options: ParsedOptions<TTypes>;
parameters: ConnectionParameters;
Expand All @@ -355,4 +360,4 @@ declare namespace postgres {

}

export = postgres;
export = postgres;

0 comments on commit 6ef71c8

Please sign in to comment.