Skip to content

Commit

Permalink
Fixed Custom Cost window and step calculations. (#2671)
Browse files Browse the repository at this point in the history
Signed-off-by: Nik Willwerth <nwillwerth@kubecost.com>
  • Loading branch information
nik-kc committed Mar 27, 2024
1 parent e59eaff commit 20b024f
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 80 deletions.
86 changes: 86 additions & 0 deletions pkg/customcost/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,95 @@ package customcost

import (
"context"
"fmt"
"time"

"github.com/opencost/opencost/core/pkg/opencost"
"github.com/opencost/opencost/core/pkg/util/timeutil"
"github.com/opencost/opencost/pkg/env"
)

type Querier interface {
QueryTotal(ctx context.Context, request CostTotalRequest) (*CostResponse, error)
QueryTimeseries(ctx context.Context, request CostTimeseriesRequest) (*CostTimeseriesResponse, error)
}

func GetCustomCostWindowAccumulation(window opencost.Window, accumulate opencost.AccumulateOption) (opencost.Window, opencost.AccumulateOption, error) {
var err error
if accumulate == opencost.AccumulateOptionNone {
accumulate, err = getCustomCostAccumulateOption(window, nil)
if err != nil {
return opencost.Window{}, opencost.AccumulateOptionNone, fmt.Errorf("failed to determine custom cost accumulation option: %v", err)
}
}
window, err = window.GetAccumulateWindow(accumulate)
if err != nil {
return opencost.Window{}, opencost.AccumulateOptionNone, fmt.Errorf("failed to determine custom cost accumulation option: %v", err)
}

return window, accumulate, nil
}

// getCustomCostAccumulateOption determines defaults in a way that matches options presented in the UI
func getCustomCostAccumulateOption(window opencost.Window, from []opencost.AccumulateOption) (opencost.AccumulateOption, error) {
if window.IsOpen() || window.IsNegative() {
return opencost.AccumulateOptionNone, fmt.Errorf("invalid window '%s'", window.String())
}

if len(from) == 0 {
from = allSteppedAccumulateOptions
}

hourlyStoreHours := env.GetDataRetentionHourlyResolutionHours()
hourlySteps := time.Duration(hourlyStoreHours) * time.Hour
oldestHourly := time.Now().Add(-1 * hourlySteps)

// Use hourly if...
// (1) hourly is an option;
// (2) we have hourly store coverage; and
// (3) the window duration is less than the hourly break point.
if hasHourly(from) && oldestHourly.Before(*window.Start()) && window.Duration() <= hourlySteps {
return opencost.AccumulateOptionHour, nil
}

dailyStoreDays := env.GetDataRetentionDailyResolutionDays()
dailySteps := time.Duration(dailyStoreDays) * timeutil.Day
oldestDaily := time.Now().Add(-1 * dailySteps)
// Use daily if...
// (1) daily is an option
// It is acceptable to query a range for which we only have partial data
if hasDaily(from) {
return opencost.AccumulateOptionDay, nil
}

if oldestDaily.After(*window.Start()) {
return opencost.AccumulateOptionDay, fmt.Errorf("data store does not have coverage for %v", window)
}

return opencost.AccumulateOptionNone, fmt.Errorf("no valid accumulate option in %v for %s", from, window)
}

var allSteppedAccumulateOptions = []opencost.AccumulateOption{
opencost.AccumulateOptionHour,
opencost.AccumulateOptionDay,
}

func hasHourly(opts []opencost.AccumulateOption) bool {
for _, opt := range opts {
if opt == opencost.AccumulateOptionHour {
return true
}
}

return false
}

func hasDaily(opts []opencost.AccumulateOption) bool {
for _, opt := range opts {
if opt == opencost.AccumulateOptionDay {
return true
}
}

return false
}
95 changes: 16 additions & 79 deletions pkg/customcost/repositoryquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/opencost/opencost/core/pkg/opencost"
"github.com/opencost/opencost/core/pkg/util/timeutil"
"github.com/opencost/opencost/pkg/env"
)

