forked from dghubble/go-twitter
-
Notifications
You must be signed in to change notification settings - Fork 2
/
stream_utils.go
95 lines (88 loc) · 3.26 KB
/
stream_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package twitter
import (
"bufio"
"bytes"
"io"
"time"
)
// stopped returns true if the done channel receives, false otherwise.
func stopped(done <-chan struct{}) bool {
select {
case <-done:
return true
default:
return false
}
}
// sleepOrDone pauses the current goroutine until the done channel receives
// or until at least the duration d has elapsed, whichever comes first. This
// is similar to time.Sleep(d), except it can be interrupted.
func sleepOrDone(d time.Duration, done <-chan struct{}) {
sleep := time.NewTimer(d)
defer sleep.Stop()
select {
case <-sleep.C:
return
case <-done:
return
}
}
// streamResponseBodyReader is a buffered reader for Twitter stream response
// body. It can scan the arbitrary length of response body unlike bufio.Scanner.
type streamResponseBodyReader struct {
reader *bufio.Reader
buf bytes.Buffer
}
// newStreamResponseBodyReader returns an instance of streamResponseBodyReader
// for the given Twitter stream response body.
func newStreamResponseBodyReader(body io.Reader) *streamResponseBodyReader {
return &streamResponseBodyReader{reader: bufio.NewReader(body)}
}
// readNext reads Twitter stream response body and returns the next stream
// content if exists. Returns io.EOF error if we reached the end of the stream
// and there's no more message to read.
func (r *streamResponseBodyReader) readNext() ([]byte, error) {
// Discard all the bytes from buf and continue to use the allocated memory
// space for reading the next message.
r.buf.Truncate(0)
for {
// Twitter stream messages are separated with "\r\n", and a valid
// message may sometimes contain '\n' in the middle.
// bufio.Reader.Read() can accept one byte delimiter only, so we need to
// first break out each line on '\n' and then check whether the line ends
// with "\r\n" to find message boundaries.
// https://dev.twitter.com/streaming/overview/processing
line, err := r.reader.ReadBytes('\n')
// Non-EOF error should be propagated to callers immediately.
if err != nil && err != io.EOF {
return nil, err
}
// EOF error means that we reached the end of the stream body before finding
// delimiter '\n'. If "line" is empty, it means the reader didn't read any
// data from the stream before reaching EOF and there's nothing to append to
// buf.
if err == io.EOF && len(line) == 0 {
// if buf has no data, propagate io.EOF to callers and let them know that
// we've finished processing the stream.
if r.buf.Len() == 0 {
return nil, err
}
// Otherwise, we still have a remaining stream message to return.
break
}
// If the line ends with "\r\n", it's the end of one stream message data.
if bytes.HasSuffix(line, []byte("\r\n")) {
// reader.ReadBytes() returns a slice including the delimiter itself, so
// we need to trim '\n' as well as '\r' from the end of the slice.
r.buf.Write(bytes.TrimRight(line, "\r\n"))
break
}
// Otherwise, the line is not the end of a stream message, so we append
// the line to buf and continue to scan lines.
r.buf.Write(line)
}
// Get the stream message bytes from buf. Not that Bytes() won't mark the
// returned data as "read", and we need to explicitly call Truncate(0) to
// discard from buf before writing the next stream message to buf.
return r.buf.Bytes(), nil
}