/
document_service.go
204 lines (170 loc) · 5 KB
/
document_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package http
import (
"context"
"fmt"
"net/http"
"path"
"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/pkg/httpc"
"go.uber.org/zap"
)
const prefixDocuments = "/api/v2/documents"
// DocumentService is an interface HTTP-exposed portion of the document service.
type DocumentService interface {
GetDocuments(ctx context.Context, namespace string, orgID platform.ID) ([]*influxdb.Document, error)
}
// DocumentBackend is all services and associated parameters required to construct
// the DocumentHandler.
type DocumentBackend struct {
log *zap.Logger
errors.HTTPErrorHandler
DocumentService influxdb.DocumentService
}
// NewDocumentBackend returns a new instance of DocumentBackend.
func NewDocumentBackend(log *zap.Logger, b *APIBackend) *DocumentBackend {
return &DocumentBackend{
HTTPErrorHandler: b.HTTPErrorHandler,
log: log,
DocumentService: b.DocumentService,
}
}
// DocumentHandler represents an HTTP API handler for documents.
type DocumentHandler struct {
*httprouter.Router
log *zap.Logger
errors.HTTPErrorHandler
DocumentService influxdb.DocumentService
LabelService influxdb.LabelService
}
const (
documentsPath = "/api/v2/documents/:ns"
)
// NewDocumentHandler returns a new instance of DocumentHandler.
// TODO(desa): this should probably take a namespace
func NewDocumentHandler(b *DocumentBackend) *DocumentHandler {
h := &DocumentHandler{
Router: NewRouter(b.HTTPErrorHandler),
HTTPErrorHandler: b.HTTPErrorHandler,
log: b.log,
DocumentService: b.DocumentService,
}
h.HandlerFunc("GET", documentsPath, h.handleGetDocuments)
return h
}
type documentResponse struct {
Links map[string]string `json:"links"`
*influxdb.Document
}
func newDocumentResponse(ns string, d *influxdb.Document) *documentResponse {
if d.Labels == nil {
d.Labels = []*influxdb.Label{}
}
return &documentResponse{
Links: map[string]string{
"self": fmt.Sprintf("/api/v2/documents/%s/%s", ns, d.ID),
},
Document: d,
}
}
type documentsResponse struct {
Documents []*documentResponse `json:"documents"`
}
func newDocumentsResponse(ns string, docs []*influxdb.Document) *documentsResponse {
ds := make([]*documentResponse, 0, len(docs))
for _, doc := range docs {
ds = append(ds, newDocumentResponse(ns, doc))
}
return &documentsResponse{
Documents: ds,
}
}
// handleGetDocuments is the HTTP handler for the GET /api/v2/documents/:ns route.
func (h *DocumentHandler) handleGetDocuments(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := decodeGetDocumentsRequest(ctx, r)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
s, err := h.DocumentService.FindDocumentStore(ctx, req.Namespace)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
ds, err := s.FindDocuments(ctx, req.OrgID)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
h.log.Debug("Documents retrieved", zap.String("documents", fmt.Sprint(ds)))
if err := encodeResponse(ctx, w, http.StatusOK, newDocumentsResponse(req.Namespace, ds)); err != nil {
logEncodingError(h.log, r, err)
return
}
}
type getDocumentsRequest struct {
Namespace string
Org string
OrgID platform.ID
}
func decodeGetDocumentsRequest(ctx context.Context, r *http.Request) (*getDocumentsRequest, error) {
params := httprouter.ParamsFromContext(ctx)
ns := params.ByName("ns")
if ns == "" {
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "url missing namespace",
}
}
qp := r.URL.Query()
req := &getDocumentsRequest{
Namespace: ns,
Org: qp.Get("org"),
}
if oidStr := qp.Get("orgID"); oidStr != "" {
oid, err := platform.IDFromString(oidStr)
if err != nil {
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "Invalid orgID",
}
}
req.OrgID = *oid
}
return req, nil
}
type documentService struct {
Client *httpc.Client
}
// NewDocumentService creates a client to connect to Influx via HTTP to manage documents.
func NewDocumentService(client *httpc.Client) DocumentService {
return &documentService{
Client: client,
}
}
func buildDocumentsPath(namespace string) string {
return path.Join(prefixDocuments, namespace)
}
// GetDocuments returns the documents for a `namespace` and an `orgID`.
// Returned documents do not contain their content.
func (s *documentService) GetDocuments(ctx context.Context, namespace string, orgID platform.ID) ([]*influxdb.Document, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var resp documentsResponse
r := s.Client.
Get(buildDocumentsPath(namespace)).
DecodeJSON(&resp)
r = r.QueryParams([2]string{"orgID", orgID.String()})
if err := r.Do(ctx); err != nil {
return nil, err
}
docs := make([]*influxdb.Document, len(resp.Documents))
for i := 0; i < len(docs); i++ {
docs[i] = resp.Documents[i].Document
}
return docs, nil
}