Skip to content

Commit

Permalink
injectable Dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed Oct 10, 2018
1 parent 9717a4b commit 9b1d721
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 48 deletions.
7 changes: 1 addition & 6 deletions cloud/tq/tq.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,7 @@ var (
ErrInvalidQueueName = errors.New("invalid queue name")
)

// QueueHandler is much like tq.Queuer, but for a single queue. We want
// independent single queue handlers to avoid thread safety issues, among
// other things.
// It needs:
// bucket
// strategies for enqueuing.
// QueueHandler provides methods for adding tasks, and getting stats and status.
type QueueHandler struct {
cloud.Config
Queue string // task queue name
Expand Down
63 changes: 23 additions & 40 deletions rex/rex.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,30 @@ func init() {
// ReprocessingExecutor handles all reprocessing steps.
type ReprocessingExecutor struct {
cloud.BQConfig
// These collectively allow injection of various fakes for testing.
StorageClient stiface.Client
BatchDS dataset.Dataset
FinalDS dataset.Dataset
// TODO need to allow injection of QueueHandler factory.
// QHFactory func(config cloud.Config, queue string) (*tq.QueueHandler, error)
}

// NewReprocessingExecutor creates a new exec.
// NOTE: The context is used to create a persistent storage Client!
// NOTE: Datasets may be injected after construction.
func NewReprocessingExecutor(ctx context.Context, config cloud.BQConfig, bucketOpts ...option.ClientOption) (*ReprocessingExecutor, error) {
storageClient, err := storage.NewClient(ctx, bucketOpts...)
if err != nil {
return nil, err
}
return &ReprocessingExecutor{config, stiface.AdaptClient(storageClient)}, nil
}

// GetBatchDS constructs an appropriate Dataset for BQ operations.
func (rex *ReprocessingExecutor) GetBatchDS(ctx context.Context) (dataset.Dataset, error) {
return dataset.NewDataset(ctx, rex.BQProject, rex.BQBatchDataset, rex.Options...)
}

// GetFinalDS constructs an appropriate Dataset for BQ operations.
func (rex *ReprocessingExecutor) GetFinalDS(ctx context.Context) (dataset.Dataset, error) {
// TODO - dataset should use the provided context.
return dataset.NewDataset(ctx, rex.BQProject, rex.BQFinalDataset, rex.Options...)
batchDS, err := dataset.NewDataset(ctx, config.BQProject, config.BQBatchDataset, config.Options...)
if err != nil {
return nil, err
}
finalDS, err := dataset.NewDataset(ctx, config.BQProject, config.BQFinalDataset, config.Options...)
if err != nil {
return nil, err
}
return &ReprocessingExecutor{config, stiface.AdaptClient(storageClient), batchDS, finalDS}, nil
}

var (
Expand Down Expand Up @@ -119,13 +121,7 @@ func (rex *ReprocessingExecutor) Next(ctx context.Context, t *state.Task, termin

case state.Stabilizing:
// Wait for the streaming buffer to be nil.
ds, err := rex.GetBatchDS(ctx)
if err != nil {
// SetError also pushes to datastore, like Update(ctx, )
t.SetError(ctx, err, "rex.GetBatchDS")
return err
}
s, _, err := t.SourceAndDest(&ds)
s, _, err := t.SourceAndDest(&rex.BatchDS)
if err != nil {
// SetError also pushes to datastore, like Update(ctx, )
t.SetError(ctx, err, "task.SourceAndDest")
Expand Down Expand Up @@ -264,11 +260,7 @@ func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int,

func (rex *ReprocessingExecutor) dedup(ctx context.Context, t *state.Task) error {
// Launch the dedup request, and save the JobID
ds, err := rex.GetBatchDS(ctx)
if err != nil {
t.SetError(ctx, err, "GetBatchDS")
return err
}
ds := rex.BatchDS
src, dest, err := t.SourceAndDest(&ds)
if err != nil {
t.SetError(ctx, err, "SourceAndDest")
Expand Down Expand Up @@ -339,17 +331,12 @@ func waitForJob(ctx context.Context, job bqiface.Job, maxBackoff time.Duration,

func (rex *ReprocessingExecutor) finish(ctx context.Context, t *state.Task, terminate <-chan struct{}) error {
// TODO use a simple client instead of creating dataset?
srcDs, err := rex.GetBatchDS(ctx)
if err != nil {
t.SetError(ctx, err, "GetBatchDS")
return err
}
src, copy, err := t.SourceAndDest(&srcDs)
src, copy, err := t.SourceAndDest(&rex.BatchDS)
if err != nil {
t.SetError(ctx, err, "SourceAndDest")
return err
}
job, err := srcDs.BqClient.JobFromID(ctx, t.JobID)
job, err := rex.BatchDS.BqClient.JobFromID(ctx, t.JobID)
if err != nil {
t.SetError(ctx, err, "JobFromID")
return err
Expand Down Expand Up @@ -396,23 +383,19 @@ func (rex *ReprocessingExecutor) finish(ctx context.Context, t *state.Task, term
t.SetError(ctx, ErrDatasetNamesMatch, "Cannot SanityCheckAndCopy to same dataset")
return ErrDatasetNamesMatch
}
destDs, err := rex.GetFinalDS(ctx)
if err != nil {
t.SetError(ctx, err, "GetFinalDS")
return err
}

// Destination table has the same ID as source.
dest := destDs.Table(copy.TableID())
dest := rex.FinalDS.Table(copy.TableID())
log.Printf("Sanity checking dedup'd data before final copy from %s to %s",
copy.FullyQualifiedName(), dest.FullyQualifiedName())

// TODO: how long can this actually take???
copyCtx, cf := context.WithTimeout(ctx, 30*time.Minute)
defer cf()

srcAt := bq.NewAnnotatedTable(copy, &srcDs)
destAt := bq.NewAnnotatedTable(dest, &destDs)
// TODO - move the AnnotatedTable stuff inside SanityCheckAndCopy?
srcAt := bq.NewAnnotatedTable(copy, &rex.BatchDS)
destAt := bq.NewAnnotatedTable(dest, &rex.FinalDS)

// Copy to Final Dataset tables.
err = bq.SanityCheckAndCopy(copyCtx, srcAt, destAt)
Expand Down
9 changes: 7 additions & 2 deletions rex/rex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,17 @@ func TestWithTaskQueue(t *testing.T) {
ctx := context.Background()
client, counter := cloud.DryRunClient()
config := cloud.Config{Project: "mlab-testing", Client: client}
bqConfig := cloud.BQConfig{Config: config, BQProject: "bqproject", BQBatchDataset: "dataset"}
bqConfig := cloud.BQConfig{Config: config, BQProject: "bqproject", BQBatchDataset: "batch", BQFinalDataset: "final"}
fc := fakeClient{objects: []*storage.ObjectAttrs{
&storage.ObjectAttrs{Name: "obj1"},
&storage.ObjectAttrs{Name: "obj2"},
}}
exec := &rex.ReprocessingExecutor{BQConfig: bqConfig, StorageClient: fc}
exec, err := rex.NewReprocessingExecutor(ctx, bqConfig)
if err != nil {
log.Fatal(err)
}
exec.StorageClient = fc

defer exec.StorageClient.Close()
saver := newTestSaver()
th := reproc.NewTaskHandler(exec, []string{"queue-1"}, saver)
Expand Down

0 comments on commit 9b1d721

Please sign in to comment.