-
Notifications
You must be signed in to change notification settings - Fork 7
/
queries.go
153 lines (113 loc) · 3.68 KB
/
queries.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package rockset
import (
"context"
"github.com/rockset/rockset-go-client/openapi"
"github.com/rockset/rockset-go-client/option"
"net/http"
)
type QueryState string
const (
QueryQueued QueryState = "QUEUED"
QueryRunning QueryState = "RUNNING"
QueryError QueryState = "ERROR"
QueryCompleted QueryState = "COMPLETED"
QueryCancelled QueryState = "CANCELLED"
)
// Query executes a sql query with optional option.QueryOption
func (rc *RockClient) Query(ctx context.Context, sql string,
options ...option.QueryOption) (openapi.QueryResponse, error) {
var err error
var httpResp *http.Response
var response *openapi.QueryResponse
q := rc.QueriesApi.Query(ctx)
rq := openapi.NewQueryRequestWithDefaults()
rq.Sql = openapi.QueryRequestSql{Query: sql}
rq.Sql.Parameters = []openapi.QueryParameter{}
for _, o := range options {
o(rq)
}
err = rc.Retry(ctx, func() error {
response, httpResp, err = q.Body(*rq).Execute()
return NewErrorWithStatusCode(err, httpResp)
})
if err != nil {
return openapi.QueryResponse{}, err
}
return *response, nil
}
// ValidateQuery validates a sql query with optional option.QueryOption
func (rc *RockClient) ValidateQuery(ctx context.Context, sql string,
options ...option.QueryOption) (openapi.ValidateQueryResponse, error) {
var err error
var httpResp *http.Response
var r *openapi.ValidateQueryResponse
q := rc.QueriesApi.Validate(ctx)
rq := openapi.NewQueryRequestWithDefaults()
rq.Sql = openapi.QueryRequestSql{Query: sql}
rq.Sql.Parameters = []openapi.QueryParameter{}
for _, o := range options {
o(rq)
}
err = rc.Retry(ctx, func() error {
r, httpResp, err = q.Body(*rq).Execute()
return NewErrorWithStatusCode(err, httpResp)
})
if err != nil {
return openapi.ValidateQueryResponse{}, err
}
return *r, nil
}
// GetQueryInfo retrieves information about a query.
func (rc *RockClient) GetQueryInfo(ctx context.Context, queryID string) (openapi.QueryInfo, error) {
var err error
var httpResp *http.Response
var response *openapi.GetQueryResponse
q := rc.QueriesApi.GetQuery(ctx, queryID)
err = rc.Retry(ctx, func() error {
response, httpResp, err = q.Execute()
return NewErrorWithStatusCode(err, httpResp)
})
if err != nil {
return openapi.QueryInfo{}, err
}
return *response.Data, nil
}
// GetQueryResults retrieves the results of a completed async query.
func (rc *RockClient) GetQueryResults(ctx context.Context, queryID string) (openapi.QueryPaginationResponse, error) {
var err error
var httpResp *http.Response
var response *openapi.QueryPaginationResponse
q := rc.QueriesApi.GetQueryResults(ctx, queryID)
err = rc.Retry(ctx, func() error {
response, httpResp, err = q.Execute()
return NewErrorWithStatusCode(err, httpResp)
})
if err != nil {
return openapi.QueryPaginationResponse{}, err
}
return *response, nil
}
// ListActiveQueries lists all active queries, i.e. queued or running.
func (rc *RockClient) ListActiveQueries(ctx context.Context) ([]openapi.QueryInfo, error) {
var err error
var httpResp *http.Response
var response *openapi.ListQueriesResponse
q := rc.QueriesApi.ListActiveQueries(ctx)
err = rc.Retry(ctx, func() error {
response, httpResp, err = q.Execute()
return NewErrorWithStatusCode(err, httpResp)
})
return response.Data, nil
}
// CancelQuery cancels a queued or running query.
func (rc *RockClient) CancelQuery(ctx context.Context, queryID string) (openapi.QueryInfo, error) {
var err error
var httpResp *http.Response
var response *openapi.CancelQueryResponse
q := rc.QueriesApi.CancelQuery(ctx, queryID)
err = rc.Retry(ctx, func() error {
response, httpResp, err = q.Execute()
return NewErrorWithStatusCode(err, httpResp)
})
return *response.Data, nil
}