forked from grafana/grafana-plugin-sdk-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
173 lines (149 loc) · 5.59 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package backend
import (
"context"
"encoding/json"
"fmt"
"github.com/kosimas/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/kosimas/grafana-plugin-sdk-go/data"
)
// StreamHandler handles streams.
// This is EXPERIMENTAL and is a subject to change till Grafana 8.
type StreamHandler interface {
// SubscribeStream called when a user tries to subscribe to a plugin/datasource
// managed channel path – thus plugin can check subscribe permissions and communicate
// options with Grafana Core. As soon as first subscriber joins channel RunStream
// will be called.
SubscribeStream(context.Context, *SubscribeStreamRequest) (*SubscribeStreamResponse, error)
// PublishStream called when a user tries to publish to a plugin/datasource
// managed channel path. Here plugin can check publish permissions and
// modify publication data if required.
PublishStream(context.Context, *PublishStreamRequest) (*PublishStreamResponse, error)
// RunStream will be initiated by Grafana to consume a stream. RunStream will be
// called once for the first client successfully subscribed to a channel path.
// When Grafana detects that there are no longer any subscribers inside a channel,
// the call will be terminated until next active subscriber appears. Call termination
// can happen with a delay.
RunStream(context.Context, *RunStreamRequest, *StreamSender) error
}
// SubscribeStreamRequest is EXPERIMENTAL and is a subject to change till Grafana 8.
type SubscribeStreamRequest struct {
PluginContext PluginContext
Path string
Data json.RawMessage
}
// SubscribeStreamStatus is a status of subscription response.
type SubscribeStreamStatus int32
const (
// SubscribeStreamStatusOK means subscription is allowed.
SubscribeStreamStatusOK SubscribeStreamStatus = 0
// SubscribeStreamStatusNotFound means stream does not exist at all.
SubscribeStreamStatusNotFound SubscribeStreamStatus = 1
// SubscribeStreamStatusPermissionDenied means that user is not allowed to subscribe.
SubscribeStreamStatusPermissionDenied SubscribeStreamStatus = 2
)
// SubscribeStreamResponse is EXPERIMENTAL and is a subject to change till Grafana 8.
type SubscribeStreamResponse struct {
Status SubscribeStreamStatus
InitialData *InitialData
}
// InitialData to send to a client upon a successful subscription to a channel.
type InitialData struct {
data []byte
}
// Data allows to get prepared bytes of initial data.
func (d *InitialData) Data() []byte {
return d.data
}
// NewInitialFrame allows creating frame as subscription InitialData.
func NewInitialFrame(frame *data.Frame, include data.FrameInclude) (*InitialData, error) {
frameJSON, err := data.FrameToJSON(frame, include)
if err != nil {
return nil, err
}
return &InitialData{
data: frameJSON,
}, nil
}
// NewInitialData allows sending JSON on subscription
func NewInitialData(data json.RawMessage) (*InitialData, error) {
if !json.Valid(data) {
return nil, fmt.Errorf("invalid JSON data")
}
return &InitialData{
data: data,
}, nil
}
// PublishStreamRequest is EXPERIMENTAL and is a subject to change till Grafana 8.
type PublishStreamRequest struct {
PluginContext PluginContext
Path string
Data json.RawMessage
}
// PublishStreamStatus is a status of publication response.
type PublishStreamStatus int32
const (
// PublishStreamStatusOK means publication is allowed.
PublishStreamStatusOK PublishStreamStatus = 0
// PublishStreamStatusNotFound means stream does not exist at all.
PublishStreamStatusNotFound PublishStreamStatus = 1
// PublishStreamStatusPermissionDenied means that user is not allowed to publish.
PublishStreamStatusPermissionDenied PublishStreamStatus = 2
)
// PublishStreamResponse is EXPERIMENTAL and is a subject to change till Grafana 8.
type PublishStreamResponse struct {
Status PublishStreamStatus
Data json.RawMessage
}
// RunStreamRequest is EXPERIMENTAL and is a subject to change till Grafana 8.
type RunStreamRequest struct {
PluginContext PluginContext
Path string
Data json.RawMessage
}
// StreamPacket is EXPERIMENTAL and is a subject to change till Grafana 8.
type StreamPacket struct {
Data json.RawMessage
}
// StreamPacketSender is EXPERIMENTAL and is a subject to change till Grafana 8.
type StreamPacketSender interface {
Send(*StreamPacket) error
}
// StreamSender allows sending data to a stream.
// StreamSender is EXPERIMENTAL and is a subject to change till Grafana 8.
type StreamSender struct {
packetSender StreamPacketSender
}
func NewStreamSender(packetSender StreamPacketSender) *StreamSender {
return &StreamSender{packetSender: packetSender}
}
// SendFrame allows sending data.Frame to a stream.
func (s *StreamSender) SendFrame(frame *data.Frame, include data.FrameInclude) error {
frameJSON, err := data.FrameToJSON(frame, include)
if err != nil {
return err
}
packet := &pluginv2.StreamPacket{
Data: frameJSON,
}
return s.packetSender.Send(FromProto().StreamPacket(packet))
}
// SendJSON allow sending arbitrary JSON to a stream. When sending data.Frame
// prefer using SendFrame method.
func (s *StreamSender) SendJSON(data []byte) error {
if !json.Valid(data) {
return fmt.Errorf("invalid JSON data")
}
packet := &pluginv2.StreamPacket{
Data: data,
}
return s.packetSender.Send(FromProto().StreamPacket(packet))
}
// SendBytes allow sending arbitrary Bytes to a stream. When sending data.Frame
// prefer using SendFrame method. When sending an arbitrary raw JSON prefer
// using SendJSON method.
func (s *StreamSender) SendBytes(data []byte) error {
packet := &pluginv2.StreamPacket{
Data: data,
}
return s.packetSender.Send(FromProto().StreamPacket(packet))
}