Skip to content

Commit

Permalink
worker: added config to limit parallel uploads/downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
adamstruck committed Nov 1, 2019
1 parent 5a151d5 commit 879f0d8
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 60 deletions.
17 changes: 17 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ type Worker struct {
// Normally the worker cleans up its working directory after executing.
// This option disables that behavior.
LeaveWorkDir bool
// Limit the number of concurrent downloads/uploads
MaxParallelTransfers int
}

// HPCBackend describes the configuration for a HPC scheduler backend such as
Expand Down
3 changes: 3 additions & 0 deletions config/default-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ Worker:
# This option disables that behavior.
LeaveWorkDir: false

# Limit the number of concurrent downloads/uploads
MaxParallelTransfers: 10

#-------------------------------------------------------------------------------
# Databases and/or Event Writers/Handlers
#-------------------------------------------------------------------------------
Expand Down
9 changes: 5 additions & 4 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ func DefaultConfig() Config {
Metadata: map[string]string{},
},
Worker: Worker{
WorkDir: workDir,
PollingRate: Duration(time.Second * 5),
LogUpdateRate: Duration(time.Second * 5),
LogTailSize: 10000,
WorkDir: workDir,
PollingRate: Duration(time.Second * 5),
LogUpdateRate: Duration(time.Second * 5),
LogTailSize: 10000,
MaxParallelTransfers: 10,
},
Logger: logger.DefaultConfig(),
// databases / event handlers
Expand Down
34 changes: 13 additions & 21 deletions storage/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package storage

import (
"context"
"sync"

"github.com/gammazero/workerpool"
"github.com/ohsu-comp-bio/funnel/util/fsutil"
)

