Skip to content

Commit

Permalink
fix: support stream cancelling (#2335)
Browse files Browse the repository at this point in the history
  • Loading branch information
ThaUnknown committed Jun 23, 2022
1 parent e74a621 commit 2e4f91f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 64 deletions.
6 changes: 5 additions & 1 deletion index.js
Expand Up @@ -222,7 +222,11 @@ class WebTorrent extends EventEmitter {
this.workerPortCount++
port.postMessage(response)
})
cb(this.serviceWorker)
// test if browser supports cancelling sw Readable Streams
fetch(`${this.serviceWorker.scriptURL.slice(0, this.serviceWorker.scriptURL.lastIndexOf('/') + 1).slice(window.location.origin.length)}webtorrent/cancel/`).then(res => {
res.body.cancel()
})
cb(null, this.serviceWorker)
}

get downloadSpeed () { return this._downloadSpeed() }
Expand Down
129 changes: 68 additions & 61 deletions lib/worker-server.js
Expand Up @@ -2,74 +2,81 @@
/* eslint-env serviceworker */

const portTimeoutDuration = 5000
let cancellable = false

module.exports = event => {
const { request } = event
const { url, method, headers, destination } = request
const { url } = event.request
if (!url.includes(self.registration.scope + 'webtorrent/')) return null
if (url.includes(self.registration.scope + 'webtorrent/keepalive/')) return new Response()
if (url.includes(self.registration.scope + 'webtorrent/cancel/')) {
return new Response(new ReadableStream({
cancel () {
cancellable = true
}
}))
}
return serve(event)
}

async function serve ({ request }) {
const { url, method, headers, destination } = request
const clientlist = await clients.matchAll({ type: 'window', includeUncontrolled: true })

const [data, port] = await new Promise(resolve => {
// Use race condition for whoever controls the response stream
for (const client of clientlist) {
const messageChannel = new MessageChannel()
const { port1, port2 } = messageChannel
port1.onmessage = ({ data }) => {
resolve([data, port1])
}
client.postMessage({
url,
method,
headers: Object.fromEntries(headers.entries()),
scope: self.registration.scope,
destination,
type: 'webtorrent'
}, [port2])
}
})

if (data.body !== 'STREAM' && data.body !== 'DOWNLOAD') return new Response(data.body, data)

return clients.matchAll({ type: 'window', includeUncontrolled: true })
.then(clients => {
let timeOut = null
return new Response(new ReadableStream({
pull (controller) {
return new Promise(resolve => {
// Use race condition for whoever controls the response stream
for (const client of clients) {
const messageChannel = new MessageChannel()
const { port1, port2 } = messageChannel
port1.onmessage = event => {
resolve([event.data, messageChannel])
port.onmessage = ({ data }) => {
if (data) {
controller.enqueue(data) // data is Uint8Array
} else {
clearTimeout(timeOut)
controller.close() // data is null, means the stream ended
port.onmessage = null
}
client.postMessage({
url,
method,
headers: Object.fromEntries(headers.entries()),
scope: self.registration.scope,
destination,
type: 'webtorrent'
}, [port2])
resolve()
}
})
})
.then(([data, messageChannel]) => {
if (data.body === 'STREAM' || data.body === 'DOWNLOAD') {
let timeOut = null
return new Response(new ReadableStream({
pull (controller) {
return new Promise(resolve => {
messageChannel.port1.onmessage = event => {
if (event.data) {
controller.enqueue(event.data) // event.data is Uint8Array
} else {
clearTimeout(timeOut)
controller.close() // event.data is null, means the stream ended
messageChannel.port1.onmessage = null
}
resolve()
}

// 'media player' does NOT signal a close on the stream and we cannot close it because it's locked to the reader,
// so we just empty it after 5s of inactivity, the browser will request another port anyways
clearTimeout(timeOut)
if (data.body === 'STREAM') {
timeOut = setTimeout(() => {
controller.close()
messageChannel.port1.postMessage(false) // send timeout
messageChannel.port1.onmessage = null
resolve()
}, portTimeoutDuration)
}

messageChannel.port1.postMessage(true) // send a pull request
})
},
cancel () {
// This event is never executed
messageChannel.port1.postMessage(false) // send a cancel request
if (!cancellable) {
// firefox doesn't support cancelling of Readable Streams in service workers,
// so we just empty it after 5s of inactivity, the browser will request another port anyways
clearTimeout(timeOut)
if (data.body === 'STREAM') {
timeOut = setTimeout(() => {
controller.close()
port.postMessage(false) // send timeout
port.onmessage = null
resolve()
}, portTimeoutDuration)
}
}), data)
}

return new Response(data.body, data)
})
.catch(console.error)
}
port.postMessage(true) // send a pull request
})
},
cancel () {
port.postMessage(false) // send a cancel request
clearTimeout(timeOut)
port.onmessage = null
}
}), data)
}
4 changes: 2 additions & 2 deletions lib/worker.js
Expand Up @@ -11,6 +11,6 @@ self.addEventListener('fetch', event => {
if (res) event.respondWith(res)
})

self.addEventListener('activate', evt => {
evt.waitUntil(self.clients.claim())
self.addEventListener('activate', () => {
self.clients.claim()
})

0 comments on commit 2e4f91f

Please sign in to comment.