Skip to content

Commit

Permalink
Merge 9076d3d into 8809272
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed Apr 7, 2020
2 parents 8809272 + 9076d3d commit a2f9fb2
Show file tree
Hide file tree
Showing 26 changed files with 525 additions and 279 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ deploy:

- provider: script
script:
TARGET_BASE="gs://etl-mlab-sandbox"
TARGET_BASE="mlab-sandbox"
$TRAVIS_BUILD_DIR/travis/kubectl.sh mlab-sandbox data-processing ./apply-cluster.sh
skip_cleanup: true
on:
Expand All @@ -143,7 +143,7 @@ deploy:
script:
$TRAVIS_BUILD_DIR/travis/kubectl.sh mlab-staging data-processing-cluster ./apply-cluster.sh
&&
TARGET_BASE="gs://etl-mlab-staging"
TARGET_BASE="mlab-staging"
$TRAVIS_BUILD_DIR/travis/kubectl.sh mlab-staging data-processing ./apply-cluster.sh
on:
repo: m-lab/etl-gardener
Expand Down
11 changes: 7 additions & 4 deletions cmd/gardener/gardener.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

var (
jobExpirationTime = flag.Duration("job_expiration_time", 24*time.Hour, "Time after which stale jobs will be purged")
jobCleanupDelay = flag.Duration("job_cleanup_delay", 3*time.Hour, "Time after which completed jobs will be removed from tracker")
shutdownTimeout = flag.Duration("shutdown_timeout", 1*time.Minute, "Graceful shutdown time allowance")
statusPort = flag.String("status_port", ":0", "The public interface port where status (and pprof) will be published")

Expand Down Expand Up @@ -311,7 +312,7 @@ func mustStandardTracker() *tracker.Tracker {
tk, err := tracker.InitTracker(
context.Background(),
dsiface.AdaptClient(client), dsKey,
time.Minute, *jobExpirationTime)
time.Minute, *jobExpirationTime, *jobCleanupDelay)
rtx.Must(err, "tracker init")
if tk == nil {
log.Fatal("nil tracker")
Expand Down Expand Up @@ -372,10 +373,10 @@ func main() {
globalTracker = mustStandardTracker()

// TODO - refactor this block.
config := cloud.Config{
cloudCfg := cloud.Config{
Project: env.Project,
Client: nil}
bqConfig := NewBQConfig(config)
bqConfig := NewBQConfig(cloudCfg)
bqConfig.BQFinalDataset = "base_tables"
bqConfig.BQBatchDataset = "batch"
monitor, err := ops.NewStandardMonitor(mainCtx, bqConfig, globalTracker)
Expand All @@ -386,7 +387,9 @@ func main() {
handler.Register(mux)

// For now, we just start in Aug 2019, and handle only new data.
svc, err := job.NewJobService(globalTracker, env.Bucket, time.Date(2019, 8, 1, 0, 0, 0, 0, time.UTC))

svc, err := job.NewJobService(globalTracker,
time.Time{}, os.Getenv("PROJECT"), config.Sources())
rtx.Must(err, "Could not initialize job service")
mux.HandleFunc("/job", svc.JobHandler)
healthy = true
Expand Down
7 changes: 6 additions & 1 deletion cmd/gardener/gardener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ import (
"github.com/m-lab/go/osx"
)

var _ = func() error {
log.Println("Setting config path")
flag.Set("config_path", "testdata/config.yml")
return nil
}()

// Retries for up to 10 seconds.
func waitFor(url string) (resp *http.Response, err error) {
for i := 0; i < 1000; i++ {
Expand Down Expand Up @@ -69,7 +75,6 @@ func TestLegacyModeSetup(t *testing.T) {
}

func TestManagerMode(t *testing.T) {
flag.Set("config_path", "../../config/testdata/config.yml")

mainCtx, mainCancel = context.WithCancel(context.Background())

Expand Down
18 changes: 18 additions & 0 deletions cmd/gardener/testdata/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
tracker:
timeout: 5h
monitor:
polling_interval: 5m
sources:
- bucket: archive-measurement-lab
experiment: ndt
datatype: tcpinfo
start: 2019-08-01
filter: .*T??:??:00.*Z
target: tmp_ndt.tcpinfo
- bucket: archive-measurement-lab
experiment: ndt
datatype: ndt5
start: 2019-08-01
filter: .*T??:??:00.*Z
target: tmp_ndt.ndt5
11 changes: 6 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ type MonitorConfig struct {

// SourceConfig holds the config that defines all data sources to be processed.
type SourceConfig struct {
Bucket string `yaml:"bucket"`
ArchivePath string `yaml:"archive_path"`
Filter string `yaml:"filter"`
Start time.Time `yaml:"start"`
Target string `yaml:"target"`
Bucket string `yaml:"bucket"`
Experiment string `yaml:"experiment"`
Datatype string `yaml:"datatype"`
Filter string `yaml:"filter"`
Start time.Time `yaml:"start"`
Target string `yaml:"target"`
}

// Gardener is the full config for a Gardener instance.
Expand Down
10 changes: 6 additions & 4 deletions config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ monitor:
polling_interval: 5m
sources:
- bucket: archive-measurement-lab
archive_path: ndt/tcpinfo
filter: .*T??:??:00.*Z
experiment: ndt
datatype: tcpinfo
start: 2019-08-01
filter: .*T??:??:00.*Z
target: tmp_ndt.tcpinfo
- bucket: archive-measurement-lab
archive_path: ndt/ndt5
filter: .*T??:??:00.*Z
experiment: ndt
datatype: ndt5
start: 2019-08-01
filter: .*T??:??:00.*Z
target: tmp_ndt.ndt5
6 changes: 4 additions & 2 deletions config/testdata/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ monitor:
polling_interval: 5m
sources:
- bucket: archive-measurement-lab
archive_path: ndt/tcpinfo
experiment: ndt
datatype: tcpinfo
filter: .*T??:??:00.*Z
start: 2019-08-01
target: ndt.tcpinfo
- bucket: archive-measurement-lab
archive_path: ndt/ndt5
experiment: ndt
datatype: ndt5
filter: .*T??:??:00.*Z
start: 2019-08-01
target: ndt.ndt5
132 changes: 98 additions & 34 deletions dedup/dedup.go
Original file line number Diff line number Diff line change
@@ -1,65 +1,129 @@
// Package dedup provides functions for deduplicating bigquery table partitions.
package dedup

// TODO consider using query parameters to simplify the template.

import (
"bytes"
"context"
"errors"
"html/template"
"log"

"cloud.google.com/go/bigquery"
"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"

"github.com/m-lab/go/dataset"

"github.com/m-lab/etl-gardener/tracker"
)

// TODO remove _tmp when we are ready to transition.
const table = "`{{.Project}}.{{.Job.Experiment}}_tmp.{{.Job.Datatype}}`"
// TODO get the tmp_ from the job Target.
const table = "`{{.Project}}.tmp_{{.Job.Experiment}}.{{.Job.Datatype}}`"

var dedupTemplate = template.Must(template.New("").Parse(`
#standardSQL
# Delete all duplicate rows based on key and prefered priority ordering.
# This is resource intensive for tcpinfo - 20 slot hours for 12M rows with 250M snapshots,
# roughly proportional to the memory footprint of the table partition.
# The query is very cheap if there are no duplicates.
DELETE
FROM ` + table + ` AS target
WHERE Date(TestTime) = "{{.Job.Date.Format "2006-01-02"}}"
# This identifies all rows that don't match rows to preserve.
AND NOT EXISTS (
# This creates list of rows to preserve, based on key and priority.
WITH keep AS (
SELECT * EXCEPT(row_number) FROM (
SELECT {{.Key}}, ParseInfo.ParseTime
ROW_NUMBER() OVER (PARTITION BY {{.Key}} ORDER BY {{.Order}}) row_number
FROM (
SELECT * FROM ` + table + `
WHERE Date(TestTime) = "{{.Job.Date.Format "2006-01-02"}}"
)
WHERE Date({{.TestTime}}) = "{{.Job.Date.Format "2006-01-02"}}"
# This identifies all rows that don't match rows to preserve.
AND NOT EXISTS (
# This creates list of rows to preserve, based on key and priority.
WITH keep AS (
SELECT * EXCEPT(row_number) FROM (
SELECT
{{range $k, $v := .Partition}}{{$v}}, {{end}}
{{range $k, $v := .Select}}{{$v}}, {{end}}
ROW_NUMBER() OVER (
PARTITION BY {{range $k, $v := .Partition}}{{$v}}, {{end}}TRUE
ORDER BY {{.Order}}
) row_number
FROM (
SELECT * FROM ` + table + `
WHERE Date({{.TestTime}}) = "{{.Job.Date.Format "2006-01-02"}}"
)
WHERE row_number = 1
)
SELECT * FROM keep
# This matches against the keep table based on key and parsetime, so it should retain
# only the rows that were selected in the keep table. Parsetime is used in lieu of
# any other distinguishing traits that create different row numbers in the keep query.
# Without the parsetime, it keeps all the rows.
WHERE target.{{.Key}} = keep.{{.Key}} AND target.ParseInfo.ParseTime = keep.ParseTime
)`))
WHERE row_number = 1
)
SELECT * FROM keep
# This matches against the keep table based on keys. Sufficient select keys must be
# used to distinguish the preferred row from the others.
WHERE
{{range $k, $v := .Partition}}target.{{$v}} = keep.{{$k}} AND {{end}}
{{range $k, $v := .Select}}target.{{$v}} = keep.{{$k}} AND {{end}}TRUE
)`))

// QueryParams is used to construct a dedup query.
type QueryParams struct {
Project string
Job tracker.Job
Key string
Order string
Project string
TestTime string // Name of the partition field
Job tracker.Job
// map key is the single field name, value is fully qualified name
Partition map[string]string
Order string
Select map[string]string // Derived from Order.
}

func (params QueryParams) String() string {
out := bytes.NewBuffer(nil)
dedupTemplate.Execute(out, params)
err := dedupTemplate.Execute(out, params)
if err != nil {
log.Println(err)
}
return out.String()
}

func tcpinfoQuery(job tracker.Job, project string) string {
return QueryParams{
Project: project,
Job: job,
Key: "uuid",
Order: "ARRAY_LENGTH(Snapshots) DESC, ParseInfo.TaskFileName, ParseInfo.ParseTime DESC",
}.String()
// ErrDatatypeNotSupported is returned by Query for unsupported datatypes.
var ErrDatatypeNotSupported = errors.New("Datatype not supported")

// Query creates a dedup query for a Job.
func Query(job tracker.Job, project string) (QueryParams, error) {
switch job.Datatype {
case "ndt5":
return QueryParams{
Project: project,
TestTime: "log_time",
Job: job,
Partition: map[string]string{"test_id": "test_id"},
Order: "ParseInfo.ParseTime DESC",
Select: map[string]string{"ParseTime": "ParseInfo.ParseTime"},
}, nil

case "tcpinfo":
return QueryParams{
Project: project,
TestTime: "TestTime",
Job: job,
Partition: map[string]string{"uuid": "uuid", "Timestamp": "FinalSnapshot.Timestamp"},
Order: "ARRAY_LENGTH(Snapshots) DESC, ParseInfo.TaskFileName, ParseInfo.ParseTime DESC",
Select: map[string]string{"ParseTime": "ParseInfo.ParseTime"},
}, nil
default:
return QueryParams{}, ErrDatatypeNotSupported
}
}

// Dedup executes a query that deletes duplicates from the destination table.
// It derives the table name from the dsExt project and the job fields.
// TODO add cost accounting to status page?
// TODO inject fake bqclient for testing?
func (params QueryParams) Dedup(ctx context.Context, dryRun bool) (bqiface.Job, error) {
c, err := bigquery.NewClient(ctx, params.Project)
if err != nil {
return nil, err
}
bqClient := bqiface.AdaptClient(c)
q := bqClient.Query(params.String())
if q == nil {
return nil, dataset.ErrNilQuery
}
if dryRun {
qc := bqiface.QueryConfig{QueryConfig: bigquery.QueryConfig{DryRun: dryRun, Q: params.String()}}
q.SetQueryConfig(qc)
}
return q.Run(ctx)
}

0 comments on commit a2f9fb2

Please sign in to comment.