Skip to content

Commit

Permalink
persistent StorageClient
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed Oct 9, 2018
1 parent e0cadd4 commit bbc35c5
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 17 deletions.
10 changes: 7 additions & 3 deletions cmd/gardener/gardener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/m-lab/etl-gardener/cloud"
"github.com/m-lab/etl-gardener/reproc"
"github.com/m-lab/etl-gardener/rex"
"google.golang.org/api/option"

"github.com/m-lab/etl-gardener/state"

Expand Down Expand Up @@ -159,7 +158,11 @@ func taskHandlerFromEnv(ctx context.Context, client *http.Client) (*reproc.TaskH
Client: client}

bqConfig := NewBQConfig(config)
exec := rex.ReprocessingExecutor{BQConfig: bqConfig, BucketOpts: []option.ClientOption{}}
exec, err := rex.NewReprocessingExecutor(ctx, bqConfig)
if err != nil {
return nil, err
}
// TODO - exec.StorageClient should be closed.
queues := make([]string, env.NumQueues)
for i := 0; i < env.NumQueues; i++ {
queues[i] = fmt.Sprintf("%s%d", env.QueueBase, i)
Expand All @@ -170,7 +173,7 @@ func taskHandlerFromEnv(ctx context.Context, client *http.Client) (*reproc.TaskH
if err != nil {
return nil, err
}
return reproc.NewTaskHandler(&exec, queues, saver), nil
return reproc.NewTaskHandler(exec, queues, saver), nil
}

// doDispatchLoop just sequences through archives in date order.
Expand Down Expand Up @@ -285,6 +288,7 @@ func setupService(ctx context.Context) error {
http.HandleFunc("/alive", healthCheck)
http.HandleFunc("/ready", healthCheck)

// TODO - this creates a storage client, which should be closed on termination.
handler, err := taskHandlerFromEnv(ctx, http.DefaultClient)

if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions rex/rex.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,17 @@ func init() {
// ReprocessingExecutor handles all reprocessing steps.
type ReprocessingExecutor struct {
cloud.BQConfig
BucketOpts []option.ClientOption
StorageClient stiface.Client
}

// NewReprocessingExecutor creates a new exec.
// NOTE: The context is used to create a persistent storage Client!
func NewReprocessingExecutor(ctx context.Context, config cloud.BQConfig, bucketOpts ...option.ClientOption) (*ReprocessingExecutor, error) {
storageClient, err := storage.NewClient(ctx, bucketOpts...)
if err != nil {
return nil, err
}
return &ReprocessingExecutor{config, stiface.AdaptClient(storageClient)}, nil
}

// GetBatchDS constructs an appropriate Dataset for BQ operations.
Expand Down Expand Up @@ -230,16 +240,8 @@ func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int,

// Use a real storage bucket.
// TODO - add a persistent storageClient to the rex object?
sc, err := storage.NewClient(ctx, rex.BucketOpts...)
if err != nil {
log.Println(err)
t.SetError(ctx, err, "StorageClientError")
return 0, err
}
storageClient := stiface.AdaptClient(sc)
// TODO - try cancelling the context instead?
defer storageClient.Close()
bucket, err := tq.GetBucket(ctx, storageClient, rex.Project, bucketName, false)
bucket, err := tq.GetBucket(ctx, rex.StorageClient, rex.Project, bucketName, false)
if err != nil {
if err == io.EOF && env.TestMode {
log.Println("Using fake client, ignoring EOF error")
Expand Down
8 changes: 6 additions & 2 deletions rex/rex_bb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ func TestRealBucket(t *testing.T) {
client, counter := cloud.DryRunClient()
config := cloud.Config{Project: "mlab-testing", Client: client}
bqConfig := cloud.BQConfig{Config: config, BQProject: "mlab-testing", BQBatchDataset: "batch"}
exec := rex.ReprocessingExecutor{BQConfig: bqConfig}
exec, err := rex.NewReprocessingExecutor(ctx, bqConfig)
if err != nil {
t.Fatal(err)
}
defer exec.StorageClient.Close()
saver := newTestSaver()
th := reproc.NewTaskHandler(&exec, []string{"queue-1"}, saver)
th := reproc.NewTaskHandler(exec, []string{"queue-1"}, saver)

// We submit tasks corresponding to real buckets...
th.AddTask(ctx, "gs://archive-mlab-testing/ndt/2017/09/22/")
Expand Down
8 changes: 6 additions & 2 deletions rex/rex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ func TestWithTaskQueue(t *testing.T) {
config := cloud.Config{Project: "mlab-testing", Client: client}
bqConfig := cloud.BQConfig{Config: config, BQProject: "bqproject", BQBatchDataset: "dataset"}
bucketOpts := []option.ClientOption{option.WithHTTPClient(client)}
exec := rex.ReprocessingExecutor{BQConfig: bqConfig, BucketOpts: bucketOpts}
exec, err := rex.NewReprocessingExecutor(ctx, bqConfig, bucketOpts...)
if err != nil {
t.Fatal(err)
}
defer exec.StorageClient.Close()
saver := newTestSaver()
th := reproc.NewTaskHandler(&exec, []string{"queue-1"}, saver)
th := reproc.NewTaskHandler(exec, []string{"queue-1"}, saver)

th.AddTask(ctx, "gs://foo/bar/2001/01/01/")

Expand Down

0 comments on commit bbc35c5

Please sign in to comment.