Skip to content

Commit

Permalink
rewrite SSEClient and implement postprocessing-finished event management
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexAndBear committed Oct 13, 2023
1 parent 1a317ad commit 0fedbac
Show file tree
Hide file tree
Showing 15 changed files with 364 additions and 138 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Enhancement: Handle postprocessing state via Server Sent Events

We've added the functionality to listen for events from the server that update the postprocessing state,
this allows the user to see if the postprocessing on a file is finished, without reloading the UI.

https://github.com/owncloud/web/pull/9771
https://github.com/owncloud/web/issues/9769
5 changes: 3 additions & 2 deletions packages/web-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
"generate-openapi": "rm -rf src/generated && docker run --rm -v \"${PWD}/src:/local\" openapitools/openapi-generator-cli generate -i https://raw.githubusercontent.com/owncloud/libre-graph-api/main/api/openapi-spec/v1.0.yaml -g typescript-axios -o /local/generated"
},
"peerDependencies": {
"@ownclouders/web-client": "workspace:*",
"@casl/ability": "^6.3.3",
"@ownclouders/web-client": "workspace:*",
"axios": "^0.27.2 || ^1.0.0",
"lodash-es": "^4.17.21",
"luxon": "^3.0.1"
"luxon": "^3.0.1",
"@microsoft/fetch-event-source": "^2.0.1"
}
}
123 changes: 123 additions & 0 deletions packages/web-client/src/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import { fetchEventSource, FetchEventSourceInit } from '@microsoft/fetch-event-source'

export enum MESSAGE_TYPE {
NOTIFICATION = 'userlog-notification',
POSTPROCESSING_FINISHED = 'postprocessing-finished'
}

class RetriableError extends Error {
name = 'RetriableError'
}

const RECONNECT_RANDOM_OFFSET = 15000

export class SSEAdapter implements EventSource {
url: string
private fetchOptions: FetchEventSourceInit
private abortController: AbortController
private eventListenerMap: Record<string, ((event: MessageEvent) => any)[]>

readonly readyState: number
readonly withCredentials: boolean

readonly CONNECTING: 0
readonly OPEN: 1
readonly CLOSED: 2

onerror: ((this: EventSource, ev: Event) => any) | null
onmessage: ((this: EventSource, ev: MessageEvent) => any) | null
onopen: ((this: EventSource, ev: Event) => any) | null

constructor(url: string, fetchOptions: FetchEventSourceInit) {
this.url = url
this.fetchOptions = fetchOptions
this.abortController = new AbortController()
this.eventListenerMap = {}
this.connect()
}

private connect() {
return fetchEventSource(this.url, {
openWhenHidden: true,
signal: this.abortController.signal,
fetch: this.fetchProvider.bind(this),
onopen: async () => {
const event = new Event('open')
this.onopen?.bind(this)(event)
},
onmessage: (msg) => {
const event = new MessageEvent('message', { data: msg.data })
this.onmessage?.bind(this)(event)

const type = msg.event
const eventListeners = this.eventListenerMap[type]
eventListeners?.forEach((l) => l(event))
},
onclose: () => {
throw new RetriableError()
},
onerror: (err) => {
console.error(err)
const event = new CustomEvent('error', { detail: err })
this.onerror?.bind(this)(event)

/*
* Try to reconnect after 30 seconds plus random time in seconds.
* This prevents all clients try to reconnect concurrent on server error, to reduce load.
*/
return 30000 + Math.floor(Math.random() * RECONNECT_RANDOM_OFFSET)
}
})
}

private fetchProvider(...args) {
let [resource, config] = args
config = { ...config, ...this.fetchOptions }
return window.fetch(resource, config)
}

close() {
this.abortController.abort('closed')
}

addEventListener(type: string, listener: (this: EventSource, event: MessageEvent) => any): void {
this.eventListenerMap[type] = this.eventListenerMap[type] || []
this.eventListenerMap[type].push(listener)
}

removeEventListener(
type: string,
listener: (this: EventSource, event: MessageEvent) => any
): void {
this.eventListenerMap[type] = this.eventListenerMap[type]?.filter((func) => func !== listener)
}

dispatchEvent(event: Event): boolean {
throw new Error('Method not implemented.')
}

updateAccessToken(token: string) {
this.fetchOptions.headers['Authorization'] = `Bearer ${token}`
}

updateLanguage(language: string) {
this.fetchOptions.headers['Accept-Language'] = language

// Force reconnect, to make the language change effect instantly
this.close()
this.connect()
}
}

let eventSource: SSEAdapter = null

export const sse = (baseURI: string, fetchOptions: FetchEventSourceInit): EventSource => {
if (!eventSource) {
eventSource = new SSEAdapter(
new URL('ocs/v2.php/apps/notifications/api/v1/notifications/sse', baseURI).href,
fetchOptions
)
}

return eventSource
}
106 changes: 106 additions & 0 deletions packages/web-client/tests/unit/sse.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
const fetchEventSourceMock = jest.fn()
jest.mock('@microsoft/fetch-event-source', () => ({
fetchEventSource: fetchEventSourceMock
}))

import { SSEAdapter, sse, MESSAGE_TYPE, RetriableError } from '../../src/sse'

