Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@ require (
github.com/oklog/ulid/v2 v2.1.0
github.com/redis/go-redis/v9 v9.7.0
github.com/rs/cors v1.11.1
github.com/shirou/gopsutil/v4 v4.24.12
golang.org/x/sync v0.10.0
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/ebitengine/purego v0.8.1 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
golang.org/x/sys v0.28.0 // indirect
)
32 changes: 32 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,48 @@ github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5m
github.com/caarlos0/env/v11 v11.3.1/go.mod h1:qupehSf/Y0TUTsxKywqRt/vJjN5nz6vauiYEUUr8P4U=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/ebitengine/purego v0.8.1 h1:sdRKd6plj7KYW33EH5As6YKfe8m9zbN9JMrOjNVF/BE=
github.com/ebitengine/purego v0.8.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/shirou/gopsutil/v4 v4.24.12 h1:qvePBOk20e0IKA1QXrIIU+jmk+zEiYVVx06WjBRlZo4=
github.com/shirou/gopsutil/v4 v4.24.12/go.mod h1:DCtMPAad2XceTeIAbGyVfycbYQNBGk2P8cvDi7/VN9o=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Binary file modified images/home.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions redis/heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type HeartbeatData struct {
HeartbeatAt time.Time `redis:"heartbeat_at"`
Queues []byte `redis:"queues"`
PID int `redis:"pid"`
MemoryUsage float64 `redis:"memory_usage"`
CPUUsage float64 `redis:"cpu_usage"`
}

