Skip to content

Commit

Permalink
[chore] Add JSON tags to job metrics (#2114)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcastorina committed Nov 17, 2023
1 parent d334b30 commit 39a603d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 36 deletions.
53 changes: 32 additions & 21 deletions pkg/sources/job_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type JobProgressHook interface {
// If the job supports it, the reference can also be used to cancel running via
// CancelRun.
type JobProgressRef struct {
JobID JobID
SourceID SourceID
SourceName string
JobID JobID `json:"job_id"`
SourceID SourceID `json:"source_id"`
SourceName string `json:"source_name"`
jobProgress *JobProgress
}

Expand Down Expand Up @@ -120,28 +120,28 @@ type JobProgress struct {

// JobProgressMetrics tracks the metrics of a job.
type JobProgressMetrics struct {
StartTime time.Time
EndTime time.Time
StartTime *time.Time `json:"start_time,omitempty"`
EndTime *time.Time `json:"end_time,omitempty"`
// Total number of units found by the Source.
TotalUnits uint64
TotalUnits uint64 `json:"total_units,omitempty"`
// Total number of units that have finished chunking.
FinishedUnits uint64
FinishedUnits uint64 `json:"finished_units,omitempty"`
// Total number of chunks produced. This metric updates before the
// chunk is sent on the output channel.
TotalChunks uint64
TotalChunks uint64 `json:"total_chunks,omitempty"`
// All errors encountered.
Errors []error
Errors []error `json:"errors,omitempty"`
// Set to true if the source supports enumeration and has finished
// enumerating. If the source does not support enumeration, this field
// is always false.
DoneEnumerating bool
DoneEnumerating bool `json:"done_enumerating,omitempty"`

// Progress information reported by the source.
SourcePercent int64
SourceMessage string
SourceEncodedResumeInfo string
SourceSectionsCompleted int32
SourceSectionsRemaining int32
SourcePercent int64 `json:"source_percent,omitempty"`
SourceMessage string `json:"source_message,omitempty"`
SourceEncodedResumeInfo string `json:"source_encoded_resume_info,omitempty"`
SourceSectionsCompleted int32 `json:"source_sections_completed,omitempty"`
SourceSectionsRemaining int32 `json:"source_sections_remaining,omitempty"`
}

// WithHooks adds hooks to be called when an event triggers.
Expand Down Expand Up @@ -189,14 +189,14 @@ func (jp *JobProgress) executeHooks(todo func(hook JobProgressHook)) {
// without the JobProgressRef parameter.
func (jp *JobProgress) Start(start time.Time) {
jp.metricsLock.Lock()
jp.metrics.StartTime = start
jp.metrics.StartTime = &start
jp.metricsLock.Unlock()

jp.executeHooks(func(hook JobProgressHook) { hook.Start(jp.Ref(), start) })
}
func (jp *JobProgress) End(end time.Time) {
jp.metricsLock.Lock()
jp.metrics.EndTime = end
jp.metrics.EndTime = &end
jp.metricsLock.Unlock()

jp.executeHooks(func(hook JobProgressHook) { hook.End(jp.Ref(), end) })
Expand Down Expand Up @@ -248,6 +248,17 @@ func (jp *JobProgress) Snapshot() JobProgressMetrics {
defer jp.metricsLock.Unlock()

metrics := jp.metrics

// Make a copy of the fields to make them read only.
if jp.metrics.StartTime != nil {
startTime := *jp.metrics.StartTime
metrics.StartTime = &startTime
}
if jp.metrics.EndTime != nil {
endTime := *jp.metrics.EndTime
metrics.EndTime = &endTime
}

metrics.Errors = make([]error, len(metrics.Errors))
copy(metrics.Errors, jp.metrics.Errors)

Expand Down Expand Up @@ -343,13 +354,13 @@ func (m JobProgressMetrics) PercentComplete() int {
// has been running. If it hasn't started yet, 0 is returned. If it has
// finished, the total time is returned.
func (m JobProgressMetrics) ElapsedTime() time.Duration {
if m.StartTime.IsZero() {
if m.StartTime == nil {
return 0
}
if m.EndTime.IsZero() {
return time.Since(m.StartTime)
if m.EndTime == nil {
return time.Since(*m.StartTime)
}
return m.EndTime.Sub(m.StartTime)
return m.EndTime.Sub(*m.StartTime)
}

// ErrorsFor returns all the errors for the given SourceUnit. If there are no
Expand Down
26 changes: 13 additions & 13 deletions pkg/sources/job_progress_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (u *UnitHook) StartUnitChunking(ref JobProgressRef, unit SourceUnit, start
u.metrics.Add(id, &UnitMetrics{
Unit: unit,
Parent: ref,
StartTime: start,
StartTime: &start,
})
}

Expand All @@ -73,7 +73,7 @@ func (u *UnitHook) EndUnitChunking(ref JobProgressRef, unit SourceUnit, end time
if !ok {
return
}
metrics.EndTime = end
metrics.EndTime = &end
}

func (u *UnitHook) ReportChunk(ref JobProgressRef, unit SourceUnit, chunk *Chunk) {
Expand Down Expand Up @@ -168,17 +168,17 @@ func (u *UnitHook) UnitMetrics() []UnitMetrics {
}

type UnitMetrics struct {
Unit SourceUnit
Parent JobProgressRef
Unit SourceUnit `json:"unit,omitempty"`
Parent JobProgressRef `json:"parent,omitempty"`
// Start and end time for chunking this unit.
StartTime time.Time
EndTime time.Time
StartTime *time.Time `json:"start_time,omitempty"`
EndTime *time.Time `json:"end_time,omitempty"`
// Total number of chunks produced from this unit.
TotalChunks uint64
TotalChunks uint64 `json:"total_chunks,omitempty"`
// Total number of bytes produced from this unit.
TotalBytes uint64
TotalBytes uint64 `json:"total_bytes,omitempty"`
// All errors encountered by this unit.
Errors []error
Errors []error `json:"errors,omitempty"`
// Flag to mark that these metrics were intentionally evicted from
// the cache.
handled bool
Expand All @@ -192,13 +192,13 @@ func (u UnitMetrics) IsFinished() bool {
// has been running. If it hasn't started yet, 0 is returned. If it has
// finished, the total time is returned.
func (u UnitMetrics) ElapsedTime() time.Duration {
if u.StartTime.IsZero() {
if u.StartTime == nil {
return 0
}
if u.EndTime.IsZero() {
return time.Since(u.StartTime)
if u.EndTime == nil {
return time.Since(*u.StartTime)
}
return u.EndTime.Sub(u.StartTime)
return u.EndTime.Sub(*u.StartTime)
}

// NoopHook implements JobProgressHook by doing nothing. This is useful for
Expand Down
6 changes: 4 additions & 2 deletions pkg/sources/job_progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,12 @@ func TestJobProgressElapsedTime(t *testing.T) {
metrics := JobProgressMetrics{}
assert.Equal(t, time.Duration(0), metrics.ElapsedTime())

metrics.StartTime = time.Date(2022, time.March, 30, 0, 0, 0, 0, time.UTC)
startTime := time.Date(2022, time.March, 30, 0, 0, 0, 0, time.UTC)
metrics.StartTime = &startTime
assert.Greater(t, metrics.ElapsedTime(), time.Duration(0))

metrics.EndTime = metrics.StartTime.Add(1 * time.Hour)
endTime := metrics.StartTime.Add(1 * time.Hour)
metrics.EndTime = &endTime
assert.Equal(t, metrics.ElapsedTime(), 1*time.Hour)
}

Expand Down

0 comments on commit 39a603d

Please sign in to comment.