Skip to content

Commit

Permalink
[bug] - Handle error when scanning s3 bucket. (#969)
Browse files Browse the repository at this point in the history
* Handle error when scanning s# bucket.

* move wait outside loop.

* Add logging.

* revert changes.

* remove.

* revert.
  • Loading branch information
ahrav committed Dec 12, 2022
1 parent 4020c40 commit 26befdd
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
12 changes: 6 additions & 6 deletions pkg/sources/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,21 @@ func (s *Source) newClient(region string) (*s3.S3, error) {

// Chunks emits chunks of bytes over a channel.
func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) error {
client, err := s.newClient("us-east-1")
const defaultAWSRegion = "us-east-1"

client, err := s.newClient(defaultAWSRegion)
if err != nil {
return errors.WrapPrefix(err, "could not create s3 client", 0)
}

bucketsToScan := []string{}
var bucketsToScan []string

switch s.conn.GetCredential().(type) {
case *sourcespb.S3_AccessKey, *sourcespb.S3_CloudEnvironment:
if len(s.conn.Buckets) == 0 {
res, err := client.ListBuckets(&s3.ListBucketsInput{})
if err != nil {
s.log.Error(err, "could not list s3 buckets")
return errors.WrapPrefix(err, "could not list s3 buckets", 0)
return fmt.Errorf("could not list s3 buckets: %w", err)
}
buckets := res.Buckets
for _, bucket := range buckets {
Expand Down Expand Up @@ -150,7 +151,7 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err
continue
}
var regionalClient *s3.S3
if region != "us-east-1" {
if region != defaultAWSRegion {
regionalClient, err = s.newClient(region)
if err != nil {
s.log.Error(err, "could not make regional s3 client")
Expand All @@ -170,7 +171,6 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err

if err != nil {
s.log.Error(err, "could not list objects in s3 bucket", "bucket", bucket)
return errors.WrapPrefix(err, fmt.Sprintf("could not list objects in s3 bucket: %s", bucket), 0)
}
}
s.SetProgressComplete(len(bucketsToScan), len(bucketsToScan), fmt.Sprintf("Completed scanning source %s", s.name), "")
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestSource_Chunks(t *testing.T) {
t.Fatal(err)
}

err = s.Init(ctx, tt.init.name, 0, 0, tt.init.verify, conn, 10)
err = s.Init(ctx, tt.init.name, 0, 0, tt.init.verify, conn, 8)
if (err != nil) != tt.wantErr {
t.Errorf("Source.Init() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down

0 comments on commit 26befdd

Please sign in to comment.