func NewHeartBeater(rc redis.UniversalClient, opts ...OptFunc) *Heartbeater {
Expand Down Expand Up @@ -47,6 +49,8 @@ func (s *Heartbeater) SendHeartbeat(ctx context.Context, data taskqueue.Heartbea
HeartbeatAt: data.HeartbeatAt,
Queues: queuesData,
PID: data.PID,
MemoryUsage: data.MemoryUsage,
CPUUsage: data.CPUUsage,
}

_, err = s.client.TxPipelined(ctx, func(p redis.Pipeliner) error {
Expand Down Expand Up @@ -107,6 +111,8 @@ func (s *Heartbeater) LastHeartbeats(ctx context.Context) ([]taskqueue.Heartbeat
HeartbeatAt: hb.HeartbeatAt,
Queues: queues,
PID: hb.PID,
MemoryUsage: hb.MemoryUsage,
CPUUsage: hb.CPUUsage,
})
}

Expand Down
6 changes: 0 additions & 6 deletions redis/jobstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,14 @@ func TestStoreLastHeartbeats(t *testing.T) {
{
Name: "queue_1",
Concurrency: 10,
MaxAttempts: 10,
Timeout: time.Minute * 10,
},
{
Name: "queue_2",
Concurrency: 10,
MaxAttempts: 10,
Timeout: time.Minute,
},
{
Name: "queue_2",
Concurrency: 10,
MaxAttempts: 10,
Timeout: time.Second * 30,
},
},
PID: 12,
Expand Down
52 changes: 34 additions & 18 deletions redis/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package redis

import (
"context"
"sort"
"strconv"
"strings"
"time"

"github.com/oshankkumar/taskqueue-go"
Expand Down Expand Up @@ -82,7 +84,7 @@ func (m *MetricsBackend) QueryRangeCounterValues(ctx context.Context, mt taskque

func (m *MetricsBackend) GaugeValue(ctx context.Context, mt taskqueue.Metric) (taskqueue.MetricValue, error) {
metricName, labels := mt.Name, mt.Labels
key := redisKeyGaugeMetrics(m.namespace, metricName, labels)
key := redisZSetKeyGaugeMetrics(m.namespace, metricName, labels)

result, err := m.client.ZRangeArgsWithScores(ctx, redis.ZRangeArgs{
Key: key,
Expand Down Expand Up @@ -121,7 +123,7 @@ func (m *MetricsBackend) GaugeValue(ctx context.Context, mt taskqueue.Metric) (t

func (m *MetricsBackend) RecordGauge(ctx context.Context, mt taskqueue.Metric, value float64, ts time.Time) error {
metricName, labels := mt.Name, mt.Labels
key := redisKeyGaugeMetrics(m.namespace, metricName, labels)
key := redisZSetKeyGaugeMetrics(m.namespace, metricName, labels)
score := ts.Unix()

return m.client.ZAdd(ctx, key, redis.Z{
Expand All @@ -132,7 +134,7 @@ func (m *MetricsBackend) RecordGauge(ctx context.Context, mt taskqueue.Metric, v

func (m *MetricsBackend) QueryRangeGaugeValues(ctx context.Context, mt taskqueue.Metric, start, end time.Time) (taskqueue.MetricRangeValue, error) {
metricName, labels := mt.Name, mt.Labels
key := redisKeyGaugeMetrics(m.namespace, metricName, labels)
key := redisZSetKeyGaugeMetrics(m.namespace, metricName, labels)

result, err := m.client.ZRangeArgsWithScores(ctx, redis.ZRangeArgs{
Key: key,
Expand Down Expand Up @@ -170,26 +172,40 @@ func (m *MetricsBackend) QueryRangeGaugeValues(ctx context.Context, mt taskqueue
return gaugeRange, nil
}

func redisKeyGaugeMetrics(ns string, metricName string, labels map[string]string) string {
key := ns + ":gauge:" + metricName
for k, v := range labels {
key += ":" + k + ":" + v
}
return key
func redisZSetKeyGaugeMetrics(ns string, metricName string, labels map[string]string) string {
return redisKeyPrefixGaugeMetrics(ns, metricName, labels) + ":values"
}

func redisHashKeyCounterMetrics(ns string, metricName string, labels map[string]string) string {
key := ns + ":counter:" + metricName
for k, v := range labels {
key += ":" + k + ":" + v
}
return key + ":values"
return redisKeyPrefixCounterMetrics(ns, metricName, labels) + ":values"
}

func redisZSetKeyCounterMetrics(ns string, metricName string, labels map[string]string) string {
key := ns + ":counter:" + metricName
for k, v := range labels {
key += ":" + k + ":" + v
return redisKeyPrefixCounterMetrics(ns, metricName, labels) + ":timestamps"
}

func redisKeyPrefixCounterMetrics(ns string, metricName string, labels map[string]string) string {
return ns + ":counter:" + metricName + ":" + joinLabels(labels, ":")
}

func redisKeyPrefixGaugeMetrics(ns string, metricName string, labels map[string]string) string {
return ns + ":gauge:" + metricName + ":" + joinLabels(labels, ":")
}

func sortedMapKeys(labels map[string]string) []string {
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}

func joinLabels(labels map[string]string, sep string) string {
keys := sortedMapKeys(labels)
tokens := make([]string, 0, len(keys)*2)
for _, k := range keys {
tokens = append(tokens, k, labels[k])
}
return key + ":timestamps"
return strings.Join(tokens, sep)
}
4 changes: 2 additions & 2 deletions taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,6 @@ func (s *Server) listActiveWorkers(w http.ResponseWriter, r *http.Request) {
queues = append(queues, QueuesConfig{
QueueName: q.Name,
Concurrency: q.Concurrency,
MaxAttempts: q.MaxAttempts,
Timeout: q.Timeout,
})
}

Expand All @@ -217,6 +215,8 @@ func (s *Server) listActiveWorkers(w http.ResponseWriter, r *http.Request) {
HeartbeatAt: hb.HeartbeatAt,
Queues: queues,
PID: hb.PID,
MemoryUsage: hb.MemoryUsage,
CPUUsage: hb.CPUUsage,
})
}

Expand Down
19 changes: 18 additions & 1 deletion taskmanager/taskqueue-web/src/components/WorkerList.vue
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
</q-card-section>

<q-card-section>
<q-item>
<q-item class="stats">
<q-item-section>
<p class="text-caption">
<strong>Started At:</strong>
Expand All @@ -69,6 +69,18 @@
{{ worker.pid }}
</p>
</q-item-section>
<q-item-section>
<p class="text-caption">
<strong>Memory Usage:</strong>
{{ worker.memoryUsage.toFixed(2) }}%
</p>
</q-item-section>
<q-item-section>
<p class="text-caption">
<strong>CPU Usage:</strong>
{{ worker.cpuUsage.toFixed(2) }}%
</p>
</q-item-section>
</q-item>
</q-card-section>

Expand Down Expand Up @@ -119,4 +131,9 @@ export default {
max-height: 400px;
overflow-y: auto;
}

.stats p {
font-size: 14px;
margin: 5px 0;
}
</style>
2 changes: 1 addition & 1 deletion taskmanager/taskqueue-web/src/pages/Home.vue
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
</div>

<div class="row q-col-gutter-sm q-pa-sm">
<div class="col-lg-6 col-md-6 col-sm-12 col-xs-12 q-pa-sm">
<div class="col-lg-12 col-md-12 col-sm-12 col-xs-12 q-pa-sm">
<worker-list :workers="activeWorkers" @refresh="fetchActiveWorkers"/>
</div>
</div>
Expand Down
8 changes: 4 additions & 4 deletions taskmanager/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ type ListActiveWorkersResponse struct {
}

type QueuesConfig struct {
QueueName string `json:"queueName"`
Concurrency int `json:"concurrency"`
MaxAttempts int `json:"maxAttempts"`
Timeout time.Duration `json:"timeout"`
QueueName string `json:"queueName"`
Concurrency int `json:"concurrency"`
}

type ActiveWorker struct {
Expand All @@ -60,6 +58,8 @@ type ActiveWorker struct {
HeartbeatAt time.Time `json:"heartbeatAt"`
Queues []QueuesConfig `json:"queues"`
PID int `json:"pid"`
MemoryUsage float64 `json:"memoryUsage"`
CPUUsage float64 `json:"cpuUsage"`
}

type TogglePendingQueueStatusResponse struct {
Expand Down
54 changes: 32 additions & 22 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package taskqueue
import (
"context"
"errors"
"github.com/shirou/gopsutil/v4/cpu"
"github.com/shirou/gopsutil/v4/mem"
"log/slog"
"math"
"os"
Expand Down Expand Up @@ -313,45 +315,53 @@ func (w *Worker) startHeartBeat(ctx context.Context) {
queues = append(queues, HeartbeatQueueData{
Name: h.queueName,
Concurrency: h.jobOptions.Concurrency,
MaxAttempts: h.jobOptions.MaxAttempts,
Timeout: h.jobOptions.Timeout,
})
}

w.internalLogger.Info("starting heartbeat loop")

if err := w.heartBeater.SendHeartbeat(ctx, HeartbeatData{
WorkerID: w.id,
StartedAt: w.startedAt,
HeartbeatAt: time.Now(),
Queues: queues,
PID: pid,
}); err != nil {
w.errorHandler(err)
sendHearBeat := func() {
var memUsed float64
memStat, err := mem.VirtualMemoryWithContext(ctx)
if err == nil {
memUsed = memStat.UsedPercent
}

var cpuUsed float64
cpuStat, err := cpu.PercentWithContext(ctx, 0, false)
if err == nil {
cpuUsed = cpuStat[0]
}

if err := w.heartBeater.SendHeartbeat(ctx, HeartbeatData{
WorkerID: w.id,
StartedAt: w.startedAt,
HeartbeatAt: time.Now(),
Queues: queues,
PID: pid,
MemoryUsage: memUsed,
CPUUsage: cpuUsed,
}); err != nil {
w.errorHandler(err)
}
}

sendHearBeat()

heartBeatTicker := time.NewTicker(time.Second * 10)
defer heartBeatTicker.Stop()

for {
select {
case <-ctx.Done():
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*5)
defer cancel()
if err := w.heartBeater.RemoveHeartbeat(ctx, w.id); err != nil {
w.errorHandler(err)
}
cancel()
return
case <-heartBeatTicker.C:
if err := w.heartBeater.SendHeartbeat(ctx, HeartbeatData{
WorkerID: w.id,
StartedAt: w.startedAt,
HeartbeatAt: time.Now(),
Queues: queues,
PID: pid,
}); err != nil {
w.errorHandler(err)
}
sendHearBeat()
}
}
}
Expand Down Expand Up @@ -459,8 +469,6 @@ func (w *Worker) monitorQueues(ctx context.Context) {
type HeartbeatQueueData struct {
Name string
Concurrency int
MaxAttempts int
Timeout time.Duration
}

type HeartbeatData struct {
Expand All @@ -469,6 +477,8 @@ type HeartbeatData struct {
HeartbeatAt time.Time
Queues []HeartbeatQueueData
PID int
MemoryUsage float64
CPUUsage float64
}

type HeartBeater interface {
Expand Down