Skip to content

Commit

Permalink
Merge bbc35c5 into 2349632
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed Oct 9, 2018
2 parents 2349632 + bbc35c5 commit 75e2898
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 22 deletions.
7 changes: 4 additions & 3 deletions cloud/tq/tq.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"cloud.google.com/go/storage"
"github.com/GoogleCloudPlatform/google-cloud-go-testing/storage/stiface"
"github.com/m-lab/etl-gardener/cloud"
"google.golang.org/api/iterator"
"google.golang.org/appengine/taskqueue"
Expand Down Expand Up @@ -155,7 +156,7 @@ func (qh *QueueHandler) postWithRetry(bucket, filepath string) error {
}

// PostAll posts all normal file items in an ObjectIterator into the appropriate queue.
func (qh *QueueHandler) PostAll(bucket string, it *storage.ObjectIterator) (int, error) {
func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int, error) {
fileCount := 0
qpErrCount := 0
gcsErrCount := 0
Expand Down Expand Up @@ -190,7 +191,7 @@ func (qh *QueueHandler) PostAll(bucket string, it *storage.ObjectIterator) (int,
// PostDay fetches an iterator over the objects with ndt/YYYY/MM/DD prefix,
// and passes the iterator to postDay with appropriate queue.
// This typically takes about 10 minutes for a 20K task NDT day.
func (qh *QueueHandler) PostDay(ctx context.Context, bucket *storage.BucketHandle, bucketName, prefix string) (int, error) {
func (qh *QueueHandler) PostDay(ctx context.Context, bucket stiface.BucketHandle, bucketName, prefix string) (int, error) {
log.Println("Adding ", prefix, " to ", qh.Queue)
qry := storage.Query{
Delimiter: "/",
Expand All @@ -209,7 +210,7 @@ func (qh *QueueHandler) PostDay(ctx context.Context, bucket *storage.BucketHandl

// GetBucket gets a storage bucket.
// opts - ClientOptions, e.g. credentials, for tests that need to access storage buckets.
func GetBucket(ctx context.Context, sClient *storage.Client, project, bucketName string, dryRun bool) (*storage.BucketHandle, error) {
func GetBucket(ctx context.Context, sClient stiface.Client, project, bucketName string, dryRun bool) (stiface.BucketHandle, error) {
bucket := sClient.Bucket(bucketName)
// Check that the bucket is valid, by fetching it's attributes.
// Bypass check if we are running travis tests.
Expand Down
9 changes: 6 additions & 3 deletions cloud/tq/tq_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"cloud.google.com/go/storage"
"github.com/GoogleCloudPlatform/google-cloud-go-testing/storage/stiface"
"github.com/m-lab/etl-gardener/cloud"
"github.com/m-lab/etl-gardener/cloud/tq"
"google.golang.org/api/iterator"
Expand All @@ -35,13 +36,14 @@ func TestGetTaskqueueStats(t *testing.T) {
func TestGetBucket(t *testing.T) {
ctx := context.Background()

storageClient, err := storage.NewClient(context.Background())
sc, err := storage.NewClient(context.Background())
if err != nil {
t.Fatal(err)
}
storageClient := stiface.AdaptClient(sc)

bucketName := "archive-mlab-testing"
bucket, err := tq.GetBucket(ctx, storageClient, "mlab-testing", bucketName, false)
bucket, err := tq.GetBucket(ctx, stiface.AdaptClient(sc), "mlab-testing", bucketName, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -97,10 +99,11 @@ func TestPostDay(t *testing.T) {
}

// Use a real storage bucket.
storageClient, err := storage.NewClient(ctx)
sc, err := storage.NewClient(ctx)
if err != nil {
t.Error(err)
}
storageClient := stiface.AdaptClient(sc)
bucketName := "archive-mlab-testing"
bucket, err := tq.GetBucket(ctx, storageClient, "mlab-testing", bucketName, false)
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions cmd/gardener/gardener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/m-lab/etl-gardener/cloud"
"github.com/m-lab/etl-gardener/reproc"
"github.com/m-lab/etl-gardener/rex"
"google.golang.org/api/option"

"github.com/m-lab/etl-gardener/state"

Expand Down Expand Up @@ -159,7 +158,11 @@ func taskHandlerFromEnv(ctx context.Context, client *http.Client) (*reproc.TaskH
Client: client}

bqConfig := NewBQConfig(config)
exec := rex.ReprocessingExecutor{BQConfig: bqConfig, BucketOpts: []option.ClientOption{}}
exec, err := rex.NewReprocessingExecutor(ctx, bqConfig)
if err != nil {
return nil, err
}
// TODO - exec.StorageClient should be closed.
queues := make([]string, env.NumQueues)
for i := 0; i < env.NumQueues; i++ {
queues[i] = fmt.Sprintf("%s%d", env.QueueBase, i)
Expand All @@ -170,7 +173,7 @@ func taskHandlerFromEnv(ctx context.Context, client *http.Client) (*reproc.TaskH
if err != nil {
return nil, err
}
return reproc.NewTaskHandler(&exec, queues, saver), nil
return reproc.NewTaskHandler(exec, queues, saver), nil
}

// doDispatchLoop just sequences through archives in date order.
Expand Down Expand Up @@ -285,6 +288,7 @@ func setupService(ctx context.Context) error {
http.HandleFunc("/alive", healthCheck)
http.HandleFunc("/ready", healthCheck)

// TODO - this creates a storage client, which should be closed on termination.
handler, err := taskHandlerFromEnv(ctx, http.DefaultClient)

if err != nil {
Expand Down
23 changes: 14 additions & 9 deletions rex/rex.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"strings"
"time"

"github.com/GoogleCloudPlatform/google-cloud-go-testing/storage/stiface"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/storage"
"github.com/m-lab/etl-gardener/cloud"
Expand Down Expand Up @@ -45,7 +47,17 @@ func init() {
// ReprocessingExecutor handles all reprocessing steps.
type ReprocessingExecutor struct {
cloud.BQConfig
BucketOpts []option.ClientOption
StorageClient stiface.Client
}

// NewReprocessingExecutor creates a new exec.
// NOTE: The context is used to create a persistent storage Client!
func NewReprocessingExecutor(ctx context.Context, config cloud.BQConfig, bucketOpts ...option.ClientOption) (*ReprocessingExecutor, error) {
storageClient, err := storage.NewClient(ctx, bucketOpts...)
if err != nil {
return nil, err
}
return &ReprocessingExecutor{config, stiface.AdaptClient(storageClient)}, nil
}

// GetBatchDS constructs an appropriate Dataset for BQ operations.
Expand Down Expand Up @@ -228,15 +240,8 @@ func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int,

// Use a real storage bucket.
// TODO - add a persistent storageClient to the rex object?
storageClient, err := storage.NewClient(ctx, rex.BucketOpts...)
if err != nil {
log.Println(err)
t.SetError(ctx, err, "StorageClientError")
return 0, err
}
// TODO - try cancelling the context instead?
defer storageClient.Close()
bucket, err := tq.GetBucket(ctx, storageClient, rex.Project, bucketName, false)
bucket, err := tq.GetBucket(ctx, rex.StorageClient, rex.Project, bucketName, false)
if err != nil {
if err == io.EOF && env.TestMode {
log.Println("Using fake client, ignoring EOF error")
Expand Down
8 changes: 6 additions & 2 deletions rex/rex_bb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ func TestRealBucket(t *testing.T) {
client, counter := cloud.DryRunClient()
config := cloud.Config{Project: "mlab-testing", Client: client}
bqConfig := cloud.BQConfig{Config: config, BQProject: "mlab-testing", BQBatchDataset: "batch"}
exec := rex.ReprocessingExecutor{BQConfig: bqConfig}
exec, err := rex.NewReprocessingExecutor(ctx, bqConfig)
if err != nil {
t.Fatal(err)
}
defer exec.StorageClient.Close()
saver := newTestSaver()
th := reproc.NewTaskHandler(&exec, []string{"queue-1"}, saver)
th := reproc.NewTaskHandler(exec, []string{"queue-1"}, saver)

// We submit tasks corresponding to real buckets...
th.AddTask(ctx, "gs://archive-mlab-testing/ndt/2017/09/22/")
Expand Down
8 changes: 6 additions & 2 deletions rex/rex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ func TestWithTaskQueue(t *testing.T) {
config := cloud.Config{Project: "mlab-testing", Client: client}
bqConfig := cloud.BQConfig{Config: config, BQProject: "bqproject", BQBatchDataset: "dataset"}
bucketOpts := []option.ClientOption{option.WithHTTPClient(client)}
exec := rex.ReprocessingExecutor{BQConfig: bqConfig, BucketOpts: bucketOpts}
exec, err := rex.NewReprocessingExecutor(ctx, bqConfig, bucketOpts...)
if err != nil {
t.Fatal(err)
}
defer exec.StorageClient.Close()
saver := newTestSaver()
th := reproc.NewTaskHandler(&exec, []string{"queue-1"}, saver)
th := reproc.NewTaskHandler(exec, []string{"queue-1"}, saver)

th.AddTask(ctx, "gs://foo/bar/2001/01/01/")

Expand Down

0 comments on commit 75e2898

Please sign in to comment.