-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Addition of AWS S3 uploader #1276
Changes from all commits
3b33207
8a11750
8d6eda9
d344727
f8b2410
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package uploader | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
|
||
"github.com/aws/aws-sdk-go-v2/feature/s3/manager" | ||
"github.com/aws/aws-sdk-go-v2/service/s3" | ||
"github.com/rs/zerolog" | ||
|
||
"github.com/onflow/flow-go/engine/execution" | ||
) | ||
|
||
var _ Uploader = (*S3Uploader)(nil) | ||
|
||
// S3Uploader is a S3 implementation of the uploader interface. | ||
type S3Uploader struct { | ||
ctx context.Context | ||
log zerolog.Logger | ||
client *s3.Client | ||
bucket string | ||
} | ||
|
||
// NewS3Uploader returns a new S3 uploader instance. | ||
func NewS3Uploader(ctx context.Context, client *s3.Client, bucket string, log zerolog.Logger) *S3Uploader { | ||
return &S3Uploader{ | ||
ctx: ctx, | ||
log: log, | ||
client: client, | ||
bucket: bucket, | ||
} | ||
} | ||
|
||
// Upload uploads the given computation result to the configured S3 bucket. | ||
func (u *S3Uploader) Upload(result *execution.ComputationResult) error { | ||
uploader := manager.NewUploader(u.client) | ||
key := GCPBlockDataObjectName(result) | ||
buf := &bytes.Buffer{} | ||
err := WriteComputationResultsTo(result, buf) | ||
|
||
if err != nil { | ||
return err | ||
} | ||
|
||
_, err = uploader.Upload(u.ctx, &s3.PutObjectInput{ | ||
Bucket: &u.bucket, | ||
Key: &key, | ||
Body: buf, | ||
}) | ||
|
||
return err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ import ( | |
|
||
jsoncdc "github.com/onflow/cadence/encoding/json" | ||
"github.com/rs/zerolog" | ||
"golang.org/x/sync/errgroup" | ||
|
||
"github.com/onflow/flow-go/engine/execution/computation/computer/uploader" | ||
|
||
|
@@ -52,7 +53,7 @@ type Manager struct { | |
blockComputer computer.BlockComputer | ||
programsCache *ProgramsCache | ||
scriptLogThreshold time.Duration | ||
uploader uploader.Uploader | ||
uploaders []uploader.Uploader | ||
} | ||
|
||
func New( | ||
|
@@ -66,7 +67,7 @@ func New( | |
programsCacheSize uint, | ||
committer computer.ViewCommitter, | ||
scriptLogThreshold time.Duration, | ||
uploader uploader.Uploader, | ||
uploaders []uploader.Uploader, | ||
) (*Manager, error) { | ||
log := logger.With().Str("engine", "computation").Logger() | ||
|
||
|
@@ -98,7 +99,7 @@ func New( | |
blockComputer: blockComputer, | ||
programsCache: programsCache, | ||
scriptLogThreshold: scriptLogThreshold, | ||
uploader: uploader, | ||
uploaders: uploaders, | ||
} | ||
|
||
return &e, nil | ||
|
@@ -214,8 +215,19 @@ func (e *Manager) ComputeBlock( | |
|
||
e.programsCache.Set(block.ID(), toInsert) | ||
|
||
if e.uploader != nil { | ||
err = e.uploader.Upload(result) | ||
if len(e.uploaders) > 0 { | ||
var g errgroup.Group | ||
|
||
for _, uploader := range e.uploaders { | ||
uploader := uploader | ||
|
||
g.Go(func() error { | ||
return uploader.Upload(result) | ||
}) | ||
} | ||
|
||
err := g.Wait() | ||
|
||
if err != nil { | ||
return nil, fmt.Errorf("failed to upload block result: %w", err) | ||
} | ||
Comment on lines
231
to
233
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: may be useful to output information on which specific uploader failed here. Using |
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if maybe instead of slice of uploaders we should create a new uploader which calls multiple ones -
MultiUploader
of sort.