Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile-taskqueue-manager
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ COPY --from=frontend-builder /app/frontend/dist/spa/ ./frontend/
# Expose the service port
EXPOSE 8080

ENV WEB_STATIC_DIR '/app/frontend/'
ENV WEB_STATIC_DIR='/app/frontend/'
# Command to run the service
ENTRYPOINT ["/app/taskqueue-manager"]
17 changes: 11 additions & 6 deletions taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *Server) initHandler() http.Handler {
mux.Handle("POST /api/pending-queues/{queue_name}/toggle-status", http.HandlerFunc(s.togglePendingQueueStatus))
mux.Handle("POST /api/dead-queues/{queue_name}/requeue-all", http.HandlerFunc(s.requeueAllDeadJobs))
mux.Handle("DELETE /api/dead-queues/{queue_name}/delete-all", http.HandlerFunc(s.deleteAllDeadJobs))
mux.Handle("GET /api/metrics/jobs/processed", http.HandlerFunc(s.jobProcessedMetrics))
mux.Handle("GET /api/metrics-range", http.HandlerFunc(s.metricsRange))
mux.Handle("GET /", http.FileServer(http.Dir(s.webStaticDir)))

handler := cors.AllowAll().Handler(mux)
Expand All @@ -116,9 +116,8 @@ func (s *Server) withLog(h http.Handler) http.Handler {
})
}

