-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
proxy.go
192 lines (158 loc) · 5.32 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package engine
import (
"context"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"path"
"time"
"github.com/vnsoft2014/prisma-client-go/binaries"
"github.com/vnsoft2014/prisma-client-go/logger"
"github.com/vnsoft2014/prisma-client-go/runtime/types"
)
func NewDataProxyEngine(schema, connectionURL string) *DataProxyEngine {
return &DataProxyEngine{
Schema: schema,
connectionURL: connectionURL,
http: &http.Client{},
}
}
type DataProxyEngine struct {
// http is the internal http client
http *http.Client
// url holds the query-engine url
url string
// connectionURL is the env var for the datasource url
// this is needed internally to extract the api key
connectionURL string
// Schema contains the prisma Schema
Schema string
// apiKey contains the parsed prisma data proxy api key from the connection string
apiKey string
}
func (e *DataProxyEngine) Connect() error {
// Example uri: https://aws-eu-west-1.prisma-data.com/2.26.0/412bf0a1742a576d699fbd5102a4f725557eff3992995f2e18febce128794961/
hash := hashSchema(e.Schema)
logger.Debug.Printf("local schema hash %s", hash)
logger.Debug.Printf("parsing connection string from database url %s", e.connectionURL)
u, err := url.Parse(e.connectionURL)
if err != nil {
return fmt.Errorf("parse prisma string: %w", err)
}
e.apiKey = u.Query().Get("api_key")
if e.apiKey == "" {
return fmt.Errorf("could not parse api key from data proxy prisma connection string")
}
e.url = getCloudURI(u.Host, hash)
logger.Debug.Printf("using %s as remote URI", e.url)
if err := e.uploadSchema(context.Background()); err != nil {
return fmt.Errorf("upload schema: %w", err)
}
return nil
}
func (e *DataProxyEngine) uploadSchema(ctx context.Context) error {
logger.Debug.Printf("uploading schema...")
b64Schema := encodeSchema(e.Schema)
res, err := e.request(ctx, "PUT", "/schema", []byte(b64Schema))
if err != nil {
return fmt.Errorf("put schema: %w", err)
}
logger.Debug.Printf("schema upload response: %s", res)
type SchemaResponse struct {
SchemaHash string `json:"schemaHash"`
}
var response SchemaResponse
if err := json.Unmarshal(res, &response); err != nil {
return fmt.Errorf("schema response err: %w", err)
}
logger.Debug.Printf("remote schema hash %s", response.SchemaHash)
logger.Debug.Printf("schema upload done.")
return nil
}
func (e *DataProxyEngine) Disconnect() error {
return nil
}
func (e *DataProxyEngine) Do(ctx context.Context, payload interface{}, into interface{}) error {
startReq := time.Now()
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("payload marshal: %w", err)
}
body, err := e.retryableRequest(ctx, "POST", "/graphql", data)
if err != nil {
return fmt.Errorf("request failed: %w", err)
}
logger.Debug.Printf("[timing] query engine request took %s", time.Since(startReq))
startParse := time.Now()
var response GQLResponse
if err := json.Unmarshal(body, &response); err != nil {
return fmt.Errorf("json gql resopnse unmarshal: %w", err)
}
if len(response.Errors) > 0 {
first := response.Errors[0]
if first.RawMessage() == internalUpdateNotFoundMessage ||
first.RawMessage() == internalDeleteNotFoundMessage {
return types.ErrNotFound
}
return fmt.Errorf("pql error: %s", first.RawMessage())
}
if err := json.Unmarshal(response.Data.Result, into); err != nil {
return fmt.Errorf("json data result unmarshal: %w", err)
}
logger.Debug.Printf("[timing] request unmarshal took %s", time.Since(startParse))
return nil
}
func (e *DataProxyEngine) Batch(ctx context.Context, payload interface{}, into interface{}) error {
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("payload marshal: %w", err)
}
body, err := e.retryableRequest(ctx, "POST", "/graphql", data)
if err != nil {
return fmt.Errorf("request failed: %w", err)
}
if err := json.Unmarshal(body, &into); err != nil {
return fmt.Errorf("json body unmarshal: %w", err)
}
return nil
}
func (e *DataProxyEngine) Name() string {
return "data-proxy"
}
func (e *DataProxyEngine) request(ctx context.Context, method string, path string, payload []byte) ([]byte, error) {
logger.Debug.Printf("requesting %s", e.url+path)
auth := func(req *http.Request) {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", e.apiKey))
}
return request(ctx, e.http, method, e.url+path, payload, auth)
}
func (e *DataProxyEngine) retryableRequest(ctx context.Context, method string, path string, payload []byte) ([]byte, error) {
res, err := e.request(ctx, method, path, payload)
if err != nil {
if !errors.Is(err, errNotFound) {
return nil, err
}
logger.Debug.Printf("got status not found in data proxy request; re-uploading schema")
if err := e.uploadSchema(ctx); err != nil {
return nil, fmt.Errorf("upload schema after 400 request: %w", err)
}
logger.Debug.Printf("schema re-upload succeeded")
return e.request(ctx, method, path, payload)
}
return res, nil
}
func hashSchema(schema string) string {
b64Schema := encodeSchema(schema)
sum := sha256.Sum256([]byte(b64Schema))
return fmt.Sprintf("%x", sum)
}
func encodeSchema(schema string) string {
return fmt.Sprint(base64.StdEncoding.EncodeToString([]byte(schema + "\n")))
}
func getCloudURI(host, schemaHash string) string {
return "https://" + path.Join(host, binaries.PrismaVersion, schemaHash)
}