-
Notifications
You must be signed in to change notification settings - Fork 82
/
sseReader.go
92 lines (73 loc) · 2.32 KB
/
sseReader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Copyright (c) 2016, 2018, 2024, Oracle and/or its affiliates. All rights reserved.
// This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.
package common
import (
"bufio"
"bytes"
"context"
"io"
"net/http"
)
type SseReader struct {
HttpBody io.ReadCloser
eventScanner bufio.Scanner
OnClose func(r *SseReader)
}
// InvalidSSEResponseError returned in the case that a nil response body was given
// to NewSSEReader()
type InvalidSSEResponseError struct {
}
const InvalidResponseErrorMessage = "invalid response struct given to NewSSEReader"
func (e InvalidSSEResponseError) Error() string {
return InvalidResponseErrorMessage
}
// NewSSEReader returns an SSE Reader given an sse response
func NewSSEReader(response *http.Response) (*SseReader, error) {
if response == nil || response.Body == nil {
return nil, InvalidSSEResponseError{}
}
reader := &SseReader{
HttpBody: response.Body,
eventScanner: *bufio.NewScanner(response.Body),
OnClose: func(r *SseReader) { r.HttpBody.Close() }, // Default on close function, ensures body is closed after use
}
return reader, nil
}
// Take the response in bytes and trim it if necessary
func processEvent(e []byte) []byte {
e = bytes.TrimPrefix(e, []byte("data: ")) // Text/event-stream always prefixed with 'data: '
return e
}
// ReadNextEvent reads the next event in the stream, return it unmarshalled
func (r *SseReader) ReadNextEvent() (event []byte, err error) {
if r.eventScanner.Scan() {
eventBytes := r.eventScanner.Bytes()
return processEvent(eventBytes), nil
} else {
// Close out the stream since we are finished reading from it
if r.OnClose != nil {
r.OnClose(r)
}
err := r.eventScanner.Err()
if err == context.Canceled || err == nil {
err = io.EOF
}
return nil, err
}
}
// ReadAllEvents reads all events from the response stream, and processes each with given event handler
func (r *SseReader) ReadAllEvents(eventHandler func(e []byte)) error {
for {
event, err := r.ReadNextEvent()
if err != nil {
if err == io.EOF {
err = nil
}
return err
}
// Ignore empty events
if len(event) > 0 {
eventHandler(event)
}
}
}