forked from nats-io/nats.go
/
api.go
151 lines (121 loc) · 4.41 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Copyright 2022-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package jetstream
import (
"context"
"encoding/json"
"strings"
)
type (
apiResponse struct {
Type string `json:"type"`
Error *APIError `json:"error,omitempty"`
}
// apiPaged includes variables used to create paged responses from the JSON API
apiPaged struct {
Total int `json:"total"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
)
// Request API subjects for JetStream.
const (
// DefaultAPIPrefix is the default prefix for the JetStream API.
DefaultAPIPrefix = "$JS.API."
// jsDomainT is used to create JetStream API prefix by specifying only Domain
jsDomainT = "$JS.%s.API."
// jsExtDomainT is used to create a StreamSource External APIPrefix
jsExtDomainT = "$JS.%s.API"
// apiAccountInfo is for obtaining general information about JetStream.
apiAccountInfo = "INFO"
// apiConsumerCreateT is used to create consumers.
apiConsumerCreateT = "CONSUMER.CREATE.%s.%s"
// apiConsumerCreateT is used to create consumers.
// it accepts stream name, consumer name and filter subject
apiConsumerCreateWithFilterSubjectT = "CONSUMER.CREATE.%s.%s.%s"
// apiConsumerInfoT is used to create consumers.
apiConsumerInfoT = "CONSUMER.INFO.%s.%s"
// apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"
// apiConsumerDeleteT is used to delete consumers.
apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
// apiConsumerListT is used to return all detailed consumer information
apiConsumerListT = "CONSUMER.LIST.%s"
// apiConsumerNamesT is used to return a list with all consumer names for the stream.
apiConsumerNamesT = "CONSUMER.NAMES.%s"
// apiStreams can lookup a stream by subject.
apiStreams = "STREAM.NAMES"
// apiStreamCreateT is the endpoint to create new streams.
apiStreamCreateT = "STREAM.CREATE.%s"
// apiStreamInfoT is the endpoint to get information on a stream.
apiStreamInfoT = "STREAM.INFO.%s"
// apiStreamUpdateT is the endpoint to update existing streams.
apiStreamUpdateT = "STREAM.UPDATE.%s"
// apiStreamDeleteT is the endpoint to delete streams.
apiStreamDeleteT = "STREAM.DELETE.%s"
// apiStreamPurgeT is the endpoint to purge streams.
apiStreamPurgeT = "STREAM.PURGE.%s"
// apiStreamListT is the endpoint that will return all detailed stream information
apiStreamListT = "STREAM.LIST"
// apiMsgGetT is the endpoint to get a message.
apiMsgGetT = "STREAM.MSG.GET.%s"
// apiMsgGetT is the endpoint to perform a direct get of a message.
apiDirectMsgGetT = "DIRECT.GET.%s"
// apiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject.
apiDirectMsgGetLastBySubjectT = "DIRECT.GET.%s.%s"
// apiMsgDeleteT is the endpoint to remove a message.
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"
)
func (js *jetStream) apiRequestJSON(ctx context.Context, subject string, resp any, data ...[]byte) (*jetStreamMsg, error) {
jsMsg, err := js.apiRequest(ctx, subject, data...)
if err != nil {
return nil, err
}
if err := json.Unmarshal(jsMsg.Data(), resp); err != nil {
return nil, err
}
return jsMsg, nil
}
// a RequestWithContext with tracing via TraceCB
func (js *jetStream) apiRequest(ctx context.Context, subj string, data ...[]byte) (*jetStreamMsg, error) {
var req []byte
if len(data) > 0 {
req = data[0]
}
if js.clientTrace != nil {
ctrace := js.clientTrace
if ctrace.RequestSent != nil {
ctrace.RequestSent(subj, req)
}
}
resp, err := js.conn.RequestWithContext(ctx, subj, req)
if err != nil {
return nil, err
}
if js.clientTrace != nil {
ctrace := js.clientTrace
if ctrace.ResponseReceived != nil {
ctrace.ResponseReceived(subj, resp.Data, resp.Header)
}
}
return js.toJSMsg(resp), nil
}
func apiSubj(prefix, subject string) string {
if prefix == "" {
return subject
}
var b strings.Builder
b.WriteString(prefix)
b.WriteString(subject)
return b.String()
}