-
Notifications
You must be signed in to change notification settings - Fork 25
/
csclient.go
1379 lines (1265 loc) · 44.7 KB
/
csclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2015 Canonical Ltd.
// Licensed under the LGPLv3, see LICENCE file for details.
// The csclient package provides access to the charm store API.
//
// Errors returned from the remote API server with an associated error
// code will have a cause of type params.ErrorCode holding that code.
//
// If a call to the API returns an error because authorization has been
// denied, an error with a cause satisfying IsAuthorizationError will be
// returned. Note that these errors can also include errors returned by
// httpbakery when it attempts to discharge macaroons.
package csclient // import "github.com/juju/charmrepo/v6/csclient"
import (
"bytes"
"crypto/sha512"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"time"
"unicode"
"github.com/go-macaroon-bakery/macaroon-bakery/v3/httpbakery"
"github.com/juju/charm/v8"
"gopkg.in/errgo.v1"
"gopkg.in/httprequest.v1"
"github.com/juju/charmrepo/v6/csclient/params"
)
const (
userAgentKey = "User-Agent"
userAgentValue = "Golang_CSClient/4.0"
)
const apiVersion = "v5"
const defaultMinMultipartUploadSize = 5 * 1024 * 1024
// ServerURL holds the default location of the global charm store.
// An alternate location can be configured by changing the URL field in the
// Params struct.
// For live testing or QAing the application, a different charm store
// location should be used, for instance "https://api.staging.jujucharms.com".
var ServerURL = "https://api.jujucharms.com/charmstore"
// Client represents the client side of a charm store.
type Client struct {
params Params
bclient httpClient
header http.Header
statsDisabled bool
channel params.Channel
minMultipartUploadSize int64
userAgentValue string
}
// Params holds parameters for creating a new charm store client.
type Params struct {
// URL holds the root endpoint URL of the charmstore,
// with no trailing slash, not including the version.
// For example https://api.jujucharms.com/charmstore
// If empty, the default charm store client location is used.
URL string
// User holds the name to authenticate as for the client. If User is empty,
// no credentials will be sent.
User string
// Password holds the password for the given user, for authenticating the
// client.
Password string
// BakeryClient holds the bakery client to use when making
// requests to the store. This is used in preference to
// HTTPClient.
BakeryClient *httpbakery.Client
// UserAgentVersion allows the overriding of the user agent version.
UserAgentValue string
}
type httpClient interface {
Do(*http.Request) (*http.Response, error)
}
// New returns a new charm store client.
func New(p Params) *Client {
if p.URL == "" {
p.URL = ServerURL
}
bclient := p.BakeryClient
if bclient == nil {
bclient = httpbakery.NewClient()
bclient.AddInteractor(httpbakery.WebBrowserInteractor{})
}
uav := p.UserAgentValue
if uav == "" {
uav = userAgentValue
}
return &Client{
bclient: bclient,
params: p,
minMultipartUploadSize: defaultMinMultipartUploadSize,
userAgentValue: uav,
}
}
// SetMinMultipartUploadSize sets the minimum size of resource upload
// that will trigger a multipart upload. This is mainly useful for testing.
func (c *Client) SetMinMultipartUploadSize(n int64) {
c.minMultipartUploadSize = n
}
// ServerURL returns the charm store URL used by the client.
func (c *Client) ServerURL() string {
return c.params.URL
}
// DisableStats disables incrementing download stats when retrieving archives
// from the charm store.
func (c *Client) DisableStats() {
c.statsDisabled = true
}
// WithChannel returns a new client whose requests are done using the
// given channel.
func (c *Client) WithChannel(channel params.Channel) *Client {
client := *c
client.channel = channel
return &client
}
// Channel returns the currently set channel.
func (c *Client) Channel() params.Channel {
return c.channel
}
// SetHTTPHeader sets custom HTTP headers that will be sent to the charm store
// on each request.
func (c *Client) SetHTTPHeader(header http.Header) {
c.header = header
}
// GetArchive retrieves the archive for the given charm or bundle, returning a
// reader its data can be read from, the fully qualified id of the
// corresponding entity, the hex-encoded SHA384 hash of the data and its size.
func (c *Client) GetArchive(id *charm.URL) (r io.ReadCloser, eid *charm.URL, hash string, size int64, err error) {
fail := func(err error) (io.ReadCloser, *charm.URL, string, int64, error) {
return nil, nil, "", 0, err
}
// Create the request.
req, err := http.NewRequest("GET", "", nil)
if err != nil {
return fail(errgo.Notef(err, "cannot make new request"))
}
// Send the request.
v := url.Values{}
if c.statsDisabled {
v.Set("stats", "0")
}
u := url.URL{
Path: "/" + id.Path() + "/archive",
RawQuery: v.Encode(),
}
resp, err := c.Do(req, u.String())
if err != nil {
terr := params.MaybeTermsAgreementError(err)
if err1, ok := errgo.Cause(terr).(*params.TermAgreementRequiredError); ok {
terms := strings.Join(err1.Terms, " ")
return fail(errgo.Newf(`cannot get archive because some terms have not been agreed to. Try "juju agree %s"`, terms))
}
return fail(errgo.NoteMask(err, "cannot get archive", isAPIError))
}
// Validate the response headers.
entityId := resp.Header.Get(params.EntityIdHeader)
if entityId == "" {
resp.Body.Close()
return fail(errgo.Newf("no %s header found in response", params.EntityIdHeader))
}
eid, err = charm.ParseURL(entityId)
if err != nil {
// The server did not return a valid id.
resp.Body.Close()
return fail(errgo.Notef(err, "invalid entity id found in response"))
}
if eid.Revision == -1 {
// The server did not return a fully qualified entity id.
resp.Body.Close()
return fail(errgo.Newf("archive get returned not fully qualified entity id %q", eid))
}
hash = resp.Header.Get(params.ContentHashHeader)
if hash == "" {
resp.Body.Close()
return fail(errgo.Newf("no %s header found in response", params.ContentHashHeader))
}
// Validate the response contents.
if resp.ContentLength < 0 {
// TODO frankban: handle the case the contents are chunked.
resp.Body.Close()
return fail(errgo.Newf("no content length found in response"))
}
return resp.Body, eid, hash, resp.ContentLength, nil
}
// GetFileFromArchive streams the contents of the requested filename from the
// given charm or bundle archive, returning a reader its data can be read from.
func (c *Client) GetFileFromArchive(id *charm.URL, filename string) (io.ReadCloser, error) {
fail := func(err error) (io.ReadCloser, error) {
return nil, err
}
// Create the request.
req, err := http.NewRequest("GET", "", nil)
if err != nil {
return fail(errgo.Notef(err, "cannot make new request"))
}
// Send the request.
v := url.Values{}
if c.statsDisabled {
v.Set("stats", "0")
}
u := url.URL{
Path: "/" + id.Path() + "/archive/" + filename,
RawQuery: v.Encode(),
}
resp, err := c.Do(req, u.String())
if err != nil {
terr := params.MaybeTermsAgreementError(err)
if err1, ok := errgo.Cause(terr).(*params.TermAgreementRequiredError); ok {
terms := strings.Join(err1.Terms, " ")
return fail(errgo.Newf(`cannot get file from archive because some terms have not been agreed to. Try "juju agree %s"`, terms))
}
return fail(errgo.NoteMask(err, "cannot get file from archive", isAPIError))
}
return resp.Body, nil
}
// ListResources retrieves the metadata about resources for the given charms.
// It returns a slice with an element for each of the given ids, holding the
// resources for the respective id.
func (c *Client) ListResources(id *charm.URL) ([]params.Resource, error) {
var result []params.Resource
if err := c.Get("/"+id.Path()+"/meta/resources", &result); err != nil {
return nil, errgo.NoteMask(err, "cannot get resource metadata from the charm store", isAPIError)
}
return result, nil
}
// Progress lets an upload notify a caller about the progress of the upload.
type Progress interface {
// Start is called with the upload id when the upload starts.
// The upload id will be empty when multipart upload is not
// being used (when the upload is small or the server does not
// support multipart upload).
Start(uploadId string, expires time.Time)
// Transferred is called periodically to notify the caller that
// the given number of bytes have been uploaded. Note that the
// number may decrease - for example when most of a file has
// been transferred before a network error occurs.
Transferred(total int64)
// Error is called when a non-fatal error (any non-API error) has
// been encountered when uploading.
Error(err error)
// Finalizing is called when all the parts of a multipart upload
// are being stitched together into the final resource.
// This will not be called if the upload is not split into
// multiple parts.
Finalizing()
}
// UploadResource uploads the contents of a resource of the given name
// attached to a charm with the given id. The given path will be used as
// the resource path metadata and the contents will be read from the
// given file, which must have the given size. If progress is not nil, it will
// be called to inform the caller of the progress of the upload.
func (c *Client) UploadResource(id *charm.URL, name, path string, file io.ReaderAt, size int64, progress Progress) (revision int, err error) {
return c.ResumeUploadResource("", id, name, path, file, size, progress)
}
// UploadResourceWithRevision is like UploadResource except that it
// puts the resource at a known revision, useful when transferring
// resources between charm store instances.
func (c *Client) UploadResourceWithRevision(
id *charm.URL,
name string,
rev int,
path string,
file io.ReaderAt,
size int64,
progress Progress,
) (revision int, err error) {
return c.ResumeUploadResourceWithRevision("", id, name, rev, path, file, size, progress)
}
// AddDockerResource adds a reference to a docker image that is available in a docker
// registry as a resource to the charm with the given id. If imageName is non-empty,
// it names the image in some non-charmstore-associated registry; otherwise
// the image should have been uploaded to the charmstore-associated registry
// (see DockerResourceUploadInfo for details on how to do that).
// The digest should hold the digest of the image (in "sha256:hex" format).
//
// AddDockerResource returns the revision of the newly added resource.
func (c *Client) AddDockerResource(id *charm.URL, resourceName string, imageName, digest string) (revision int, err error) {
path := fmt.Sprintf("/%s/resource/%s", id.Path(), resourceName)
var result params.ResourceUploadResponse
if err := c.DoWithResponse("POST", path, params.DockerResourceUploadRequest{
Digest: digest,
ImageName: imageName,
}, &result); err != nil {
return 0, errgo.Mask(err)
}
return result.Revision, nil
}
// DockerResourceDownloadInfo returns information on how
// to download the given resource in the given Kubernetes charm
// from a docker registry. The returned information
// includes the image name to use and the username and password
// to use for authentication.
func (c *Client) DockerResourceDownloadInfo(id *charm.URL, resourceName string, revision int) (*params.DockerInfoResponse, error) {
path := fmt.Sprintf("/%s/resource/%s", id.Path(), resourceName)
if revision >= 0 {
path += fmt.Sprintf("/%d", revision)
}
var result params.DockerInfoResponse
if err := c.Get(path, &result); err != nil {
return nil, errgo.Mask(err)
}
return &result, nil
}
// DockerResourceUploadInfo returns information on how to upload an image
// to the charm store's associated docker registry.
// The returned information includes a tag to associate with the image
// and username and password to use for push authentication.
func (c *Client) DockerResourceUploadInfo(id *charm.URL, resourceName string) (*params.DockerInfoResponse, error) {
path := fmt.Sprintf("/%s/docker-resource-upload-info?resource-name=%s", id.Path(), url.QueryEscape(resourceName))
var result params.DockerInfoResponse
if err := c.Get(path, &result); err != nil {
return nil, errgo.Mask(err)
}
return &result, nil
}
var ErrUploadNotFound = errgo.Newf("upload not found")
// ResumeUploadResource is like UploadResource except that if uploadId is non-empty,
// it specifies the id of an existing upload to resume; if an upload with this ID is not
// found, an error with an ErrUploadNotFound cause is returned.
func (c *Client) ResumeUploadResource(uploadId string, id *charm.URL, resourceName, path string, content io.ReaderAt, size int64, progress Progress) (revision int, err error) {
return c.ResumeUploadResourceWithRevision(uploadId, id, resourceName, -1, path, content, size, progress)
}
// ResumeUploadResource is like UploadResource except that if uploadId is non-empty,
// it specifies the id of an existing upload to resume; if an upload with this ID is not
// found, an error with an ErrUploadNotFound cause is returned.
func (c *Client) ResumeUploadResourceWithRevision(
uploadId string,
id *charm.URL,
resourceName string,
rev int,
path string,
content io.ReaderAt,
size int64,
progress Progress,
) (revision int, err error) {
if progress == nil {
progress = noProgress{}
}
info := &uploadInfo{
id: id,
resourceName: resourceName,
revision: rev,
path: path,
size: size,
progress: progress,
content: content,
}
if size >= c.minMultipartUploadSize {
return c.uploadMultipartResource(uploadId, info)
}
return c.uploadSinglePartResource(info)
}
func (c *Client) uploadSinglePartResource(info *uploadInfo) (revision int, err error) {
info.progress.Start("", time.Time{})
hash, size1, err := readerHashAndSize(io.NewSectionReader(info.content, 0, info.size))
if err != nil {
return -1, errgo.Mask(err)
}
if size1 != info.size {
return 0, errgo.Newf("resource file changed underfoot? (initial size %d, then %d)", info.size, size1)
}
// Prepare the request.
req, err := http.NewRequest("POST", "", newProgressReader(io.NewSectionReader(info.content, 0, info.size), info.progress, 0))
if err != nil {
return 0, errgo.Notef(err, "cannot make new request")
}
if info.revision != -1 {
req.Method = "PUT"
}
req.Header.Set("Content-Type", "application/octet-stream")
req.ContentLength = info.size
if info.size == 0 {
// Setting body to nil is the only way to make sure that
// an explicit 0 Content-Length header is set.
req.Body = nil
}
path := fmt.Sprintf("/%s/resource/%s", info.id.Path(), info.resourceName)
if info.revision != -1 {
path += fmt.Sprintf("/%d", info.revision)
}
url := fmt.Sprintf("%s?hash=%s&filename=%s", path, url.QueryEscape(hash), url.QueryEscape(info.path))
resp, err := c.Do(req, url)
if err != nil {
return 0, errgo.NoteMask(err, "cannot post resource", isAPIError)
}
defer resp.Body.Close()
// Parse the response.
var result params.ResourceUploadResponse
if err := httprequest.UnmarshalJSONResponse(resp, &result); err != nil {
return 0, errgo.Mask(err)
}
return result.Revision, nil
}
type uploadInfo struct {
id *charm.URL
resourceName string
revision int
path string
size int64
progress Progress
content io.ReaderAt
// The following fields are only set for multipart uploads.
params.UploadInfoResponse
preferredPartSize int64
}
func (c *Client) uploadMultipartResource(uploadId string, info *uploadInfo) (int, error) {
if uploadId == "" {
// Create the upload.
if err := c.DoWithResponse("POST", "/upload", nil, &info.UploadInfoResponse); err != nil {
if errgo.Cause(err) == params.ErrNotFound {
// An earlier version of the API - try single part upload even though it's big.
return c.uploadSinglePartResource(info)
}
return 0, errgo.Mask(err)
}
} else {
if err := c.Get("/upload/"+uploadId, &info.UploadInfoResponse); err != nil {
if errgo.Cause(err) == params.ErrNotFound {
return 0, errgo.WithCausef(nil, ErrUploadNotFound, "")
}
return 0, errgo.Mask(err)
}
if info.UploadId != uploadId {
return 0, errgo.Newf("unexpected upload id in response (got %q want %q)", info.UploadId, uploadId)
}
}
info.progress.Start(info.UploadId, info.Expires)
// Calculate the part size, but round up so that we have
// enough parts to cover the remainder at the end.
info.preferredPartSize = (info.size + int64(info.MaxParts) - 1) / int64(info.MaxParts)
if info.preferredPartSize > info.MaxPartSize {
return 0, errgo.Newf("resource too big (allowed %.3fGB)", float64(info.MaxPartSize)*float64(info.MaxParts)/1e9)
}
if info.preferredPartSize < info.MinPartSize {
info.preferredPartSize = info.MinPartSize
}
revision, err := c.uploadParts(info)
if err != nil {
return 0, errgo.Mask(err)
}
return revision, nil
}
func (c *Client) uploadParts(info *uploadInfo) (int, error) {
parts := info.Parts
offset := int64(0)
loop:
for i := 0; offset < info.size; i++ {
p0, p1, err := choosePartRange(i, offset, info)
offset = p1
if err != nil {
switch errgo.Cause(err) {
case errAlreadyUploaded:
info.progress.Transferred(p1)
continue
case errFinished:
break loop
default:
return 0, errgo.Mask(err)
}
}
// TODO concurrent part upload?
hash, err := c.uploadPart(info.UploadId, i, info.content, p0, p1, info.progress)
if err != nil {
return 0, errgo.Mask(err)
}
part := params.Part{
Hash: hash,
Complete: true,
}
if i < len(parts.Parts) {
parts.Parts[i] = part
} else {
// We can just append to parts because we know that if i >= len(parts.Parts),
// we always call uploadPart and append to parts.Parts, because choosePartRange
// will never return errAlreadyUploaded for a nonexistent part.
parts.Parts = append(parts.Parts, part)
}
}
info.progress.Finalizing()
// All parts uploaded, now complete the upload.
var finishResponse params.FinishUploadResponse
if err := c.PutWithResponse("/upload/"+info.UploadId, parts, &finishResponse); err != nil {
return 0, errgo.Mask(err)
}
method := "POST"
path := fmt.Sprintf("/%s/resource/%s", info.id.Path(), info.resourceName)
if info.revision != -1 {
path += fmt.Sprintf("/%d", info.revision)
method = "PUT"
}
url := fmt.Sprintf("%s?upload-id=%s&filename=%s", path, info.UploadId, info.path)
// The multipart upload has now been uploaded.
// Create the resource that uses it.
var resourceResp params.ResourceUploadResponse
if err := c.DoWithResponse(method, url, nil, &resourceResp); err != nil {
return -1, errgo.NoteMask(err, "cannot post resource", isAPIError)
}
return resourceResp.Revision, nil
}
var (
errAlreadyUploaded = errgo.Newf("resource part already uploaded")
errFinished = errgo.Newf("all resource parts uploaded")
)
// choosePartRange returns the file range to use for the part with the given index.
// It returns errAlreadyUploaded if the part is complete and errFinished if the part is
// at the end.
func choosePartRange(partIndex int, offset int64, info *uploadInfo) (p0, p1 int64, err error) {
if offset >= info.size {
return info.size, info.size, errFinished
}
if partIndex < len(info.Parts.Parts) {
if part := info.Parts.Parts[partIndex]; part.Complete {
if part.Offset != offset {
return 0, 0, errgo.Newf("offset mismatch at part %d (want %d got %d)", partIndex, offset, part.Offset)
}
return offset, offset + part.Size, errAlreadyUploaded
}
}
nextOffset := info.size
nextUploadedPart := -1
// Find the offset of the next uploaded part, if any.
for i := partIndex + 1; i < len(info.Parts.Parts); i++ {
if info.Parts.Parts[i].Valid() {
nextOffset = info.Parts.Parts[i].Offset
nextUploadedPart = i
break
}
}
if nextUploadedPart == partIndex+1 {
// Exactly one part to fill in.
p0, p1 = offset, nextOffset
if p1-p0 < info.MinPartSize {
return 0, 0, errgo.Newf("remaining part is too small")
}
if p1-p0 > info.MaxPartSize {
return 0, 0, errgo.Newf("remaining part is too large")
}
return p0, p1, nil
}
if nextUploadedPart == -1 {
// No next part, so we can choose for ourselves.
p0 = offset
p1 = offset + info.preferredPartSize
if p1 > info.size {
p1 = info.size
}
return p0, p1, nil
}
// There's an already-uploaded part more than one away, so
// divide it equally (rounding errors will be allocated to the last
// part, which should be dealt with by the "exactly one part" case
// above).
partSize := (nextOffset - offset) / int64(nextUploadedPart-partIndex)
return offset, offset + partSize, nil
}
// progressReader implements an io.Reader that informs a Progress
// implementation of progress as data is transferred. Note that this
// will not work correctly if two uploads are made concurrently.
type progressReader struct {
r io.ReadSeeker
p Progress
pos int64
start int64
}
// newProgressReader returns a reader that reads from r and calls
// p.Transferred with the number of bytes that have been transferred.
// The start parameter holds the number of bytes that have already been
// transferred.
func newProgressReader(r io.ReadSeeker, p Progress, start int64) io.ReadSeeker {
return &progressReader{
r: r,
p: p,
pos: start,
start: start,
}
}
// Read implements io.Reader.Read.
func (p *progressReader) Read(pb []byte) (int, error) {
n, err := p.r.Read(pb)
if n > 0 {
p.pos += int64(n)
p.p.Transferred(p.pos)
}
return n, err
}
// Seek implements io.Seeker.Seek.
func (p *progressReader) Seek(offset int64, whence int) (int64, error) {
pos, err := p.r.Seek(offset, whence)
p.pos = p.start + pos
p.p.Transferred(p.pos)
return pos, err
}
// uploadPart uploads a single part of a multipart upload
// and returns the hash of the part.
func (c *Client) uploadPart(uploadId string, part int, r io.ReaderAt, p0, p1 int64, progress Progress) (string, error) {
h := sha512.New384()
if _, err := io.Copy(h, io.NewSectionReader(r, p0, p1-p0)); err != nil {
return "", errgo.Notef(err, "cannot read resource")
}
hash := fmt.Sprintf("%x", h.Sum(nil))
var lastError error
section := newProgressReader(io.NewSectionReader(r, p0, p1-p0), progress, p0)
for i := 0; i < 10; i++ {
req, err := http.NewRequest("PUT", "", section)
if err != nil {
return "", errgo.Mask(err)
}
req.Header.Set("Content-Type", "application/octet-stream")
req.ContentLength = p1 - p0
resp, err := c.Do(req, fmt.Sprintf("/upload/%s/%d?hash=%s&offset=%d", uploadId, part, hash, p0))
if err == nil {
// Success
resp.Body.Close()
return hash, nil
}
if isAPIError(err) {
// It's a genuine error from the charm store, so
// stop trying.
return "", errgo.Mask(err, isAPIError)
}
progress.Error(err)
lastError = err
section.Seek(0, 0)
// Try again.
}
return "", errgo.Notef(lastError, "too many attempts; last error")
}
// Publish tells the charmstore to mark the given charm as published with the
// given resource revisions to the given channels.
func (c *Client) Publish(id *charm.URL, channels []params.Channel, resources map[string]int) error {
if len(channels) == 0 {
return nil
}
val := ¶ms.PublishRequest{
Resources: resources,
Channels: channels,
}
if err := c.Put("/"+id.Path()+"/publish", val); err != nil {
return errgo.Mask(err, isAPIError)
}
return nil
}
// ResourceData holds information about a resource.
// It must be closed after use.
type ResourceData struct {
io.ReadCloser
Size int64
Hash string
}
// GetResource retrieves byes of the resource with the given name and revision
// for the given charm, returning a reader its data can be read from, the
// SHA384 hash of the data.
//
// If revision is negative, the currently published resource for the Client's
// channel will be returned.
//
// Note that the result must be closed after use.
func (c *Client) GetResource(id *charm.URL, name string, revision int) (result ResourceData, err error) {
// Create the request.
req, err := http.NewRequest("GET", "", nil)
if err != nil {
return result, errgo.Notef(err, "cannot make new request")
}
url := "/" + id.Path() + "/resource/" + name
if revision >= 0 {
url += "/" + strconv.Itoa(revision)
}
resp, err := c.Do(req, url)
if err != nil {
return result, errgo.NoteMask(err, "cannot get resource", isAPIError)
}
defer func() {
if err != nil {
resp.Body.Close()
}
}()
// Validate the response headers.
hash := resp.Header.Get(params.ContentHashHeader)
if hash == "" {
return result, errgo.Newf("no %s header found in response", params.ContentHashHeader)
}
return ResourceData{
ReadCloser: resp.Body,
Size: resp.ContentLength,
Hash: hash,
}, nil
}
// ResourceMeta returns the metadata for the resource on charm id with the
// given name and revision. If the revision is negative, the latest version
// of the resource will be returned.
func (c *Client) ResourceMeta(id *charm.URL, name string, revision int) (params.Resource, error) {
path := fmt.Sprintf("/%s/meta/resources/%s", id.Path(), name)
if revision >= 0 {
path += fmt.Sprintf("/%d", revision)
}
var result params.Resource
if err := c.Get(path, &result); err != nil {
return result, errgo.NoteMask(err, fmt.Sprintf("cannot get %q", path), isAPIError)
}
return result, nil
}
// StatsUpdate updates the download stats for the given id and specific time.
func (c *Client) StatsUpdate(req params.StatsUpdateRequest) error {
return c.Put("/stats/update", req)
}
// UploadCharm uploads the given charm to the charm store with the given id,
// which must not specify a revision.
// The accepted charm implementations are charm.CharmDir and
// charm.CharmArchive.
//
// UploadCharm returns the id that the charm has been given in the
// store - this will be the same as id except the revision.
func (c *Client) UploadCharm(id *charm.URL, ch charm.Charm) (*charm.URL, error) {
if id.Revision != -1 {
return nil, errgo.Newf("revision specified in %q, but should not be specified", id)
}
r, hash, size, err := openArchive(ch)
if err != nil {
return nil, errgo.Notef(err, "cannot open charm archive")
}
defer r.Close()
return c.UploadArchive(id, r, hash, size, -1, nil)
}
// UploadCharmWithRevision uploads the given charm to the
// given id in the charm store, which must contain a revision.
// If promulgatedRevision is not -1, it specifies that the charm
// should be marked as promulgated with that revision.
//
// This method is provided only for testing and should not
// generally be used otherwise.
func (c *Client) UploadCharmWithRevision(id *charm.URL, ch charm.Charm, promulgatedRevision int) error {
if id.Revision == -1 {
return errgo.Newf("revision not specified in %q", id)
}
r, hash, size, err := openArchive(ch)
if err != nil {
return errgo.Notef(err, "cannot open charm archive")
}
defer r.Close()
_, err = c.UploadArchive(id, r, hash, size, promulgatedRevision, nil)
return errgo.Mask(err, isAPIError)
}
// UploadBundle uploads the given charm to the charm store with the given id,
// which must not specify a revision.
// The accepted bundle implementations are charm.BundleDir and
// charm.BundleArchive.
//
// UploadBundle returns the id that the bundle has been given in the
// store - this will be the same as id except the revision.
func (c *Client) UploadBundle(id *charm.URL, b charm.Bundle) (*charm.URL, error) {
if id.Revision != -1 {
return nil, errgo.Newf("revision specified in %q, but should not be specified", id)
}
r, hash, size, err := openArchive(b)
if err != nil {
return nil, errgo.Notef(err, "cannot open bundle archive")
}
defer r.Close()
return c.UploadArchive(id, r, hash, size, -1, nil)
}
// UploadBundleWithRevision uploads the given bundle to the
// given id in the charm store, which must contain a revision.
// If promulgatedRevision is not -1, it specifies that the charm
// should be marked as promulgated with that revision.
//
// This method is provided only for testing and should not
// generally be used otherwise.
func (c *Client) UploadBundleWithRevision(id *charm.URL, b charm.Bundle, promulgatedRevision int) error {
if id.Revision == -1 {
return errgo.Newf("revision not specified in %q", id)
}
r, hash, size, err := openArchive(b)
if err != nil {
return errgo.Notef(err, "cannot open charm archive")
}
defer r.Close()
_, err = c.UploadArchive(id, r, hash, size, promulgatedRevision, nil)
return errgo.Mask(err, isAPIError)
}
// UploadArchive pushes the archive for the charm or bundle represented by
// the given body, its hex-encoded SHA384 hash and its size. It returns
// the resulting entity reference. The given id should include the series
// should usually include the revision, unless a specific revision is required
// (for example when synchronizing between charmstores). If a revision
// is specified, then PUT will be used instead of POST.
//
// This is the method used internally by UploadBundle, UploadCharm and UploadCharmWithRevision;
// one of those methods should usually be used in preference.
func (c *Client) UploadArchive(id *charm.URL, body io.ReadSeeker, hash string, size int64, promulgatedRevision int, chans []params.Channel) (*charm.URL, error) {
// When uploading archives, it can be a problem that the
// an error response is returned while we are still writing
// the body data.
// To avoid this, we log in first so that we don't need to
// do the macaroon exchange after POST.
// Unfortunately this won't help matters if the user is logged in but
// doesn't have privileges to write to the stated charm.
// A better solution would be to fix https://github.com/golang/go/issues/3665
// and use the 100-Continue client functionality.
//
// We only need to do this when basic auth credentials are not provided.
if c.params.User == "" {
if err := c.Login(); err != nil {
return nil, errgo.NoteMask(err, "cannot log in", isAPIError)
}
}
method := "POST"
urlParams := url.Values{
"hash": {hash},
}
if id.Revision != -1 {
method = "PUT"
if promulgatedRevision != -1 {
pr := *id
pr.User = ""
pr.Revision = promulgatedRevision
urlParams.Set("promulgated", pr.Path())
}
}
// Prepare the request.
req, err := http.NewRequest(method, "", body)
if err != nil {
return nil, errgo.Notef(err, "cannot make new request")
}
req.Header.Set("Content-Type", "application/zip")
req.ContentLength = size
for _, c := range chans {
urlParams["channel"] = append(urlParams["channel"], string(c))
}
// Send the request.
resp, err := c.Do(
req,
"/"+id.Path()+"/archive?"+urlParams.Encode(),
)
if err != nil {
return nil, errgo.NoteMask(err, "cannot post archive", isAPIError)
}
defer resp.Body.Close()
// Parse the response.
var result params.ArchiveUploadResponse
if err := httprequest.UnmarshalJSONResponse(resp, &result); err != nil {
return nil, errgo.NoteMask(err, "cannot unmarshal response", errgo.Any)
}
return result.Id, nil
}
// PutExtraInfo puts extra-info data for the given id.
// Each entry in the info map causes a value in extra-info with
// that key to be set to the associated value.
// Entries not set in the map will be unchanged.
func (c *Client) PutExtraInfo(id *charm.URL, info map[string]interface{}) error {
return c.Put("/"+id.Path()+"/meta/extra-info", info)
}
// PutCommonInfo puts common-info data for the given id.
// Each entry in the info map causes a value in common-info with
// that key to be set to the associated value.
// Entries not set in the map will be unchanged.
func (c *Client) PutCommonInfo(id *charm.URL, info map[string]interface{}) error {
return c.Put("/"+id.Path()+"/meta/common-info", info)
}
// Meta fetches metadata on the charm or bundle with the
// given id. The result value provides a value
// to be filled in with the result, which must be
// a pointer to a struct containing members corresponding
// to possible metadata include parameters
// (see https://github.com/juju/charmstore/blob/v4/docs/API.md#get-idmeta).
//
// It returns the fully qualified id of the entity.
//
// The name of the struct member is translated to
// a lower case hyphen-separated form; for example,
// ArchiveSize becomes "archive-size", and BundleMachineCount
// becomes "bundle-machine-count", but may also
// be specified in the field's tag
//
// This example will fill in the result structure with information
// about the given id, including information on its archive
// size (include archive-size), upload time (include archive-upload-time)
// and digest (include extra-info/digest).
//
// var result struct {
// ArchiveSize params.ArchiveSizeResponse
// ArchiveUploadTime params.ArchiveUploadTimeResponse
// Digest string `csclient:"extra-info/digest"`
// }
// id, err := client.Meta(id, &result)
func (c *Client) Meta(id *charm.URL, result interface{}) (*charm.URL, error) {
return c.MetaWithChannel(id, result, c.channel)
}
// MetaWithChannel behaves the same as a call to Meta but allows the caller to
// specify a channel which will be passed to the metadata endpoint only for
// this particular lookup.
func (c *Client) MetaWithChannel(id *charm.URL, result interface{}, channel params.Channel) (*charm.URL, error) {
if result == nil {
return nil, fmt.Errorf("expected valid result pointer, not nil")
}
resultv := reflect.ValueOf(result)
resultt := resultv.Type()
if resultt.Kind() != reflect.Ptr {
return nil, fmt.Errorf("expected pointer, not %T", result)
}
resultt = resultt.Elem()
if resultt.Kind() != reflect.Struct {
return nil, fmt.Errorf("expected pointer to struct, not %T", result)
}
resultv = resultv.Elem()
// At this point, resultv refers to the struct value pointed
// to by result, and resultt is its type.
numField := resultt.NumField()
includes := make([]string, 0, numField)
// If a channel override is specified add it to the query parameters.
if channel != params.NoChannel {
includes = append(includes, "channel="+string(channel))
}
// results holds an entry for each field in the result value,