Skip to content

Commit

Permalink
Change datatypes for annotation2 and hopannotation2 (#420)
Browse files Browse the repository at this point in the history
* Update annotation2 and hopannotation2
* Process staging in staging to verify pipeline before production
  • Loading branch information
stephen-soltesz committed Mar 30, 2023
1 parent 68e819d commit 27ccc01
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 17 deletions.
5 changes: 3 additions & 2 deletions apply-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ CLUSTER=${CLOUDSDK_CONTAINER_CLUSTER:?Please provide cluster name: $USAGE}
DATE_SKIP=${DATE_SKIP:-"0"} # Number of dates to skip between each processed date (for sandbox).
TASK_FILE_SKIP=${TASK_FILE_SKIP:-"0"} # Number of files to skip between each processed file (for sandbox).

# Use sandbox in sandbox, measurement-lab in staging and oti.
# Use sandbox in sandbox, staging in staging, measurement-lab in oti.
SOURCE_PROJECT=${PROJECT_ID/mlab-oti/measurement-lab}
SOURCE_PROJECT=${SOURCE_PROJECT/mlab-staging/measurement-lab}
# TODO(soltesz): restore or remove.
#SOURCE_PROJECT=${SOURCE_PROJECT/mlab-staging/measurement-lab}
sed -i \
-e 's/{{ANNOTATION_SOURCE_PROJECT}}/'${SOURCE_PROJECT}'/g' \
config/config.yml
Expand Down
6 changes: 3 additions & 3 deletions cloud/bq/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func NewTableOpsWithClient(client bqiface.Client, job tracker.Job, project strin
switch job.Datatype {
case "switch":
fallthrough
case "annotation":
case "annotation2":
fallthrough
case "hopannotation1":
case "hopannotation2":
fallthrough
case "pcap":
fallthrough
Expand Down Expand Up @@ -176,7 +176,7 @@ const tmpTable = "`{{.Project}}.{{.Job.Datasets.Tmp}}.{{.Job.Datatype}}`"
const rawTable = "`{{.Project}}.{{.Job.Datasets.Raw}}.{{.Job.Datatype}}`"

// NOTE: experiment annotations must come from the same raw experiment dataset.
const annoTable = "`{{.Project}}.{{.Job.Datasets.Raw}}.annotation`"
const annoTable = "`{{.Project}}.{{.Job.Datasets.Raw}}.annotation2`"

var dedupTemplate = template.Must(template.New("").Parse(`
#standardSQL
Expand Down
6 changes: 3 additions & 3 deletions cloud/bq/ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func TestTemplate(t *testing.T) {
job := jobtest.NewJob("bucket", "ndt", "annotation", time.Date(2019, 3, 4, 0, 0, 0, 0, time.UTC))
job := jobtest.NewJob("bucket", "ndt", "annotation2", time.Date(2019, 3, 4, 0, 0, 0, 0, time.UTC))
q, err := bq.NewTableOps(context.Background(), job, "fake-project", "")
rtx.Must(err, "NewTableOps failed")
qs := bq.DedupQuery(*q)
Expand Down Expand Up @@ -45,7 +45,7 @@ func TestValidateQueries(t *testing.T) {
{
Bucket: "bucket",
Experiment: "ndt",
Datatype: "annotation",
Datatype: "annotation2",
Date: d,
Datasets: tracker.Datasets{Tmp: "tmp_ndt", Raw: "raw_ndt"},
},
Expand All @@ -66,7 +66,7 @@ func TestValidateQueries(t *testing.T) {
{
Bucket: "bucket",
Experiment: "ndt",
Datatype: "hopannotation1",
Datatype: "hopannotation2",
Date: d,
Datasets: tracker.Datasets{Tmp: "tmp_ndt", Raw: "raw_ndt"},
},
Expand Down
11 changes: 9 additions & 2 deletions config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,27 @@ sources:
# NOTE: It now matters what order these are in.
- bucket: archive-{{ANNOTATION_SOURCE_PROJECT}}
experiment: ndt
datatype: annotation
datatype: annotation2
target_datasets:
tmp: tmp_ndt
raw: raw_ndt
daily_only: true
- bucket: archive-{{NDT_SOURCE_PROJECT}}
experiment: ndt
datatype: ndt5
target_datasets:
tmp: tmp_ndt
raw: raw_ndt
join: ndt
daily_only: true
- bucket: archive-{{NDT_SOURCE_PROJECT}}
experiment: ndt
datatype: ndt7
target_datasets:
tmp: tmp_ndt
raw: raw_ndt
join: ndt
daily_only: true
- bucket: archive-{{NDT_SOURCE_PROJECT}}
experiment: ndt
datatype: pcap
Expand All @@ -35,27 +38,31 @@ sources:
daily_only: true
- bucket: archive-{{ANNOTATION_SOURCE_PROJECT}}
experiment: ndt
datatype: hopannotation1
datatype: hopannotation2
target_datasets:
tmp: tmp_ndt
raw: raw_ndt
daily_only: true
- bucket: archive-measurement-lab
experiment: ndt
datatype: scamper1
target_datasets:
tmp: tmp_ndt
raw: raw_ndt
join: ndt
daily_only: true
- bucket: archive-measurement-lab
experiment: utilization
datatype: switch
target_datasets:
tmp: tmp_utilization
raw: raw_utilization
daily_only: true
- bucket: archive-{{NDT_SOURCE_PROJECT}}
experiment: ndt
datatype: tcpinfo
target_datasets:
tmp: tmp_ndt
raw: raw_ndt
join: ndt
daily_only: true
10 changes: 5 additions & 5 deletions ops/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@ func newStateFunc(detail string) ActionFunc {
// annotation job in the tracker. Used to gate the join action.
func newJoinConditionFunc(tk *tracker.Tracker, detail string) ConditionFunc {
return func(ctx context.Context, j tracker.Job) bool {
if j.Datatype == "annotation" {
if j.Datatype == "annotation2" {
// Annotation does not require joining, so the check is
// not needed.
log.Println(j, "condition met")
return true
}
// All other types currently depend only on the annotation table.
// So, we check whether the annotation table is complete.
// All other types currently depend only on the annotation2 table.
// So, we check whether the annotation2 table is complete.
// (Technically, we only need to know whether the copy has completed.)
ann := j
ann.Datatype = "annotation"
ann.Datatype = "annotation2"
status, err := tk.GetStatus(ann.Key())
if err != nil {
// For early dates, there is no annotation job, so if the job
// For early dates, there is no annotation2 job, so if the job
// is absent, we proceed with the join.
log.Println(ann, "is absent")
return true
Expand Down
4 changes: 2 additions & 2 deletions ops/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestStandardMonitor(t *testing.T) {
{
Bucket: "bucket",
Experiment: "ndt",
Datatype: "annotation",
Datatype: "annotation2",
Date: d,
Datasets: tracker.Datasets{Tmp: "tmp_ndt", Raw: "raw_ndt"},
},
Expand All @@ -63,7 +63,7 @@ func TestStandardMonitor(t *testing.T) {
{
Bucket: "bucket",
Experiment: "ndt",
Datatype: "hopannotation1",
Datatype: "hopannotation2",
Date: d,
Datasets: tracker.Datasets{Tmp: "tmp_ndt", Raw: "raw_ndt"},
},
Expand Down

0 comments on commit 27ccc01

Please sign in to comment.