const url = 'https://owncloud.test/'
describe('SSEAdapter', () => {
let mockFetch

beforeEach(() => {
mockFetch = jest.fn()

// Mock fetchEventSource and window.fetch

global.window.fetch = mockFetch
})

afterEach(() => {
jest.clearAllMocks()
})

test('it should initialize the SSEAdapter', () => {
const fetchOptions = { method: 'GET' }

const sseAdapter = new SSEAdapter(url, fetchOptions)

expect(sseAdapter.url).toBe(url)
expect(sseAdapter.fetchOptions).toBe(fetchOptions)
expect(sseAdapter.readyState).toBe(sseAdapter.CONNECTING)
})

test('it should call connect and set up event listeners', () => {
const fetchOptions = { method: 'GET' }
const sseAdapter = new SSEAdapter(url, fetchOptions)

expect(fetchEventSourceMock).toHaveBeenCalledWith(url, expect.any(Object))
expect(fetchEventSourceMock.mock.calls[0][1].onopen).toEqual(expect.any(Function))

fetchEventSourceMock.mock.calls[0][1].onopen()

expect(sseAdapter.readyState).toBe(sseAdapter.OPEN)
})

test('it should handle onmessage events', () => {
const fetchOptions = { method: 'GET' }
const sseAdapter = new SSEAdapter(url, fetchOptions)
const message = { data: 'Message data', event: MESSAGE_TYPE.NOTIFICATION }

const messageListener = jest.fn()
sseAdapter.addEventListener(MESSAGE_TYPE.NOTIFICATION, messageListener)

fetchEventSourceMock.mock.calls[0][1].onmessage(message)

expect(messageListener).toHaveBeenCalledWith(expect.any(Object))
})

test('it should handle onclose events and throw RetriableError', () => {
const fetchOptions = { method: 'GET' }
const sseAdapter = new SSEAdapter(url, fetchOptions)

expect(() => {
// Simulate onclose
fetchEventSourceMock.mock.calls[0][1].onclose()
}).toThrow(RetriableError)
})

test('it should call fetchProvider with fetch options', () => {
const fetchOptions = { headers: { Authorization: 'Bearer xy' } }
const sseAdapter = new SSEAdapter(url, fetchOptions)

sseAdapter.fetchProvider(url, fetchOptions)

expect(mockFetch).toHaveBeenCalledWith(url, { ...fetchOptions })
})

test('it should update the access token in fetch options', () => {
const fetchOptions = { headers: { Authorization: 'Bearer xy' } }
const sseAdapter = new SSEAdapter(url, fetchOptions)

const token = 'new-token'
sseAdapter.updateAccessToken(token)

expect(sseAdapter.fetchOptions.headers.Authorization).toBe(`Bearer ${token}`)
})

test('it should close the SSEAdapter', () => {
const fetchOptions = { method: 'GET' }
const sseAdapter = new SSEAdapter(url, fetchOptions)

sseAdapter.close()

expect(sseAdapter.readyState).toBe(sseAdapter.CLOSED)
})
})

describe('sse', () => {
test('it should create and return an SSEAdapter instance', () => {
const fetchOptions = { method: 'GET' }
const eventSource = sse(url, fetchOptions)

expect(eventSource).toBeInstanceOf(SSEAdapter)
expect(eventSource.url).toBe(`${url}ocs/v2.php/apps/notifications/api/v1/notifications/sse`)
})
})
3 changes: 2 additions & 1 deletion packages/web-pkg/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"vue-concurrency": "4.0.1",
"vue-router": "4.2.0",
"vue3-gettext": "2.5.0-alpha.1",
"vuex": "4.1.0"
"vuex": "4.1.0",
"@microsoft/fetch-event-source": "^2.0.1"
}
}
1 change: 0 additions & 1 deletion packages/web-pkg/src/composables/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ export * from './service'
export * from './sideBar'
export * from './sort'
export * from './spaces'
export * from './sse'
export * from './store'
export * from './upload'
export * from './viewMode'
1 change: 0 additions & 1 deletion packages/web-pkg/src/composables/sse/index.ts

This file was deleted.

79 changes: 0 additions & 79 deletions packages/web-pkg/src/composables/sse/useServerSentEvents.ts

This file was deleted.

20 changes: 20 additions & 0 deletions packages/web-pkg/src/services/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { OwnCloudSdk } from '@ownclouders/web-client/src/types'
import { ConfigurationManager } from '../../configuration'
import { Store } from 'vuex'
import { Language } from 'vue3-gettext'
import { FetchEventSourceInit } from '@microsoft/fetch-event-source'
import { sse } from '@ownclouders/web-client/src/sse'

interface OcClient {
token: string
Expand All @@ -22,6 +24,17 @@ interface HttpClient {
token?: string
}

const createFetchOptions = (authParams: AuthParameters, language: string): FetchEventSourceInit => {
return {
headers: {
Authorization: `Bearer ${authParams.accessToken}`,
'Accept-Language': language,
'X-Request-ID': uuidV4(),
'X-Requested-With': 'XMLHttpRequest'
}
}
}

const createAxiosInstance = (authParams: AuthParameters, language: string): AxiosInstance => {
const auth = new Auth(authParams)
const axiosClient = axios.create({
Expand Down Expand Up @@ -84,6 +97,13 @@ export class ClientService {
return this.ocUserContextClient.graph
}

public get sseAuthenticated(): EventSource {
return sse(
this.configurationManager.serverUrl,
createFetchOptions({ accessToken: this.token }, this.currentLanguage)
)
}

public get ocsUserContext(): OCS {
if (this.clientNeedsInit(this.ocUserContextClient)) {
this.ocUserContextClient = this.getOcsClient({ accessToken: this.token })
Expand Down

0 comments on commit 0fedbac

Please sign in to comment.