Skip to content

Commit

Permalink
pkg/debuginfo: Allow uploads to be done via signed URLs
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Dec 8, 2022
1 parent 1505055 commit 73b6660
Show file tree
Hide file tree
Showing 34 changed files with 3,673 additions and 1,796 deletions.
969 changes: 718 additions & 251 deletions gen/proto/go/parca/debuginfo/v1alpha1/debuginfo.pb.go

Large diffs are not rendered by default.

296 changes: 204 additions & 92 deletions gen/proto/go/parca/debuginfo/v1alpha1/debuginfo.pb.gw.go

Large diffs are not rendered by default.

1,554 changes: 1,194 additions & 360 deletions gen/proto/go/parca/debuginfo/v1alpha1/debuginfo_vtproto.pb.go

Large diffs are not rendered by default.

59 changes: 28 additions & 31 deletions gen/proto/swagger/parca/debuginfo/v1alpha1/debuginfo.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
},
"tags": [
{
"name": "DebugInfoService"
"name": "DebuginfoService"
}
],
"consumes": [
Expand All @@ -17,15 +17,15 @@
],
"paths": {},
"definitions": {
"DownloadInfoSource": {
"InitiateUploadResponseUploadStrategy": {
"type": "string",
"enum": [
"SOURCE_UNKNOWN_UNSPECIFIED",
"SOURCE_UPLOAD",
"SOURCE_DEBUGINFOD"
"UPLOAD_STRATEGY_UNSPECIFIED",
"UPLOAD_STRATEGY_GRPC",
"UPLOAD_STRATEGY_SIGNED_URL"
],
"default": "SOURCE_UNKNOWN_UNSPECIFIED",
"description": "Source enum describes the source a debuginfo is from.\n\n - SOURCE_UNKNOWN_UNSPECIFIED: To understand when no source is set we have the unknown source.\n - SOURCE_UPLOAD: The debuginfo was uploaded by a user/agent.\n - SOURCE_DEBUGINFOD: The debuginfo was downloaded from a public debuginfod server."
"default": "UPLOAD_STRATEGY_UNSPECIFIED",
"description": "The strategy to use for uploading.\n\n - UPLOAD_STRATEGY_UNSPECIFIED: The upload is not allowed.\n - UPLOAD_STRATEGY_GRPC: The upload is allowed and should be done via the Upload RPC.\n - UPLOAD_STRATEGY_SIGNED_URL: The upload is allowed and should be done via a returned signed URL."
},
"parcadebuginfov1alpha1UploadResponse": {
"type": "object",
Expand Down Expand Up @@ -69,40 +69,37 @@
}
}
},
"v1alpha1DownloadInfo": {
"v1alpha1InitiateUploadResponse": {
"type": "object",
"properties": {
"source": {
"$ref": "#/definitions/DownloadInfoSource",
"description": "Source indicates the origin of the debuginfo being downloaded."
}
},
"description": "DownloadInfo metadata for the debug data that is being downloaded."
},
"v1alpha1DownloadResponse": {
"type": "object",
"properties": {
"info": {
"$ref": "#/definitions/v1alpha1DownloadInfo",
"title": "info is the metadata for the debug info"
"uploadId": {
"type": "string",
"description": "The upload_id to use for uploading."
},
"uploadStrategy": {
"$ref": "#/definitions/InitiateUploadResponseUploadStrategy",
"description": "The strategy to use for uploading."
},
"chunkData": {
"signedUrl": {
"type": "string",
"format": "byte",
"title": "chunk_data is the raw bytes of the debug info"
"description": "The signed url to use for uploading using a PUT request when the upload\nstrategy is SIGNED_STRATEGY_URL."
}
},
"description": "DownloadRequest returns chunked data of the debuginfo."
"description": "InitiateUploadResponse is the response to an InitiateUploadRequest."
},
"v1alpha1MarkUploadFinishedResponse": {
"type": "object",
"description": "MarkUploadFinishedResponse is the response to a MarkUploadFinishedRequest."
},
"v1alpha1ExistsResponse": {
"v1alpha1ShouldInitiateUploadResponse": {
"type": "object",
"properties": {
"exists": {
"shouldInitiateUpload": {
"type": "boolean",
"title": "exists indicates if there is debug data present for the given build_id"
"description": "Whether an upload should be initiated or not."
}
},
"title": "ExistsResponse returns whether the given build_id has debug info"
"description": "ShouldInitiateUploadResponse is the response for ShouldInitiateUpload."
},
"v1alpha1UploadInfo": {
"type": "object",
Expand All @@ -111,9 +108,9 @@
"type": "string",
"title": "build_id is a unique identifier for the debug data"
},
"hash": {
"uploadId": {
"type": "string",
"title": "hash is the hash of the source file that debug information extracted from"
"title": "upload_id is a unique identifier for the upload"
}
},
"title": "UploadInfo contains the build_id and other metadata for the debug data"
Expand Down
22 changes: 11 additions & 11 deletions gen/proto/swagger/parca/scrape/v1alpha1/scrape.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,6 @@
"description": "- HEALTH_UNKNOWN_UNSPECIFIED: HEALTH_UNKNOWN_UNSPECIFIED unspecified\n - HEALTH_GOOD: HEALTH_GOOD healthy target\n - HEALTH_BAD: HEALTH_BAD unhealthy target",
"title": "Health are the possible health values of a target"
},
"TargetsRequestState": {
"type": "string",
"enum": [
"STATE_ANY_UNSPECIFIED",
"STATE_ACTIVE",
"STATE_DROPPED"
],
"default": "STATE_ANY_UNSPECIFIED",
"description": "- STATE_ANY_UNSPECIFIED: STATE_ANY_UNSPECIFIED unspecified\n - STATE_ACTIVE: STATE_ACTIVE target active state\n - STATE_DROPPED: STATE_DROPPED target dropped state",
"title": "State represents the current state of a target"
},
"profilestorev1alpha1Label": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -180,6 +169,17 @@
},
"title": "Targets is a list of targets"
},
"v1alpha1TargetsRequestState": {
"type": "string",
"enum": [
"STATE_ANY_UNSPECIFIED",
"STATE_ACTIVE",
"STATE_DROPPED"
],
"default": "STATE_ANY_UNSPECIFIED",
"description": "- STATE_ANY_UNSPECIFIED: STATE_ANY_UNSPECIFIED unspecified\n - STATE_ACTIVE: STATE_ACTIVE target active state\n - STATE_DROPPED: STATE_DROPPED target dropped state",
"title": "State represents the current state of a target"
},
"v1alpha1TargetsResponse": {
"type": "object",
"properties": {
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/parca-dev/parca
go 1.19

require (
cloud.google.com/go/storage v1.27.0
github.com/alecthomas/kong v0.7.1
github.com/apache/arrow/go/v8 v8.0.1
github.com/cenkalti/backoff/v4 v4.2.0
Expand All @@ -18,6 +19,7 @@ require (
github.com/go-kit/log v0.2.1
github.com/go-ozzo/ozzo-validation/v4 v4.3.0
github.com/goburrow/cache v0.1.4
github.com/golang/protobuf v1.5.2
github.com/google/pprof v0.0.0-20221203041831-ce31453925ec
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
Expand All @@ -31,6 +33,7 @@ require (
github.com/klauspost/compress v1.15.12
github.com/nanmu42/limitio v1.0.0
github.com/oklog/run v1.1.0
github.com/pkg/errors v0.9.1
github.com/polarsignals/frostdb v0.0.0-20221206153157-a83160fb1ff1
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
Expand All @@ -45,7 +48,9 @@ require (
go.opentelemetry.io/otel/trace v1.11.1
golang.org/x/exp v0.0.0-20221204150635-6dcec336b2bb
golang.org/x/net v0.2.0
golang.org/x/oauth2 v0.2.0
golang.org/x/sync v0.1.0
google.golang.org/api v0.103.0
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
Expand All @@ -57,7 +62,6 @@ require (
cloud.google.com/go/compute v1.13.0 // indirect
cloud.google.com/go/compute/metadata v0.2.1 // indirect
cloud.google.com/go/iam v0.7.0 // indirect
cloud.google.com/go/storage v1.27.0 // indirect
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
Expand Down Expand Up @@ -132,7 +136,6 @@ require (
github.com/golang-jwt/jwt/v4 v4.2.0 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/flatbuffers v2.0.5+incompatible // indirect
Expand Down Expand Up @@ -198,7 +201,6 @@ require (
github.com/ovh/go-ovh v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
Expand Down Expand Up @@ -226,14 +228,12 @@ require (
go.uber.org/goleak v1.2.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/mod v0.6.0 // indirect
golang.org/x/oauth2 v0.2.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/term v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/time v0.1.0 // indirect
golang.org/x/tools v0.2.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.103.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.6 // indirect
Expand Down
10 changes: 7 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@ type ObjectStorage struct {

// Validate returns an error if the config is not valid.
func (c *Config) Validate() error {
return validation.ValidateStruct(c,
validation.Field(&c.ObjectStorage, validation.Required, Valid),
)
if err := validation.ValidateStruct(c,
validation.Field(&c.ObjectStorage, validation.Required, ObjectStorageValid),
); err != nil {
return err
}

return nil
}

func trueValue() *bool {
Expand Down
12 changes: 6 additions & 6 deletions pkg/config/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
"github.com/thanos-io/objstore/client"
)

// Valid is the ValidRule.
var Valid = ValidRule{}
// ObjectStorageValid is the ValidRule.
var ObjectStorageValid = ObjectStorageValidRule{}

// ValidRule is a validation rule for the Config. It implements the validation.Rule interface.
type ValidRule struct{}
// ObjectStorageValidRule is a validation rule for the Config. It implements the validation.Rule interface.
type ObjectStorageValidRule struct{}

// Validate returns an error if the config is not valid.
func (v ValidRule) Validate(value interface{}) error {
// ObjectStorageValidate returns an error if the config is not valid.
func (v ObjectStorageValidRule) Validate(value interface{}) error {
c, ok := value.(*ObjectStorage)
if !ok {
return errors.New("DebugInfo is invalid")
Expand Down
95 changes: 7 additions & 88 deletions pkg/debuginfo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,40 +38,23 @@ const (
MaxMsgSize = 1024 * 1024 * 64
)

type Client struct {
c debuginfopb.DebugInfoServiceClient
type UploadClient struct {
debuginfopb.DebuginfoServiceClient
}

func NewDebugInfoClient(conn *grpc.ClientConn) *Client {
return &Client{
c: debuginfopb.NewDebugInfoServiceClient(conn),
}
}

func (c *Client) Exists(ctx context.Context, buildID, hash string) (bool, error) {
res, err := c.c.Exists(ctx, &debuginfopb.ExistsRequest{
BuildId: buildID,
Hash: hash,
})
if err != nil {
return false, err
}

return res.Exists, nil
func NewDebuginfoClient(client debuginfopb.DebuginfoServiceClient) *UploadClient {
return &UploadClient{client}
}

func (c *Client) Upload(ctx context.Context, buildID, hash string, r io.Reader) (uint64, error) {
stream, err := c.c.Upload(ctx, grpc.MaxCallSendMsgSize(MaxMsgSize))
func (c *UploadClient) Upload(ctx context.Context, info *debuginfopb.UploadInfo, r io.Reader) (uint64, error) {
stream, err := c.DebuginfoServiceClient.Upload(ctx, grpc.MaxCallSendMsgSize(MaxMsgSize))
if err != nil {
return 0, fmt.Errorf("initiate upload: %w", err)
}

err = stream.Send(&debuginfopb.UploadRequest{
Data: &debuginfopb.UploadRequest_Info{
Info: &debuginfopb.UploadInfo{
BuildId: buildID,
Hash: hash,
},
Info: info,
},
})
if err != nil {
Expand Down Expand Up @@ -131,70 +114,6 @@ func (c *Client) Upload(ctx context.Context, buildID, hash string, r io.Reader)
return res.Size, nil
}

type Downloader struct {
stream debuginfopb.DebugInfoService_DownloadClient
info *debuginfopb.DownloadInfo
}

func (c *Client) Downloader(ctx context.Context, buildID string) (*Downloader, error) {
stream, err := c.c.Download(ctx, &debuginfopb.DownloadRequest{
BuildId: buildID,
}, grpc.MaxCallRecvMsgSize(MaxMsgSize))
if err != nil {
return nil, fmt.Errorf("initiate download: %w", err)
}

res, err := stream.Recv()
if err != nil {
return nil, fmt.Errorf("receive download info: %w", err)
}

info := res.GetInfo()
if info == nil {
return nil, fmt.Errorf("download info is nil")
}

return &Downloader{
stream: stream,
info: info,
}, nil
}

func (d *Downloader) Info() *debuginfopb.DownloadInfo {
return d.info
}

func (d *Downloader) Close() error {
// Note that CloseSend does not Recv, therefore is not guaranteed to release all resources
return d.stream.CloseSend()
}

func (d *Downloader) Download(ctx context.Context, w io.Writer) (int, error) {
bytesWritten := 0
for {
res, err := d.stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return bytesWritten, fmt.Errorf("receive next chunk: %w", err)
}

chunkData := res.GetChunkData()
if chunkData == nil {
return bytesWritten, fmt.Errorf("chunk does not contain data")
}

n, err := w.Write(chunkData)
if err != nil {
return bytesWritten, fmt.Errorf("write next chunk: %w", err)
}
bytesWritten += n
}

return bytesWritten, nil
}

// sentinelError checks underlying error for grpc.StatusCode and returns if it's a known and expected error.
func sentinelError(err error) error {
if sts, ok := status.FromError(err); ok {
Expand Down
Loading

0 comments on commit 73b6660

Please sign in to comment.