/
aggregation.go
2339 lines (2064 loc) · 84.5 KB
/
aggregation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package costmodel
import (
"fmt"
"math"
"net/http"
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/kubecost/opencost/pkg/util/httputil"
"github.com/kubecost/opencost/pkg/util/timeutil"
"github.com/julienschmidt/httprouter"
"github.com/kubecost/opencost/pkg/cloud"
"github.com/kubecost/opencost/pkg/env"
"github.com/kubecost/opencost/pkg/errors"
"github.com/kubecost/opencost/pkg/kubecost"
"github.com/kubecost/opencost/pkg/log"
"github.com/kubecost/opencost/pkg/prom"
"github.com/kubecost/opencost/pkg/thanos"
"github.com/kubecost/opencost/pkg/util"
"github.com/kubecost/opencost/pkg/util/json"
"github.com/patrickmn/go-cache"
prometheusClient "github.com/prometheus/client_golang/api"
)
const (
// SplitTypeWeighted signals that shared costs should be shared
// proportionally, rather than evenly
SplitTypeWeighted = "weighted"
// UnallocatedSubfield indicates an allocation datum that does not have the
// chosen Aggregator; e.g. during aggregation by some label, there may be
// cost data that do not have the given label.
UnallocatedSubfield = "__unallocated__"
clusterCostsCacheMinutes = 5.0
)
// Aggregation describes aggregated cost data, containing cumulative cost and
// allocation data per resource, vectors of rate data per resource, efficiency
// data, and metadata describing the type of aggregation operation.
type Aggregation struct {
Aggregator string `json:"aggregation"`
Subfields []string `json:"subfields,omitempty"`
Environment string `json:"environment"`
Cluster string `json:"cluster,omitempty"`
Properties *kubecost.AllocationProperties `json:"-"`
Start time.Time `json:"-"`
End time.Time `json:"-"`
CPUAllocationHourlyAverage float64 `json:"cpuAllocationAverage"`
CPUAllocationVectors []*util.Vector `json:"-"`
CPUAllocationTotal float64 `json:"-"`
CPUCost float64 `json:"cpuCost"`
CPUCostVector []*util.Vector `json:"cpuCostVector,omitempty"`
CPUEfficiency float64 `json:"cpuEfficiency"`
CPURequestedVectors []*util.Vector `json:"-"`
CPUUsedVectors []*util.Vector `json:"-"`
Efficiency float64 `json:"efficiency"`
GPUAllocationHourlyAverage float64 `json:"gpuAllocationAverage"`
GPUAllocationVectors []*util.Vector `json:"-"`
GPUCost float64 `json:"gpuCost"`
GPUCostVector []*util.Vector `json:"gpuCostVector,omitempty"`
GPUAllocationTotal float64 `json:"-"`
RAMAllocationHourlyAverage float64 `json:"ramAllocationAverage"`
RAMAllocationVectors []*util.Vector `json:"-"`
RAMAllocationTotal float64 `json:"-"`
RAMCost float64 `json:"ramCost"`
RAMCostVector []*util.Vector `json:"ramCostVector,omitempty"`
RAMEfficiency float64 `json:"ramEfficiency"`
RAMRequestedVectors []*util.Vector `json:"-"`
RAMUsedVectors []*util.Vector `json:"-"`
PVAllocationHourlyAverage float64 `json:"pvAllocationAverage"`
PVAllocationVectors []*util.Vector `json:"-"`
PVAllocationTotal float64 `json:"-"`
PVCost float64 `json:"pvCost"`
PVCostVector []*util.Vector `json:"pvCostVector,omitempty"`
NetworkCost float64 `json:"networkCost"`
NetworkCostVector []*util.Vector `json:"networkCostVector,omitempty"`
SharedCost float64 `json:"sharedCost"`
TotalCost float64 `json:"totalCost"`
TotalCostVector []*util.Vector `json:"totalCostVector,omitempty"`
}
// TotalHours determines the amount of hours the Aggregation covers, as a
// function of the cost vectors and the resolution of those vectors' data
func (a *Aggregation) TotalHours(resolutionHours float64) float64 {
length := 1
if length < len(a.CPUCostVector) {
length = len(a.CPUCostVector)
}
if length < len(a.RAMCostVector) {
length = len(a.RAMCostVector)
}
if length < len(a.PVCostVector) {
length = len(a.PVCostVector)
}
if length < len(a.GPUCostVector) {
length = len(a.GPUCostVector)
}
if length < len(a.NetworkCostVector) {
length = len(a.NetworkCostVector)
}
return float64(length) * resolutionHours
}
// RateCoefficient computes the coefficient by which the total cost needs to be
// multiplied in order to convert totals costs into per-rate costs.
func (a *Aggregation) RateCoefficient(rateStr string, resolutionHours float64) float64 {
// monthly rate = (730.0)*(total cost)/(total hours)
// daily rate = (24.0)*(total cost)/(total hours)
// hourly rate = (1.0)*(total cost)/(total hours)
// default to hourly rate
coeff := 1.0
switch rateStr {
case "daily":
coeff = timeutil.HoursPerDay
case "monthly":
coeff = timeutil.HoursPerMonth
}
return coeff / a.TotalHours(resolutionHours)
}
type SharedResourceInfo struct {
ShareResources bool
SharedNamespace map[string]bool
LabelSelectors map[string]map[string]bool
}
type SharedCostInfo struct {
Name string
Cost float64
ShareType string
}
func (s *SharedResourceInfo) IsSharedResource(costDatum *CostData) bool {
// exists in a shared namespace
if _, ok := s.SharedNamespace[costDatum.Namespace]; ok {
return true
}
// has at least one shared label (OR, not AND in the case of multiple labels)
for labelName, labelValues := range s.LabelSelectors {
if val, ok := costDatum.Labels[labelName]; ok && labelValues[val] {
return true
}
}
return false
}
func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, labelNames []string, labelValues []string) *SharedResourceInfo {
sr := &SharedResourceInfo{
ShareResources: shareResources,
SharedNamespace: make(map[string]bool),
LabelSelectors: make(map[string]map[string]bool),
}
for _, ns := range sharedNamespaces {
sr.SharedNamespace[strings.Trim(ns, " ")] = true
}
// Creating a map of label name to label value, but only if
// the cardinality matches
if len(labelNames) == len(labelValues) {
for i := range labelNames {
cleanedLname := prom.SanitizeLabelName(strings.Trim(labelNames[i], " "))
if values, ok := sr.LabelSelectors[cleanedLname]; ok {
values[strings.Trim(labelValues[i], " ")] = true
} else {
sr.LabelSelectors[cleanedLname] = map[string]bool{strings.Trim(labelValues[i], " "): true}
}
}
}
return sr
}
func GetTotalContainerCost(costData map[string]*CostData, rate string, cp cloud.Provider, discount float64, customDiscount float64, idleCoefficients map[string]float64) float64 {
totalContainerCost := 0.0
for _, costDatum := range costData {
clusterID := costDatum.ClusterID
cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficients[clusterID])
totalContainerCost += totalVectors(cpuv)
totalContainerCost += totalVectors(ramv)
totalContainerCost += totalVectors(gpuv)
for _, pv := range pvvs {
totalContainerCost += totalVectors(pv)
}
totalContainerCost += totalVectors(netv)
}
return totalContainerCost
}
func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, customDiscount float64, window, offset time.Duration) (map[string]float64, error) {
coefficients := make(map[string]float64)
profileName := "ComputeIdleCoefficient: ComputeClusterCosts"
profileStart := time.Now()
var clusterCosts map[string]*ClusterCosts
var err error
fmtWindow, fmtOffset := timeutil.DurationOffsetStrings(window, offset)
key := fmt.Sprintf("%s:%s", fmtWindow, fmtOffset)
if data, valid := a.ClusterCostsCache.Get(key); valid {
clusterCosts = data.(map[string]*ClusterCosts)
} else {
clusterCosts, err = a.ComputeClusterCosts(cli, cp, window, offset, false)
if err != nil {
return nil, err
}
}
measureTime(profileStart, profileThreshold, profileName)
for cid, costs := range clusterCosts {
if costs.CPUCumulative == 0 && costs.RAMCumulative == 0 && costs.StorageCumulative == 0 {
log.Warnf("No ClusterCosts data for cluster '%s'. Is it emitting data?", cid)
coefficients[cid] = 1.0
continue
}
if costs.TotalCumulative == 0 {
return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, fmtWindow, fmtOffset)
}
totalContainerCost := 0.0
for _, costDatum := range costData {
if costDatum.ClusterID == cid {
cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, customDiscount, 1)
totalContainerCost += totalVectors(cpuv)
totalContainerCost += totalVectors(ramv)
totalContainerCost += totalVectors(gpuv)
for _, pv := range pvvs {
totalContainerCost += totalVectors(pv)
}
}
}
coeff := totalContainerCost / costs.TotalCumulative
coefficients[cid] = coeff
}
return coefficients, nil
}
// AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
type AggregationOptions struct {
Discount float64 // percent by which to discount CPU, RAM, and GPU cost
CustomDiscount float64 // additional custom discount applied to all prices
IdleCoefficients map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
IncludeEfficiency bool // set to true to receive efficiency/usage data
IncludeTimeSeries bool // set to true to receive time series data
Rate string // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
ResolutionHours float64
SharedResourceInfo *SharedResourceInfo
SharedCosts map[string]*SharedCostInfo
FilteredContainerCount int
FilteredEnvironments map[string]int
SharedSplit string
TotalContainerCost float64
}
// Helper method to test request/usgae values against allocation averages for efficiency scores. Generate a warning log if
// clamp is required
func clampAverage(requestsAvg float64, usedAverage float64, allocationAvg float64, resource string) (float64, float64) {
rAvg := requestsAvg
if rAvg > allocationAvg {
log.Debugf("Average %s Requested (%f) > Average %s Allocated (%f). Clamping.", resource, rAvg, resource, allocationAvg)
rAvg = allocationAvg
}
uAvg := usedAverage
if uAvg > allocationAvg {
log.Debugf(" Average %s Used (%f) > Average %s Allocated (%f). Clamping.", resource, uAvg, resource, allocationAvg)
uAvg = allocationAvg
}
return rAvg, uAvg
}
// AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
// must pass a slice of subfields indicating the labels by which to group. Provider is used to define custom resource pricing.
// See AggregationOptions for optional parameters.
func AggregateCostData(costData map[string]*CostData, field string, subfields []string, cp cloud.Provider, opts *AggregationOptions) map[string]*Aggregation {
discount := opts.Discount
customDiscount := opts.CustomDiscount
idleCoefficients := opts.IdleCoefficients
includeTimeSeries := opts.IncludeTimeSeries
includeEfficiency := opts.IncludeEfficiency
rate := opts.Rate
sr := opts.SharedResourceInfo
resolutionHours := 1.0
if opts.ResolutionHours > 0.0 {
resolutionHours = opts.ResolutionHours
}
if idleCoefficients == nil {
idleCoefficients = make(map[string]float64)
}
// aggregations collects key-value pairs of resource group-to-aggregated data
// e.g. namespace-to-data or label-value-to-data
aggregations := make(map[string]*Aggregation)
// sharedResourceCost is the running total cost of resources that should be reported
// as shared across all other resources, rather than reported as a stand-alone category
sharedResourceCost := 0.0
for _, costDatum := range costData {
idleCoefficient, ok := idleCoefficients[costDatum.ClusterID]
if !ok {
idleCoefficient = 1.0
}
if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
sharedResourceCost += totalVectors(cpuv)
sharedResourceCost += totalVectors(ramv)
sharedResourceCost += totalVectors(gpuv)
sharedResourceCost += totalVectors(netv)
for _, pv := range pvvs {
sharedResourceCost += totalVectors(pv)
}
} else {
if field == "cluster" {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.ClusterID, discount, customDiscount, idleCoefficient, false)
} else if field == "node" {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.NodeName, discount, customDiscount, idleCoefficient, false)
} else if field == "namespace" {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace, discount, customDiscount, idleCoefficient, false)
} else if field == "service" {
if len(costDatum.Services) > 0 {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Services[0], discount, customDiscount, idleCoefficient, false)
} else {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
}
} else if field == "deployment" {
if len(costDatum.Deployments) > 0 {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Deployments[0], discount, customDiscount, idleCoefficient, false)
} else {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
}
} else if field == "statefulset" {
if len(costDatum.Statefulsets) > 0 {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Statefulsets[0], discount, customDiscount, idleCoefficient, false)
} else {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
}
} else if field == "daemonset" {
if len(costDatum.Daemonsets) > 0 {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Daemonsets[0], discount, customDiscount, idleCoefficient, false)
} else {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
}
} else if field == "controller" {
if controller, kind, hasController := costDatum.GetController(); hasController {
key := fmt.Sprintf("%s/%s:%s", costDatum.Namespace, kind, controller)
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, false)
} else {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
}
} else if field == "label" {
found := false
if costDatum.Labels != nil {
for _, sf := range subfields {
if subfieldName, ok := costDatum.Labels[sf]; ok {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
found = true
break
}
}
}
if !found {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
}
} else if field == "annotation" {
found := false
if costDatum.Annotations != nil {
for _, sf := range subfields {
if subfieldName, ok := costDatum.Annotations[sf]; ok {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
found = true
break
}
}
}
if !found {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
}
} else if field == "pod" {
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.PodName, discount, customDiscount, idleCoefficient, false)
} else if field == "container" {
key := fmt.Sprintf("%s/%s/%s/%s", costDatum.ClusterID, costDatum.Namespace, costDatum.PodName, costDatum.Name)
aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, true)
}
}
}
for key, agg := range aggregations {
sharedCoefficient := 1 / float64(len(opts.FilteredEnvironments)+len(aggregations))
agg.CPUCost = totalVectors(agg.CPUCostVector)
agg.RAMCost = totalVectors(agg.RAMCostVector)
agg.GPUCost = totalVectors(agg.GPUCostVector)
agg.PVCost = totalVectors(agg.PVCostVector)
agg.NetworkCost = totalVectors(agg.NetworkCostVector)
if opts.SharedSplit == SplitTypeWeighted {
d := opts.TotalContainerCost - sharedResourceCost
if d == 0 {
log.Warnf("Total container cost '%f' and shared resource cost '%f are the same'. Setting sharedCoefficient to 1", opts.TotalContainerCost, sharedResourceCost)
sharedCoefficient = 1.0
} else {
sharedCoefficient = (agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost) / d
}
}
agg.SharedCost = sharedResourceCost * sharedCoefficient
for _, v := range opts.SharedCosts {
agg.SharedCost += v.Cost * sharedCoefficient
}
if rate != "" {
rateCoeff := agg.RateCoefficient(rate, resolutionHours)
agg.CPUCost *= rateCoeff
agg.RAMCost *= rateCoeff
agg.GPUCost *= rateCoeff
agg.PVCost *= rateCoeff
agg.NetworkCost *= rateCoeff
agg.SharedCost *= rateCoeff
}
agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
// Evicted and Completed Pods can still show up here, but have 0 cost.
// Filter these by default. Any reason to keep them?
if agg.TotalCost == 0 {
delete(aggregations, key)
continue
}
// CPU, RAM, and PV allocation are cumulative per-datum, whereas GPU is rate per-datum
agg.CPUAllocationHourlyAverage = totalVectors(agg.CPUAllocationVectors) / agg.TotalHours(resolutionHours)
agg.RAMAllocationHourlyAverage = totalVectors(agg.RAMAllocationVectors) / agg.TotalHours(resolutionHours)
agg.GPUAllocationHourlyAverage = averageVectors(agg.GPUAllocationVectors)
agg.PVAllocationHourlyAverage = totalVectors(agg.PVAllocationVectors) / agg.TotalHours(resolutionHours)
// TODO niko/etl does this check out for GPU data? Do we need to rewrite GPU queries to be
// culumative?
agg.CPUAllocationTotal = totalVectors(agg.CPUAllocationVectors)
agg.GPUAllocationTotal = totalVectors(agg.GPUAllocationVectors)
agg.PVAllocationTotal = totalVectors(agg.PVAllocationVectors)
agg.RAMAllocationTotal = totalVectors(agg.RAMAllocationVectors)
if includeEfficiency {
// Default both RAM and CPU to 0% efficiency so that a 0-requested, 0-allocated, 0-used situation
// returns 0% efficiency, which should be a red-flag.
//
// If non-zero numbers are available, then efficiency is defined as:
// idlePercentage = (requested - used) / allocated
// efficiency = (1.0 - idlePercentage)
//
// It is possible to score > 100% efficiency, which is meant to be interpreted as a red flag.
// It is not possible to score < 0% efficiency.
agg.CPUEfficiency = 0.0
CPUIdle := 0.0
if agg.CPUAllocationHourlyAverage > 0.0 {
avgCPURequested := averageVectors(agg.CPURequestedVectors)
avgCPUUsed := averageVectors(agg.CPUUsedVectors)
// Clamp averages, log range violations
avgCPURequested, avgCPUUsed = clampAverage(avgCPURequested, avgCPUUsed, agg.CPUAllocationHourlyAverage, "CPU")
CPUIdle = ((avgCPURequested - avgCPUUsed) / agg.CPUAllocationHourlyAverage)
agg.CPUEfficiency = 1.0 - CPUIdle
}
agg.RAMEfficiency = 0.0
RAMIdle := 0.0
if agg.RAMAllocationHourlyAverage > 0.0 {
avgRAMRequested := averageVectors(agg.RAMRequestedVectors)
avgRAMUsed := averageVectors(agg.RAMUsedVectors)
// Clamp averages, log range violations
avgRAMRequested, avgRAMUsed = clampAverage(avgRAMRequested, avgRAMUsed, agg.RAMAllocationHourlyAverage, "RAM")
RAMIdle = ((avgRAMRequested - avgRAMUsed) / agg.RAMAllocationHourlyAverage)
agg.RAMEfficiency = 1.0 - RAMIdle
}
// Score total efficiency by the sum of CPU and RAM efficiency, weighted by their
// respective total costs.
agg.Efficiency = 0.0
if (agg.CPUCost + agg.RAMCost) > 0 {
agg.Efficiency = ((agg.CPUCost * agg.CPUEfficiency) + (agg.RAMCost * agg.RAMEfficiency)) / (agg.CPUCost + agg.RAMCost)
}
}
// convert RAM from bytes to GiB
agg.RAMAllocationHourlyAverage = agg.RAMAllocationHourlyAverage / 1024 / 1024 / 1024
// convert storage from bytes to GiB
agg.PVAllocationHourlyAverage = agg.PVAllocationHourlyAverage / 1024 / 1024 / 1024
// remove time series data if it is not explicitly requested
if !includeTimeSeries {
agg.CPUCostVector = nil
agg.RAMCostVector = nil
agg.GPUCostVector = nil
agg.PVCostVector = nil
agg.NetworkCostVector = nil
agg.TotalCostVector = nil
} else { // otherwise compute a totalcostvector
v1 := addVectors(agg.CPUCostVector, agg.RAMCostVector)
v2 := addVectors(v1, agg.GPUCostVector)
v3 := addVectors(v2, agg.PVCostVector)
v4 := addVectors(v3, agg.NetworkCostVector)
agg.TotalCostVector = v4
}
// Typesafety checks
if math.IsNaN(agg.CPUAllocationHourlyAverage) || math.IsInf(agg.CPUAllocationHourlyAverage, 0) {
log.Warnf("CPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.CPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
agg.CPUAllocationHourlyAverage = 0
}
if math.IsNaN(agg.CPUCost) || math.IsInf(agg.CPUCost, 0) {
log.Warnf("CPUCost is %f for '%s: %s/%s'", agg.CPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
agg.CPUCost = 0
}
if math.IsNaN(agg.CPUEfficiency) || math.IsInf(agg.CPUEfficiency, 0) {
log.Warnf("CPUEfficiency is %f for '%s: %s/%s'", agg.CPUEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
agg.CPUEfficiency = 0
}
if math.IsNaN(agg.Efficiency) || math.IsInf(agg.Efficiency, 0) {
log.Warnf("Efficiency is %f for '%s: %s/%s'", agg.Efficiency, agg.Cluster, agg.Aggregator, agg.Environment)
agg.Efficiency = 0
}
if math.IsNaN(agg.GPUAllocationHourlyAverage) || math.IsInf(agg.GPUAllocationHourlyAverage, 0) {
log.Warnf("GPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.GPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
agg.GPUAllocationHourlyAverage = 0
}
if math.IsNaN(agg.GPUCost) || math.IsInf(agg.GPUCost, 0) {
log.Warnf("GPUCost is %f for '%s: %s/%s'", agg.GPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
agg.GPUCost = 0
}
if math.IsNaN(agg.RAMAllocationHourlyAverage) || math.IsInf(agg.RAMAllocationHourlyAverage, 0) {
log.Warnf("RAMAllocationHourlyAverage is %f for '%s: %s/%s'", agg.RAMAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
agg.RAMAllocationHourlyAverage = 0
}
if math.IsNaN(agg.RAMCost) || math.IsInf(agg.RAMCost, 0) {
log.Warnf("RAMCost is %f for '%s: %s/%s'", agg.RAMCost, agg.Cluster, agg.Aggregator, agg.Environment)
agg.RAMCost = 0
}
if math.IsNaN(agg.RAMEfficiency) || math.IsInf(agg.RAMEfficiency, 0) {
log.Warnf("RAMEfficiency is %f for '%s: %s/%s'", agg.RAMEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
agg.RAMEfficiency = 0
}
if math.IsNaN(agg.PVAllocationHourlyAverage) || math.IsInf(agg.PVAllocationHourlyAverage, 0) {
log.Warnf("PVAllocationHourlyAverage is %f for '%s: %s/%s'", agg.PVAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
agg.PVAllocationHourlyAverage = 0
}
if math.IsNaN(agg.PVCost) || math.IsInf(agg.PVCost, 0) {
log.Warnf("PVCost is %f for '%s: %s/%s'", agg.PVCost, agg.Cluster, agg.Aggregator, agg.Environment)
agg.PVCost = 0
}
if math.IsNaN(agg.NetworkCost) || math.IsInf(agg.NetworkCost, 0) {
log.Warnf("NetworkCost is %f for '%s: %s/%s'", agg.NetworkCost, agg.Cluster, agg.Aggregator, agg.Environment)
agg.NetworkCost = 0
}
if math.IsNaN(agg.SharedCost) || math.IsInf(agg.SharedCost, 0) {
log.Warnf("SharedCost is %f for '%s: %s/%s'", agg.SharedCost, agg.Cluster, agg.Aggregator, agg.Environment)
agg.SharedCost = 0
}
if math.IsNaN(agg.TotalCost) || math.IsInf(agg.TotalCost, 0) {
log.Warnf("TotalCost is %f for '%s: %s/%s'", agg.TotalCost, agg.Cluster, agg.Aggregator, agg.Environment)
agg.TotalCost = 0
}
}
return aggregations
}
func aggregateDatum(cp cloud.Provider, aggregations map[string]*Aggregation, costDatum *CostData, field string, subfields []string, rate string, key string, discount float64, customDiscount float64, idleCoefficient float64, includeProperties bool) {
// add new entry to aggregation results if a new key is encountered
if _, ok := aggregations[key]; !ok {
agg := &Aggregation{
Aggregator: field,
Environment: key,
}
if len(subfields) > 0 {
agg.Subfields = subfields
}
if includeProperties {
props := &kubecost.AllocationProperties{}
props.Cluster = costDatum.ClusterID
props.Node = costDatum.NodeName
if controller, kind, hasController := costDatum.GetController(); hasController {
props.Controller = controller
props.ControllerKind = kind
}
props.Labels = costDatum.Labels
props.Annotations = costDatum.Annotations
props.Namespace = costDatum.Namespace
props.Pod = costDatum.PodName
props.Services = costDatum.Services
props.Container = costDatum.Name
agg.Properties = props
}
aggregations[key] = agg
}
mergeVectors(cp, costDatum, aggregations[key], rate, discount, customDiscount, idleCoefficient)
}
func mergeVectors(cp cloud.Provider, costDatum *CostData, aggregation *Aggregation, rate string, discount float64, customDiscount float64, idleCoefficient float64) {
aggregation.CPUAllocationVectors = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocationVectors)
aggregation.CPURequestedVectors = addVectors(costDatum.CPUReq, aggregation.CPURequestedVectors)
aggregation.CPUUsedVectors = addVectors(costDatum.CPUUsed, aggregation.CPUUsedVectors)
aggregation.RAMAllocationVectors = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocationVectors)
aggregation.RAMRequestedVectors = addVectors(costDatum.RAMReq, aggregation.RAMRequestedVectors)
aggregation.RAMUsedVectors = addVectors(costDatum.RAMUsed, aggregation.RAMUsedVectors)
aggregation.GPUAllocationVectors = addVectors(costDatum.GPUReq, aggregation.GPUAllocationVectors)
for _, pvcd := range costDatum.PVCData {
aggregation.PVAllocationVectors = addVectors(pvcd.Values, aggregation.PVAllocationVectors)
}
cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
aggregation.NetworkCostVector = addVectors(netv, aggregation.NetworkCostVector)
for _, vectorList := range pvvs {
aggregation.PVCostVector = addVectors(aggregation.PVCostVector, vectorList)
}
}
// Returns the blended discounts applied to the node as a result of global discounts and reserved instance
// discounts
func getDiscounts(costDatum *CostData, cpuCost float64, ramCost float64, discount float64) (float64, float64) {
if costDatum.NodeData == nil {
return discount, discount
}
if costDatum.NodeData.IsSpot() {
return 0, 0
}
reserved := costDatum.NodeData.Reserved
// blended discounts
blendedCPUDiscount := discount
blendedRAMDiscount := discount
if reserved != nil && reserved.CPUCost > 0 && reserved.RAMCost > 0 {
reservedCPUDiscount := 0.0
if cpuCost == 0 {
log.Warnf("No cpu cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
} else {
reservedCPUDiscount = 1.0 - (reserved.CPUCost / cpuCost)
}
reservedRAMDiscount := 0.0
if ramCost == 0 {
log.Warnf("No ram cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
} else {
reservedRAMDiscount = 1.0 - (reserved.RAMCost / ramCost)
}
// AWS passes the # of reserved CPU and RAM as -1 to represent "All"
if reserved.ReservedCPU < 0 && reserved.ReservedRAM < 0 {
blendedCPUDiscount = reservedCPUDiscount
blendedRAMDiscount = reservedRAMDiscount
} else {
nodeCPU, ierr := strconv.ParseInt(costDatum.NodeData.VCPU, 10, 64)
nodeRAM, ferr := strconv.ParseFloat(costDatum.NodeData.RAMBytes, 64)
if ierr == nil && ferr == nil {
nodeRAMGB := nodeRAM / 1024 / 1024 / 1024
reservedRAMGB := float64(reserved.ReservedRAM) / 1024 / 1024 / 1024
nonReservedCPU := nodeCPU - reserved.ReservedCPU
nonReservedRAM := nodeRAMGB - reservedRAMGB
if nonReservedCPU == 0 {
blendedCPUDiscount = reservedCPUDiscount
} else {
if nodeCPU == 0 {
log.Warnf("No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
} else {
blendedCPUDiscount = (float64(reserved.ReservedCPU) * reservedCPUDiscount) + (float64(nonReservedCPU)*discount)/float64(nodeCPU)
}
}
if nonReservedRAM == 0 {
blendedRAMDiscount = reservedRAMDiscount
} else {
if nodeRAMGB == 0 {
log.Warnf("No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
} else {
blendedRAMDiscount = (reservedRAMGB * reservedRAMDiscount) + (nonReservedRAM*discount)/nodeRAMGB
}
}
}
}
}
return blendedCPUDiscount, blendedRAMDiscount
}
func parseVectorPricing(cfg *cloud.CustomPricing, costDatum *CostData, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr string) (float64, float64, float64, float64, bool) {
usesCustom := false
cpuCost, err := strconv.ParseFloat(cpuCostStr, 64)
if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) || cpuCost == 0 {
cpuCost, err = strconv.ParseFloat(cfg.CPU, 64)
usesCustom = true
if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
cpuCost = 0
}
}
ramCost, err := strconv.ParseFloat(ramCostStr, 64)
if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) || ramCost == 0 {
ramCost, err = strconv.ParseFloat(cfg.RAM, 64)
usesCustom = true
if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
ramCost = 0
}
}
gpuCost, err := strconv.ParseFloat(gpuCostStr, 64)
if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
gpuCost, err = strconv.ParseFloat(cfg.GPU, 64)
if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
gpuCost = 0
}
}
pvCost, err := strconv.ParseFloat(pvCostStr, 64)
if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
pvCost, err = strconv.ParseFloat(cfg.Storage, 64)
if err != nil || math.IsNaN(pvCost) || math.IsInf(pvCost, 0) {
pvCost = 0
}
}
return cpuCost, ramCost, gpuCost, pvCost, usesCustom
}
func getPriceVectors(cp cloud.Provider, costDatum *CostData, rate string, discount float64, customDiscount float64, idleCoefficient float64) ([]*util.Vector, []*util.Vector, []*util.Vector, [][]*util.Vector, []*util.Vector) {
var cpuCost float64
var ramCost float64
var gpuCost float64
var pvCost float64
var usesCustom bool
// If custom pricing is enabled and can be retrieved, replace
// default cost values with custom values
customPricing, err := cp.GetConfig()
if err != nil {
log.Errorf("failed to load custom pricing: %s", err)
}
if cloud.CustomPricesEnabled(cp) && err == nil {
var cpuCostStr string
var ramCostStr string
var gpuCostStr string
var pvCostStr string
if costDatum.NodeData.IsSpot() {
cpuCostStr = customPricing.SpotCPU
ramCostStr = customPricing.SpotRAM
gpuCostStr = customPricing.SpotGPU
} else {
cpuCostStr = customPricing.CPU
ramCostStr = customPricing.RAM
gpuCostStr = customPricing.GPU
}
pvCostStr = customPricing.Storage
cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
} else if costDatum.NodeData == nil && err == nil {
cpuCostStr := customPricing.CPU
ramCostStr := customPricing.RAM
gpuCostStr := customPricing.GPU
pvCostStr := customPricing.Storage
cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
} else {
cpuCostStr := costDatum.NodeData.VCPUCost
ramCostStr := costDatum.NodeData.RAMCost
gpuCostStr := costDatum.NodeData.GPUCost
pvCostStr := costDatum.NodeData.StorageCost
cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
}
if usesCustom {
log.DedupedWarningf(5, "No pricing data found for node `%s` , using custom pricing", costDatum.NodeName)
}
cpuDiscount, ramDiscount := getDiscounts(costDatum, cpuCost, ramCost, discount)
log.Debugf("Node Name: %s", costDatum.NodeName)
log.Debugf("Blended CPU Discount: %f", cpuDiscount)
log.Debugf("Blended RAM Discount: %f", ramDiscount)
// TODO should we try to apply the rate coefficient here or leave it as a totals-only metric?
rateCoeff := 1.0
if idleCoefficient == 0 {
idleCoefficient = 1.0
}
cpuv := make([]*util.Vector, 0, len(costDatum.CPUAllocation))
for _, val := range costDatum.CPUAllocation {
cpuv = append(cpuv, &util.Vector{
Timestamp: math.Round(val.Timestamp/10) * 10,
Value: (val.Value * cpuCost * (1 - cpuDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
})
}
ramv := make([]*util.Vector, 0, len(costDatum.RAMAllocation))
for _, val := range costDatum.RAMAllocation {
ramv = append(ramv, &util.Vector{
Timestamp: math.Round(val.Timestamp/10) * 10,
Value: ((val.Value / 1024 / 1024 / 1024) * ramCost * (1 - ramDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
})
}
gpuv := make([]*util.Vector, 0, len(costDatum.GPUReq))
for _, val := range costDatum.GPUReq {
gpuv = append(gpuv, &util.Vector{
Timestamp: math.Round(val.Timestamp/10) * 10,
Value: (val.Value * gpuCost * (1 - discount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
})
}
pvvs := make([][]*util.Vector, 0, len(costDatum.PVCData))
for _, pvcData := range costDatum.PVCData {
pvv := make([]*util.Vector, 0, len(pvcData.Values))
if pvcData.Volume != nil {
cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
// override with custom pricing if enabled
if cloud.CustomPricesEnabled(cp) {
cost = pvCost
}
for _, val := range pvcData.Values {
pvv = append(pvv, &util.Vector{
Timestamp: math.Round(val.Timestamp/10) * 10,
Value: ((val.Value / 1024 / 1024 / 1024) * cost * (1 - customDiscount) / idleCoefficient) * rateCoeff,
})
}
pvvs = append(pvvs, pvv)
}
}
netv := make([]*util.Vector, 0, len(costDatum.NetworkData))
for _, val := range costDatum.NetworkData {
netv = append(netv, &util.Vector{
Timestamp: math.Round(val.Timestamp/10) * 10,
Value: val.Value,
})
}
return cpuv, ramv, gpuv, pvvs, netv
}
func averageVectors(vectors []*util.Vector) float64 {
if len(vectors) == 0 {
return 0.0
}
return totalVectors(vectors) / float64(len(vectors))
}
func totalVectors(vectors []*util.Vector) float64 {
total := 0.0
for _, vector := range vectors {
total += vector.Value
}
return total
}
// addVectors adds two slices of Vectors. Vector timestamps are rounded to the
// nearest ten seconds to allow matching of Vectors within a delta allowance.
// Matching Vectors are summed, while unmatched Vectors are passed through.
// e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
func addVectors(xvs []*util.Vector, yvs []*util.Vector) []*util.Vector {
sumOp := func(result *util.Vector, x *float64, y *float64) bool {
if x != nil && y != nil {
result.Value = *x + *y
} else if y != nil {
result.Value = *y
} else if x != nil {
result.Value = *x
}
return true
}
return util.ApplyVectorOp(xvs, yvs, sumOp)
}
// minCostDataLength sets the minimum number of time series data required to
// cache both raw and aggregated cost data
const minCostDataLength = 2
// EmptyDataError describes an error caused by empty cost data for some
// defined interval
type EmptyDataError struct {
err error
window kubecost.Window
}
// Error implements the error interface
func (ede *EmptyDataError) Error() string {
err := fmt.Sprintf("empty data for range: %s", ede.window)
if ede.err != nil {
err += fmt.Sprintf(": %s", ede.err)
}
return err
}
func costDataTimeSeriesLength(costData map[string]*CostData) int {
l := 0
for _, cd := range costData {
if l < len(cd.RAMAllocation) {
l = len(cd.RAMAllocation)
}
if l < len(cd.CPUAllocation) {
l = len(cd.CPUAllocation)
}
}
return l
}
// ScaleHourlyCostData converts per-hour cost data to per-resolution data. If the target resolution is higher (i.e. < 1.0h)
// then we can do simple multiplication by the fraction-of-an-hour and retain accuracy. If the target resolution is
// lower (i.e. > 1.0h) then we sum groups of hourly data by resolution to maintain fidelity.
// e.g. (100 hours of per-hour hourly data, resolutionHours=10) => 10 data points, grouped and summed by 10-hour window
// e.g. (20 minutes of per-minute hourly data, resolutionHours=1/60) => 20 data points, scaled down by a factor of 60
func ScaleHourlyCostData(data map[string]*CostData, resolutionHours float64) map[string]*CostData {
scaled := map[string]*CostData{}
for key, datum := range data {
datum.RAMReq = scaleVectorSeries(datum.RAMReq, resolutionHours)
datum.RAMUsed = scaleVectorSeries(datum.RAMUsed, resolutionHours)
datum.RAMAllocation = scaleVectorSeries(datum.RAMAllocation, resolutionHours)
datum.CPUReq = scaleVectorSeries(datum.CPUReq, resolutionHours)
datum.CPUUsed = scaleVectorSeries(datum.CPUUsed, resolutionHours)
datum.CPUAllocation = scaleVectorSeries(datum.CPUAllocation, resolutionHours)
datum.GPUReq = scaleVectorSeries(datum.GPUReq, resolutionHours)
datum.NetworkData = scaleVectorSeries(datum.NetworkData, resolutionHours)
for _, pvcDatum := range datum.PVCData {
pvcDatum.Values = scaleVectorSeries(pvcDatum.Values, resolutionHours)
}
scaled[key] = datum
}
return scaled
}
func scaleVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
// if scaling to a lower resolution, compress the hourly data for maximum accuracy
if resolutionHours > 1.0 {
return compressVectorSeries(vs, resolutionHours)
}
// if scaling to a higher resolution, simply scale each value down by the fraction of an hour
for _, v := range vs {
v.Value *= resolutionHours
}
return vs
}
func compressVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
if len(vs) == 0 {
return vs
}
compressed := []*util.Vector{}
threshold := float64(60 * 60 * resolutionHours)
var acc *util.Vector
for i, v := range vs {
if acc == nil {
// start a new accumulation from current datum
acc = &util.Vector{
Value: vs[i].Value,
Timestamp: vs[i].Timestamp,
}
continue
}
if v.Timestamp-acc.Timestamp < threshold {
// v should be accumulated in current datum
acc.Value += v.Value
} else {
// v falls outside current datum's threshold; append and start a new one
compressed = append(compressed, acc)
acc = &util.Vector{