Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions cdb/db_instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,16 @@ func (oDb *DB) InstanceResourceInfoUpdate(ctx context.Context, svcID, nodeID str
)
for _, info := range data.Info {
for _, key := range info.Keys {
if len(key.Value) > 255 {
value = key.Value[:255]
} else {
value = key.Value
switch v := key.Value.(type) {
case string:
value = v
case float64:
value = fmt.Sprintf("%f", v)
default:
continue
}
if len(value) > 255 {
value = value[:255]
}
resCount++
args = append(args, svcID, nodeID, info.Rid, key.Key, data.Topology, value)
Expand Down
1 change: 1 addition & 0 deletions cmd/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func setDefaultWorkerConfig(name string) {

viper.SetDefault(section+".runners", 1)
viper.SetDefault(section+".log.request.level", "none")
viper.SetDefault(section+".directories.uploads", "/oc3/uploads")
}

func setDefaultFeederConfig() {
Expand Down
13 changes: 8 additions & 5 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (

type (
workerT struct {
db *sql.DB
redis *redis.Client
section string
runners int
queues []string
db *sql.DB
redis *redis.Client
section string
runners int
queues []string
uploadDir string
}
)

Expand All @@ -43,6 +44,7 @@ func newWorker(section string, runners int, queues []string) (*workerT, error) {
for _, q := range queues {
t.queues = append(t.queues, cachekeys.QueuePrefix+q)
}
t.uploadDir = viper.GetString(t.section + ".directories.uploads")
return t, nil
}

Expand Down Expand Up @@ -82,6 +84,7 @@ func (t *workerT) run() error {
Runners: t.runners,
SubSystem: t.Section(),
ODB: odb,
UploadDir: t.uploadDir,
}
return w.Run()
}
3 changes: 1 addition & 2 deletions feeder/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,7 @@ components:
properties:
key:
type: string
value:
type: string
value: {}

InstanceStatus:
type: object
Expand Down
54 changes: 27 additions & 27 deletions feeder/codegen_server_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions feeder/codegen_type_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions worker/base_job_uploader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package worker

type (
JobUpload struct {
UploadDir string
}
)

func (j *JobUpload) SetUploadDir(s string) {
j.UploadDir = s
}
82 changes: 82 additions & 0 deletions worker/job_feed_instance_resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ import (
"encoding/json"
"fmt"
"log/slog"
"path/filepath"
"strconv"

"github.com/go-graphite/go-whisper"
"github.com/go-redis/redis/v8"

"github.com/opensvc/oc3/cachekeys"
"github.com/opensvc/oc3/feeder"
"github.com/opensvc/oc3/timeseries"
"github.com/opensvc/oc3/util/logkey"
)

Expand All @@ -18,6 +22,7 @@ type (
JobBase
JobRedis
JobDB
JobUpload

// idX is the id of the posted instance config with the expected pattern: <objectName>@<nodeID>@<clusterID>
idX string
Expand All @@ -38,6 +43,10 @@ type (
}
)

var (
ErrResInfoValue = fmt.Errorf("invalid resource info value")
)

func newjobFeedInstanceResourceInfo(objectName, nodeID, clusterID string) *jobFeedInstanceResourceInfo {
idX := fmt.Sprintf("%s@%s@%s", objectName, nodeID, clusterID)
return &jobFeedInstanceResourceInfo{
Expand All @@ -64,6 +73,7 @@ func (j *jobFeedInstanceResourceInfo) Operations() []operation {
{name: "dbNow", do: j.dbNow},
{name: "updateDB", do: j.updateDB},
{name: "purgeDB", do: j.purgeDB},
{name: "updateWSP", do: j.updateWSP, blocking: false},
{name: "pushFromTableChanges", do: j.pushFromTableChanges},
}
}
Expand Down Expand Up @@ -113,3 +123,75 @@ func (j *jobFeedInstanceResourceInfo) purgeDB(ctx context.Context) (err error) {

return nil
}

// updateWSP updates Whisper files with new data points for instance resource information.
// Returns an error on failure.
// filename: <UploadDir>/stats/nodes/<nodeID>/services/<objectID>/resources/<rid>/info/<key>.wsp
func (j *jobFeedInstanceResourceInfo) updateWSP(ctx context.Context) (err error) {
if j.objectID == "" {
return fmt.Errorf("updateWSP: objectID is empty")
}
timestamp := int(j.now.Unix())
baseDir := filepath.Join(j.UploadDir, "stats", "nodes", j.nodeID, "services", j.objectID)

var okKeys []string
var badKeys []string
for _, info := range j.data.Info {
rid := info.Rid
for _, v := range info.Keys {
value, err := j.valueToFloat64(v.Value)
if err != nil {
continue
}
fName := filepath.Join(baseDir, "resources", rid, "info", v.Key+".wsp")

if err := timeseries.Update(fName, value, timestamp, timeseries.DefaultRetentions, whisper.Average, 0.0); err != nil {
badKeys = append(badKeys, v.Key)
} else {
okKeys = append(okKeys, v.Key)
}
}
if len(okKeys) > 0 {
j.logger.Debug(fmt.Sprintf("updateWSP done for keys %v", okKeys))
}
}
if len(badKeys) > 0 {
return fmt.Errorf("jobFeedInstanceResourceInfo: updateWSP failed for keys %v", badKeys)
}
return nil
}

// valueToFloat64 converts an arbitrary value to a float64, returning an error if the conversion is not possible.
func (j *jobFeedInstanceResourceInfo) valueToFloat64(i any) (float64, error) {
switch n := i.(type) {
case string:
// most common values are strings, so start with that
return strconv.ParseFloat(n, 64)
case int:
return float64(n), nil
case float64:
return n, nil
case float32:
return float64(n), nil
case int8:
return float64(n), nil
case int16:
return float64(n), nil
case int32:
return float64(n), nil
case int64:
return float64(n), nil
case uint:
return float64(n), nil
case uint8:
return float64(n), nil
case uint16:
return float64(n), nil
case uint32:
return float64(n), nil
case uint64:
return float64(n), nil
default:
return 0, ErrResInfoValue
}
}
10 changes: 10 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type (
Runners int

SubSystem string
UploadDir string
}

EventPublisher interface {
Expand All @@ -48,6 +49,10 @@ type (
SetEv(ev EventPublisher)
}

UploadDirSetter interface {
SetUploadDir(s string)
}

JobRunner interface {
Operationer

Expand Down Expand Up @@ -193,6 +198,11 @@ func (w *Worker) runJob(unqueuedJob []string) error {
if a, ok := j.(EvSetter); ok {
a.SetEv(w.Ev)
}

if a, ok := j.(UploadDirSetter); ok {
a.SetUploadDir(w.UploadDir)
}

status := jobStatusOk
err := RunJob(ctx, j)
duration := time.Since(begin)
Expand Down
Loading