Skip to content

Commit

Permalink
S3 scanner improvements (#938)
Browse files Browse the repository at this point in the history
  • Loading branch information
dustin-decker committed Nov 22, 2022
1 parent 4409210 commit 28dd25b
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 46 deletions.
3 changes: 1 addition & 2 deletions go.mod
Expand Up @@ -34,7 +34,6 @@ require (
github.com/google/go-cmp v0.5.9
github.com/google/go-github/v42 v42.0.0
github.com/gorilla/mux v1.8.0
github.com/h2non/filetype v1.1.3
github.com/hashicorp/go-retryablehttp v0.7.1
github.com/jlaffaye/ftp v0.1.0
github.com/joho/godotenv v1.4.0
Expand All @@ -46,6 +45,7 @@ require (
github.com/mholt/archiver/v4 v4.0.0-alpha.7
github.com/paulbellamy/ratecounter v0.2.0
github.com/pkg/errors v0.9.1
github.com/rabbitmq/amqp091-go v1.5.0
github.com/sergi/go-diff v1.2.0
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.1
Expand Down Expand Up @@ -122,7 +122,6 @@ require (
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/pkg/diff v0.0.0-20200914180035-5b29258ca4f7 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rabbitmq/amqp091-go v1.5.0 // indirect
github.com/therootcompany/xz v1.0.1 // indirect
github.com/ulikunitz/xz v0.5.10 // indirect
github.com/xanzy/ssh-agent v0.3.0 // indirect
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Expand Up @@ -270,8 +270,6 @@ github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/h2non/filetype v1.1.3 h1:FKkx9QbD7HR/zjK1Ia5XiBsq9zdLi5Kf3zGyFTAFkGg=
github.com/h2non/filetype v1.1.3/go.mod h1:319b3zT68BvV+WRj7cwy856M2ehB3HqNOt6sy1HndBY=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
Expand Down Expand Up @@ -459,7 +457,6 @@ go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
Expand Down
12 changes: 2 additions & 10 deletions pkg/common/vars.go
Expand Up @@ -2,22 +2,14 @@ package common

import (
"path/filepath"

"github.com/h2non/filetype"
)

var (
KB, MB, GB, TB, PB = 1e3, 1e6, 1e9, 1e12, 1e15
IGNORED_EXTENSIONS = []string{"pdf", "mp4", "avi", "mpeg", "mpg", "mov", "wmv", "m4p", "swf", "mp2", "flv", "vob", "webm", "hdv", "3gp", "ogg", "mp3", "wav", "flac", "tif", "tiff", "jpg", "jpeg", "png", "gif", "zip", "webp"}
IGNORED_EXTENSIONS = []string{"mp4", "avi", "mpeg", "mpg", "mov", "wmv", "m4p", "swf", "mp2", "flv", "vob", "webm", "hdv", "3gp", "ogg", "mp3", "wav", "flac", "webp"}
)

func SkipFile(filename string, data []byte) bool {
if filepath.Ext(filename) == "" {
//no sepcified extension, check mimetype
if filetype.IsArchive(data[:256]) {
return true
}
}
func SkipFile(filename string) bool {
for _, ext := range IGNORED_EXTENSIONS {
if filepath.Ext(filename) == ext {
return true
Expand Down
76 changes: 45 additions & 31 deletions pkg/sources/s3/s3.go
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"
diskbufferreader "github.com/bill-rich/disk-buffer-reader"
"github.com/go-errors/errors"
log "github.com/sirupsen/logrus"
"github.com/go-logr/logr"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
Expand All @@ -35,7 +35,7 @@ type Source struct {
verify bool
concurrency int
aCtx context.Context
log *log.Entry
log logr.Logger
sources.Progress
errorCount *sync.Map
conn *sourcespb.S3
Expand All @@ -59,7 +59,7 @@ func (s *Source) JobID() int64 {

// Init returns an initialized AWS source
func (s *Source) Init(aCtx context.Context, name string, jobId, sourceId int64, verify bool, connection *anypb.Any, concurrency int) error {
s.log = log.WithField("source", s.Type()).WithField("name", name)
s.log = context.WithValues(aCtx, "source", s.Type(), "name", name).Logger()

s.aCtx = aCtx
s.name = name
Expand Down Expand Up @@ -120,7 +120,7 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err
if len(s.conn.Buckets) == 0 {
res, err := client.ListBuckets(&s3.ListBucketsInput{})
if err != nil {
s.log.Errorf("could not list s3 buckets: %s", err)
s.log.Error(err, "could not list s3 buckets")
return errors.WrapPrefix(err, "could not list s3 buckets", 0)
}
buckets := res.Buckets
Expand All @@ -143,23 +143,22 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err

s.SetProgressComplete(i, len(bucketsToScan), fmt.Sprintf("Bucket: %s", bucket), "")

s.log.Debugf("Scanning bucket: %s", bucket)
s.log.Info("Scanning bucket", "bucket", bucket)
region, err := s3manager.GetBucketRegionWithClient(context.Background(), client, bucket)
if err != nil {
s.log.WithError(err).Errorf("could not get s3 region for bucket: %s", bucket)
s.log.Error(err, "could not get s3 region for bucket", "bucket", bucket)
continue
}
var regionalClient *s3.S3
if region != "us-east-1" {
regionalClient, err = s.newClient(region)
if err != nil {
s.log.WithError(err).Error("could not make regional s3 client")
s.log.Error(err, "could not make regional s3 client")
}
} else {
regionalClient = client
}
// Forced prefix for testing
// pf := "public"

errorCount := sync.Map{}

err = regionalClient.ListObjectsV2PagesWithContext(
Expand All @@ -170,7 +169,7 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err
})

if err != nil {
s.log.WithError(err).Errorf("could not list objects in s3 bucket: %s", bucket)
s.log.Error(err, "could not list objects in s3 bucket", "bucket", bucket)
return errors.WrapPrefix(err, fmt.Sprintf("could not list objects in s3 bucket: %s", bucket), 0)
}
}
Expand All @@ -188,22 +187,48 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
return
}

if obj == nil {
continue
}

// skip GLACIER and GLACIER_IR objects
if obj.StorageClass == nil || strings.Contains(*obj.StorageClass, "GLACIER") {
s.log.V(5).Info("Skipping object in storage class", "storage_class", *obj.StorageClass, "object", *obj.Key)
continue
}

// ignore large files
if *obj.Size > int64(250*common.MB) {
s.log.V(3).Info("Skipping %d byte file (over 250MB limit)", "object", *obj.Key)
return
}

// file empty file
if *obj.Size == 0 {
s.log.V(5).Info("Skipping 0 byte file", "object", *obj.Key)
return
}

// skip incompatible extensions
if common.SkipFile(*obj.Key) {
s.log.V(5).Info("Skipping file with incompatible extension", "object", *obj.Key)
return
}

err := sem.Acquire(ctx, 1)
if err != nil {
log.WithError(err).Error("could not acquire semaphore")
s.log.Error(err, "could not acquire semaphore")
continue
}
wg.Add(1)
go func(ctx context.Context, wg *sync.WaitGroup, sem *semaphore.Weighted, obj *s3.Object) {
defer common.RecoverWithExit(ctx)
defer sem.Release(1)
defer wg.Done()
// defer log.Debugf("DONE - %s", *obj.Key)

if (*obj.Key)[len(*obj.Key)-1:] == "/" {
return
}
// log.Debugf("Object: %s", *obj.Key)

path := strings.Split(*obj.Key, "/")
prefix := strings.Join(path[:len(path)-1], "/")
Expand All @@ -213,17 +238,7 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
nErr = 0
}
if nErr.(int) > 3 {
log.Debugf("Skipped: %s", *obj.Key)
return
}

// ignore large files
if *obj.Size > int64(10*common.MB) {
return
}

// file is 0 bytes - likely no permissions - skipping
if *obj.Size == 0 {
s.log.V(2).Info("Skipped due to excessive errors", "object", *obj.Key)
return
}

Expand All @@ -237,31 +252,30 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
})
if err != nil {
if !strings.Contains(err.Error(), "AccessDenied") {
s.log.WithError(err).Errorf("could not get S3 object: %s", *obj.Key)
s.log.Error(err, "could not get S3 object", "object", *obj.Key)
}

nErr, ok := errorCount.Load(prefix)
if !ok {
nErr = 0
}
if nErr.(int) > 3 {
log.Debugf("Skipped: %s", *obj.Key)
s.log.V(3).Info("Skipped due to excessive errors", "object", *obj.Key)
return
}
nErr = nErr.(int) + 1
errorCount.Store(prefix, nErr)
// too many consective errors on this page
if nErr.(int) > 3 {
s.log.Warnf("Too many consecutive errors. Excluding %s", prefix)
s.log.V(2).Info("Too many consecutive errors, excluding prefix", "prefix", prefix)
}
log.Debugf("Error Counts: %s:%d", prefix, nErr)
return
}

defer res.Body.Close()
reader, err := diskbufferreader.New(res.Body)
if err != nil {
log.WithError(err).Error("Could not create reader.")
s.log.Error(err, "Could not create reader.")
return
}

Expand Down Expand Up @@ -292,14 +306,14 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
}

if err := reader.Reset(); err != nil {
log.WithError(err).Error("Error resetting reader to start.")
s.log.Error(err, "Error resetting reader to start.")
}
reader.Stop()

chunk := *chunkSkel
chunkData, err := io.ReadAll(reader)
if err != nil {
log.WithError(err).Error("Could not read file data.")
s.log.Error(err, "Could not read file data.")
return
}
chunk.Data = chunkData
Expand Down

0 comments on commit 28dd25b

Please sign in to comment.