Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/stctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ func main() {
rtx.Must(err, "Failed to create")
pretty.Print(job)
case "sync":
rtx.Must(cmd.Sync(ctx), "Failed to sync")
job, err := cmd.Sync(ctx)
rtx.Must(err, "Failed to sync")
pretty.Print(job)
case "disable":
name := mustArg(1)
job, err := cmd.Disable(ctx, name)
Expand Down
8 changes: 0 additions & 8 deletions internal/stctl/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,6 @@ func fmtTime(t *storagetransfer.TimeOfDay) string {
return fmt.Sprintf("%02d:%02d:%02d", t.Hours, t.Minutes, t.Seconds)
}

// Sync guarantees that a job exists matching the current command parameters.
// If a job with matching command parameters already exists, no action is taken.
// If a matching description is found with different values for IncludePrefixes
// or StartTimeOfDay, then the original job is disabled and a new job created.
func (c *Command) Sync(ctx context.Context) error {
return nil
}

// The Metadata field of storagetransfer.TransferOperation must be parsed from a
// JSON blob. The structs below are a subset of fields available.
func parseJobMetadata(m googleapi.RawMessage) *jobMetadata {
Expand Down
6 changes: 5 additions & 1 deletion internal/stctl/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@ type fakeTJ struct {
listJobResp *storagetransfer.ListTransferJobsResponse
listOpsResp *storagetransfer.ListOperationsResponse
job *storagetransfer.TransferJob
listErr error
getErr error
updateErr error
createErr error
}

func (f *fakeTJ) Jobs(ctx context.Context, visit func(resp *storagetransfer.ListTransferJobsResponse) error) error {
if f.listErr != nil {
return f.listErr
}
return visit(f.listJobResp)
}

func (f *fakeTJ) Create(ctx context.Context, create *storagetransfer.TransferJob) (*storagetransfer.TransferJob, error) {
if f.createErr != nil {
return nil, f.createErr
}
create.Name = "THIS-IS-A-FAKE-JOB-NAME"
create.Name = "THIS-IS-A-FAKE-ASSIGNED-JOB-NAME"
return create, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/stctl/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestCommand_Create(t *testing.T) {
ts := time.Now().UTC()
expected := &storagetransfer.TransferJob{
Description: "STCTL: daily copy of src-bucket to dest-bucket",
Name: "THIS-IS-A-FAKE-JOB-NAME",
Name: "THIS-IS-A-FAKE-ASSIGNED-JOB-NAME",
ProjectId: "fake-mlab-testing",
Schedule: &storagetransfer.Schedule{
ScheduleEndDate: (*storagetransfer.Date)(nil),
Expand Down
101 changes: 101 additions & 0 deletions internal/stctl/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package stctl

import (
"context"
"fmt"

"github.com/m-lab/go/flagx"
"github.com/m-lab/go/logx"
"github.com/stephen-soltesz/pretty"

"sort"

"google.golang.org/api/storagetransfer/v1"
)

// Sync guarantees that a job exists matching the current command parameters. If
// a job with matching command parameters already exists, no action is taken. If
// a matching description is found with different values for IncludePrefixes or
// StartTimeOfDay, then the original job is disabled and a new job created. In
// either case, the found or newly created job is returned on success.
func (c *Command) Sync(ctx context.Context) (*storagetransfer.TransferJob, error) {
var found *storagetransfer.TransferJob
notFound := fmt.Errorf("no matching job found")

// Generate canonical description from current config.
desc := getDesc(c.SourceBucket, c.TargetBucket)

// List jobs and find first that matches canonical description.
logx.Debug.Println("Listing jobs")
findJob := func(resp *storagetransfer.ListTransferJobsResponse) error {
for _, job := range resp.TransferJobs {
if job.Schedule.ScheduleEndDate != nil {
// We only manage jobs without an end date.
continue
}
logx.Debug.Print(pretty.Sprint(job))
if desc == job.Description {
// Sync depends on the convention for storage transfer job managment where
// only a single transfer job exists between two buckets. So, the first
// matching job should be the only matching job.
found = job
return nil
}
}
// Job was not found.
return notFound
}

err := c.Client.Jobs(ctx, findJob)
if err != notFound && err != nil {
return nil, err
}
if found != nil {
logx.Debug.Println("Found job!")
logx.Debug.Print(pretty.Sprint(found))
if specMatches(found, c.StartTime, c.Prefixes) {
// We found a matching job, do nothing, return success.
logx.Debug.Println("Specs match!")
return found, nil
}
// We found a managed job and it does not match the new spec, so disable it.
_, err := c.Disable(ctx, found.Name)
if err != nil {
return nil, err
}
}
// Create new job matching the preferred spec.
return c.Create(ctx)
}

// Verify that the two times are equal.
func timesEqual(scheduled *storagetransfer.TimeOfDay, desired flagx.Time) bool {
return fmtTime(scheduled) == desired.String()
}

// Verify that the two arrays have the same values.
func includesEqual(configured []string, desired []string) bool {
if len(configured) != len(desired) {
return false
}
sort.Strings(configured)
sort.Strings(desired)
for i := 0; i < len(configured); i++ {
if configured[i] != desired[i] {
return false
}
}
return true
}

func specMatches(job *storagetransfer.TransferJob, start flagx.Time, prefixes []string) bool {
if job.Schedule.StartTimeOfDay == nil ||
!timesEqual(job.Schedule.StartTimeOfDay, start) {
return false
}
if job.TransferSpec.ObjectConditions == nil ||
!includesEqual(job.TransferSpec.ObjectConditions.IncludePrefixes, prefixes) {
return false
}
return true
}
234 changes: 234 additions & 0 deletions internal/stctl/sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package stctl

import (
"context"
"errors"
"strings"
"testing"
"time"

"github.com/go-test/deep"
"github.com/m-lab/go/flagx"
storagetransfer "google.golang.org/api/storagetransfer/v1"
)

func TestCommand_Sync(t *testing.T) {
ts := time.Now().UTC()
tests := []struct {
name string
c *Command
expected *storagetransfer.TransferJob
wantErr bool
}{
{
name: "success-job-found-specs-match",
c: &Command{
Client: &fakeTJ{
listJobResp: &storagetransfer.ListTransferJobsResponse{
TransferJobs: []*storagetransfer.TransferJob{
{
Name: "transferOperations/ignore-job-with-end-date",
Description: "ignore-job-with-end-date",
Schedule: &storagetransfer.Schedule{
ScheduleEndDate: &storagetransfer.Date{Day: 1, Month: 2, Year: 2019},
},
},
{
Name: "transferOperations/description-matches-gcs-buckets",
Description: getDesc("fake-source", "fake-target"),
Schedule: &storagetransfer.Schedule{
ScheduleEndDate: nil,
StartTimeOfDay: &storagetransfer.TimeOfDay{Hours: 1, Minutes: 2, Seconds: 3},
},
TransferSpec: &storagetransfer.TransferSpec{
GcsDataSource: &storagetransfer.GcsData{BucketName: "fake-source"},
GcsDataSink: &storagetransfer.GcsData{BucketName: "fake-target"},
ObjectConditions: &storagetransfer.ObjectConditions{IncludePrefixes: []string{"a", "b"}},
},
},
},
},
},
SourceBucket: "fake-source",
TargetBucket: "fake-target",
Prefixes: []string{"a", "b"},
StartTime: flagx.Time{Hour: 1, Minute: 2, Second: 3},
},
expected: &storagetransfer.TransferJob{
Description: "STCTL: daily copy of fake-source to fake-target",
Name: "transferOperations/description-matches-gcs-buckets",
Schedule: &storagetransfer.Schedule{
StartTimeOfDay: &storagetransfer.TimeOfDay{Hours: 1, Minutes: 2, Seconds: 3},
},
TransferSpec: &storagetransfer.TransferSpec{
GcsDataSource: &storagetransfer.GcsData{BucketName: "fake-source"},
GcsDataSink: &storagetransfer.GcsData{BucketName: "fake-target"},
ObjectConditions: &storagetransfer.ObjectConditions{
IncludePrefixes: []string{"a", "b"},
},
},
},
},
{
name: "success-disable-and-create",
c: &Command{
SourceBucket: "fake-source",
TargetBucket: "fake-target",
Prefixes: []string{"a", "b"},
StartTime: flagx.Time{Hour: 1, Minute: 2, Second: 3},
Client: &fakeTJ{
// fake jobs that are listed to search for one that matches the current Command spec.
listJobResp: &storagetransfer.ListTransferJobsResponse{
TransferJobs: []*storagetransfer.TransferJob{
{
Name: "transferOperations/description-matches-ObjectConditions-does-not",
Description: getDesc("fake-source", "fake-target"),
Schedule: &storagetransfer.Schedule{
ScheduleEndDate: nil,
StartTimeOfDay: &storagetransfer.TimeOfDay{Hours: 1, Minutes: 2, Seconds: 3},
},
TransferSpec: &storagetransfer.TransferSpec{
GcsDataSource: &storagetransfer.GcsData{BucketName: "fake-source"},
GcsDataSink: &storagetransfer.GcsData{BucketName: "fake-target"},
ObjectConditions: &storagetransfer.ObjectConditions{}, // Empty object conditions specified.
},
},
},
},
// a fake job that is disabled.
job: &storagetransfer.TransferJob{},
},
},
expected: &storagetransfer.TransferJob{
Description: "STCTL: daily copy of fake-source to fake-target",
Name: "THIS-IS-A-FAKE-ASSIGNED-JOB-NAME",
Schedule: &storagetransfer.Schedule{
StartTimeOfDay: &storagetransfer.TimeOfDay{Hours: 1, Minutes: 2, Seconds: 3},
ScheduleStartDate: &storagetransfer.Date{
Day: int64(ts.Day()),
Month: int64(ts.Month()),
Year: int64(ts.Year()),
},
},
TransferSpec: &storagetransfer.TransferSpec{
GcsDataSource: &storagetransfer.GcsData{BucketName: "fake-source"},
GcsDataSink: &storagetransfer.GcsData{BucketName: "fake-target"},
ObjectConditions: &storagetransfer.ObjectConditions{
IncludePrefixes: []string{"a", "b"},
},
},
Status: "ENABLED",
},
},
{
name: "success-job-not-found-then-created",
c: &Command{
SourceBucket: "source",
TargetBucket: "target",
StartTime: flagx.Time{Hour: 1, Minute: 2, Second: 3},
Client: &fakeTJ{
listJobResp: &storagetransfer.ListTransferJobsResponse{},
},
},
expected: &storagetransfer.TransferJob{
Description: "STCTL: daily copy of source to target",
Name: "THIS-IS-A-FAKE-ASSIGNED-JOB-NAME",
Schedule: &storagetransfer.Schedule{
StartTimeOfDay: &storagetransfer.TimeOfDay{Hours: 1, Minutes: 2, Seconds: 3},
ScheduleStartDate: &storagetransfer.Date{
Day: int64(ts.Day()),
Month: int64(ts.Month()),
Year: int64(ts.Year()),
},
},
TransferSpec: &storagetransfer.TransferSpec{
GcsDataSource: &storagetransfer.GcsData{BucketName: "source"},
GcsDataSink: &storagetransfer.GcsData{BucketName: "target"},
},
Status: "ENABLED",
},
},
{
name: "error-list-jobs",
c: &Command{
Client: &fakeTJ{
listErr: errors.New("Fake list error"),
},
},
wantErr: true,
},
{
name: "error-found-and-disable-error-different-IncludePrefixes",
c: &Command{
SourceBucket: "fake-source",
TargetBucket: "fake-target",
Prefixes: []string{"a", "b"},
StartTime: flagx.Time{Hour: 1, Minute: 2, Second: 3},
Client: &fakeTJ{
// fake jobs that are listed to search for one that matches the current Command spec.
listJobResp: &storagetransfer.ListTransferJobsResponse{
TransferJobs: []*storagetransfer.TransferJob{
{
Name: "transferOperations/description-matches",
Description: getDesc("fake-source", "fake-target"),
Schedule: &storagetransfer.Schedule{
ScheduleEndDate: nil,
StartTimeOfDay: &storagetransfer.TimeOfDay{Hours: 1, Minutes: 2, Seconds: 3},
},
TransferSpec: &storagetransfer.TransferSpec{
GcsDataSource: &storagetransfer.GcsData{BucketName: "fake-source"},
GcsDataSink: &storagetransfer.GcsData{BucketName: "fake-target"},
ObjectConditions: &storagetransfer.ObjectConditions{IncludePrefixes: []string{"c", "d"}}, // IncludePrefixes do not match command.Prefixes.
},
},
},
},
getErr: errors.New("fake get error causes Disable() to fail"),
},
},
wantErr: true,
},
{
name: "error-found-and-disable-error-different-start-times",
c: &Command{
SourceBucket: "fake-source",
TargetBucket: "fake-target",
StartTime: flagx.Time{Hour: 3, Minute: 2, Second: 1},
Client: &fakeTJ{
// fake jobs that are listed to search for one that matches the current Command spec.
listJobResp: &storagetransfer.ListTransferJobsResponse{
TransferJobs: []*storagetransfer.TransferJob{
{
Name: "transferOperations/description-matches",
Description: getDesc("fake-source", "fake-target"),
Schedule: &storagetransfer.Schedule{
ScheduleEndDate: nil,
StartTimeOfDay: &storagetransfer.TimeOfDay{Hours: 1, Minutes: 2, Seconds: 3},
},
TransferSpec: &storagetransfer.TransferSpec{
GcsDataSource: &storagetransfer.GcsData{BucketName: "fake-source"},
GcsDataSink: &storagetransfer.GcsData{BucketName: "fake-target"},
ObjectConditions: &storagetransfer.ObjectConditions{},
},
},
},
},
getErr: errors.New("fake get error causes Disable() to fail"),
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
job, err := tt.c.Sync(ctx)
if (err != nil) != tt.wantErr {
t.Errorf("Command.Sync() error = %v, wantErr %v", err, tt.wantErr)
}
if diff := deep.Equal(job, tt.expected); diff != nil && !tt.wantErr {
t.Errorf("Command.Sync() job did not match expected;\n%s", strings.Join(diff, "\n"))
}
})
}
}