Expand All @@ -23,52 +23,44 @@ type Transfer interface {
//
// Transfer events (started, failed, finished, etc) are communicated
// via the Transfer interface.
func Download(ctx context.Context, store Storage, transfers []Transfer) {
wg := &sync.WaitGroup{}
wg.Add(len(transfers))

func Download(ctx context.Context, store Storage, transfers []Transfer, parallelLimit int) {
wp := workerpool.New(parallelLimit)
for _, x := range transfers {
go func(x Transfer) {
defer wg.Done()
x := x
wp.Submit(func() {
x.Started()

var obj *Object
err := fsutil.EnsurePath(x.Path())
if err == nil {
obj, err = store.Get(ctx, x.URL(), x.Path())
}

if err != nil {
x.Failed(err)
} else {
x.Finished(obj)
}
}(x)
})
}
wg.Wait()
wp.StopWait()
}

// Upload uploads a list of transfers to storage, in parallel.
//
// Transfer events (started, failed, finished, etc) are communicated
// via the Transfer interface.
func Upload(ctx context.Context, store Storage, transfers []Transfer) {
wg := &sync.WaitGroup{}
wg.Add(len(transfers))

func Upload(ctx context.Context, store Storage, transfers []Transfer, parallelLimit int) {
wp := workerpool.New(parallelLimit)
for _, x := range transfers {
go func(x Transfer) {
defer wg.Done()

x := x
wp.Submit(func() {
x.Started()
obj, err := store.Put(ctx, x.URL(), x.Path())

if err != nil {
x.Failed(err)
} else {
x.Finished(obj)
}
}(x)
})
}
wg.Wait()
wp.StopWait()
}
9 changes: 5 additions & 4 deletions tests/storage/amazon_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestAmazonS3Storage(t *testing.T) {
ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log})
testBucket := "funnel-e2e-tests-" + tests.RandomString(6)
ctx := context.Background()
parallelXfer := 10

client, err := newS3Test(conf.AmazonS3)
if err != nil {
Expand All @@ -54,7 +55,7 @@ func TestAmazonS3Storage(t *testing.T) {
inFileURL := protocol + testBucket + "/" + fPath
_, err = worker.UploadOutputs(ctx, []*tes.Output{
{Url: inFileURL, Path: fPath},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("error uploading test file:", err)
}
Expand All @@ -64,7 +65,7 @@ func TestAmazonS3Storage(t *testing.T) {
inDirURL := protocol + testBucket + "/" + dPath
_, err = worker.UploadOutputs(ctx, []*tes.Output{
{Url: inDirURL, Path: dPath, Type: tes.Directory},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("error uploading test directory:", err)
}
Expand Down Expand Up @@ -129,7 +130,7 @@ func TestAmazonS3Storage(t *testing.T) {

err = worker.DownloadInputs(ctx, []*tes.Input{
{Url: outFileURL, Path: "./test_tmp/test-s3-file.txt"},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("Failed to download file:", err)
}
Expand All @@ -148,7 +149,7 @@ func TestAmazonS3Storage(t *testing.T) {

err = worker.DownloadInputs(ctx, []*tes.Input{
{Url: outDirURL, Path: "./test_tmp/test-s3-directory", Type: tes.Directory},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("Failed to download directory:", err)
}
Expand Down
20 changes: 10 additions & 10 deletions tests/storage/ftp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestFTPStorage(t *testing.T) {
ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log})
testBucket := "bob:bob@localhost:8021"
ctx := context.Background()

parallelXfer := 10
protocol := "ftp://"

store, err := storage.NewMux(conf)
Expand All @@ -40,7 +40,7 @@ func TestFTPStorage(t *testing.T) {
inFileURL := protocol + testBucket + "/" + fPath
_, err = worker.UploadOutputs(ctx, []*tes.Output{
{Url: inFileURL, Path: fPath},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("error uploading test file:", err)
}
Expand All @@ -49,7 +49,7 @@ func TestFTPStorage(t *testing.T) {
inDirURL := protocol + testBucket + "/" + dPath
_, err = worker.UploadOutputs(ctx, []*tes.Output{
{Url: inDirURL, Path: dPath, Type: tes.Directory},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("error uploading test directory:", err)
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestFTPStorage(t *testing.T) {

err = worker.DownloadInputs(ctx, []*tes.Input{
{Url: outFileURL, Path: "./test_tmp/test-gs-file.txt"},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("Failed to download file:", err)
}
Expand All @@ -130,7 +130,7 @@ func TestFTPStorage(t *testing.T) {

err = worker.DownloadInputs(ctx, []*tes.Input{
{Url: outDirURL, Path: "./test_tmp/test-gs-directory", Type: tes.Directory},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("Failed to download directory:", err)
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestFTPStorageConfigAuth(t *testing.T) {
ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log})
testBucket := "localhost:8021"
ctx := context.Background()

parallelXfer := 10
protocol := "ftp://"

store, err := storage.NewMux(conf)
Expand All @@ -230,7 +230,7 @@ func TestFTPStorageConfigAuth(t *testing.T) {
inFileURL := protocol + testBucket + "/" + fPath
_, err = worker.UploadOutputs(ctx, []*tes.Output{
{Url: inFileURL, Path: fPath},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("error uploading test file:", err)
}
Expand All @@ -239,7 +239,7 @@ func TestFTPStorageConfigAuth(t *testing.T) {
inDirURL := protocol + testBucket + "/" + dPath
_, err = worker.UploadOutputs(ctx, []*tes.Output{
{Url: inDirURL, Path: dPath, Type: tes.Directory},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("error uploading test directory:", err)
}
Expand Down Expand Up @@ -301,7 +301,7 @@ func TestFTPStorageConfigAuth(t *testing.T) {

err = worker.DownloadInputs(ctx, []*tes.Input{
{Url: outFileURL, Path: "./test_tmp/test-gs-file.txt"},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("Failed to download file:", err)
}
Expand All @@ -320,7 +320,7 @@ func TestFTPStorageConfigAuth(t *testing.T) {

err = worker.DownloadInputs(ctx, []*tes.Input{
{Url: outDirURL, Path: "./test_tmp/test-gs-directory", Type: tes.Directory},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("Failed to download directory:", err)
}
Expand Down
9 changes: 5 additions & 4 deletions tests/storage/generic_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestGenericS3Storage(t *testing.T) {
ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log})
testBucket := "funnel-e2e-tests-" + tests.RandomString(6)
ctx := context.Background()
parallelXfer := 10

client, err := newMinioTest(conf.GenericS3[0])
if err != nil {
Expand All @@ -55,7 +56,7 @@ func TestGenericS3Storage(t *testing.T) {
inFileURL := protocol + testBucket + "/" + fPath
_, err = worker.UploadOutputs(ctx, []*tes.Output{
{Url: inFileURL, Path: fPath},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("error uploading test file:", err)
}
Expand All @@ -64,7 +65,7 @@ func TestGenericS3Storage(t *testing.T) {
inDirURL := protocol + testBucket + "/" + dPath
_, err = worker.UploadOutputs(ctx, []*tes.Output{
{Url: inDirURL, Path: dPath, Type: tes.Directory},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("error uploading test directory:", err)
}
Expand Down Expand Up @@ -126,7 +127,7 @@ func TestGenericS3Storage(t *testing.T) {

err = worker.DownloadInputs(ctx, []*tes.Input{
{Url: outFileURL, Path: "./test_tmp/test-s3-file.txt"},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("Failed to download file:", err)
}
Expand All @@ -145,7 +146,7 @@ func TestGenericS3Storage(t *testing.T) {

err = worker.DownloadInputs(ctx, []*tes.Input{
{Url: outDirURL, Path: "./test_tmp/test-s3-directory", Type: tes.Directory},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("Failed to download directory:", err)
}
Expand Down
9 changes: 5 additions & 4 deletions tests/storage/gs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestGoogleStorage(t *testing.T) {
ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log})
testBucket := "funnel-e2e-tests-" + tests.RandomString(6)
ctx := context.Background()
parallelXfer := 10

client, err := newGsTest()
if err != nil {
Expand All @@ -62,7 +63,7 @@ func TestGoogleStorage(t *testing.T) {
inFileURL := protocol + testBucket + "/" + fPath
_, err = worker.UploadOutputs(ctx, []*tes.Output{
{Url: inFileURL, Path: fPath},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("error uploading test file:", err)
}
Expand All @@ -71,7 +72,7 @@ func TestGoogleStorage(t *testing.T) {
inDirURL := protocol + testBucket + "/" + dPath
_, err = worker.UploadOutputs(ctx, []*tes.Output{
{Url: inDirURL, Path: dPath, Type: tes.Directory},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("error uploading test directory:", err)
}
Expand Down Expand Up @@ -133,7 +134,7 @@ func TestGoogleStorage(t *testing.T) {

err = worker.DownloadInputs(ctx, []*tes.Input{
{Url: outFileURL, Path: "./test_tmp/test-gs-file.txt"},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("Failed to download file:", err)
}
Expand All @@ -152,7 +153,7 @@ func TestGoogleStorage(t *testing.T) {

err = worker.DownloadInputs(ctx, []*tes.Input{
{Url: outDirURL, Path: "./test_tmp/test-gs-directory", Type: tes.Directory},
}, store, ev)
}, store, ev, parallelXfer)
if err != nil {
t.Fatal("Failed to download directory:", err)
}
Expand Down
7 changes: 4 additions & 3 deletions tests/storage/multi_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestMultiS3Storage(t *testing.T) {
ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log})
testBucket := "funnel-e2e-tests-" + tests.RandomString(6)
ctx := context.Background()
parallelXfer := 10

// Generic S3 backend setup
gconf1 := conf.GenericS3[0]
Expand Down Expand Up @@ -63,15 +64,15 @@ func TestMultiS3Storage(t *testing.T) {
g1FileURL := protocol + gconf1.Endpoint + "/" + testBucket + "/" + fPath + tests.RandomString(6)
_, err = worker.UploadOutputs(ctx, []*tes.Output{
{Url: g1FileURL, Path: fPath},
}, gclient1.fcli, ev)
}, gclient1.fcli, ev, parallelXfer)
if err != nil {
t.Fatal("error uploading test file:", err)
}

g2FileURL := protocol + gconf2.Endpoint + "/" + testBucket + "/" + fPath + tests.RandomString(6)
_, err = worker.UploadOutputs(ctx, []*tes.Output{
{Url: g2FileURL, Path: fPath, Type: tes.Directory},
}, gclient2.fcli, ev)
}, gclient2.fcli, ev, parallelXfer)
if err != nil {
t.Fatal("error uploading test file:", err)
}
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestMultiS3Storage(t *testing.T) {

err = worker.DownloadInputs(ctx, []*tes.Input{
{Url: outFileURL, Path: "./test_tmp/test-s3-file.txt"},
}, gclient1.fcli, ev)
}, gclient1.fcli, ev, parallelXfer)
if err != nil {
t.Fatal("Failed to download file:", err)
}
Expand Down
Loading

0 comments on commit 879f0d8

Please sign in to comment.