Skip to content

Commit

Permalink
Historical processing only targets last year
Browse files Browse the repository at this point in the history
  • Loading branch information
cristinaleonr committed Jan 25, 2024
1 parent 9ff5c25 commit af73f2c
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 11 deletions.
13 changes: 7 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ type MonitorConfig struct {

// SourceConfig holds the config that defines all data sources to be processed.
type SourceConfig struct {
Bucket string `yaml:"bucket"`
Experiment string `yaml:"experiment"`
Datatype string `yaml:"datatype"`
Filter string `yaml:"filter"`
Datasets Datasets `yaml:"target_datasets"`
DailyOnly bool `yaml:"daily_only"`
Bucket string `yaml:"bucket"`
Experiment string `yaml:"experiment"`
Datatype string `yaml:"datatype"`
Filter string `yaml:"filter"`
Datasets Datasets `yaml:"target_datasets"`
DailyOnly bool `yaml:"daily_only"`
FullHistory bool `yaml:"full_history"`
}

// Datasets contains the name of BigQuery datasets used for temporary, raw, or
Expand Down
1 change: 1 addition & 0 deletions config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ sources:
tmp: tmp_ndt
raw: raw_ndt
join: ndt
full_history: true
- bucket: archive-{{NDT_SOURCE_PROJECT}}
experiment: ndt
datatype: pcap
Expand Down
6 changes: 6 additions & 0 deletions job-service/iterators.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ func (j *JobIterator) Next() (*tracker.JobWithTarget, error) {
// Copy the jobspec and set the date.
jt := &tracker.JobWithTarget{}
*jt = j.specs[j.nextIndex]

lastYear := time.Now().UTC().AddDate(-1, 0, 0)
if !jt.FullHistory && j.lastDate.Before(lastYear) {
return nil, nil
}

jt.Job.Date = j.lastDate
jt.ID = jt.Job.Key()
j.nextIndex++ // increment index for next call.
Expand Down
11 changes: 9 additions & 2 deletions job-service/job-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func (svc *Service) NextJob(ctx context.Context) *tracker.JobWithTarget {
log.Println(err)
continue
}

lastYear := time.Now().UTC().AddDate(-1, 0, 0)
if !jt.FullHistory && jt.Job.Date.Before(lastYear) {
return nil
}

return svc.ifHasFiles(ctx, jt)
}
return nil
Expand Down Expand Up @@ -108,8 +114,9 @@ func NewJobService(startDate time.Time,
// TODO - handle gs:// targets
jt := tracker.JobWithTarget{
// NOTE: JobWithTarget.ID is assigned after Job.Date is set.
Job: job,
DailyOnly: s.DailyOnly,
Job: job,
DailyOnly: s.DailyOnly,
FullHistory: s.FullHistory,
}
dailySpecs = append(dailySpecs, jt)
if !s.DailyOnly {
Expand Down
7 changes: 4 additions & 3 deletions tracker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ func (j *Job) TablePartition() string {
// JobWithTarget specifies a type/date job, and a destination
// table or GCS prefix
type JobWithTarget struct {
ID Key // ID used by gardener & parsers to identify a Job's status and configuration.
Job Job
DailyOnly bool `json:"-"`
ID Key // ID used by gardener & parsers to identify a Job's status and configuration.
Job Job
DailyOnly bool `json:"-"`
FullHistory bool `json:"-"`
// TODO: enable configuration for parser to target alterate buckets.
}

Expand Down

0 comments on commit af73f2c

Please sign in to comment.