forked from textileio/go-textile
/
api_sub.go
79 lines (71 loc) · 1.84 KB
/
api_sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package core
import (
"errors"
"io"
"net/http"
"strings"
"github.com/gin-gonic/gin"
)
func (a *api) getThreadsSub(g *gin.Context) {
opts, err := a.readOpts(g)
if err != nil {
a.abort500(g, err)
return
}
// Expects or'd list of event types (e.g., FILES|COMMENTS|LIKES).
types := strings.Split(strings.TrimSpace(strings.ToUpper(opts["type"])), "|")
threadId := g.Param("id")
if threadId == "default" {
threadId = a.node.config.Threads.Defaults.ID
} // If id wasn't supplied, it will be an empty string
listener := a.node.GetThreadUpdateListener()
g.Stream(func(w io.Writer) bool {
select {
case update, ok := <-listener.Ch:
if !ok {
return false
}
if data, ok := update.(ThreadUpdate); ok {
if threadId != "" && data.ThreadId != threadId {
break
}
for _, t := range types {
if t == "" || data.Block.Type == t {
info, err := addBlockInfo(a, data)
if err != nil {
log.Error(err)
}
if opts["events"] == "true" {
// Support events option to emit Server-Sent Events (SSEvent),
// otherwise, emit JSON responses. SSEvents enable browsers/clients
// to consume the stream using EventSource.
g.SSEvent("update", info)
} else {
g.JSON(http.StatusOK, info)
g.Writer.Write([]byte("\n"))
}
}
}
}
}
return true
})
listener.Close()
}
func addBlockInfo(a *api, update ThreadUpdate) (ThreadUpdate, error) {
switch update.Block.Type {
case "FILES":
info, err := a.node.ThreadFile(update.Block.Id)
if err != nil {
return update, errors.New("error getting thread file: " + err.Error())
}
return ThreadUpdate{
Block: update.Block,
ThreadId: update.ThreadId,
ThreadName: update.ThreadName,
Info: info,
}, nil
default: // For everything else... we've already go block info
return update, nil
}
}