From 302de6ebc776823366020cc5bd1b91d8b158e5fb Mon Sep 17 00:00:00 2001 From: Oshank Kumar Date: Fri, 31 Jan 2025 12:15:14 +0530 Subject: [PATCH] Add Job Failure Metrics in UI --- Dockerfile-taskqueue-manager | 2 +- taskmanager/taskmanager.go | 17 +++-- .../src/api/TaskManagerClient.js | 5 +- .../src/components/JobProcessingGraph.vue | 74 ++++++++++++++----- .../src/components/QueueStatistics.vue | 2 +- .../src/components/WorkerList.vue | 4 +- taskmanager/types.go | 1 + worker.go | 3 + 8 files changed, 77 insertions(+), 31 deletions(-) diff --git a/Dockerfile-taskqueue-manager b/Dockerfile-taskqueue-manager index c00fc3a..37078c2 100644 --- a/Dockerfile-taskqueue-manager +++ b/Dockerfile-taskqueue-manager @@ -48,6 +48,6 @@ COPY --from=frontend-builder /app/frontend/dist/spa/ ./frontend/ # Expose the service port EXPOSE 8080 -ENV WEB_STATIC_DIR '/app/frontend/' +ENV WEB_STATIC_DIR='/app/frontend/' # Command to run the service ENTRYPOINT ["/app/taskqueue-manager"] diff --git a/taskmanager/taskmanager.go b/taskmanager/taskmanager.go index c31251c..b531ff6 100644 --- a/taskmanager/taskmanager.go +++ b/taskmanager/taskmanager.go @@ -98,7 +98,7 @@ func (s *Server) initHandler() http.Handler { mux.Handle("POST /api/pending-queues/{queue_name}/toggle-status", http.HandlerFunc(s.togglePendingQueueStatus)) mux.Handle("POST /api/dead-queues/{queue_name}/requeue-all", http.HandlerFunc(s.requeueAllDeadJobs)) mux.Handle("DELETE /api/dead-queues/{queue_name}/delete-all", http.HandlerFunc(s.deleteAllDeadJobs)) - mux.Handle("GET /api/metrics/jobs/processed", http.HandlerFunc(s.jobProcessedMetrics)) + mux.Handle("GET /api/metrics-range", http.HandlerFunc(s.metricsRange)) mux.Handle("GET /", http.FileServer(http.Dir(s.webStaticDir))) handler := cors.AllowAll().Handler(mux) @@ -116,9 +116,8 @@ func (s *Server) withLog(h http.Handler) http.Handler { }) } -// GET /api/metrics/jobs/processed -func (s *Server) jobProcessedMetrics(w http.ResponseWriter, r *http.Request) { - // Parse query parameters +// GET /api/metrics-range +func (s *Server) metricsRange(w http.ResponseWriter, r *http.Request) { query, err := getMetricsRangeQueryParam(r) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -127,12 +126,12 @@ func (s *Server) jobProcessedMetrics(w http.ResponseWriter, r *http.Request) { mt, err := s.metricsBackend.QueryRangeCounterValues( r.Context(), - taskqueue.Metric{Name: taskqueue.MetricJobProcessedCount}, + taskqueue.Metric{Name: query.Name}, query.Start, query.End, ) if err != nil { - http.Error(w, "Failed to query pending queues "+err.Error(), http.StatusInternalServerError) + http.Error(w, "Failed to query metrics "+err.Error(), http.StatusInternalServerError) return } @@ -528,6 +527,11 @@ func getPagination(r *http.Request) (taskqueue.Pagination, error) { } func getMetricsRangeQueryParam(r *http.Request) (MetricsQueryParam, error) { + name := r.URL.Query().Get("name") + if name == "" { + return MetricsQueryParam{}, errors.New("name is required") + } + startTimeStr := r.URL.Query().Get("start") startTime := time.Now().Add(-time.Hour * 24) @@ -562,6 +566,7 @@ func getMetricsRangeQueryParam(r *http.Request) (MetricsQueryParam, error) { fmt.Println(startTime, endTime, stepInt) return MetricsQueryParam{ + Name: name, Start: startTime, End: endTime, Step: time.Duration(stepInt) * time.Second, diff --git a/taskmanager/taskqueue-web/src/api/TaskManagerClient.js b/taskmanager/taskqueue-web/src/api/TaskManagerClient.js index d3e58b4..5f1a225 100644 --- a/taskmanager/taskqueue-web/src/api/TaskManagerClient.js +++ b/taskmanager/taskqueue-web/src/api/TaskManagerClient.js @@ -27,10 +27,11 @@ class TaskManagerClient { } } - async fetchJobProcessedMetrics(start, end, step) { + async queryMetricsRange(metric, start, end, step) { try { - const response = await this.api.get('/metrics/jobs/processed', { + const response = await this.api.get('/metrics-range', { params: { + name: metric, start: start, end: end, step: step, diff --git a/taskmanager/taskqueue-web/src/components/JobProcessingGraph.vue b/taskmanager/taskqueue-web/src/components/JobProcessingGraph.vue index 078e884..a98de1d 100644 --- a/taskmanager/taskqueue-web/src/components/JobProcessingGraph.vue +++ b/taskmanager/taskqueue-web/src/components/JobProcessingGraph.vue @@ -6,7 +6,7 @@ -
Job Processed
+
Job Processing Metrics
+ + + - + @@ -41,11 +52,11 @@ import {use} from "echarts/core"; import {CanvasRenderer} from "echarts/renderers"; import {LineChart} from "echarts/charts"; -import {GridComponent, TitleComponent, TooltipComponent} from "echarts/components"; +import {GridComponent, LegendComponent, TitleComponent, TooltipComponent} from "echarts/components"; import VChart from "vue-echarts"; // Register ECharts components -use([CanvasRenderer, LineChart, GridComponent, TooltipComponent, TitleComponent]); +use([CanvasRenderer, LineChart, GridComponent, TooltipComponent, TitleComponent, LegendComponent]); export default { name: 'JobProcessingGraph', @@ -58,6 +69,17 @@ export default { tooltip: { trigger: 'axis', }, + legend: { + data: ['Jobs Processed', 'Jobs Failed'], + textStyle: { + color: '#333', // Ensure legend text is readable + }, + itemStyle: { + color: (name) => { + return name === 'Jobs Processed' ? '#42A5F5' : '#F44336'; + }, + }, + }, xAxis: { type: 'category', data: [], @@ -67,19 +89,24 @@ export default { }, yAxis: { type: 'value', - name: 'Jobs Processed', + name: 'Count', }, series: [ { + name: 'Jobs Processed', data: [], type: 'line', smooth: true, - areaStyle: { - color: 'rgba(66, 165, 245, 0.2)', - }, - lineStyle: { - color: '#42A5F5', - }, + areaStyle: {color: 'rgba(66, 165, 245, 0.2)'}, + lineStyle: {color: '#42A5F5'}, + }, + { + name: 'Jobs Failed', + data: [], + type: 'line', + smooth: true, + areaStyle: {color: 'rgba(244, 67, 54, 0.2)'}, + lineStyle: {color: '#F44336'}, }, ], }, @@ -102,17 +129,28 @@ export default { }, methods: { async fetchMetrics(startTime, endTime, step) { + const metricJobProcessedCount = "job_processed_count" + const metricsJobFailedCount = "job_failed_count" + this.loading = true; try { - const {values} = await this.$taskManagerClient.fetchJobProcessedMetrics(startTime, endTime, step); - const labels = values.map(item => new Date(item.timestamp * 1000).toLocaleTimeString([], { - hour: '2-digit', - minute: '2-digit' - })); - const data = values.map(item => item.value); + const [processedResponse, failedResponse] = await Promise.all([ + this.$taskManagerClient.queryMetricsRange(metricJobProcessedCount, startTime, endTime, step), + this.$taskManagerClient.queryMetricsRange(metricsJobFailedCount, startTime, endTime, step), + ]); + + const processedData = processedResponse.values; + const failedData = failedResponse.values; + + const labels = processedData.map(item => + new Date(item.timestamp * 1000).toLocaleTimeString([], {hour: '2-digit', minute: '2-digit'}) + ); + const processedValues = processedData.map(item => item.value); + const failedValues = failedData.map(item => item.value); this.chartOptions.xAxis.data = labels; - this.chartOptions.series[0].data = data; + this.chartOptions.series[0].data = processedValues; + this.chartOptions.series[1].data = failedValues; } catch (error) { console.error('Error fetching metrics:', error); } finally { diff --git a/taskmanager/taskqueue-web/src/components/QueueStatistics.vue b/taskmanager/taskqueue-web/src/components/QueueStatistics.vue index 6ecb2ac..13476cb 100644 --- a/taskmanager/taskqueue-web/src/components/QueueStatistics.vue +++ b/taskmanager/taskqueue-web/src/components/QueueStatistics.vue @@ -11,7 +11,7 @@ - + diff --git a/taskmanager/taskqueue-web/src/components/WorkerList.vue b/taskmanager/taskqueue-web/src/components/WorkerList.vue index a4a7d86..1a6c7da 100644 --- a/taskmanager/taskqueue-web/src/components/WorkerList.vue +++ b/taskmanager/taskqueue-web/src/components/WorkerList.vue @@ -44,9 +44,7 @@
{{ worker.workerID }}
- - Active - + Active
diff --git a/taskmanager/types.go b/taskmanager/types.go index 3c88abf..62578ce 100644 --- a/taskmanager/types.go +++ b/taskmanager/types.go @@ -81,6 +81,7 @@ type MetricValue struct { } type MetricsQueryParam struct { + Name string Start time.Time End time.Time Step time.Duration diff --git a/worker.go b/worker.go index 029a499..091e4e5 100644 --- a/worker.go +++ b/worker.go @@ -283,6 +283,8 @@ func (w *Worker) processJob(ctx context.Context, job *Job, h *queueHandler) erro return w.queue.Ack(ctx, job, &AckOptions{QueueName: h.queueName}) } + _ = w.metricsBackend.IncrementCounter(ctx, Metric{Name: MetricJobFailedCount}, 1, time.Now()) + nackOpts := &NackOptions{ QueueName: h.queueName, RetryAfter: h.jobOptions.BackoffFunc(job.Attempts), @@ -479,6 +481,7 @@ const ( MetricPendingQueueSize = "pending_queue_size" MetricDeadQueueSize = "dead_queue_size" MetricJobProcessedCount = "job_processed_count" + MetricJobFailedCount = "job_failed_count" ) type MetricRangeValue struct {