Skip to content

Commit

Permalink
Merge 7fb2cee into 16fd7a2
Browse files Browse the repository at this point in the history
  • Loading branch information
stephen-soltesz committed Jul 8, 2022
2 parents 16fd7a2 + 7fb2cee commit a3cb042
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 101 deletions.
14 changes: 8 additions & 6 deletions cmd/gardener/gardener.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var (
shutdownTimeout = flag.Duration("shutdown_timeout", 1*time.Minute, "Graceful shutdown time allowance")
statusPort = flag.String("status_port", ":0", "The public interface port where status (and pprof) will be published")
gardenerAddr = flag.String("gardener_addr", ":8080", "The listen address for the gardener jobs service")
configPath = flag.String("config_path", "config.yml", "Path to the config file.")

// Context and injected variables to allow smoke testing of main()
mainCtx, mainCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -214,7 +215,7 @@ func mustStandardTracker() *tracker.Tracker {
return tk
}

func mustCreateJobService(ctx context.Context) *job.Service {
func mustCreateJobService(ctx context.Context, g *config.Gardener) *job.Service {
var saver persistence.Saver
storageClient, err := storage.NewClient(ctx)
rtx.Must(err, "Could not create storage client for job service")
Expand All @@ -227,8 +228,8 @@ func mustCreateJobService(ctx context.Context) *job.Service {
saver = persistence.NewLocalSaver(saverDir)
}
svc, err := job.NewJobService(
ctx, globalTracker, config.StartDate(),
project, config.Sources(), saver,
ctx, globalTracker, g.Start(),
project, g.Sources, saver,
stiface.AdaptClient(storageClient))
rtx.Must(err, "Could not initialize job service")
return svc
Expand Down Expand Up @@ -279,9 +280,10 @@ func main() {
// This is the v2 "manager" mode, in which Gardener provides the "jobs" API
// for parsers to request work and report progress.
// TODO Once the legacy deployments are turned down, this should move to head of main().
config.ParseConfig()
gcfg, err := config.ParseConfig(*configPath)
rtx.Must(err, "Failed to parse config: %q", *configPath)

for _, src := range config.Sources() {
for _, src := range gcfg.Sources {
metrics.ConfigDatatypes.WithLabelValues(src.Experiment, src.Datatype)
}

Expand All @@ -297,7 +299,7 @@ func main() {
rtx.Must(err, "NewStandardMonitor failed")
go monitor.Watch(mainCtx, 5*time.Second)

js := mustCreateJobService(mainCtx)
js := mustCreateJobService(mainCtx, gcfg)
handler := tracker.NewHandler(globalTracker, js)
handler.Register(mux)

Expand Down
62 changes: 19 additions & 43 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ package config
// Modelled on https://dev.to/ilyakaznacheev/a-clean-way-to-pass-configs-in-a-go-application-1g64

import (
"flag"
"fmt"
"errors"
"log"
"os"
"time"

"github.com/kelseyhightower/envconfig"
"gopkg.in/yaml.v2"
)

Expand All @@ -32,6 +30,7 @@ type SourceConfig struct {
Datatype string `yaml:"datatype"`
Filter string `yaml:"filter"`
Target string `yaml:"target"`
DailyOnly bool `yaml:"daily_only"`
}

// Gardener is the full config for a Gardener instance.
Expand All @@ -42,57 +41,34 @@ type Gardener struct {
Sources []SourceConfig `yaml:"sources"`
}

var gardener Gardener

// Sources returns the list of sources that should be processed.
func Sources() []SourceConfig {
src := make([]SourceConfig, len(gardener.Sources))
copy(src, gardener.Sources)
return src
}

// StartDate returns the first date that should be processed.
func StartDate() time.Time {
return gardener.StartDate.UTC().Truncate(24 * time.Hour)
func (g *Gardener) Start() time.Time {
return g.StartDate.UTC().Truncate(24 * time.Hour)
}

// ParseConfig loads the full Config, or Exits on failure.
func ParseConfig() {
log.Println("config init")
readFile(&gardener)
readEnv(&gardener)

log.Printf("%+v\n", gardener)
}

func processError(err error) {
fmt.Println(err)
// For now don't die... os.Exit(2)
func ParseConfig(name string) (*Gardener, error) {
log.Println("Config path:", name)
g := &Gardener{}
err := readFile(name, g)
if err != nil {
return nil, err
}
return g, nil
}

var configPath = flag.String("config_path", "config.yml", "Path to the config file.")
var ErrNoConfig = errors.New("no config file given")

func readFile(cfg *Gardener) {
log.Println("Config path:", *configPath)
if *configPath == "" {
return
func readFile(name string, cfg *Gardener) error {
if name == "" {
return ErrNoConfig
}
f, err := os.Open(*configPath)
f, err := os.Open(name)
if err != nil {
processError(err)
return err
}
defer f.Close()

decoder := yaml.NewDecoder(f)
err = decoder.Decode(cfg)
if err != nil {
processError(err)
}
}

func readEnv(cfg *Gardener) {
err := envconfig.Process("", cfg)
if err != nil {
processError(err)
}
return decoder.Decode(cfg)
}
8 changes: 8 additions & 0 deletions config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,39 @@ sources:
experiment: ndt
datatype: annotation
target: tmp_ndt.annotation
daily_only: true
- bucket: archive-{{NDT_SOURCE_PROJECT}}
experiment: ndt
datatype: ndt5
target: tmp_ndt.ndt5
daily_only: true
- bucket: archive-{{NDT_SOURCE_PROJECT}}
experiment: ndt
datatype: ndt7
target: tmp_ndt.ndt7
daily_only: true
- bucket: archive-{{NDT_SOURCE_PROJECT}}
experiment: ndt
datatype: pcap
target: tmp_ndt.pcap
daily_only: true
- bucket: archive-{{ANNOTATION_SOURCE_PROJECT}}
experiment: ndt
datatype: hopannotation1
target: tmp_ndt.hopannotation1
daily_only: true
- bucket: archive-measurement-lab
experiment: ndt
datatype: scamper1
target: tmp_ndt.scamper1
daily_only: true
- bucket: archive-measurement-lab
experiment: utilization
datatype: switch
target: tmp_utilization.switch
daily_only: true
- bucket: archive-{{NDT_SOURCE_PROJECT}}
experiment: ndt
datatype: tcpinfo
target: tmp_ndt.tcpinfo
daily_only: true
64 changes: 54 additions & 10 deletions config/config_test.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,69 @@
package config_test

import (
"flag"
"log"
"reflect"
"testing"
"time"

"github.com/m-lab/etl-gardener/config"
"github.com/m-lab/go/flagx"
"github.com/m-lab/go/rtx"
)

func init() {
// Always prepend the filename and line number.
log.SetFlags(log.LstdFlags | log.Lshortfile)
}

func TestBasic(t *testing.T) {
flag.Set("config_path", "testdata/config.yml")
flag.Parse()
rtx.Must(flagx.ArgsFromEnv(flag.CommandLine), "Could not get args from env")

config.ParseConfig()

func TestParseConfig(t *testing.T) {
tests := []struct {
name string
file string
start string
want *config.Gardener
wantErr bool
}{
{
name: "success",
file: "testdata/config.yml",
start: "2019-03-04",
want: &config.Gardener{
StartDate: time.Date(2019, time.March, 4, 0, 1, 2, 0, time.UTC),
Tracker: config.TrackerConfig{Timeout: 5 * time.Hour},
Monitor: config.MonitorConfig{PollingInterval: 5 * time.Minute},
Sources: []config.SourceConfig{
{Bucket: "archive-measurement-lab", Experiment: "ndt", Datatype: "tcpinfo", Filter: ".*T??:??:00.*Z", Target: "ndt.tcpinfo", DailyOnly: false},
{Bucket: "archive-measurement-lab", Experiment: "ndt", Datatype: "ndt5", Filter: ".*T??:??:00.*Z", Target: "ndt.ndt5", DailyOnly: false},
{Bucket: "archive-measurement-lab", Experiment: "ndt", Datatype: "pcap", Filter: "", Target: "ndt.pcap", DailyOnly: true},
},
},
},
{
name: "error-empty-file-name",
file: "",
wantErr: true,
},
{
name: "error-file-does-not-exist",
file: "this-file-does-not-exist",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := config.ParseConfig(tt.file)
if (err != nil) != tt.wantErr {
t.Errorf("ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.wantErr {
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ParseConfig() = %v, want %v", got, tt.want)
}
if got.Start().Format("2006-01-02") != tt.start {
t.Errorf("Gardener.Start() wrong date; got %q, want %q", got.Start().Format("2006-01-02"), tt.start)
}
})
}
}
9 changes: 8 additions & 1 deletion config/testdata/config.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
---
start_date: 2019-03-04T00:01:02Z
tracker:
timeout: 5h
monitor:
Expand All @@ -12,7 +13,13 @@ sources:
target: ndt.tcpinfo
- bucket: archive-measurement-lab
experiment: ndt
datatype: ndt5
datatype: ndt5
filter: .*T??:??:00.*Z
start: 2019-08-01
target: ndt.ndt5
- bucket: archive-measurement-lab
experiment: ndt
datatype: pcap
start: 2019-08-01
target: ndt.pcap
daily_only: true

0 comments on commit a3cb042

Please sign in to comment.