Skip to content

Commit

Permalink
Merge pull request #2993 from waysact/early-disposal
Browse files Browse the repository at this point in the history
Fix issues around early disposal
  • Loading branch information
Robert Mosolgo committed Jun 16, 2020
2 parents cda9597 + 1ea8059 commit 9094118
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 23 deletions.
Expand Up @@ -216,6 +216,43 @@ describe("createAblyHandler", () => {
expect(nextInvokedWith).toBeUndefined()
})

it("detaches the channel when the subscription is disposed during initial response", async () => {
let detached = false

const ably = createDummyConsumer({
...channelTemplate,
detach() {
detached = true
}
})
const producer = createAblyHandler({
fetchOperation: () =>
new Promise(resolve =>
resolve({
headers: new Map([["X-Subscription-ID", "foo"]]),
body: { errors: {} }
})
),
ably
})

const { dispose } = producer(
dummyOperation,
{},
{},
{
onError: async () => {
dispose()
},
onNext: async () => {},
onCompleted: () => {}
}
)

await nextTick()
expect(detached).toBe(true)
})

describe("integration with Ably", () => {
const key = process.env.ABLY_KEY
const testWithAblyKey = key ? test : test.skip
Expand Down
42 changes: 19 additions & 23 deletions javascript_client/src/subscriptions/createAblyHandler.ts
Expand Up @@ -68,13 +68,11 @@ function createAblyHandler(options: AblyHandlerOptions) {
observer.onCompleted()
}
}

;(async () => {
try {
// POST the subscription like a normal query
const response = await fetchOperation(operation, variables, cacheConfig)

dispatchResult(response.body)
const channelName = response.headers.get("X-Subscription-ID")
if (!channelName) {
throw new Error("Missing X-Subscription-ID header")
Expand Down Expand Up @@ -123,6 +121,8 @@ function createAblyHandler(options: AblyHandlerOptions) {
}
// When you get an update from ably, give it to Relay
channel.subscribe("update", updateHandler)

dispatchResult(response.body)
} catch (error) {
observer.onError(error)
}
Expand All @@ -133,28 +133,25 @@ function createAblyHandler(options: AblyHandlerOptions) {
try {
if (channel) {
const disposedChannel = channel
disposedChannel.unsubscribe("update", updateHandler)

const leavePromise = new Promise((resolve, reject) => {
const callback = (err: Types.ErrorInfo) => {
if (err) {
reject(new AblyError(err))
} else {
resolve()
disposedChannel.unsubscribe()

// Ensure channel is no longer attaching, as otherwise detach does
// nothing
if (disposedChannel.state === "attaching") {
await new Promise((resolve, _reject) => {
const onStateChange = (
stateChange: Types.ChannelStateChange
) => {
if (stateChange.current !== "attaching") {
disposedChannel.off(onStateChange)
resolve()
}
}
}

if (isAnonymousClient()) {
disposedChannel.presence.leaveClient(
anonymousClientId,
callback
)
} else {
disposedChannel.presence.leave(callback)
}
})
disposedChannel.on(onStateChange)
})
}

const detachPromise = new Promise((resolve, reject) => {
await new Promise((resolve, reject) => {
disposedChannel.detach((err: Types.ErrorInfo) => {
if (err) {
reject(new AblyError(err))
Expand All @@ -164,7 +161,6 @@ function createAblyHandler(options: AblyHandlerOptions) {
})
})

await Promise.all([leavePromise, detachPromise])
ably.channels.release(disposedChannel.name)
}
} catch (error) {
Expand Down

0 comments on commit 9094118

Please sign in to comment.