// GET /api/metrics/jobs/processed
func (s *Server) jobProcessedMetrics(w http.ResponseWriter, r *http.Request) {
// Parse query parameters
// GET /api/metrics-range
func (s *Server) metricsRange(w http.ResponseWriter, r *http.Request) {
query, err := getMetricsRangeQueryParam(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -127,12 +126,12 @@ func (s *Server) jobProcessedMetrics(w http.ResponseWriter, r *http.Request) {

mt, err := s.metricsBackend.QueryRangeCounterValues(
r.Context(),
taskqueue.Metric{Name: taskqueue.MetricJobProcessedCount},
taskqueue.Metric{Name: query.Name},
query.Start,
query.End,
)
if err != nil {
http.Error(w, "Failed to query pending queues "+err.Error(), http.StatusInternalServerError)
http.Error(w, "Failed to query metrics "+err.Error(), http.StatusInternalServerError)
return
}

Expand Down Expand Up @@ -528,6 +527,11 @@ func getPagination(r *http.Request) (taskqueue.Pagination, error) {
}

func getMetricsRangeQueryParam(r *http.Request) (MetricsQueryParam, error) {
name := r.URL.Query().Get("name")
if name == "" {
return MetricsQueryParam{}, errors.New("name is required")
}

startTimeStr := r.URL.Query().Get("start")

startTime := time.Now().Add(-time.Hour * 24)
Expand Down Expand Up @@ -562,6 +566,7 @@ func getMetricsRangeQueryParam(r *http.Request) (MetricsQueryParam, error) {
fmt.Println(startTime, endTime, stepInt)

return MetricsQueryParam{
Name: name,
Start: startTime,
End: endTime,
Step: time.Duration(stepInt) * time.Second,
Expand Down
5 changes: 3 additions & 2 deletions taskmanager/taskqueue-web/src/api/TaskManagerClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ class TaskManagerClient {
}
}

async fetchJobProcessedMetrics(start, end, step) {
async queryMetricsRange(metric, start, end, step) {
try {
const response = await this.api.get('/metrics/jobs/processed', {
const response = await this.api.get('/metrics-range', {
params: {
name: metric,
start: start,
end: end,
step: step,
Expand Down
74 changes: 56 additions & 18 deletions taskmanager/taskqueue-web/src/components/JobProcessingGraph.vue
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<q-icon color="blue" name="fas fa-chart-line" size="44px"/>
</q-item-section>
<q-item-section>
<div class="text-h6">Job Processed</div>
<div class="text-h6">Job Processing Metrics</div>
</q-item-section>
<q-item-section side>
<q-select
Expand All @@ -29,9 +29,20 @@
@update:model-value="updateMetrics"
/>
</q-item-section>
<q-item-section side>
<q-btn
flat
dense
color="primary"
icon="refresh"
label="Refresh"
@click="updateMetrics"
class="q-ml-auto"
/>
</q-item-section>
</q-item>
</q-card-section>
<q-card-section>
<q-card-section class="q-pa-none">
<v-chart :option="chartOptions" class="chart-container" autoresize/>
</q-card-section>
</q-card>
Expand All @@ -41,11 +52,11 @@
import {use} from "echarts/core";
import {CanvasRenderer} from "echarts/renderers";
import {LineChart} from "echarts/charts";
import {GridComponent, TitleComponent, TooltipComponent} from "echarts/components";
import {GridComponent, LegendComponent, TitleComponent, TooltipComponent} from "echarts/components";
import VChart from "vue-echarts";

// Register ECharts components
use([CanvasRenderer, LineChart, GridComponent, TooltipComponent, TitleComponent]);
use([CanvasRenderer, LineChart, GridComponent, TooltipComponent, TitleComponent, LegendComponent]);

export default {
name: 'JobProcessingGraph',
Expand All @@ -58,6 +69,17 @@ export default {
tooltip: {
trigger: 'axis',
},
legend: {
data: ['Jobs Processed', 'Jobs Failed'],
textStyle: {
color: '#333', // Ensure legend text is readable
},
itemStyle: {
color: (name) => {
return name === 'Jobs Processed' ? '#42A5F5' : '#F44336';
},
},
},
xAxis: {
type: 'category',
data: [],
Expand All @@ -67,19 +89,24 @@ export default {
},
yAxis: {
type: 'value',
name: 'Jobs Processed',
name: 'Count',
},
series: [
{
name: 'Jobs Processed',
data: [],
type: 'line',
smooth: true,
areaStyle: {
color: 'rgba(66, 165, 245, 0.2)',
},
lineStyle: {
color: '#42A5F5',
},
areaStyle: {color: 'rgba(66, 165, 245, 0.2)'},
lineStyle: {color: '#42A5F5'},
},
{
name: 'Jobs Failed',
data: [],
type: 'line',
smooth: true,
areaStyle: {color: 'rgba(244, 67, 54, 0.2)'},
lineStyle: {color: '#F44336'},
},
],
},
Expand All @@ -102,17 +129,28 @@ export default {
},
methods: {
async fetchMetrics(startTime, endTime, step) {
const metricJobProcessedCount = "job_processed_count"
const metricsJobFailedCount = "job_failed_count"

this.loading = true;
try {
const {values} = await this.$taskManagerClient.fetchJobProcessedMetrics(startTime, endTime, step);
const labels = values.map(item => new Date(item.timestamp * 1000).toLocaleTimeString([], {
hour: '2-digit',
minute: '2-digit'
}));
const data = values.map(item => item.value);
const [processedResponse, failedResponse] = await Promise.all([
this.$taskManagerClient.queryMetricsRange(metricJobProcessedCount, startTime, endTime, step),
this.$taskManagerClient.queryMetricsRange(metricsJobFailedCount, startTime, endTime, step),
]);

const processedData = processedResponse.values;
const failedData = failedResponse.values;

const labels = processedData.map(item =>
new Date(item.timestamp * 1000).toLocaleTimeString([], {hour: '2-digit', minute: '2-digit'})
);
const processedValues = processedData.map(item => item.value);
const failedValues = failedData.map(item => item.value);

this.chartOptions.xAxis.data = labels;
this.chartOptions.series[0].data = data;
this.chartOptions.series[0].data = processedValues;
this.chartOptions.series[1].data = failedValues;
} catch (error) {
console.error('Error fetching metrics:', error);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</q-item>
</q-card-section>
<q-card-section>
<v-chart :option="chartOptions" class="chart-container"/>
<v-chart :option="chartOptions" class="chart-container" autoresize/>
</q-card-section>
</q-card>
</template>
Expand Down
4 changes: 1 addition & 3 deletions taskmanager/taskqueue-web/src/components/WorkerList.vue
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@
<div class="row items-center">
<div class="col">
<div class="text-h6">{{ worker.workerID }}</div>
<q-badge color="green" align="left">
Active
</q-badge>
<q-badge color="green">Active</q-badge>
</div>
</div>
</q-card-section>
Expand Down
1 change: 1 addition & 0 deletions taskmanager/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type MetricValue struct {
}

type MetricsQueryParam struct {
Name string
Start time.Time
End time.Time
Step time.Duration
Expand Down
3 changes: 3 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func (w *Worker) processJob(ctx context.Context, job *Job, h *queueHandler) erro
return w.queue.Ack(ctx, job, &AckOptions{QueueName: h.queueName})
}

_ = w.metricsBackend.IncrementCounter(ctx, Metric{Name: MetricJobFailedCount}, 1, time.Now())

nackOpts := &NackOptions{
QueueName: h.queueName,
RetryAfter: h.jobOptions.BackoffFunc(job.Attempts),
Expand Down Expand Up @@ -479,6 +481,7 @@ const (
MetricPendingQueueSize = "pending_queue_size"
MetricDeadQueueSize = "dead_queue_size"
MetricJobProcessedCount = "job_processed_count"
MetricJobFailedCount = "job_failed_count"
)

type MetricRangeValue struct {
Expand Down