diff --git a/Dockerfile-taskqueue-manager b/Dockerfile-taskqueue-manager
index c00fc3a..37078c2 100644
--- a/Dockerfile-taskqueue-manager
+++ b/Dockerfile-taskqueue-manager
@@ -48,6 +48,6 @@ COPY --from=frontend-builder /app/frontend/dist/spa/ ./frontend/
# Expose the service port
EXPOSE 8080
-ENV WEB_STATIC_DIR '/app/frontend/'
+ENV WEB_STATIC_DIR='/app/frontend/'
# Command to run the service
ENTRYPOINT ["/app/taskqueue-manager"]
diff --git a/taskmanager/taskmanager.go b/taskmanager/taskmanager.go
index c31251c..b531ff6 100644
--- a/taskmanager/taskmanager.go
+++ b/taskmanager/taskmanager.go
@@ -98,7 +98,7 @@ func (s *Server) initHandler() http.Handler {
mux.Handle("POST /api/pending-queues/{queue_name}/toggle-status", http.HandlerFunc(s.togglePendingQueueStatus))
mux.Handle("POST /api/dead-queues/{queue_name}/requeue-all", http.HandlerFunc(s.requeueAllDeadJobs))
mux.Handle("DELETE /api/dead-queues/{queue_name}/delete-all", http.HandlerFunc(s.deleteAllDeadJobs))
- mux.Handle("GET /api/metrics/jobs/processed", http.HandlerFunc(s.jobProcessedMetrics))
+ mux.Handle("GET /api/metrics-range", http.HandlerFunc(s.metricsRange))
mux.Handle("GET /", http.FileServer(http.Dir(s.webStaticDir)))
handler := cors.AllowAll().Handler(mux)
@@ -116,9 +116,8 @@ func (s *Server) withLog(h http.Handler) http.Handler {
})
}
-// GET /api/metrics/jobs/processed
-func (s *Server) jobProcessedMetrics(w http.ResponseWriter, r *http.Request) {
- // Parse query parameters
+// GET /api/metrics-range
+func (s *Server) metricsRange(w http.ResponseWriter, r *http.Request) {
query, err := getMetricsRangeQueryParam(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
@@ -127,12 +126,12 @@ func (s *Server) jobProcessedMetrics(w http.ResponseWriter, r *http.Request) {
mt, err := s.metricsBackend.QueryRangeCounterValues(
r.Context(),
- taskqueue.Metric{Name: taskqueue.MetricJobProcessedCount},
+ taskqueue.Metric{Name: query.Name},
query.Start,
query.End,
)
if err != nil {
- http.Error(w, "Failed to query pending queues "+err.Error(), http.StatusInternalServerError)
+ http.Error(w, "Failed to query metrics "+err.Error(), http.StatusInternalServerError)
return
}
@@ -528,6 +527,11 @@ func getPagination(r *http.Request) (taskqueue.Pagination, error) {
}
func getMetricsRangeQueryParam(r *http.Request) (MetricsQueryParam, error) {
+ name := r.URL.Query().Get("name")
+ if name == "" {
+ return MetricsQueryParam{}, errors.New("name is required")
+ }
+
startTimeStr := r.URL.Query().Get("start")
startTime := time.Now().Add(-time.Hour * 24)
@@ -562,6 +566,7 @@ func getMetricsRangeQueryParam(r *http.Request) (MetricsQueryParam, error) {
fmt.Println(startTime, endTime, stepInt)
return MetricsQueryParam{
+ Name: name,
Start: startTime,
End: endTime,
Step: time.Duration(stepInt) * time.Second,
diff --git a/taskmanager/taskqueue-web/src/api/TaskManagerClient.js b/taskmanager/taskqueue-web/src/api/TaskManagerClient.js
index d3e58b4..5f1a225 100644
--- a/taskmanager/taskqueue-web/src/api/TaskManagerClient.js
+++ b/taskmanager/taskqueue-web/src/api/TaskManagerClient.js
@@ -27,10 +27,11 @@ class TaskManagerClient {
}
}
- async fetchJobProcessedMetrics(start, end, step) {
+ async queryMetricsRange(metric, start, end, step) {
try {
- const response = await this.api.get('/metrics/jobs/processed', {
+ const response = await this.api.get('/metrics-range', {
params: {
+ name: metric,
start: start,
end: end,
step: step,
diff --git a/taskmanager/taskqueue-web/src/components/JobProcessingGraph.vue b/taskmanager/taskqueue-web/src/components/JobProcessingGraph.vue
index 078e884..a98de1d 100644
--- a/taskmanager/taskqueue-web/src/components/JobProcessingGraph.vue
+++ b/taskmanager/taskqueue-web/src/components/JobProcessingGraph.vue
@@ -6,7 +6,7 @@
- Job Processed
+ Job Processing Metrics
+
+
+
-
+
@@ -41,11 +52,11 @@
import {use} from "echarts/core";
import {CanvasRenderer} from "echarts/renderers";
import {LineChart} from "echarts/charts";
-import {GridComponent, TitleComponent, TooltipComponent} from "echarts/components";
+import {GridComponent, LegendComponent, TitleComponent, TooltipComponent} from "echarts/components";
import VChart from "vue-echarts";
// Register ECharts components
-use([CanvasRenderer, LineChart, GridComponent, TooltipComponent, TitleComponent]);
+use([CanvasRenderer, LineChart, GridComponent, TooltipComponent, TitleComponent, LegendComponent]);
export default {
name: 'JobProcessingGraph',
@@ -58,6 +69,17 @@ export default {
tooltip: {
trigger: 'axis',
},
+ legend: {
+ data: ['Jobs Processed', 'Jobs Failed'],
+ textStyle: {
+ color: '#333', // Ensure legend text is readable
+ },
+ itemStyle: {
+ color: (name) => {
+ return name === 'Jobs Processed' ? '#42A5F5' : '#F44336';
+ },
+ },
+ },
xAxis: {
type: 'category',
data: [],
@@ -67,19 +89,24 @@ export default {
},
yAxis: {
type: 'value',
- name: 'Jobs Processed',
+ name: 'Count',
},
series: [
{
+ name: 'Jobs Processed',
data: [],
type: 'line',
smooth: true,
- areaStyle: {
- color: 'rgba(66, 165, 245, 0.2)',
- },
- lineStyle: {
- color: '#42A5F5',
- },
+ areaStyle: {color: 'rgba(66, 165, 245, 0.2)'},
+ lineStyle: {color: '#42A5F5'},
+ },
+ {
+ name: 'Jobs Failed',
+ data: [],
+ type: 'line',
+ smooth: true,
+ areaStyle: {color: 'rgba(244, 67, 54, 0.2)'},
+ lineStyle: {color: '#F44336'},
},
],
},
@@ -102,17 +129,28 @@ export default {
},
methods: {
async fetchMetrics(startTime, endTime, step) {
+ const metricJobProcessedCount = "job_processed_count"
+ const metricsJobFailedCount = "job_failed_count"
+
this.loading = true;
try {
- const {values} = await this.$taskManagerClient.fetchJobProcessedMetrics(startTime, endTime, step);
- const labels = values.map(item => new Date(item.timestamp * 1000).toLocaleTimeString([], {
- hour: '2-digit',
- minute: '2-digit'
- }));
- const data = values.map(item => item.value);
+ const [processedResponse, failedResponse] = await Promise.all([
+ this.$taskManagerClient.queryMetricsRange(metricJobProcessedCount, startTime, endTime, step),
+ this.$taskManagerClient.queryMetricsRange(metricsJobFailedCount, startTime, endTime, step),
+ ]);
+
+ const processedData = processedResponse.values;
+ const failedData = failedResponse.values;
+
+ const labels = processedData.map(item =>
+ new Date(item.timestamp * 1000).toLocaleTimeString([], {hour: '2-digit', minute: '2-digit'})
+ );
+ const processedValues = processedData.map(item => item.value);
+ const failedValues = failedData.map(item => item.value);
this.chartOptions.xAxis.data = labels;
- this.chartOptions.series[0].data = data;
+ this.chartOptions.series[0].data = processedValues;
+ this.chartOptions.series[1].data = failedValues;
} catch (error) {
console.error('Error fetching metrics:', error);
} finally {
diff --git a/taskmanager/taskqueue-web/src/components/QueueStatistics.vue b/taskmanager/taskqueue-web/src/components/QueueStatistics.vue
index 6ecb2ac..13476cb 100644
--- a/taskmanager/taskqueue-web/src/components/QueueStatistics.vue
+++ b/taskmanager/taskqueue-web/src/components/QueueStatistics.vue
@@ -11,7 +11,7 @@
-
+
diff --git a/taskmanager/taskqueue-web/src/components/WorkerList.vue b/taskmanager/taskqueue-web/src/components/WorkerList.vue
index a4a7d86..1a6c7da 100644
--- a/taskmanager/taskqueue-web/src/components/WorkerList.vue
+++ b/taskmanager/taskqueue-web/src/components/WorkerList.vue
@@ -44,9 +44,7 @@
{{ worker.workerID }}
-
- Active
-
+
Active
diff --git a/taskmanager/types.go b/taskmanager/types.go
index 3c88abf..62578ce 100644
--- a/taskmanager/types.go
+++ b/taskmanager/types.go
@@ -81,6 +81,7 @@ type MetricValue struct {
}
type MetricsQueryParam struct {
+ Name string
Start time.Time
End time.Time
Step time.Duration
diff --git a/worker.go b/worker.go
index 029a499..091e4e5 100644
--- a/worker.go
+++ b/worker.go
@@ -283,6 +283,8 @@ func (w *Worker) processJob(ctx context.Context, job *Job, h *queueHandler) erro
return w.queue.Ack(ctx, job, &AckOptions{QueueName: h.queueName})
}
+ _ = w.metricsBackend.IncrementCounter(ctx, Metric{Name: MetricJobFailedCount}, 1, time.Now())
+
nackOpts := &NackOptions{
QueueName: h.queueName,
RetryAfter: h.jobOptions.BackoffFunc(job.Attempts),
@@ -479,6 +481,7 @@ const (
MetricPendingQueueSize = "pending_queue_size"
MetricDeadQueueSize = "dead_queue_size"
MetricJobProcessedCount = "job_processed_count"
+ MetricJobFailedCount = "job_failed_count"
)
type MetricRangeValue struct {