From e651da610f89aaf42e41fcf82c6a962810d0312c Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Tue, 19 Sep 2023 13:07:03 -0400 Subject: [PATCH 01/36] refactor supervisor to use sdk v2 and add checksum --- go.mod | 21 +++++++++- go.sum | 43 +++++++++++++++++-- pkg/supervisor/archived_snapshot.go | 65 +++++++++++++++++++++-------- pkg/supervisor/s3_uploader.go | 8 ++-- 4 files changed, 113 insertions(+), 24 deletions(-) diff --git a/go.mod b/go.mod index b6204e36..0e1cad6a 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,12 @@ go 1.20 require ( github.com/AlekSi/pointer v1.0.0 github.com/aws/aws-sdk-go v1.37.8 + github.com/aws/aws-sdk-go-v2/config v1.18.40 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.84 + github.com/aws/aws-sdk-go-v2/service/s3 v1.38.5 github.com/fsnotify/fsnotify v1.5.1 github.com/go-sql-driver/mysql v1.4.1 - github.com/google/go-cmp v0.5.6 + github.com/google/go-cmp v0.5.8 github.com/google/uuid v1.1.2 github.com/gorilla/mux v1.7.3 github.com/julienschmidt/httprouter v1.2.0 @@ -23,6 +26,22 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2 v1.21.0 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.13.38 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.11 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.42 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.1.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.14 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.36 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.35 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.15.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.14.0 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.16.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.22.0 // indirect + github.com/aws/smithy-go v1.14.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/mdlayher/genetlink v0.0.0-20190313224034-60417448a851 // indirect diff --git a/go.sum b/go.sum index 578f101a..64a25a05 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,44 @@ github.com/PuerkitoBio/goquery v1.8.1/go.mod h1:Q8ICL1kNUJ2sXGoAhPGUdYDJvgQgHzJs github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA= github.com/aws/aws-sdk-go v1.37.8 h1:9kywcbuz6vQuTf+FD+U7FshafrHzmqUCjgAEiLuIJ8U= github.com/aws/aws-sdk-go v1.37.8/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= +github.com/aws/aws-sdk-go-v2 v1.21.0 h1:gMT0IW+03wtYJhRqTVYn0wLzwdnK9sRMcxmtfGzRdJc= +github.com/aws/aws-sdk-go-v2 v1.21.0/go.mod h1:/RfNgGmRxI+iFOB1OeJUyxiU+9s88k3pfHvDagGEp0M= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13 h1:OPLEkmhXf6xFPiz0bLeDArZIDx1NNS4oJyG4nv3Gct0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13/go.mod h1:gpAbvyDGQFozTEmlTFO8XcQKHzubdq0LzRyJpG6MiXM= +github.com/aws/aws-sdk-go-v2/config v1.18.40 h1:dbu1llI/nTIL+r6sYHMeVLl99DM8J8/o1I4EPurnhLg= +github.com/aws/aws-sdk-go-v2/config v1.18.40/go.mod h1:JjrCZQwSPGCoZRQzKHyZNNueaKO+kFaEy2sR6mCzd90= +github.com/aws/aws-sdk-go-v2/credentials v1.13.38 h1:gDAuCdVlA4lmmgQhvpZlscwicloCqH44vkxLklGkQLA= +github.com/aws/aws-sdk-go-v2/credentials v1.13.38/go.mod h1:sD4G/Ybgp6s89mWIES3Xn97CsRLpxvz9uVSdv0UxY8I= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.11 h1:uDZJF1hu0EVT/4bogChk8DyjSF6fof6uL/0Y26Ma7Fg= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.11/go.mod h1:TEPP4tENqBGO99KwVpV9MlOX4NSrSLP8u3KRy2CDwA8= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.84 h1:LENrVcqnWTyI8fbIUCvxAMe+fXbREIaXzcR8WPwco1U= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.84/go.mod h1:LHxCiYAStsgps4srke7HujyADd504MSkNXjLpOtICTc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41 h1:22dGT7PneFMx4+b3pz7lMTRyN8ZKH7M2cW4GP9yUS2g= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41/go.mod h1:CrObHAuPneJBlfEJ5T3szXOUkLEThaGfvnhTf33buas= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35 h1:SijA0mgjV8E+8G45ltVHs0fvKpTj8xmZJ3VwhGKtUSI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35/go.mod h1:SJC1nEVVva1g3pHAIdCp7QsRIkMmLAgoDquQ9Rr8kYw= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.42 h1:GPUcE/Yq7Ur8YSUk6lVkoIMWnJNO0HT18GUzCWCgCI0= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.42/go.mod h1:rzfdUlfA+jdgLDmPKjd3Chq9V7LVLYo1Nz++Wb91aRo= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.1.4 h1:6lJvvkQ9HmbHZ4h/IEwclwv2mrTW8Uq1SOB/kXy0mfw= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.1.4/go.mod h1:1PrKYwxTM+zjpw9Y41KFtoJCQrJ34Z47Y4VgVbfndjo= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.14 h1:m0QTSI6pZYJTk5WSKx3fm5cNW/DCicVzULBgU/6IyD0= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.14/go.mod h1:dDilntgHy9WnHXsh7dDtUPgHKEfTJIBUTHM8OWm0f/0= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.36 h1:eev2yZX7esGRjqRbnVk1UxMLw4CyVZDpZXRCcy75oQk= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.36/go.mod h1:lGnOkH9NJATw0XEPcAknFBj3zzNTEGRHtSw+CwC1YTg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.35 h1:CdzPW9kKitgIiLV1+MHobfR5Xg25iYnyzWZhyQuSlDI= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.35/go.mod h1:QGF2Rs33W5MaN9gYdEQOBBFPLwTZkEhRwI33f7KIG0o= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.15.4 h1:v0jkRigbSD6uOdwcaUQmgEwG1BkPfAPDqaeNt/29ghg= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.15.4/go.mod h1:LhTyt8J04LL+9cIt7pYJ5lbS/U98ZmXovLOR/4LUsk8= +github.com/aws/aws-sdk-go-v2/service/s3 v1.38.5 h1:A42xdtStObqy7NGvzZKpnyNXvoOmm+FENobZ0/ssHWk= +github.com/aws/aws-sdk-go-v2/service/s3 v1.38.5/go.mod h1:rDGMZA7f4pbmTtPOk5v5UM2lmX6UAbRnMDJeDvnH7AM= +github.com/aws/aws-sdk-go-v2/service/sso v1.14.0 h1:AR/hlTsCyk1CwlyKnPFvIMvnONydRjDDRT9OGb0i+/g= +github.com/aws/aws-sdk-go-v2/service/sso v1.14.0/go.mod h1:fIAwKQKBFu90pBxx07BFOMJLpRUGu8VOzLJakeY+0K4= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.16.0 h1:vbgiXuhtn49+erlPrgIvQ+J32rg1HseaPf8lEpKbkxQ= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.16.0/go.mod h1:yygr8ACQRY2PrEcy3xsUI357stq2AxnFM6DIsR9lij4= +github.com/aws/aws-sdk-go-v2/service/sts v1.22.0 h1:s4bioTgjSFRwOoyEFzAVCmFmoowBgjTR8gkrF/sQ4wk= +github.com/aws/aws-sdk-go-v2/service/sts v1.22.0/go.mod h1:VC7JDqsqiwXukYEDjoHh9U0fOJtNWh04FPQz4ct4GGU= +github.com/aws/smithy-go v1.14.2 h1:MJU9hqBGbvWZdApzpvoF2WAIJDbtjK2NDJSiJP7HblQ= +github.com/aws/smithy-go v1.14.2/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -24,8 +62,8 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= -github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw= @@ -155,7 +193,6 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= diff --git a/pkg/supervisor/archived_snapshot.go b/pkg/supervisor/archived_snapshot.go index 8b2b3bc9..bcffe1a9 100644 --- a/pkg/supervisor/archived_snapshot.go +++ b/pkg/supervisor/archived_snapshot.go @@ -3,14 +3,17 @@ package supervisor import ( "bufio" "context" + "crypto/sha256" + "fmt" "io" "net/url" "os" "strings" "time" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/pkg/errors" "github.com/segmentio/events/v2" "github.com/segmentio/stats/v4" @@ -52,7 +55,7 @@ type s3Snapshot struct { Bucket string Key string sendToS3Func sendToS3Func - s3Uploader S3Uploader + s3Client S3Client } func (c *s3Snapshot) Upload(ctx context.Context, path string) error { @@ -71,6 +74,13 @@ func (c *s3Snapshot) Upload(ctx context.Context, path string) error { key = key[1:] } var reader io.Reader = bufio.NewReaderSize(f, 1024*32) // use a 32K buffer for reading + + h := sha256.New() + if _, err := io.Copy(h, f); err != nil { + events.Log("filed to generate snapshop hash value", err) + } + cs := string(h.Sum(nil)) + var gpr *gzipCompressionReader if strings.HasSuffix(key, ".gz") { events.Log("Compressing s3 payload with GZIP") @@ -80,7 +90,7 @@ func (c *s3Snapshot) Upload(ctx context.Context, path string) error { events.Log("Uploading %{file}s (%d bytes) to %{bucket}s/%{key}s", path, size, c.Bucket, key) start := time.Now() - if err = c.sendToS3(ctx, key, c.Bucket, reader); err != nil { + if err = c.sendToS3(ctx, key, c.Bucket, reader, cs); err != nil { return errors.Wrap(err, "send to s3") } stats.Observe("ldb-upload-time", time.Since(start), stats.T("compressed", isCompressed(gpr))) @@ -104,35 +114,56 @@ func isCompressed(gpr *gzipCompressionReader) string { return "true" } -func (c *s3Snapshot) sendToS3(ctx context.Context, key string, bucket string, body io.Reader) error { +type BucketBasics struct { + S3Client S3Client +} + +func (c *s3Snapshot) sendToS3(ctx context.Context, key string, bucket string, body io.Reader, cs string) error { if c.sendToS3Func != nil { return c.sendToS3Func(ctx, key, bucket, body) } - ul, err := c.getS3Uploader() + + client, err := c.getS3Client() if err != nil { return err } - output, err := ul.UploadWithContext(ctx, &s3manager.UploadInput{ - Bucket: &bucket, - Key: &key, - Body: body, + + var basics = BucketBasics{ + S3Client: client, + } + var partMiBs int64 = 16 + uploader := manager.NewUploader(basics.S3Client, func(u *manager.Uploader) { + u.PartSize = partMiBs * 1024 * 1024 + }) + + output, err := uploader.Upload(ctx, &s3.PutObjectInput{ + Bucket: &bucket, + Key: &key, + Body: body, + ChecksumAlgorithm: "sha256", + ChecksumSHA256: &cs, }) if err == nil { events.Log("Wrote to S3 location: %s", output.Location) + } else { + events.Log("Couldn't upload s3 snapshot to %v:%v. Here's why: %v\n", + bucket, key, err) } return errors.Wrap(err, "upload with context") } -func (c *s3Snapshot) getS3Uploader() (S3Uploader, error) { - if c.s3Uploader != nil { - return c.s3Uploader, nil +func (c *s3Snapshot) getS3Client() (S3Client, error) { + if c.s3Client != nil { + return c.s3Client, nil } - sess, err := session.NewSession() + cfg, err := config.LoadDefaultConfig(context.Background()) + if err != nil { - return nil, errors.Wrap(err, "creating aws session") + panic(fmt.Sprintf("failed loading config, %v", err)) } - uploader := s3manager.NewUploader(sess) - return uploader, nil + + client := s3.NewFromConfig(cfg) + return client, nil } func archivedSnapshotFromURL(URL string) (archivedSnapshot, error) { diff --git a/pkg/supervisor/s3_uploader.go b/pkg/supervisor/s3_uploader.go index 44663092..46b996d3 100644 --- a/pkg/supervisor/s3_uploader.go +++ b/pkg/supervisor/s3_uploader.go @@ -1,8 +1,10 @@ package supervisor -import "github.com/aws/aws-sdk-go/service/s3/s3manager/s3manageriface" +import ( + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" +) //counterfeiter:generate -o fakes/s3_uploader.go . S3Uploader -type S3Uploader interface { - s3manageriface.UploaderAPI +type S3Client interface { + manager.UploadAPIClient } From a09dc8dd1907a83c810f177ee2f9cdbc51f56231 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Tue, 19 Sep 2023 16:12:35 -0400 Subject: [PATCH 02/36] open another reader to get checksum --- pkg/supervisor/archived_snapshot.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/supervisor/archived_snapshot.go b/pkg/supervisor/archived_snapshot.go index bcffe1a9..a0377a54 100644 --- a/pkg/supervisor/archived_snapshot.go +++ b/pkg/supervisor/archived_snapshot.go @@ -75,8 +75,14 @@ func (c *s3Snapshot) Upload(ctx context.Context, path string) error { } var reader io.Reader = bufio.NewReaderSize(f, 1024*32) // use a 32K buffer for reading + ff, err := os.OpenFile(path, os.O_RDONLY, 0) + if err != nil { + return errors.Wrap(err, "opening file") + } + defer ff.Close() + h := sha256.New() - if _, err := io.Copy(h, f); err != nil { + if _, err := io.Copy(h, ff); err != nil { events.Log("filed to generate snapshop hash value", err) } cs := string(h.Sum(nil)) From 38065709f3c970f123fcc81b25eaadf85057f9fb Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Tue, 19 Sep 2023 16:47:11 -0400 Subject: [PATCH 03/36] make generate --- pkg/supervisor/fakes/s3_uploader.go | 459 +++++++++++++++++++++------- pkg/supervisor/s3_uploader.go | 2 +- 2 files changed, 356 insertions(+), 105 deletions(-) diff --git a/pkg/supervisor/fakes/s3_uploader.go b/pkg/supervisor/fakes/s3_uploader.go index 93d0db9d..77aef812 100644 --- a/pkg/supervisor/fakes/s3_uploader.go +++ b/pkg/supervisor/fakes/s3_uploader.go @@ -5,57 +5,236 @@ import ( "context" "sync" - "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/segmentio/ctlstore/pkg/supervisor" ) -type FakeS3Uploader struct { - UploadStub func(*s3manager.UploadInput, ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) - uploadMutex sync.RWMutex - uploadArgsForCall []struct { - arg1 *s3manager.UploadInput - arg2 []func(*s3manager.Uploader) +type FakeS3Client struct { + AbortMultipartUploadStub func(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) + abortMultipartUploadMutex sync.RWMutex + abortMultipartUploadArgsForCall []struct { + arg1 context.Context + arg2 *s3.AbortMultipartUploadInput + arg3 []func(*s3.Options) + } + abortMultipartUploadReturns struct { + result1 *s3.AbortMultipartUploadOutput + result2 error + } + abortMultipartUploadReturnsOnCall map[int]struct { + result1 *s3.AbortMultipartUploadOutput + result2 error + } + CompleteMultipartUploadStub func(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) + completeMultipartUploadMutex sync.RWMutex + completeMultipartUploadArgsForCall []struct { + arg1 context.Context + arg2 *s3.CompleteMultipartUploadInput + arg3 []func(*s3.Options) + } + completeMultipartUploadReturns struct { + result1 *s3.CompleteMultipartUploadOutput + result2 error + } + completeMultipartUploadReturnsOnCall map[int]struct { + result1 *s3.CompleteMultipartUploadOutput + result2 error + } + CreateMultipartUploadStub func(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) + createMultipartUploadMutex sync.RWMutex + createMultipartUploadArgsForCall []struct { + arg1 context.Context + arg2 *s3.CreateMultipartUploadInput + arg3 []func(*s3.Options) + } + createMultipartUploadReturns struct { + result1 *s3.CreateMultipartUploadOutput + result2 error } - uploadReturns struct { - result1 *s3manager.UploadOutput + createMultipartUploadReturnsOnCall map[int]struct { + result1 *s3.CreateMultipartUploadOutput result2 error } - uploadReturnsOnCall map[int]struct { - result1 *s3manager.UploadOutput + PutObjectStub func(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error) + putObjectMutex sync.RWMutex + putObjectArgsForCall []struct { + arg1 context.Context + arg2 *s3.PutObjectInput + arg3 []func(*s3.Options) + } + putObjectReturns struct { + result1 *s3.PutObjectOutput result2 error } - UploadWithContextStub func(context.Context, *s3manager.UploadInput, ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) - uploadWithContextMutex sync.RWMutex - uploadWithContextArgsForCall []struct { + putObjectReturnsOnCall map[int]struct { + result1 *s3.PutObjectOutput + result2 error + } + UploadPartStub func(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) + uploadPartMutex sync.RWMutex + uploadPartArgsForCall []struct { arg1 context.Context - arg2 *s3manager.UploadInput - arg3 []func(*s3manager.Uploader) + arg2 *s3.UploadPartInput + arg3 []func(*s3.Options) } - uploadWithContextReturns struct { - result1 *s3manager.UploadOutput + uploadPartReturns struct { + result1 *s3.UploadPartOutput result2 error } - uploadWithContextReturnsOnCall map[int]struct { - result1 *s3manager.UploadOutput + uploadPartReturnsOnCall map[int]struct { + result1 *s3.UploadPartOutput result2 error } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } -func (fake *FakeS3Uploader) Upload(arg1 *s3manager.UploadInput, arg2 ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) { - fake.uploadMutex.Lock() - ret, specificReturn := fake.uploadReturnsOnCall[len(fake.uploadArgsForCall)] - fake.uploadArgsForCall = append(fake.uploadArgsForCall, struct { - arg1 *s3manager.UploadInput - arg2 []func(*s3manager.Uploader) - }{arg1, arg2}) - stub := fake.UploadStub - fakeReturns := fake.uploadReturns - fake.recordInvocation("Upload", []interface{}{arg1, arg2}) - fake.uploadMutex.Unlock() +func (fake *FakeS3Client) AbortMultipartUpload(arg1 context.Context, arg2 *s3.AbortMultipartUploadInput, arg3 ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) { + fake.abortMultipartUploadMutex.Lock() + ret, specificReturn := fake.abortMultipartUploadReturnsOnCall[len(fake.abortMultipartUploadArgsForCall)] + fake.abortMultipartUploadArgsForCall = append(fake.abortMultipartUploadArgsForCall, struct { + arg1 context.Context + arg2 *s3.AbortMultipartUploadInput + arg3 []func(*s3.Options) + }{arg1, arg2, arg3}) + stub := fake.AbortMultipartUploadStub + fakeReturns := fake.abortMultipartUploadReturns + fake.recordInvocation("AbortMultipartUpload", []interface{}{arg1, arg2, arg3}) + fake.abortMultipartUploadMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3...) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeS3Client) AbortMultipartUploadCallCount() int { + fake.abortMultipartUploadMutex.RLock() + defer fake.abortMultipartUploadMutex.RUnlock() + return len(fake.abortMultipartUploadArgsForCall) +} + +func (fake *FakeS3Client) AbortMultipartUploadCalls(stub func(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)) { + fake.abortMultipartUploadMutex.Lock() + defer fake.abortMultipartUploadMutex.Unlock() + fake.AbortMultipartUploadStub = stub +} + +func (fake *FakeS3Client) AbortMultipartUploadArgsForCall(i int) (context.Context, *s3.AbortMultipartUploadInput, []func(*s3.Options)) { + fake.abortMultipartUploadMutex.RLock() + defer fake.abortMultipartUploadMutex.RUnlock() + argsForCall := fake.abortMultipartUploadArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeS3Client) AbortMultipartUploadReturns(result1 *s3.AbortMultipartUploadOutput, result2 error) { + fake.abortMultipartUploadMutex.Lock() + defer fake.abortMultipartUploadMutex.Unlock() + fake.AbortMultipartUploadStub = nil + fake.abortMultipartUploadReturns = struct { + result1 *s3.AbortMultipartUploadOutput + result2 error + }{result1, result2} +} + +func (fake *FakeS3Client) AbortMultipartUploadReturnsOnCall(i int, result1 *s3.AbortMultipartUploadOutput, result2 error) { + fake.abortMultipartUploadMutex.Lock() + defer fake.abortMultipartUploadMutex.Unlock() + fake.AbortMultipartUploadStub = nil + if fake.abortMultipartUploadReturnsOnCall == nil { + fake.abortMultipartUploadReturnsOnCall = make(map[int]struct { + result1 *s3.AbortMultipartUploadOutput + result2 error + }) + } + fake.abortMultipartUploadReturnsOnCall[i] = struct { + result1 *s3.AbortMultipartUploadOutput + result2 error + }{result1, result2} +} + +func (fake *FakeS3Client) CompleteMultipartUpload(arg1 context.Context, arg2 *s3.CompleteMultipartUploadInput, arg3 ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) { + fake.completeMultipartUploadMutex.Lock() + ret, specificReturn := fake.completeMultipartUploadReturnsOnCall[len(fake.completeMultipartUploadArgsForCall)] + fake.completeMultipartUploadArgsForCall = append(fake.completeMultipartUploadArgsForCall, struct { + arg1 context.Context + arg2 *s3.CompleteMultipartUploadInput + arg3 []func(*s3.Options) + }{arg1, arg2, arg3}) + stub := fake.CompleteMultipartUploadStub + fakeReturns := fake.completeMultipartUploadReturns + fake.recordInvocation("CompleteMultipartUpload", []interface{}{arg1, arg2, arg3}) + fake.completeMultipartUploadMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3...) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeS3Client) CompleteMultipartUploadCallCount() int { + fake.completeMultipartUploadMutex.RLock() + defer fake.completeMultipartUploadMutex.RUnlock() + return len(fake.completeMultipartUploadArgsForCall) +} + +func (fake *FakeS3Client) CompleteMultipartUploadCalls(stub func(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)) { + fake.completeMultipartUploadMutex.Lock() + defer fake.completeMultipartUploadMutex.Unlock() + fake.CompleteMultipartUploadStub = stub +} + +func (fake *FakeS3Client) CompleteMultipartUploadArgsForCall(i int) (context.Context, *s3.CompleteMultipartUploadInput, []func(*s3.Options)) { + fake.completeMultipartUploadMutex.RLock() + defer fake.completeMultipartUploadMutex.RUnlock() + argsForCall := fake.completeMultipartUploadArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeS3Client) CompleteMultipartUploadReturns(result1 *s3.CompleteMultipartUploadOutput, result2 error) { + fake.completeMultipartUploadMutex.Lock() + defer fake.completeMultipartUploadMutex.Unlock() + fake.CompleteMultipartUploadStub = nil + fake.completeMultipartUploadReturns = struct { + result1 *s3.CompleteMultipartUploadOutput + result2 error + }{result1, result2} +} + +func (fake *FakeS3Client) CompleteMultipartUploadReturnsOnCall(i int, result1 *s3.CompleteMultipartUploadOutput, result2 error) { + fake.completeMultipartUploadMutex.Lock() + defer fake.completeMultipartUploadMutex.Unlock() + fake.CompleteMultipartUploadStub = nil + if fake.completeMultipartUploadReturnsOnCall == nil { + fake.completeMultipartUploadReturnsOnCall = make(map[int]struct { + result1 *s3.CompleteMultipartUploadOutput + result2 error + }) + } + fake.completeMultipartUploadReturnsOnCall[i] = struct { + result1 *s3.CompleteMultipartUploadOutput + result2 error + }{result1, result2} +} + +func (fake *FakeS3Client) CreateMultipartUpload(arg1 context.Context, arg2 *s3.CreateMultipartUploadInput, arg3 ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) { + fake.createMultipartUploadMutex.Lock() + ret, specificReturn := fake.createMultipartUploadReturnsOnCall[len(fake.createMultipartUploadArgsForCall)] + fake.createMultipartUploadArgsForCall = append(fake.createMultipartUploadArgsForCall, struct { + arg1 context.Context + arg2 *s3.CreateMultipartUploadInput + arg3 []func(*s3.Options) + }{arg1, arg2, arg3}) + stub := fake.CreateMultipartUploadStub + fakeReturns := fake.createMultipartUploadReturns + fake.recordInvocation("CreateMultipartUpload", []interface{}{arg1, arg2, arg3}) + fake.createMultipartUploadMutex.Unlock() if stub != nil { - return stub(arg1, arg2...) + return stub(arg1, arg2, arg3...) } if specificReturn { return ret.result1, ret.result2 @@ -63,63 +242,129 @@ func (fake *FakeS3Uploader) Upload(arg1 *s3manager.UploadInput, arg2 ...func(*s3 return fakeReturns.result1, fakeReturns.result2 } -func (fake *FakeS3Uploader) UploadCallCount() int { - fake.uploadMutex.RLock() - defer fake.uploadMutex.RUnlock() - return len(fake.uploadArgsForCall) +func (fake *FakeS3Client) CreateMultipartUploadCallCount() int { + fake.createMultipartUploadMutex.RLock() + defer fake.createMultipartUploadMutex.RUnlock() + return len(fake.createMultipartUploadArgsForCall) } -func (fake *FakeS3Uploader) UploadCalls(stub func(*s3manager.UploadInput, ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)) { - fake.uploadMutex.Lock() - defer fake.uploadMutex.Unlock() - fake.UploadStub = stub +func (fake *FakeS3Client) CreateMultipartUploadCalls(stub func(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)) { + fake.createMultipartUploadMutex.Lock() + defer fake.createMultipartUploadMutex.Unlock() + fake.CreateMultipartUploadStub = stub +} + +func (fake *FakeS3Client) CreateMultipartUploadArgsForCall(i int) (context.Context, *s3.CreateMultipartUploadInput, []func(*s3.Options)) { + fake.createMultipartUploadMutex.RLock() + defer fake.createMultipartUploadMutex.RUnlock() + argsForCall := fake.createMultipartUploadArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeS3Uploader) UploadArgsForCall(i int) (*s3manager.UploadInput, []func(*s3manager.Uploader)) { - fake.uploadMutex.RLock() - defer fake.uploadMutex.RUnlock() - argsForCall := fake.uploadArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 +func (fake *FakeS3Client) CreateMultipartUploadReturns(result1 *s3.CreateMultipartUploadOutput, result2 error) { + fake.createMultipartUploadMutex.Lock() + defer fake.createMultipartUploadMutex.Unlock() + fake.CreateMultipartUploadStub = nil + fake.createMultipartUploadReturns = struct { + result1 *s3.CreateMultipartUploadOutput + result2 error + }{result1, result2} +} + +func (fake *FakeS3Client) CreateMultipartUploadReturnsOnCall(i int, result1 *s3.CreateMultipartUploadOutput, result2 error) { + fake.createMultipartUploadMutex.Lock() + defer fake.createMultipartUploadMutex.Unlock() + fake.CreateMultipartUploadStub = nil + if fake.createMultipartUploadReturnsOnCall == nil { + fake.createMultipartUploadReturnsOnCall = make(map[int]struct { + result1 *s3.CreateMultipartUploadOutput + result2 error + }) + } + fake.createMultipartUploadReturnsOnCall[i] = struct { + result1 *s3.CreateMultipartUploadOutput + result2 error + }{result1, result2} +} + +func (fake *FakeS3Client) PutObject(arg1 context.Context, arg2 *s3.PutObjectInput, arg3 ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + fake.putObjectMutex.Lock() + ret, specificReturn := fake.putObjectReturnsOnCall[len(fake.putObjectArgsForCall)] + fake.putObjectArgsForCall = append(fake.putObjectArgsForCall, struct { + arg1 context.Context + arg2 *s3.PutObjectInput + arg3 []func(*s3.Options) + }{arg1, arg2, arg3}) + stub := fake.PutObjectStub + fakeReturns := fake.putObjectReturns + fake.recordInvocation("PutObject", []interface{}{arg1, arg2, arg3}) + fake.putObjectMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3...) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeS3Client) PutObjectCallCount() int { + fake.putObjectMutex.RLock() + defer fake.putObjectMutex.RUnlock() + return len(fake.putObjectArgsForCall) +} + +func (fake *FakeS3Client) PutObjectCalls(stub func(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error)) { + fake.putObjectMutex.Lock() + defer fake.putObjectMutex.Unlock() + fake.PutObjectStub = stub +} + +func (fake *FakeS3Client) PutObjectArgsForCall(i int) (context.Context, *s3.PutObjectInput, []func(*s3.Options)) { + fake.putObjectMutex.RLock() + defer fake.putObjectMutex.RUnlock() + argsForCall := fake.putObjectArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeS3Uploader) UploadReturns(result1 *s3manager.UploadOutput, result2 error) { - fake.uploadMutex.Lock() - defer fake.uploadMutex.Unlock() - fake.UploadStub = nil - fake.uploadReturns = struct { - result1 *s3manager.UploadOutput +func (fake *FakeS3Client) PutObjectReturns(result1 *s3.PutObjectOutput, result2 error) { + fake.putObjectMutex.Lock() + defer fake.putObjectMutex.Unlock() + fake.PutObjectStub = nil + fake.putObjectReturns = struct { + result1 *s3.PutObjectOutput result2 error }{result1, result2} } -func (fake *FakeS3Uploader) UploadReturnsOnCall(i int, result1 *s3manager.UploadOutput, result2 error) { - fake.uploadMutex.Lock() - defer fake.uploadMutex.Unlock() - fake.UploadStub = nil - if fake.uploadReturnsOnCall == nil { - fake.uploadReturnsOnCall = make(map[int]struct { - result1 *s3manager.UploadOutput +func (fake *FakeS3Client) PutObjectReturnsOnCall(i int, result1 *s3.PutObjectOutput, result2 error) { + fake.putObjectMutex.Lock() + defer fake.putObjectMutex.Unlock() + fake.PutObjectStub = nil + if fake.putObjectReturnsOnCall == nil { + fake.putObjectReturnsOnCall = make(map[int]struct { + result1 *s3.PutObjectOutput result2 error }) } - fake.uploadReturnsOnCall[i] = struct { - result1 *s3manager.UploadOutput + fake.putObjectReturnsOnCall[i] = struct { + result1 *s3.PutObjectOutput result2 error }{result1, result2} } -func (fake *FakeS3Uploader) UploadWithContext(arg1 context.Context, arg2 *s3manager.UploadInput, arg3 ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) { - fake.uploadWithContextMutex.Lock() - ret, specificReturn := fake.uploadWithContextReturnsOnCall[len(fake.uploadWithContextArgsForCall)] - fake.uploadWithContextArgsForCall = append(fake.uploadWithContextArgsForCall, struct { +func (fake *FakeS3Client) UploadPart(arg1 context.Context, arg2 *s3.UploadPartInput, arg3 ...func(*s3.Options)) (*s3.UploadPartOutput, error) { + fake.uploadPartMutex.Lock() + ret, specificReturn := fake.uploadPartReturnsOnCall[len(fake.uploadPartArgsForCall)] + fake.uploadPartArgsForCall = append(fake.uploadPartArgsForCall, struct { arg1 context.Context - arg2 *s3manager.UploadInput - arg3 []func(*s3manager.Uploader) + arg2 *s3.UploadPartInput + arg3 []func(*s3.Options) }{arg1, arg2, arg3}) - stub := fake.UploadWithContextStub - fakeReturns := fake.uploadWithContextReturns - fake.recordInvocation("UploadWithContext", []interface{}{arg1, arg2, arg3}) - fake.uploadWithContextMutex.Unlock() + stub := fake.UploadPartStub + fakeReturns := fake.uploadPartReturns + fake.recordInvocation("UploadPart", []interface{}{arg1, arg2, arg3}) + fake.uploadPartMutex.Unlock() if stub != nil { return stub(arg1, arg2, arg3...) } @@ -129,58 +374,64 @@ func (fake *FakeS3Uploader) UploadWithContext(arg1 context.Context, arg2 *s3mana return fakeReturns.result1, fakeReturns.result2 } -func (fake *FakeS3Uploader) UploadWithContextCallCount() int { - fake.uploadWithContextMutex.RLock() - defer fake.uploadWithContextMutex.RUnlock() - return len(fake.uploadWithContextArgsForCall) +func (fake *FakeS3Client) UploadPartCallCount() int { + fake.uploadPartMutex.RLock() + defer fake.uploadPartMutex.RUnlock() + return len(fake.uploadPartArgsForCall) } -func (fake *FakeS3Uploader) UploadWithContextCalls(stub func(context.Context, *s3manager.UploadInput, ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)) { - fake.uploadWithContextMutex.Lock() - defer fake.uploadWithContextMutex.Unlock() - fake.UploadWithContextStub = stub +func (fake *FakeS3Client) UploadPartCalls(stub func(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)) { + fake.uploadPartMutex.Lock() + defer fake.uploadPartMutex.Unlock() + fake.UploadPartStub = stub } -func (fake *FakeS3Uploader) UploadWithContextArgsForCall(i int) (context.Context, *s3manager.UploadInput, []func(*s3manager.Uploader)) { - fake.uploadWithContextMutex.RLock() - defer fake.uploadWithContextMutex.RUnlock() - argsForCall := fake.uploadWithContextArgsForCall[i] +func (fake *FakeS3Client) UploadPartArgsForCall(i int) (context.Context, *s3.UploadPartInput, []func(*s3.Options)) { + fake.uploadPartMutex.RLock() + defer fake.uploadPartMutex.RUnlock() + argsForCall := fake.uploadPartArgsForCall[i] return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeS3Uploader) UploadWithContextReturns(result1 *s3manager.UploadOutput, result2 error) { - fake.uploadWithContextMutex.Lock() - defer fake.uploadWithContextMutex.Unlock() - fake.UploadWithContextStub = nil - fake.uploadWithContextReturns = struct { - result1 *s3manager.UploadOutput +func (fake *FakeS3Client) UploadPartReturns(result1 *s3.UploadPartOutput, result2 error) { + fake.uploadPartMutex.Lock() + defer fake.uploadPartMutex.Unlock() + fake.UploadPartStub = nil + fake.uploadPartReturns = struct { + result1 *s3.UploadPartOutput result2 error }{result1, result2} } -func (fake *FakeS3Uploader) UploadWithContextReturnsOnCall(i int, result1 *s3manager.UploadOutput, result2 error) { - fake.uploadWithContextMutex.Lock() - defer fake.uploadWithContextMutex.Unlock() - fake.UploadWithContextStub = nil - if fake.uploadWithContextReturnsOnCall == nil { - fake.uploadWithContextReturnsOnCall = make(map[int]struct { - result1 *s3manager.UploadOutput +func (fake *FakeS3Client) UploadPartReturnsOnCall(i int, result1 *s3.UploadPartOutput, result2 error) { + fake.uploadPartMutex.Lock() + defer fake.uploadPartMutex.Unlock() + fake.UploadPartStub = nil + if fake.uploadPartReturnsOnCall == nil { + fake.uploadPartReturnsOnCall = make(map[int]struct { + result1 *s3.UploadPartOutput result2 error }) } - fake.uploadWithContextReturnsOnCall[i] = struct { - result1 *s3manager.UploadOutput + fake.uploadPartReturnsOnCall[i] = struct { + result1 *s3.UploadPartOutput result2 error }{result1, result2} } -func (fake *FakeS3Uploader) Invocations() map[string][][]interface{} { +func (fake *FakeS3Client) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.uploadMutex.RLock() - defer fake.uploadMutex.RUnlock() - fake.uploadWithContextMutex.RLock() - defer fake.uploadWithContextMutex.RUnlock() + fake.abortMultipartUploadMutex.RLock() + defer fake.abortMultipartUploadMutex.RUnlock() + fake.completeMultipartUploadMutex.RLock() + defer fake.completeMultipartUploadMutex.RUnlock() + fake.createMultipartUploadMutex.RLock() + defer fake.createMultipartUploadMutex.RUnlock() + fake.putObjectMutex.RLock() + defer fake.putObjectMutex.RUnlock() + fake.uploadPartMutex.RLock() + defer fake.uploadPartMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value @@ -188,7 +439,7 @@ func (fake *FakeS3Uploader) Invocations() map[string][][]interface{} { return copiedInvocations } -func (fake *FakeS3Uploader) recordInvocation(key string, args []interface{}) { +func (fake *FakeS3Client) recordInvocation(key string, args []interface{}) { fake.invocationsMutex.Lock() defer fake.invocationsMutex.Unlock() if fake.invocations == nil { @@ -200,4 +451,4 @@ func (fake *FakeS3Uploader) recordInvocation(key string, args []interface{}) { fake.invocations[key] = append(fake.invocations[key], args) } -var _ supervisor.S3Uploader = new(FakeS3Uploader) +var _ supervisor.S3Client = new(FakeS3Client) diff --git a/pkg/supervisor/s3_uploader.go b/pkg/supervisor/s3_uploader.go index 46b996d3..601ceacb 100644 --- a/pkg/supervisor/s3_uploader.go +++ b/pkg/supervisor/s3_uploader.go @@ -4,7 +4,7 @@ import ( "github.com/aws/aws-sdk-go-v2/feature/s3/manager" ) -//counterfeiter:generate -o fakes/s3_uploader.go . S3Uploader +//counterfeiter:generate -o fakes/s3_uploader.go . S3Client type S3Client interface { manager.UploadAPIClient } From 4b4de0d985d7c04bdf88f9fef5311bed2d6cc8c6 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Wed, 20 Sep 2023 09:46:59 -0400 Subject: [PATCH 04/36] refactoring --- pkg/supervisor/archived_snapshot.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/pkg/supervisor/archived_snapshot.go b/pkg/supervisor/archived_snapshot.go index a0377a54..4591f29e 100644 --- a/pkg/supervisor/archived_snapshot.go +++ b/pkg/supervisor/archived_snapshot.go @@ -75,17 +75,10 @@ func (c *s3Snapshot) Upload(ctx context.Context, path string) error { } var reader io.Reader = bufio.NewReaderSize(f, 1024*32) // use a 32K buffer for reading - ff, err := os.OpenFile(path, os.O_RDONLY, 0) + cs, err := getCheckSum(path) if err != nil { - return errors.Wrap(err, "opening file") - } - defer ff.Close() - - h := sha256.New() - if _, err := io.Copy(h, ff); err != nil { - events.Log("filed to generate snapshop hash value", err) + return errors.Wrap(err, "generate file CheckSum") } - cs := string(h.Sum(nil)) var gpr *gzipCompressionReader if strings.HasSuffix(key, ".gz") { @@ -113,6 +106,22 @@ func (c *s3Snapshot) Upload(ctx context.Context, path string) error { return nil } +func getCheckSum(path string) (string, error) { + f, err := os.OpenFile(path, os.O_RDONLY, 0) + if err != nil { + return "", errors.Wrap(err, "opening file") + } + defer f.Close() + + h := sha256.New() + if _, err := io.Copy(h, f); err != nil { + events.Log("failed to generate sha256", err) + } + cs := string(h.Sum(nil)) + + return cs, nil +} + func isCompressed(gpr *gzipCompressionReader) string { if gpr == nil { return "false" From f1452df4fe0058c91431fd392b0f5011fbcbf7db Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Wed, 20 Sep 2023 15:53:39 -0400 Subject: [PATCH 05/36] hex encoding --- pkg/supervisor/archived_snapshot.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/supervisor/archived_snapshot.go b/pkg/supervisor/archived_snapshot.go index 4591f29e..9c8baf97 100644 --- a/pkg/supervisor/archived_snapshot.go +++ b/pkg/supervisor/archived_snapshot.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "crypto/sha256" + "encoding/hex" "fmt" "io" "net/url" @@ -117,7 +118,7 @@ func getCheckSum(path string) (string, error) { if _, err := io.Copy(h, f); err != nil { events.Log("failed to generate sha256", err) } - cs := string(h.Sum(nil)) + cs := hex.EncodeToString(h.Sum(nil)) return cs, nil } From 9e84749ea37ebbe67c0e15814862e0c776524734 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Wed, 20 Sep 2023 16:48:44 -0400 Subject: [PATCH 06/36] don't calculate --- pkg/supervisor/archived_snapshot.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/supervisor/archived_snapshot.go b/pkg/supervisor/archived_snapshot.go index 9c8baf97..2f04581b 100644 --- a/pkg/supervisor/archived_snapshot.go +++ b/pkg/supervisor/archived_snapshot.go @@ -4,7 +4,7 @@ import ( "bufio" "context" "crypto/sha256" - "encoding/hex" + "encoding/base64" "fmt" "io" "net/url" @@ -118,7 +118,10 @@ func getCheckSum(path string) (string, error) { if _, err := io.Copy(h, f); err != nil { events.Log("failed to generate sha256", err) } - cs := hex.EncodeToString(h.Sum(nil)) + + cs := base64.StdEncoding.EncodeToString(h.Sum(nil)) + + events.Log("base64", cs) return cs, nil } @@ -157,7 +160,7 @@ func (c *s3Snapshot) sendToS3(ctx context.Context, key string, bucket string, bo Key: &key, Body: body, ChecksumAlgorithm: "sha256", - ChecksumSHA256: &cs, + //ChecksumSHA256: &cs, }) if err == nil { events.Log("Wrote to S3 location: %s", output.Location) From 6cf8f0ba29c7d0fd45de67e0377bca906b6c407c Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Wed, 20 Sep 2023 17:42:02 -0400 Subject: [PATCH 07/36] use metadata --- pkg/supervisor/archived_snapshot.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/supervisor/archived_snapshot.go b/pkg/supervisor/archived_snapshot.go index 2f04581b..f2fd1389 100644 --- a/pkg/supervisor/archived_snapshot.go +++ b/pkg/supervisor/archived_snapshot.go @@ -120,8 +120,7 @@ func getCheckSum(path string) (string, error) { } cs := base64.StdEncoding.EncodeToString(h.Sum(nil)) - - events.Log("base64", cs) + events.Log("base64 encoding: %s", cs) return cs, nil } @@ -160,7 +159,9 @@ func (c *s3Snapshot) sendToS3(ctx context.Context, key string, bucket string, bo Key: &key, Body: body, ChecksumAlgorithm: "sha256", - //ChecksumSHA256: &cs, + Metadata: map[string]string{ + "checksum": cs, + }, }) if err == nil { events.Log("Wrote to S3 location: %s", output.Location) From aaca5d0e8576bc52dd5feb7f83bccd8607413d8e Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Thu, 21 Sep 2023 11:10:48 -0400 Subject: [PATCH 08/36] WIP test --- scripts/download.sh | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/scripts/download.sh b/scripts/download.sh index 500eb7bf..4e4b7d7b 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -7,6 +7,7 @@ CONCURRENCY=${2:-20} DOWNLOADED="false" COMPRESSED="false" METRICS="/var/spool/ctlstore/metrics.json" +SHASUM="" START=$(date +%s) END=$(date +%s) @@ -16,8 +17,22 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then mkdir -p /var/spool/ctlstore cd /var/spool/ctlstore + + + PREFIX="$(echo $CTLSTORE_BOOTSTRAP_URL | grep :// | sed -e's,^\(.*://\).*,\1,g')" + URL="$(echo $CTLSTORE_BOOTSTRAP_URL | sed -e s,$PREFIX,,g)" + BUCKET="$(echo $URL | grep / | cut -d/ -f1)" + KEY="$(echo $URL | grep / | cut -d/ -f2)" + + aws s3api head-object \ + --bucket "${BUCKET}" \ + --key "${KEY}" + s5cmd -r 0 --log debug cp --concurrency $CONCURRENCY $CTLSTORE_BOOTSTRAP_URL . + SHASUM=$(shasum -a 256 $CTLSTORE_BOOTSTRAP_URL | cut -f1 -d\ | xxd -r -p | base64) + echo "Sha value of the downloaded file: $(($SHASUM))" + DOWNLOADED="true" if [[ ${CTLSTORE_BOOTSTRAP_URL: -2} == gz ]]; then echo "Decompressing" @@ -29,6 +44,18 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then END=$(date +%s) echo "ldb.db ready in $(($END - $START)) seconds" else + + CTLSTORE_BOOTSTRAP_URL="s3://segment-ctlstore-snapshots-stage/snapshot.db" + + PREFIX="$(echo $CTLSTORE_BOOTSTRAP_URL | grep :// | sed -e's,^\(.*://\).*,\1,g')" + URL="$(echo $CTLSTORE_BOOTSTRAP_URL | sed -e s,$PREFIX,,g)" + BUCKET="$(echo $URL | grep / | cut -d/ -f1)" + KEY="$(echo $URL | grep / | cut -d/ -f2)" + + aws s3api head-object \ + --bucket "${BUCKET}" \ + --key "${KEY}" + echo "Snapshot already present" fi From 0fe8ff665dbf317b3e07522aefe3c9181033b691 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Thu, 21 Sep 2023 16:14:45 -0400 Subject: [PATCH 09/36] wip --- Dockerfile | 1 + scripts/download.sh | 3 +++ 2 files changed, 4 insertions(+) diff --git a/Dockerfile b/Dockerfile index 72e9c3d8..706b61d5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,5 +24,6 @@ RUN apk --no-cache add sqlite pigz COPY --from=0 /go/src/github.com/segmentio/ctlstore/scripts/download.sh . COPY --from=0 /bin/chamber /bin/chamber COPY --from=0 /bin/s5cmd /bin/s5cmd +COPY --from=0 /bin/aws /bin/aws COPY --from=0 /usr/local/bin/ctlstore /usr/local/bin/ COPY --from=0 /usr/local/bin/ctlstore-cli /usr/local/bin/ diff --git a/scripts/download.sh b/scripts/download.sh index 4e4b7d7b..1d611ccf 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -52,6 +52,9 @@ else BUCKET="$(echo $URL | grep / | cut -d/ -f1)" KEY="$(echo $URL | grep / | cut -d/ -f2)" + SHASUM=$(shasum -a 256 ldb.db | cut -f1 -d\ | xxd -r -p | base64) + echo "Sha value of the downloaded file: $(($SHASUM))" + aws s3api head-object \ --bucket "${BUCKET}" \ --key "${KEY}" From 126e1eefbee06ef49b305efc7154f7aa9af0bac3 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Thu, 21 Sep 2023 16:51:00 -0400 Subject: [PATCH 10/36] wip --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index 706b61d5..e792adfc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,6 +5,7 @@ ARG VERSION RUN apk --update add gcc git curl alpine-sdk libc6-compat ca-certificates sqlite \ && curl -SsL https://github.com/segmentio/chamber/releases/download/v2.13.2/chamber-v2.13.2-linux-amd64 -o /bin/chamber \ && curl -sL https://github.com/peak/s5cmd/releases/download/v2.1.0/s5cmd_2.1.0_Linux-64bit.tar.gz -o s5cmd.gz && tar -xzf s5cmd.gz -C /bin \ + && curl -sL https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip -o awscliv2.zip && unzip awscliv2.zip && ./aws/install \ && chmod +x /bin/chamber \ && chmod +x /bin/s5cmd From fe3b1a3f2cd3c2f37f3d5fdd4281cfed71e6db2f Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Thu, 21 Sep 2023 17:38:38 -0400 Subject: [PATCH 11/36] WIP --- Dockerfile | 1 - 1 file changed, 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index e792adfc..3baa0a54 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,6 +25,5 @@ RUN apk --no-cache add sqlite pigz COPY --from=0 /go/src/github.com/segmentio/ctlstore/scripts/download.sh . COPY --from=0 /bin/chamber /bin/chamber COPY --from=0 /bin/s5cmd /bin/s5cmd -COPY --from=0 /bin/aws /bin/aws COPY --from=0 /usr/local/bin/ctlstore /usr/local/bin/ COPY --from=0 /usr/local/bin/ctlstore-cli /usr/local/bin/ From 9a8fd2970cd7e59f5e9096688ea18ef2d16eb8bb Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Thu, 21 Sep 2023 18:22:02 -0400 Subject: [PATCH 12/36] WIP --- Dockerfile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Dockerfile b/Dockerfile index 3baa0a54..87130a53 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,6 +2,9 @@ FROM golang:1.20-alpine ENV SRC github.com/segmentio/ctlstore ARG VERSION +RUN yum update && yum install perl-Digest-SHA +RUN shasum -v + RUN apk --update add gcc git curl alpine-sdk libc6-compat ca-certificates sqlite \ && curl -SsL https://github.com/segmentio/chamber/releases/download/v2.13.2/chamber-v2.13.2-linux-amd64 -o /bin/chamber \ && curl -sL https://github.com/peak/s5cmd/releases/download/v2.1.0/s5cmd_2.1.0_Linux-64bit.tar.gz -o s5cmd.gz && tar -xzf s5cmd.gz -C /bin \ From 4d7e765d84dfbc321a04b266ea3349afd15da1b6 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Thu, 21 Sep 2023 18:28:09 -0400 Subject: [PATCH 13/36] WIP --- Dockerfile | 4 ++-- scripts/download.sh | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 87130a53..54acea6a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,8 +2,8 @@ FROM golang:1.20-alpine ENV SRC github.com/segmentio/ctlstore ARG VERSION -RUN yum update && yum install perl-Digest-SHA -RUN shasum -v +#RUN yum update && yum install perl-Digest-SHA +#RUN shasum -v RUN apk --update add gcc git curl alpine-sdk libc6-compat ca-certificates sqlite \ && curl -SsL https://github.com/segmentio/chamber/releases/download/v2.13.2/chamber-v2.13.2-linux-amd64 -o /bin/chamber \ diff --git a/scripts/download.sh b/scripts/download.sh index 1d611ccf..01e730cb 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -52,8 +52,8 @@ else BUCKET="$(echo $URL | grep / | cut -d/ -f1)" KEY="$(echo $URL | grep / | cut -d/ -f2)" - SHASUM=$(shasum -a 256 ldb.db | cut -f1 -d\ | xxd -r -p | base64) - echo "Sha value of the downloaded file: $(($SHASUM))" +# SHASUM=$(shasum -a 256 ldb.db | cut -f1 -d\ | xxd -r -p | base64) +# echo "Sha value of the downloaded file: $(($SHASUM))" aws s3api head-object \ --bucket "${BUCKET}" \ From f12a80a33b7dc8028a36579c423ea8f234d69da7 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Fri, 22 Sep 2023 10:40:35 -0400 Subject: [PATCH 14/36] fix install --- Dockerfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 54acea6a..1983060c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,7 +8,7 @@ ARG VERSION RUN apk --update add gcc git curl alpine-sdk libc6-compat ca-certificates sqlite \ && curl -SsL https://github.com/segmentio/chamber/releases/download/v2.13.2/chamber-v2.13.2-linux-amd64 -o /bin/chamber \ && curl -sL https://github.com/peak/s5cmd/releases/download/v2.1.0/s5cmd_2.1.0_Linux-64bit.tar.gz -o s5cmd.gz && tar -xzf s5cmd.gz -C /bin \ - && curl -sL https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip -o awscliv2.zip && unzip awscliv2.zip && ./aws/install \ + && curl -sL https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip -o awscliv2.zip && unzip awscliv2.zip -d /bin && /bin/aws/install \ && chmod +x /bin/chamber \ && chmod +x /bin/s5cmd @@ -28,5 +28,6 @@ RUN apk --no-cache add sqlite pigz COPY --from=0 /go/src/github.com/segmentio/ctlstore/scripts/download.sh . COPY --from=0 /bin/chamber /bin/chamber COPY --from=0 /bin/s5cmd /bin/s5cmd +COPY --from=0 /bin/aws /bin/aws COPY --from=0 /usr/local/bin/ctlstore /usr/local/bin/ COPY --from=0 /usr/local/bin/ctlstore-cli /usr/local/bin/ From e6bd136f155a1e75752531b214ab0a2b3c93c7e5 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Fri, 22 Sep 2023 11:28:08 -0400 Subject: [PATCH 15/36] WIP --- scripts/download.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/download.sh b/scripts/download.sh index 01e730cb..5c426266 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -55,9 +55,9 @@ else # SHASUM=$(shasum -a 256 ldb.db | cut -f1 -d\ | xxd -r -p | base64) # echo "Sha value of the downloaded file: $(($SHASUM))" - aws s3api head-object \ - --bucket "${BUCKET}" \ - --key "${KEY}" +# aws s3api head-object \ +# --bucket "${BUCKET}" \ +# --key "${KEY}" echo "Snapshot already present" fi From fb8f501258759da5b5ca20846b7b35ef0b369488 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Fri, 22 Sep 2023 12:03:54 -0400 Subject: [PATCH 16/36] wip --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 1983060c..c0cb8117 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,7 +27,7 @@ RUN apk --no-cache add sqlite pigz COPY --from=0 /go/src/github.com/segmentio/ctlstore/scripts/download.sh . COPY --from=0 /bin/chamber /bin/chamber -COPY --from=0 /bin/s5cmd /bin/s5cmd +#COPY --from=0 /bin/s5cmd /bin/s5cmd COPY --from=0 /bin/aws /bin/aws COPY --from=0 /usr/local/bin/ctlstore /usr/local/bin/ COPY --from=0 /usr/local/bin/ctlstore-cli /usr/local/bin/ From e3e213cd03661e95a6c528e309d0c562523dc2e0 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Fri, 22 Sep 2023 14:05:33 -0400 Subject: [PATCH 17/36] wip --- Dockerfile | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index c0cb8117..b59bc282 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,7 +8,7 @@ ARG VERSION RUN apk --update add gcc git curl alpine-sdk libc6-compat ca-certificates sqlite \ && curl -SsL https://github.com/segmentio/chamber/releases/download/v2.13.2/chamber-v2.13.2-linux-amd64 -o /bin/chamber \ && curl -sL https://github.com/peak/s5cmd/releases/download/v2.1.0/s5cmd_2.1.0_Linux-64bit.tar.gz -o s5cmd.gz && tar -xzf s5cmd.gz -C /bin \ - && curl -sL https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip -o awscliv2.zip && unzip awscliv2.zip -d /bin && /bin/aws/install \ + && curl -sL https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip -o awscliv2.zip && unzip awscliv2.zip -d /bin \ && chmod +x /bin/chamber \ && chmod +x /bin/s5cmd @@ -27,7 +27,8 @@ RUN apk --no-cache add sqlite pigz COPY --from=0 /go/src/github.com/segmentio/ctlstore/scripts/download.sh . COPY --from=0 /bin/chamber /bin/chamber -#COPY --from=0 /bin/s5cmd /bin/s5cmd +COPY --from=0 /bin/s5cmd /bin/s5cmd COPY --from=0 /bin/aws /bin/aws +RUN /bin/aws/install COPY --from=0 /usr/local/bin/ctlstore /usr/local/bin/ COPY --from=0 /usr/local/bin/ctlstore-cli /usr/local/bin/ From 5aa21300bfc8fa83174405fa744dea95cad9f29f Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Fri, 22 Sep 2023 15:49:07 -0400 Subject: [PATCH 18/36] chmod --- Dockerfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index b59bc282..9fe212a9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,9 +8,10 @@ ARG VERSION RUN apk --update add gcc git curl alpine-sdk libc6-compat ca-certificates sqlite \ && curl -SsL https://github.com/segmentio/chamber/releases/download/v2.13.2/chamber-v2.13.2-linux-amd64 -o /bin/chamber \ && curl -sL https://github.com/peak/s5cmd/releases/download/v2.1.0/s5cmd_2.1.0_Linux-64bit.tar.gz -o s5cmd.gz && tar -xzf s5cmd.gz -C /bin \ - && curl -sL https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip -o awscliv2.zip && unzip awscliv2.zip -d /bin \ + && curl -sL https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip -o awscliv2.zip && unzip awscliv2.zip -d /bin && /bin/aws/install \ && chmod +x /bin/chamber \ - && chmod +x /bin/s5cmd + && chmod +x /bin/s5cmd \ + && chmod +x /bin/aws COPY . /go/src/${SRC} @@ -29,6 +30,5 @@ COPY --from=0 /go/src/github.com/segmentio/ctlstore/scripts/download.sh . COPY --from=0 /bin/chamber /bin/chamber COPY --from=0 /bin/s5cmd /bin/s5cmd COPY --from=0 /bin/aws /bin/aws -RUN /bin/aws/install COPY --from=0 /usr/local/bin/ctlstore /usr/local/bin/ COPY --from=0 /usr/local/bin/ctlstore-cli /usr/local/bin/ From 3625b81f2085682d5ea1c893cab89d822c3857cb Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Fri, 22 Sep 2023 17:00:46 -0400 Subject: [PATCH 19/36] install s3cmd from alpine --- Dockerfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 9fe212a9..7e46e999 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,7 +24,8 @@ RUN CGO_ENABLED=1 go install -ldflags="-X github.com/segmentio/ctlstore/pkg/vers && cp ${GOPATH}/bin/ctlstore-cli /usr/local/bin FROM alpine -RUN apk --no-cache add sqlite pigz +RUN apk --no-cache add sqlite pigz py-pip \ + && pip install s3cmd COPY --from=0 /go/src/github.com/segmentio/ctlstore/scripts/download.sh . COPY --from=0 /bin/chamber /bin/chamber From 2b1a032f494116f31ae3d85b334a1d9cdcacf3ff Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Fri, 22 Sep 2023 17:46:17 -0400 Subject: [PATCH 20/36] install aws-cli in alpine --- Dockerfile | 12 +++++------- scripts/download.sh | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/Dockerfile b/Dockerfile index 7e46e999..b8fdff03 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,16 +2,12 @@ FROM golang:1.20-alpine ENV SRC github.com/segmentio/ctlstore ARG VERSION -#RUN yum update && yum install perl-Digest-SHA -#RUN shasum -v RUN apk --update add gcc git curl alpine-sdk libc6-compat ca-certificates sqlite \ && curl -SsL https://github.com/segmentio/chamber/releases/download/v2.13.2/chamber-v2.13.2-linux-amd64 -o /bin/chamber \ && curl -sL https://github.com/peak/s5cmd/releases/download/v2.1.0/s5cmd_2.1.0_Linux-64bit.tar.gz -o s5cmd.gz && tar -xzf s5cmd.gz -C /bin \ - && curl -sL https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip -o awscliv2.zip && unzip awscliv2.zip -d /bin && /bin/aws/install \ && chmod +x /bin/chamber \ - && chmod +x /bin/s5cmd \ - && chmod +x /bin/aws + && chmod +x /bin/s5cmd COPY . /go/src/${SRC} @@ -23,13 +19,15 @@ RUN CGO_ENABLED=1 go install -ldflags="-X github.com/segmentio/ctlstore/pkg/vers RUN CGO_ENABLED=1 go install -ldflags="-X github.com/segmentio/ctlstore/pkg/version.version=$VERSION" ${SRC}/pkg/cmd/ctlstore-cli \ && cp ${GOPATH}/bin/ctlstore-cli /usr/local/bin +#FROM fedora:34 +#RUN yum -y install perl-Digest-SHA + FROM alpine -RUN apk --no-cache add sqlite pigz py-pip \ +RUN apk --no-cache add sqlite pigz aws-cli py-pip \ && pip install s3cmd COPY --from=0 /go/src/github.com/segmentio/ctlstore/scripts/download.sh . COPY --from=0 /bin/chamber /bin/chamber COPY --from=0 /bin/s5cmd /bin/s5cmd -COPY --from=0 /bin/aws /bin/aws COPY --from=0 /usr/local/bin/ctlstore /usr/local/bin/ COPY --from=0 /usr/local/bin/ctlstore-cli /usr/local/bin/ diff --git a/scripts/download.sh b/scripts/download.sh index 5c426266..e83d8ad8 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -45,7 +45,7 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then echo "ldb.db ready in $(($END - $START)) seconds" else - CTLSTORE_BOOTSTRAP_URL="s3://segment-ctlstore-snapshots-stage/snapshot.db" + CTLSTORE_BOOTSTRAP_URL="s3://segment-ctlstore-snapshots-stage-euw1/snapshot.db" PREFIX="$(echo $CTLSTORE_BOOTSTRAP_URL | grep :// | sed -e's,^\(.*://\).*,\1,g')" URL="$(echo $CTLSTORE_BOOTSTRAP_URL | sed -e s,$PREFIX,,g)" From d33084b78e56b8a2f387ab0afd659458b5d7152e Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Fri, 22 Sep 2023 18:34:31 -0400 Subject: [PATCH 21/36] install shasum in alpine --- Dockerfile | 2 +- scripts/download.sh | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index b8fdff03..0c849066 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,7 +23,7 @@ RUN CGO_ENABLED=1 go install -ldflags="-X github.com/segmentio/ctlstore/pkg/vers #RUN yum -y install perl-Digest-SHA FROM alpine -RUN apk --no-cache add sqlite pigz aws-cli py-pip \ +RUN apk --no-cache add sqlite pigz aws-cli py-pip perl-utils \ && pip install s3cmd COPY --from=0 /go/src/github.com/segmentio/ctlstore/scripts/download.sh . diff --git a/scripts/download.sh b/scripts/download.sh index e83d8ad8..b8337d7c 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -55,9 +55,9 @@ else # SHASUM=$(shasum -a 256 ldb.db | cut -f1 -d\ | xxd -r -p | base64) # echo "Sha value of the downloaded file: $(($SHASUM))" -# aws s3api head-object \ -# --bucket "${BUCKET}" \ -# --key "${KEY}" + aws s3api head-object \ + --bucket "${BUCKET}" \ + --key "${KEY}" echo "Snapshot already present" fi From b63675a657b07543bdee00ee820173675221dd36 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Fri, 22 Sep 2023 18:55:24 -0400 Subject: [PATCH 22/36] wip --- scripts/download.sh | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scripts/download.sh b/scripts/download.sh index b8337d7c..b36e04c3 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -30,9 +30,6 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then s5cmd -r 0 --log debug cp --concurrency $CONCURRENCY $CTLSTORE_BOOTSTRAP_URL . - SHASUM=$(shasum -a 256 $CTLSTORE_BOOTSTRAP_URL | cut -f1 -d\ | xxd -r -p | base64) - echo "Sha value of the downloaded file: $(($SHASUM))" - DOWNLOADED="true" if [[ ${CTLSTORE_BOOTSTRAP_URL: -2} == gz ]]; then echo "Decompressing" @@ -40,6 +37,9 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then COMPRESSED="true" fi + SHASUM=$(shasum -a 256 snapshot.db | cut -f1 -d\ | xxd -r -p | base64) + echo "Sha value of the downloaded file: $(($SHASUM))" + mv snapshot.db ldb.db END=$(date +%s) echo "ldb.db ready in $(($END - $START)) seconds" @@ -52,8 +52,8 @@ else BUCKET="$(echo $URL | grep / | cut -d/ -f1)" KEY="$(echo $URL | grep / | cut -d/ -f2)" -# SHASUM=$(shasum -a 256 ldb.db | cut -f1 -d\ | xxd -r -p | base64) -# echo "Sha value of the downloaded file: $(($SHASUM))" + SHASUM=$(shasum -a 256 ldb.db | cut -f1 -d\ | xxd -r -p | base64) + echo "Sha value of the downloaded file: $(($SHASUM))" aws s3api head-object \ --bucket "${BUCKET}" \ From 0311249f61008cfe7d576ff77428d32aad32aad7 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Mon, 25 Sep 2023 18:00:08 -0400 Subject: [PATCH 23/36] test if sha value matches --- scripts/download.sh | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/scripts/download.sh b/scripts/download.sh index b36e04c3..2f47ab8b 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -24,12 +24,20 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then BUCKET="$(echo $URL | grep / | cut -d/ -f1)" KEY="$(echo $URL | grep / | cut -d/ -f2)" + + echo "Downloading head object from ${CTLSTORE_BOOTSTRAP_URL} before downloading the snapshot" aws s3api head-object \ --bucket "${BUCKET}" \ --key "${KEY}" s5cmd -r 0 --log debug cp --concurrency $CONCURRENCY $CTLSTORE_BOOTSTRAP_URL . + echo "Downloading head object from ${CTLSTORE_BOOTSTRAP_URL} after downloading the snapshot" + aws s3api head-object \ + --bucket "${BUCKET}" \ + --key "${KEY}" + + DOWNLOADED="true" if [[ ${CTLSTORE_BOOTSTRAP_URL: -2} == gz ]]; then echo "Decompressing" @@ -37,8 +45,12 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then COMPRESSED="true" fi + START_SHASUM=$(date +%s) SHASUM=$(shasum -a 256 snapshot.db | cut -f1 -d\ | xxd -r -p | base64) echo "Sha value of the downloaded file: $(($SHASUM))" + END_SHASUM=$(date +%s) + echo "Sha value calculation took $(($END - $START)) seconds" + mv snapshot.db ldb.db END=$(date +%s) From 6e353243b402e6dd90eda3dd621d502faf394cc8 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Mon, 25 Sep 2023 18:44:32 -0400 Subject: [PATCH 24/36] wip --- scripts/download.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/download.sh b/scripts/download.sh index 2f47ab8b..de4b1821 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -64,7 +64,7 @@ else BUCKET="$(echo $URL | grep / | cut -d/ -f1)" KEY="$(echo $URL | grep / | cut -d/ -f2)" - SHASUM=$(shasum -a 256 ldb.db | cut -f1 -d\ | xxd -r -p | base64) + SHASUM=$(shasum -a 256 /var/spool/ctlstore/ldb.db | cut -f1 -d\ | xxd -r -p | base64) echo "Sha value of the downloaded file: $(($SHASUM))" aws s3api head-object \ From d3a81f7790c2864e2446bc46b09454bdace906af Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Mon, 25 Sep 2023 18:56:19 -0400 Subject: [PATCH 25/36] fix shell syntax error --- scripts/download.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/download.sh b/scripts/download.sh index de4b1821..acf3e02b 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -47,7 +47,7 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then START_SHASUM=$(date +%s) SHASUM=$(shasum -a 256 snapshot.db | cut -f1 -d\ | xxd -r -p | base64) - echo "Sha value of the downloaded file: $(($SHASUM))" + echo "Sha value of the downloaded file: $SHASUM" END_SHASUM=$(date +%s) echo "Sha value calculation took $(($END - $START)) seconds" @@ -65,7 +65,7 @@ else KEY="$(echo $URL | grep / | cut -d/ -f2)" SHASUM=$(shasum -a 256 /var/spool/ctlstore/ldb.db | cut -f1 -d\ | xxd -r -p | base64) - echo "Sha value of the downloaded file: $(($SHASUM))" + echo "Sha value of the downloaded file: $SHASUM" aws s3api head-object \ --bucket "${BUCKET}" \ From bf5ee2071627b9c8a2f8e3c0e56602f81896cb16 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Tue, 26 Sep 2023 11:34:12 -0400 Subject: [PATCH 26/36] add 5 attempts threshold --- Dockerfile | 3 +- scripts/download.sh | 87 ++++++++++++++++++++++++--------------------- 2 files changed, 48 insertions(+), 42 deletions(-) diff --git a/Dockerfile b/Dockerfile index 0c849066..3a2526fc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,8 +23,7 @@ RUN CGO_ENABLED=1 go install -ldflags="-X github.com/segmentio/ctlstore/pkg/vers #RUN yum -y install perl-Digest-SHA FROM alpine -RUN apk --no-cache add sqlite pigz aws-cli py-pip perl-utils \ - && pip install s3cmd +RUN apk --no-cache add sqlite pigz aws-cli perl-utils jq COPY --from=0 /go/src/github.com/segmentio/ctlstore/scripts/download.sh . COPY --from=0 /bin/chamber /bin/chamber diff --git a/scripts/download.sh b/scripts/download.sh index acf3e02b..b90a4ee1 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -3,6 +3,10 @@ set -eo pipefail CTLSTORE_BOOTSTRAP_URL=$1 +PREFIX="$(echo $CTLSTORE_BOOTSTRAP_URL | grep :// | sed -e's,^\(.*://\).*,\1,g')" +URL="$(echo $CTLSTORE_BOOTSTRAP_URL | sed -e s,$PREFIX,,g)" +BUCKET="$(echo $URL | grep / | cut -d/ -f1)" +KEY="$(echo $URL | grep / | cut -d/ -f2)" CONCURRENCY=${2:-20} DOWNLOADED="false" COMPRESSED="false" @@ -11,6 +15,16 @@ SHASUM="" START=$(date +%s) END=$(date +%s) + +download_snapshot() { + s5cmd -r 0 --log debug cp --concurrency $CONCURRENCY $CTLSTORE_BOOTSTRAP_URL . +} + +get_remote_checksum() { + remote_checksum=$(aws s3api head-object --bucket "${BUCKET}" --key "${KEY}" | jq -r '.Metadata.checksum') + echo "$remote_checksum" +} + if [ ! -f /var/spool/ctlstore/ldb.db ]; then # busybox does not support sub-second resolution START=$(date +%s) @@ -18,59 +32,52 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then mkdir -p /var/spool/ctlstore cd /var/spool/ctlstore + COUNTER=0 + while true; do + COUNTER=$(($COUNTER+1)) - PREFIX="$(echo $CTLSTORE_BOOTSTRAP_URL | grep :// | sed -e's,^\(.*://\).*,\1,g')" - URL="$(echo $CTLSTORE_BOOTSTRAP_URL | sed -e s,$PREFIX,,g)" - BUCKET="$(echo $URL | grep / | cut -d/ -f1)" - KEY="$(echo $URL | grep / | cut -d/ -f2)" - + echo "Downloading head object from ${CTLSTORE_BOOTSTRAP_URL}" + checksum_before=$(get_remote_checksum) + echo "Remote checksum before downloading snapshot: $checksum_before" - echo "Downloading head object from ${CTLSTORE_BOOTSTRAP_URL} before downloading the snapshot" - aws s3api head-object \ - --bucket "${BUCKET}" \ - --key "${KEY}" + echo "Downloading snapshot from ${CTLSTORE_BOOTSTRAP_URL}" + download_snapshot - s5cmd -r 0 --log debug cp --concurrency $CONCURRENCY $CTLSTORE_BOOTSTRAP_URL . + echo "Downloading head object from ${CTLSTORE_BOOTSTRAP_URL}" + checksum_after=$(get_remote_checksum) + echo "Remote checksum after downloading snapshot: $checksum_after" - echo "Downloading head object from ${CTLSTORE_BOOTSTRAP_URL} after downloading the snapshot" - aws s3api head-object \ - --bucket "${BUCKET}" \ - --key "${KEY}" + DOWNLOADED="true" + if [[ ${CTLSTORE_BOOTSTRAP_URL: -2} == gz ]]; then + echo "Decompressing" + pigz -d snapshot.db.gz + COMPRESSED="true" + fi + local_checksum=$(shasum -a 256 snapshot.db | cut -f1 -d\ | xxd -r -p | base64) + echo "Local snapshot checksum: $local_checksum" - DOWNLOADED="true" - if [[ ${CTLSTORE_BOOTSTRAP_URL: -2} == gz ]]; then - echo "Decompressing" - pigz -d snapshot.db.gz - COMPRESSED="true" - fi + if [[ "$local_checksum" == "$checksum_before" ]] || [[ "$local_checksum" == "$checksum_after" ]]; then + echo "Checksum matches" + break + else + echo "Checksum mismatch, retrying in 1 second" + DOWNLOADED="false" + COMPRESSED="false" + sleep 1 + fi - START_SHASUM=$(date +%s) - SHASUM=$(shasum -a 256 snapshot.db | cut -f1 -d\ | xxd -r -p | base64) - echo "Sha value of the downloaded file: $SHASUM" - END_SHASUM=$(date +%s) - echo "Sha value calculation took $(($END - $START)) seconds" + if [ $COUNTER -gt 5 ]; then + echo "Failed to download intact snapshot after 5 attempts" + exit 1 + fi + done mv snapshot.db ldb.db END=$(date +%s) echo "ldb.db ready in $(($END - $START)) seconds" else - - CTLSTORE_BOOTSTRAP_URL="s3://segment-ctlstore-snapshots-stage-euw1/snapshot.db" - - PREFIX="$(echo $CTLSTORE_BOOTSTRAP_URL | grep :// | sed -e's,^\(.*://\).*,\1,g')" - URL="$(echo $CTLSTORE_BOOTSTRAP_URL | sed -e s,$PREFIX,,g)" - BUCKET="$(echo $URL | grep / | cut -d/ -f1)" - KEY="$(echo $URL | grep / | cut -d/ -f2)" - - SHASUM=$(shasum -a 256 /var/spool/ctlstore/ldb.db | cut -f1 -d\ | xxd -r -p | base64) - echo "Sha value of the downloaded file: $SHASUM" - - aws s3api head-object \ - --bucket "${BUCKET}" \ - --key "${KEY}" - echo "Snapshot already present" fi From 7d1deee9c1292b64032c169fa00c2c746f55fd4b Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Tue, 26 Sep 2023 12:12:57 -0400 Subject: [PATCH 27/36] skip checksum validation if null --- scripts/download.sh | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/scripts/download.sh b/scripts/download.sh index b90a4ee1..c205b64e 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -21,7 +21,7 @@ download_snapshot() { } get_remote_checksum() { - remote_checksum=$(aws s3api head-object --bucket "${BUCKET}" --key "${KEY}" | jq -r '.Metadata.checksum') + remote_checksum=$(aws s3api head-object --bucket "${BUCKET}" --key "${KEY}" | jq -r '.Metadata.checksum // empty') echo "$remote_checksum" } @@ -54,6 +54,11 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then COMPRESSED="true" fi + if [ -z $checksum_after ]; then + echo "Checksum is null, skipping checksum validation" + break + fi + local_checksum=$(shasum -a 256 snapshot.db | cut -f1 -d\ | xxd -r -p | base64) echo "Local snapshot checksum: $local_checksum" @@ -71,7 +76,6 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then echo "Failed to download intact snapshot after 5 attempts" exit 1 fi - done mv snapshot.db ldb.db From 6a43640d731677687b5582203b3fd4dbb558ad63 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Tue, 26 Sep 2023 14:41:53 -0400 Subject: [PATCH 28/36] test unhappy path --- Dockerfile | 4 ---- scripts/download.sh | 9 ++++++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3a2526fc..a70bf5d3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,6 @@ FROM golang:1.20-alpine ENV SRC github.com/segmentio/ctlstore ARG VERSION - RUN apk --update add gcc git curl alpine-sdk libc6-compat ca-certificates sqlite \ && curl -SsL https://github.com/segmentio/chamber/releases/download/v2.13.2/chamber-v2.13.2-linux-amd64 -o /bin/chamber \ && curl -sL https://github.com/peak/s5cmd/releases/download/v2.1.0/s5cmd_2.1.0_Linux-64bit.tar.gz -o s5cmd.gz && tar -xzf s5cmd.gz -C /bin \ @@ -19,9 +18,6 @@ RUN CGO_ENABLED=1 go install -ldflags="-X github.com/segmentio/ctlstore/pkg/vers RUN CGO_ENABLED=1 go install -ldflags="-X github.com/segmentio/ctlstore/pkg/version.version=$VERSION" ${SRC}/pkg/cmd/ctlstore-cli \ && cp ${GOPATH}/bin/ctlstore-cli /usr/local/bin -#FROM fedora:34 -#RUN yum -y install perl-Digest-SHA - FROM alpine RUN apk --no-cache add sqlite pigz aws-cli perl-utils jq diff --git a/scripts/download.sh b/scripts/download.sh index c205b64e..0a0c28d6 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -11,7 +11,6 @@ CONCURRENCY=${2:-20} DOWNLOADED="false" COMPRESSED="false" METRICS="/var/spool/ctlstore/metrics.json" -SHASUM="" START=$(date +%s) END=$(date +%s) @@ -63,8 +62,12 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then echo "Local snapshot checksum: $local_checksum" if [[ "$local_checksum" == "$checksum_before" ]] || [[ "$local_checksum" == "$checksum_after" ]]; then - echo "Checksum matches" - break +# echo "Checksum matches" +# break + echo "Checksum mismatch, retrying in 1 second" + DOWNLOADED="false" + COMPRESSED="false" + sleep 1 else echo "Checksum mismatch, retrying in 1 second" DOWNLOADED="false" From d432e9a01852cf4e50432f9f68e0144c196de2a9 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Tue, 26 Sep 2023 15:40:15 -0400 Subject: [PATCH 29/36] revert false negative --- scripts/download.sh | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/scripts/download.sh b/scripts/download.sh index 0a0c28d6..e4b897a3 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -62,12 +62,8 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then echo "Local snapshot checksum: $local_checksum" if [[ "$local_checksum" == "$checksum_before" ]] || [[ "$local_checksum" == "$checksum_after" ]]; then -# echo "Checksum matches" -# break - echo "Checksum mismatch, retrying in 1 second" - DOWNLOADED="false" - COMPRESSED="false" - sleep 1 + echo "Checksum matches" + break else echo "Checksum mismatch, retrying in 1 second" DOWNLOADED="false" From fa5dfad2b90cc24e8ea3974c25738981beb22e09 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Mon, 2 Oct 2023 19:31:09 -0400 Subject: [PATCH 30/36] address feedbacks --- scripts/download.sh | 64 +++++++++++++++++---------------------------- 1 file changed, 24 insertions(+), 40 deletions(-) diff --git a/scripts/download.sh b/scripts/download.sh index e4b897a3..48870bca 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -15,13 +15,9 @@ METRICS="/var/spool/ctlstore/metrics.json" START=$(date +%s) END=$(date +%s) -download_snapshot() { - s5cmd -r 0 --log debug cp --concurrency $CONCURRENCY $CTLSTORE_BOOTSTRAP_URL . -} - -get_remote_checksum() { - remote_checksum=$(aws s3api head-object --bucket "${BUCKET}" --key "${KEY}" | jq -r '.Metadata.checksum // empty') - echo "$remote_checksum" +get_head_object() { + head_object=$(aws s3api head-object --bucket "${BUCKET}" --key "${KEY}") + echo "$head_object" } if [ ! -f /var/spool/ctlstore/ldb.db ]; then @@ -31,51 +27,39 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then mkdir -p /var/spool/ctlstore cd /var/spool/ctlstore - COUNTER=0 - while true; do - COUNTER=$(($COUNTER+1)) + echo "Downloading head object from ${CTLSTORE_BOOTSTRAP_URL}" + head_object=$(get_head_object) - echo "Downloading head object from ${CTLSTORE_BOOTSTRAP_URL}" - checksum_before=$(get_remote_checksum) - echo "Remote checksum before downloading snapshot: $checksum_before" + remote_checksum=$(jq -r '.Metadata.checksum // empty' <<< $head_object) + echo "Remote checksum: $remote_checksum" - echo "Downloading snapshot from ${CTLSTORE_BOOTSTRAP_URL}" - download_snapshot + remote_version=$(jq -r '.VersionId // empty' <<< $head_object) + echo "Remote version: $remote_version" - echo "Downloading head object from ${CTLSTORE_BOOTSTRAP_URL}" - checksum_after=$(get_remote_checksum) - echo "Remote checksum after downloading snapshot: $checksum_after" + echo "Downloading snapshot from ${CTLSTORE_BOOTSTRAP_URL} with VersionID: ${remote_version}" + s5cmd -r 0 --log debug cp --version-id $remote_version --concurrency $CONCURRENCY $CTLSTORE_BOOTSTRAP_URL . - DOWNLOADED="true" - if [[ ${CTLSTORE_BOOTSTRAP_URL: -2} == gz ]]; then - echo "Decompressing" - pigz -d snapshot.db.gz - COMPRESSED="true" - fi - - if [ -z $checksum_after ]; then - echo "Checksum is null, skipping checksum validation" - break - fi + DOWNLOADED="true" + if [[ ${CTLSTORE_BOOTSTRAP_URL: -2} == gz ]]; then + echo "Decompressing" + pigz -d snapshot.db.gz + COMPRESSED="true" + fi + if [ -z $remote_checksum ]; then + echo "Remote checksum is null, skipping checksum validation" + else local_checksum=$(shasum -a 256 snapshot.db | cut -f1 -d\ | xxd -r -p | base64) echo "Local snapshot checksum: $local_checksum" - if [[ "$local_checksum" == "$checksum_before" ]] || [[ "$local_checksum" == "$checksum_after" ]]; then + if [[ "$local_checksum" == "$remote_checksum" ]]; then echo "Checksum matches" - break else - echo "Checksum mismatch, retrying in 1 second" - DOWNLOADED="false" - COMPRESSED="false" - sleep 1 - fi - - if [ $COUNTER -gt 5 ]; then - echo "Failed to download intact snapshot after 5 attempts" + echo "Checksum does not match" + echo "Failed to download intact snapshot" exit 1 fi - done + fi mv snapshot.db ldb.db END=$(date +%s) From 215f474d230efa3ee569ead247fcb90bff27dc8b Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Mon, 2 Oct 2023 23:19:13 -0400 Subject: [PATCH 31/36] fix script --- scripts/download.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/download.sh b/scripts/download.sh index 48870bca..96eb9c36 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -30,10 +30,10 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then echo "Downloading head object from ${CTLSTORE_BOOTSTRAP_URL}" head_object=$(get_head_object) - remote_checksum=$(jq -r '.Metadata.checksum // empty' <<< $head_object) + remote_checksum=$(printf '%s\n' "$head_object" | jq -r '.Metadata.checksum // empty') echo "Remote checksum: $remote_checksum" - remote_version=$(jq -r '.VersionId // empty' <<< $head_object) + remote_version=$(printf '%s\n' "$head_object" | jq -r '.VersionId // empty') echo "Remote version: $remote_version" echo "Downloading snapshot from ${CTLSTORE_BOOTSTRAP_URL} with VersionID: ${remote_version}" From 2ac82cfca78fd2aecb791303b65342c6b83398a4 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Tue, 3 Oct 2023 18:09:06 -0400 Subject: [PATCH 32/36] test unhappy path --- scripts/download.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/download.sh b/scripts/download.sh index 96eb9c36..31e1c9a8 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -52,7 +52,7 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then local_checksum=$(shasum -a 256 snapshot.db | cut -f1 -d\ | xxd -r -p | base64) echo "Local snapshot checksum: $local_checksum" - if [[ "$local_checksum" == "$remote_checksum" ]]; then + if [[ "$local_checksum" != "$remote_checksum" ]]; then echo "Checksum matches" else echo "Checksum does not match" From 51099287d30cff7c51685cd583a44491d24bee15 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Tue, 3 Oct 2023 18:47:31 -0400 Subject: [PATCH 33/36] revert false negative --- scripts/download.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/download.sh b/scripts/download.sh index 31e1c9a8..96eb9c36 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -52,7 +52,7 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then local_checksum=$(shasum -a 256 snapshot.db | cut -f1 -d\ | xxd -r -p | base64) echo "Local snapshot checksum: $local_checksum" - if [[ "$local_checksum" != "$remote_checksum" ]]; then + if [[ "$local_checksum" == "$remote_checksum" ]]; then echo "Checksum matches" else echo "Checksum does not match" From 728965c20fcdb24a31836168732f5b1973fb2ac1 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Wed, 4 Oct 2023 23:05:03 -0400 Subject: [PATCH 34/36] using sha1 --- pkg/supervisor/archived_snapshot.go | 4 ++-- scripts/download.sh | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/supervisor/archived_snapshot.go b/pkg/supervisor/archived_snapshot.go index f2fd1389..a1ddcfbe 100644 --- a/pkg/supervisor/archived_snapshot.go +++ b/pkg/supervisor/archived_snapshot.go @@ -3,7 +3,7 @@ package supervisor import ( "bufio" "context" - "crypto/sha256" + "crypto/sha1" "encoding/base64" "fmt" "io" @@ -114,7 +114,7 @@ func getCheckSum(path string) (string, error) { } defer f.Close() - h := sha256.New() + h := sha1.New() if _, err := io.Copy(h, f); err != nil { events.Log("failed to generate sha256", err) } diff --git a/scripts/download.sh b/scripts/download.sh index 96eb9c36..da070d35 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -49,7 +49,7 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then if [ -z $remote_checksum ]; then echo "Remote checksum is null, skipping checksum validation" else - local_checksum=$(shasum -a 256 snapshot.db | cut -f1 -d\ | xxd -r -p | base64) + local_checksum=$(shasum snapshot.db | cut -f1 -d\ | xxd -r -p | base64) echo "Local snapshot checksum: $local_checksum" if [[ "$local_checksum" == "$remote_checksum" ]]; then From be05588a67d6e8af63f3c49b3f0129ac58f2f303 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Wed, 4 Oct 2023 23:58:31 -0400 Subject: [PATCH 35/36] update log to avoid confusion --- pkg/supervisor/archived_snapshot.go | 4 ++-- scripts/download.sh | 11 ++++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/supervisor/archived_snapshot.go b/pkg/supervisor/archived_snapshot.go index a1ddcfbe..d378a401 100644 --- a/pkg/supervisor/archived_snapshot.go +++ b/pkg/supervisor/archived_snapshot.go @@ -116,11 +116,11 @@ func getCheckSum(path string) (string, error) { h := sha1.New() if _, err := io.Copy(h, f); err != nil { - events.Log("failed to generate sha256", err) + events.Log("failed to generate sha1 of snapshot", err) } cs := base64.StdEncoding.EncodeToString(h.Sum(nil)) - events.Log("base64 encoding: %s", cs) + events.Log("base64 encoding of sha1: %s", cs) return cs, nil } diff --git a/scripts/download.sh b/scripts/download.sh index da070d35..98559c99 100755 --- a/scripts/download.sh +++ b/scripts/download.sh @@ -14,6 +14,8 @@ METRICS="/var/spool/ctlstore/metrics.json" START=$(date +%s) END=$(date +%s) +SHA_START=$(date +%s) +SHA_END=$(date +%s) get_head_object() { head_object=$(aws s3api head-object --bucket "${BUCKET}" --key "${KEY}") @@ -31,7 +33,7 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then head_object=$(get_head_object) remote_checksum=$(printf '%s\n' "$head_object" | jq -r '.Metadata.checksum // empty') - echo "Remote checksum: $remote_checksum" + echo "Remote checksum in sha1: $remote_checksum" remote_version=$(printf '%s\n' "$head_object" | jq -r '.VersionId // empty') echo "Remote version: $remote_version" @@ -46,11 +48,12 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then COMPRESSED="true" fi + SHA_START=$(date +%s) if [ -z $remote_checksum ]; then - echo "Remote checksum is null, skipping checksum validation" + echo "Remote checksum sha1 is null, skipping checksum validation" else local_checksum=$(shasum snapshot.db | cut -f1 -d\ | xxd -r -p | base64) - echo "Local snapshot checksum: $local_checksum" + echo "Local snapshot checksum in sha1: $local_checksum" if [[ "$local_checksum" == "$remote_checksum" ]]; then echo "Checksum matches" @@ -60,6 +63,8 @@ if [ ! -f /var/spool/ctlstore/ldb.db ]; then exit 1 fi fi + SHA_END=$(date +%s) + echo "Local checksum calculation took $(($SHA_END - $SHA_START)) seconds" mv snapshot.db ldb.db END=$(date +%s) From b42aac54c16266e52575af6d7ae760e4a50b1928 Mon Sep 17 00:00:00 2001 From: Hongyu Zhou Date: Thu, 5 Oct 2023 16:00:44 -0400 Subject: [PATCH 36/36] fix wording --- pkg/supervisor/archived_snapshot.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/supervisor/archived_snapshot.go b/pkg/supervisor/archived_snapshot.go index d378a401..bd5ace73 100644 --- a/pkg/supervisor/archived_snapshot.go +++ b/pkg/supervisor/archived_snapshot.go @@ -76,9 +76,9 @@ func (c *s3Snapshot) Upload(ctx context.Context, path string) error { } var reader io.Reader = bufio.NewReaderSize(f, 1024*32) // use a 32K buffer for reading - cs, err := getCheckSum(path) + cs, err := getChecksum(path) if err != nil { - return errors.Wrap(err, "generate file CheckSum") + return errors.Wrap(err, "generate file Checksum") } var gpr *gzipCompressionReader @@ -107,7 +107,7 @@ func (c *s3Snapshot) Upload(ctx context.Context, path string) error { return nil } -func getCheckSum(path string) (string, error) { +func getChecksum(path string) (string, error) { f, err := os.OpenFile(path, os.O_RDONLY, 0) if err != nil { return "", errors.Wrap(err, "opening file")