-
Notifications
You must be signed in to change notification settings - Fork 48
/
types.go
126 lines (104 loc) · 2.87 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package messages
import (
"encoding/json"
"fmt"
"time"
)
// See https://developers.signalfx.com/signalflow_analytics/rest_api_messages/stream_messages_specification.html
const (
AuthenticatedType = "authenticated"
ControlMessageType = "control-message"
ErrorType = "error"
MetadataType = "metadata"
MessageType = "message"
DataType = "data"
EventType = "event"
WebsocketErrorType = "websocket-error"
ExpiredTSIDType = "expired-tsid"
)
type BaseMessage struct {
Typ string `json:"type"`
}
func (bm *BaseMessage) Type() string {
return bm.Typ
}
func (bm *BaseMessage) String() string {
return fmt.Sprintf("%s message", bm.Typ)
}
func (bm *BaseMessage) Base() *BaseMessage {
return bm
}
var _ Message = &BaseMessage{}
type Message interface {
Type() string
Base() *BaseMessage
}
type ChannelMessage interface {
Channel() string
}
type BaseChannelMessage struct {
Chan string `json:"channel,omitempty"`
}
func (bcm *BaseChannelMessage) Channel() string {
return bcm.Chan
}
type JSONMessage interface {
Message
JSONBase() *BaseJSONMessage
RawData() map[string]interface{}
}
type BaseJSONMessage struct {
BaseMessage
rawMessage []byte
rawData map[string]interface{}
}
func (bjm *BaseJSONMessage) JSONBase() *BaseJSONMessage {
return bjm
}
// The raw message deserialized from JSON. Only applicable for JSON
// Useful if the message type doesn't have a concrete struct type implemented
// in this library (e.g. due to an upgrade to the SignalFlow protocol).
func (j *BaseJSONMessage) RawData() map[string]interface{} {
if j.rawData == nil {
if err := json.Unmarshal(j.rawMessage, &j.rawData); err != nil {
// This shouldn't ever error since it wouldn't have been initially
// deserialized if there were parse errors. But in case it does
// just return nil.
return nil
}
}
return j.rawData
}
func (j *BaseJSONMessage) String() string {
return j.BaseMessage.String() + string(j.rawMessage)
}
type BaseJSONChannelMessage struct {
BaseJSONMessage
BaseChannelMessage
}
func (j *BaseJSONChannelMessage) String() string {
return fmt.Sprintf("%s", j.BaseJSONMessage.rawMessage)
}
type TimestampedMessage struct {
TimestampMillis uint64 `json:"timestampMs"`
}
func (tsm *TimestampedMessage) Timestamp() time.Time {
return time.Unix(0, int64(tsm.TimestampMillis*uint64(time.Millisecond)))
}
type AuthenticatedMessage struct {
BaseJSONMessage
OrgID string `json:"orgId"`
UserID string `json:"userId"`
}
// The way to distinguish between JSON and binary messages is the websocket
// message type.
func ParseMessage(msg []byte, isText bool) (Message, error) {
if isText {
var baseMessage BaseMessage
if err := json.Unmarshal(msg, &baseMessage); err != nil {
return nil, fmt.Errorf("couldn't unmarshal JSON websocket message: %v", err)
}
return parseJSONMessage(&baseMessage, msg)
}
return parseBinaryMessage(msg)
}