Skip to content

Commit

Permalink
Merge 78759ef into 983f356
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed Oct 8, 2018
2 parents 983f356 + 78759ef commit e1851b2
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 194 deletions.
11 changes: 6 additions & 5 deletions cloud/bq/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (

// WaitForStableTable loops checking until table exists and has no streaming buffer.
// TODO - move these functions to go/bqext package
func WaitForStableTable(tt *bigquery.Table) error {
func WaitForStableTable(ctx context.Context, tt *bigquery.Table) error {
errorTimeout := 2 * time.Minute
if testMode {
errorTimeout = 100 * time.Millisecond
Expand All @@ -44,7 +44,7 @@ ErrorTimeout:
// Check table status until streaming buffer is empty, OR there is
// an error condition we don't expect to recover from.
for {
ctx, cf := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cf := context.WithTimeout(ctx, 10*time.Second)
defer cf()
meta, err = tt.Metadata(ctx)
if err == nil && ctx.Err() != nil {
Expand Down Expand Up @@ -153,9 +153,10 @@ var dedupTemplateSwitch = `
// NOTE: If destination table is partitioned, destTable MUST include the partition
// suffix to avoid accidentally overwriting the entire table.
// TODO - move these functions to go/bqext package
func Dedup(dsExt *bqext.Dataset, src string, destTable *bigquery.Table) (*bigquery.Job, error) {
// TODO - should we get the context from the dsExt?
func Dedup(ctx context.Context, dsExt *bqext.Dataset, src string, destTable *bigquery.Table) (*bigquery.Job, error) {
if !strings.Contains(destTable.TableID, "$") {
meta, err := destTable.Metadata(context.Background())
meta, err := destTable.Metadata(ctx)
if err == nil && meta.TimePartitioning != nil {
return nil, errors.New("Destination table must specify partition")
}
Expand All @@ -179,7 +180,7 @@ func Dedup(dsExt *bqext.Dataset, src string, destTable *bigquery.Table) (*bigque
if query.QueryConfig.Dst == nil && query.QueryConfig.DryRun == false {
return nil, errors.New("query must be a destination or dry run")
}
job, err := query.Run(context.Background())
job, err := query.Run(ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cloud/bq/sanity.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func SanityCheckAndCopy(ctx context.Context, src, dest *AnnotatedTable) error {
return err
}

err = WaitForJob(context.Background(), job, 10*time.Second)
err = WaitForJob(ctx, job, 10*time.Second)
log.Println("SanityCheckAndCopy Done")
return err
}
17 changes: 9 additions & 8 deletions cloud/tq/tq.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"cloud.google.com/go/storage"
"github.com/GoogleCloudPlatform/google-cloud-go-testing/storage/stiface"
"github.com/m-lab/etl-gardener/cloud"
"google.golang.org/api/iterator"
"google.golang.org/appengine/taskqueue"
Expand Down Expand Up @@ -72,7 +73,7 @@ func (qh *QueueHandler) IsEmpty() error {
// GetTaskqueueStats gets stats for a single task queue.
func GetTaskqueueStats(config cloud.Config, name string) (stats taskqueue.QueueStatistics, err error) {
// Would prefer to use this, but it does not work from flex![]
// stats, err := taskqueue.QueueStats(context.Background(), queueNames)
// stats, err := taskqueue.QueueStats(config.Context, queueNames)
resp, err := config.Client.Get(fmt.Sprintf(`https://queue-pusher-dot-%s.appspot.com/stats?queuename=%s`, config.Project, name))
if err != nil {
return
Expand Down Expand Up @@ -155,7 +156,7 @@ func (qh *QueueHandler) postWithRetry(bucket, filepath string) error {
}

// PostAll posts all normal file items in an ObjectIterator into the appropriate queue.
func (qh *QueueHandler) PostAll(bucket string, it *storage.ObjectIterator) (int, error) {
func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int, error) {
fileCount := 0
qpErrCount := 0
gcsErrCount := 0
Expand Down Expand Up @@ -190,14 +191,15 @@ func (qh *QueueHandler) PostAll(bucket string, it *storage.ObjectIterator) (int,
// PostDay fetches an iterator over the objects with ndt/YYYY/MM/DD prefix,
// and passes the iterator to postDay with appropriate queue.
// This typically takes about 10 minutes for a 20K task NDT day.
func (qh *QueueHandler) PostDay(bucket *storage.BucketHandle, bucketName, prefix string) (int, error) {
func (qh *QueueHandler) PostDay(ctx context.Context, bucket stiface.BucketHandle, bucketName, prefix string) (int, error) {
log.Println("Adding ", prefix, " to ", qh.Queue)
qry := storage.Query{
Delimiter: "/",
Prefix: prefix,
}
// TODO - can this error? Or do errors only occur on iterator ops?
it := bucket.Objects(context.Background(), &qry)
// TODO - handle timeout errors?
// TODO - should we add a deadline?
it := bucket.Objects(ctx, &qry)
return qh.PostAll(bucketName, it)
}

Expand All @@ -208,13 +210,12 @@ func (qh *QueueHandler) PostDay(bucket *storage.BucketHandle, bucketName, prefix

// GetBucket gets a storage bucket.
// opts - ClientOptions, e.g. credentials, for tests that need to access storage buckets.
func GetBucket(sClient *storage.Client, project, bucketName string, dryRun bool) (*storage.BucketHandle, error) {

func GetBucket(ctx context.Context, sClient stiface.Client, project, bucketName string, dryRun bool) (stiface.BucketHandle, error) {
bucket := sClient.Bucket(bucketName)
// Check that the bucket is valid, by fetching it's attributes.
// Bypass check if we are running travis tests.
if !dryRun {
_, err := bucket.Attrs(context.Background())
_, err := bucket.Attrs(ctx)
if err != nil {
return nil, err
}
Expand Down
19 changes: 12 additions & 7 deletions cloud/tq/tq_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"cloud.google.com/go/storage"
"github.com/GoogleCloudPlatform/google-cloud-go-testing/storage/stiface"
"github.com/m-lab/etl-gardener/cloud"
"github.com/m-lab/etl-gardener/cloud/tq"
"google.golang.org/api/iterator"
Expand All @@ -33,14 +34,16 @@ func TestGetTaskqueueStats(t *testing.T) {
// check that the bucket content has not been changed.
// TODO - this currently leaks goroutines.
func TestGetBucket(t *testing.T) {
ctx := context.Background()

storageClient, err := storage.NewClient(context.Background())
sc, err := storage.NewClient(context.Background())
if err != nil {
t.Error(err)
t.Fatal(err)
}
storageClient := stiface.AdaptClient(sc)

bucketName := "archive-mlab-testing"
bucket, err := tq.GetBucket(storageClient, "mlab-testing", bucketName, false)
bucket, err := tq.GetBucket(ctx, stiface.AdaptClient(sc), "mlab-testing", bucketName, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -88,30 +91,32 @@ func TestIsEmpty(t *testing.T) {
func TestPostDay(t *testing.T) {
// Use a fake queue client.
client, counter := cloud.DryRunClient()
ctx := context.Background()
config := cloud.Config{Client: client, Project: "fake-project"}
q, err := tq.NewQueueHandler(config, "test-queue")
if err != nil {
t.Fatal(err)
}

// Use a real storage bucket.
storageClient, err := storage.NewClient(context.Background())
sc, err := storage.NewClient(ctx)
if err != nil {
t.Error(err)
}
storageClient := stiface.AdaptClient(sc)
bucketName := "archive-mlab-testing"
bucket, err := tq.GetBucket(storageClient, "mlab-testing", bucketName, false)
bucket, err := tq.GetBucket(ctx, storageClient, "mlab-testing", bucketName, false)
if err != nil {
t.Fatal(err)
}
n, err := q.PostDay(bucket, bucketName, "ndt/2017/09/24/")
n, err := q.PostDay(ctx, bucket, bucketName, "ndt/2017/09/24/")
if err != nil {
t.Fatal(err)
}
if n != 3 {
t.Error("Should have posted 3 items", n)
}
n, err = q.PostDay(bucket, bucketName, "ndt/2018/05/01/")
n, err = q.PostDay(ctx, bucket, bucketName, "ndt/2018/05/01/")
if err != nil {
t.Fatal(err)
}
Expand Down
53 changes: 32 additions & 21 deletions cmd/gardener/gardener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/m-lab/etl-gardener/cloud"
"github.com/m-lab/etl-gardener/reproc"
"github.com/m-lab/etl-gardener/rex"
"google.golang.org/api/option"

"github.com/m-lab/etl-gardener/state"

Expand Down Expand Up @@ -145,7 +144,9 @@ func NewBQConfig(config cloud.Config) cloud.BQConfig {

// dispatcherFromEnv creates a Dispatcher struct initialized from environment variables.
// It uses PROJECT, QUEUE_BASE, and NUM_QUEUES.
func taskHandlerFromEnv(client *http.Client) (*reproc.TaskHandler, error) {
// NOTE: ctx should only be used within the function scope, and not reused later.
// Not currently clear if that is true.
func taskHandlerFromEnv(ctx context.Context, client *http.Client) (*reproc.TaskHandler, error) {
if env.Error != nil {
log.Println(env.Error)
log.Println(env)
Expand All @@ -157,24 +158,28 @@ func taskHandlerFromEnv(client *http.Client) (*reproc.TaskHandler, error) {
Client: client}

bqConfig := NewBQConfig(config)
exec := rex.ReprocessingExecutor{BQConfig: bqConfig, BucketOpts: []option.ClientOption{}}
exec, err := rex.NewReprocessingExecutor(ctx, bqConfig)
if err != nil {
return nil, err
}
// TODO - exec.StorageClient should be closed.
queues := make([]string, env.NumQueues)
for i := 0; i < env.NumQueues; i++ {
queues[i] = fmt.Sprintf("%s%d", env.QueueBase, i)
}

// TODO move DatastoreSaver to another package?
saver, err := state.NewDatastoreSaver(env.Project)
saver, err := state.NewDatastoreSaver(ctx, env.Project)
if err != nil {
return nil, err
}
return reproc.NewTaskHandler(&exec, queues, saver), nil
return reproc.NewTaskHandler(exec, queues, saver), nil
}

// doDispatchLoop just sequences through archives in date order.
// It will generally be blocked on the queues.
// It will start processing at startDate, and when it catches up to "now" it will restart at restartDate.
func doDispatchLoop(handler *reproc.TaskHandler, startDate time.Time, restartDate time.Time, bucket string, experiment string) {
func doDispatchLoop(ctx context.Context, handler *reproc.TaskHandler, startDate time.Time, restartDate time.Time, bucket string, experiment string) {
log.Println("(Re)starting at", startDate)
next := startDate

Expand All @@ -183,7 +188,7 @@ func doDispatchLoop(handler *reproc.TaskHandler, startDate time.Time, restartDat
prefix := next.Format(fmt.Sprintf("gs://%s/%s/2006/01/02/", bucket, experiment))

// Note that this blocks until a queue is available.
err := handler.AddTask(prefix)
err := handler.AddTask(ctx, prefix)
if err != nil {
// Only error expected here is ErrTerminating
log.Println(err)
Expand Down Expand Up @@ -244,7 +249,8 @@ func Status(w http.ResponseWriter, r *http.Request) {

fmt.Fprintf(w, "</br></br>\n")

state.WriteHTMLStatusTo(w, env.Project, env.Experiment)
// TODO - attach the environment to the context.
state.WriteHTMLStatusTo(r.Context(), w, env.Project, env.Experiment)
fmt.Fprintf(w, "</br>\n")

env := os.Environ()
Expand All @@ -267,7 +273,7 @@ func healthCheck(w http.ResponseWriter, r *http.Request) {

// setupService prepares the setting for a service.
// The configuration info comes from environment variables.
func setupService() error {
func setupService(ctx context.Context) error {
// Enable block profiling
runtime.SetBlockProfileRate(1000000) // One event per msec.

Expand All @@ -282,7 +288,8 @@ func setupService() error {
http.HandleFunc("/alive", healthCheck)
http.HandleFunc("/ready", healthCheck)

handler, err := taskHandlerFromEnv(http.DefaultClient)
// TODO - this creates a storage client, which should be closed on termination.
handler, err := taskHandlerFromEnv(ctx, http.DefaultClient)

if err != nil {
log.Println(err)
Expand All @@ -291,32 +298,33 @@ func setupService() error {

startDate := env.StartDate

ds, err := state.NewDatastoreSaver(env.Project)
ds, err := state.NewDatastoreSaver(ctx, env.Project)
if err != nil {
log.Println(err)
return err
}
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)

tasks, err := ds.GetStatus(ctx, env.Experiment)
// Move the timeout into GetStatus?
taskCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
tasks, err := ds.GetStatus(taskCtx, env.Experiment)
cancel()

if err != nil {
log.Println(err)
return err
}
maxDate, err := handler.RestartTasks(tasks)
maxDate, err := handler.RestartTasks(ctx, tasks)
if err != nil {
log.Println(err)
return err
} else {
if maxDate.After(startDate) {
startDate = maxDate.AddDate(0, 0, 1)
}
}

if maxDate.After(startDate) {
startDate = maxDate.AddDate(0, 0, 1)
}

log.Println("Using start date of", startDate)
go doDispatchLoop(handler, startDate, env.StartDate, env.Bucket, env.Experiment)
go doDispatchLoop(ctx, handler, startDate, env.StartDate, env.Bucket, env.Experiment)

return nil
}
Expand All @@ -331,12 +339,15 @@ func init() {
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Check if invoked as a service.
isService, _ := strconv.ParseBool(os.Getenv("GARDENER_SERVICE"))
if isService {
// If setupService() returns an err instead of nil, healthy will be
// set as false and eventually it will cause kubernetes to roll back.
err := setupService()
err := setupService(ctx)
if err != nil {
healthy = false
log.Println("Running as unhealthy service")
Expand Down
Binary file added gardener
Binary file not shown.
17 changes: 10 additions & 7 deletions reproc/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package reproc

import (
"context"
"errors"
"log"
"strings"
Expand Down Expand Up @@ -89,7 +90,7 @@ func NewTaskHandler(exec state.Executor, queues []string, saver state.Saver) *Ta
var ErrTerminating = errors.New("TaskHandler is terminating")

// StartTask starts a single task. It should be properly initialized except for saver.
func (th *TaskHandler) StartTask(t state.Task) {
func (th *TaskHandler) StartTask(ctx context.Context, t state.Task) {
t.SetSaver(th.saver)

// WARNING: There is a race here when terminating, if a task gets
Expand All @@ -105,15 +106,15 @@ func (th *TaskHandler) StartTask(t state.Task) {
log.Println("Returning", t.Queue)
th.taskQueues <- t.Queue
}
go t.Process(th.exec, doneWithQueue, th.Terminator)
go t.Process(ctx, th.exec, doneWithQueue, th.Terminator)
}

// AddTask adds a new task, blocking until the task has been accepted.
// This will typically be repeated called by another goroutine responsible
// for driving the reprocessing.
// May return ErrTerminating, if th has started termination.
// TODO: Add prometheus metrics.
func (th *TaskHandler) AddTask(prefix string) error {
func (th *TaskHandler) AddTask(ctx context.Context, prefix string) error {
log.Println("Waiting for a queue")
select {
// Wait until there is an available task queue.
Expand All @@ -124,7 +125,7 @@ func (th *TaskHandler) AddTask(prefix string) error {
return err
}
log.Println("Adding:", t.Name)
th.StartTask(*t)
th.StartTask(ctx, *t)
return nil

// Or until we start termination.
Expand All @@ -137,7 +138,9 @@ func (th *TaskHandler) AddTask(prefix string) error {
// RestartTasks restarts all the tasks, allocating queues as needed.
// SHOULD ONLY be called at startup.
// Returns date of next jobs to process.
func (th *TaskHandler) RestartTasks(tasks []state.Task) (time.Time, error) {
// NOTE: The ctx parameter will be used for all rpc operations for all tasks, and must be
// long lived.
func (th *TaskHandler) RestartTasks(ctx context.Context, tasks []state.Task) (time.Time, error) {
// Retrieve all task queues from the pool.
queues := make(map[string]struct{}, 20)
queueLoop:
Expand Down Expand Up @@ -171,7 +174,7 @@ queueLoop:
if ok {
delete(queues, t.Queue)
log.Println("Restarting", t)
th.StartTask(t)
th.StartTask(ctx, t)
} else {
log.Println("Queue", t.Queue, "already in use. Skipping", t)
metrics.FailCount.WithLabelValues("queue not available").Inc()
Expand All @@ -180,7 +183,7 @@ queueLoop:
} else {
// No queue, just restart...
log.Println("Restarting", t)
th.StartTask(t)
th.StartTask(ctx, t)
}
if t.Date.After(maxDate) {
maxDate = t.Date
Expand Down

0 comments on commit e1851b2

Please sign in to comment.