type RepositoryQuerier struct {
Expand All @@ -28,9 +27,15 @@ func NewRepositoryQuerier(hourlyRepo, dailyRepo Repository, hourlyDuration, dail
}

func (rq *RepositoryQuerier) QueryTotal(ctx context.Context, request CostTotalRequest) (*CostResponse, error) {
window := opencost.NewClosedWindow(request.Start, request.End)
window, accumulate, err := GetCustomCostWindowAccumulation(window, request.Accumulate)
if err != nil {
return nil, fmt.Errorf("error getting custom cost total window accumulation: %w", err)
}

repo := rq.dailyRepo
step := timeutil.Day
if request.Accumulate == opencost.AccumulateOptionHour {
if accumulate == opencost.AccumulateOptionHour {
repo = rq.hourlyRepo
step = time.Hour
}
Expand All @@ -45,10 +50,9 @@ func (rq *RepositoryQuerier) QueryTotal(ctx context.Context, request CostTotalRe
return nil, fmt.Errorf("RepositoryQuerier: Query: failed to compile filters: %w", err)
}

requestWindow := opencost.NewClosedWindow(request.Start, request.End)
ccs := NewCustomCostSet(requestWindow)
queryStart := request.Start
for queryStart.Before(request.End) {
ccs := NewCustomCostSet(window)
queryStart := *window.Start()
for queryStart.Before(*window.End()) {
queryEnd := queryStart.Add(step)

for _, domain := range domains {
Expand Down Expand Up @@ -78,81 +82,14 @@ func (rq *RepositoryQuerier) QueryTotal(ctx context.Context, request CostTotalRe
return NewCostResponse(ccs), nil
}

var allSteppedAccumulateOptions = []opencost.AccumulateOption{
opencost.AccumulateOptionHour,
opencost.AccumulateOptionDay,
}

func hasHourly(opts []opencost.AccumulateOption) bool {
for _, opt := range opts {
if opt == opencost.AccumulateOptionHour {
return true
}
}

return false
}

func hasDaily(opts []opencost.AccumulateOption) bool {
for _, opt := range opts {
if opt == opencost.AccumulateOptionDay {
return true
}
}

return false
}

// GetCustomCostAccumulateOption determines defaults in a way that matches options presented in the UI
func GetCustomCostAccumulateOption(window opencost.Window, from []opencost.AccumulateOption) (opencost.AccumulateOption, error) {
if window.IsOpen() || window.IsNegative() {
return opencost.AccumulateOptionNone, fmt.Errorf("invalid window '%s'", window.String())
}

if len(from) == 0 {
from = allSteppedAccumulateOptions
}

hourlyStoreHours := env.GetDataRetentionHourlyResolutionHours()
hourlySteps := time.Duration(hourlyStoreHours) * time.Hour
oldestHourly := time.Now().Add(-1 * hourlySteps)

// Use hourly if...
// (1) hourly is an option;
// (2) we have hourly store coverage; and
// (3) the window duration is less than the hourly break point.
if hasHourly(from) && oldestHourly.Before(*window.Start()) && window.Duration() <= hourlySteps {
return opencost.AccumulateOptionHour, nil
}

dailyStoreDays := env.GetDataRetentionDailyResolutionDays()
dailySteps := time.Duration(dailyStoreDays) * timeutil.Day
oldestDaily := time.Now().Add(-1 * dailySteps)
// Use daily if...
// (1) daily is an option
// It is acceptable to query a range for which we only have partial data
if hasDaily(from) {
return opencost.AccumulateOptionDay, nil
}

if oldestDaily.After(*window.Start()) {
return opencost.AccumulateOptionNone, fmt.Errorf("data store does not have coverage for %v", window)
}

return opencost.AccumulateOptionNone, fmt.Errorf("no valid accumulate option in %v for %s", from, window)
}

func (rq *RepositoryQuerier) QueryTimeseries(ctx context.Context, request CostTimeseriesRequest) (*CostTimeseriesResponse, error) {
window, _ := opencost.NewClosedWindow(request.Start, request.End).GetAccumulateWindow(request.Accumulate)
var err error
if request.Accumulate == opencost.AccumulateOptionNone {
request.Accumulate, err = GetCustomCostAccumulateOption(window, nil)
if err != nil {
return nil, fmt.Errorf("error determining accumulation option: %v", err)
}
window := opencost.NewClosedWindow(request.Start, request.End)
window, accumulate, err := GetCustomCostWindowAccumulation(window, request.Accumulate)
if err != nil {
return nil, fmt.Errorf("error getting custom cost timeseries window accumulation: %w", err)
}

windows, err := window.GetAccumulateWindows(request.Accumulate)
windows, err := window.GetAccumulateWindows(accumulate)
if err != nil {
return nil, fmt.Errorf("error getting timeseries windows: %w", err)
}
Expand All @@ -172,7 +109,7 @@ func (rq *RepositoryQuerier) QueryTimeseries(ctx context.Context, request CostTi
End: *window.End(),
AggregateBy: request.AggregateBy,
Filter: request.Filter,
Accumulate: request.Accumulate,
Accumulate: accumulate,
})
}(i, w, totals)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/customcost/repositoryquerier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestGetCustomCostAccumulateOption(t *testing.T) {
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
got, err := GetCustomCostAccumulateOption(tt.window, tt.from)
got, err := getCustomCostAccumulateOption(tt.window, tt.from)
if (err != nil) != tt.wantErr {
t.Errorf("GetAccumulateOption() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down

0 comments on commit 20b024f

Please sign in to comment.