diff --git a/changelogs/unreleased/6493-allenxu404 b/changelogs/unreleased/6493-allenxu404 new file mode 100644 index 0000000000..cc61aaa86d --- /dev/null +++ b/changelogs/unreleased/6493-allenxu404 @@ -0,0 +1 @@ +Add data upload and download metrics \ No newline at end of file diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 51c0a45006..ffbb5ca74a 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -224,9 +224,9 @@ func (s *nodeAgentServer) run() { s.logger.Fatalf("Failed to start metric server for node agent at [%s]: %v", s.metricsAddress, err) } }() - s.metrics = metrics.NewPodVolumeMetrics() + s.metrics = metrics.NewNodeMetrics() s.metrics.RegisterAllMetrics() - s.metrics.InitPodVolumeMetricsForNode(s.nodeName) + s.metrics.InitMetricsForNode(s.nodeName) s.markInProgressCRsFailed() @@ -260,13 +260,13 @@ func (s *nodeAgentServer) run() { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } - dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger) + dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) s.markDataUploadsCancel(dataUploadReconciler) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } - dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger) + dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) s.markDataDownloadsCancel(dataDownloadReconciler) if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 2243b17dec..d41a262863 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -44,6 +44,7 @@ import ( datamover "github.com/vmware-tanzu/velero/pkg/datamover" "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/exposer" + "github.com/vmware-tanzu/velero/pkg/metrics" repository "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" @@ -63,10 +64,11 @@ type DataDownloadReconciler struct { repositoryEnsurer *repository.Ensurer dataPathMgr *datapath.Manager preparingTimeout time.Duration + metrics *metrics.ServerMetrics } func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, - repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger) *DataDownloadReconciler { + repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { return &DataDownloadReconciler{ client: client, kubeClient: kubeClient, @@ -79,6 +81,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), dataPathMgr: datapath.NewManager(1), preparingTimeout: preparingTimeout, + metrics: metrics, } } @@ -301,6 +304,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na log.WithError(err).Error("error updating data download status") } else { log.Infof("Data download is marked as %s", dd.Status.Phase) + r.metrics.RegisterDataDownloadSuccess(r.nodeName) } } @@ -343,6 +347,8 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { log.WithError(err).Error("error updating data download status") + } else { + r.metrics.RegisterDataDownloadCancel(r.nodeName) } } } @@ -497,6 +503,8 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil { log.WithError(patchErr).Error("error updating DataDownload status") + } else { + r.metrics.RegisterDataDownloadFailure(r.nodeName) } return err @@ -548,6 +556,8 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) log.Info("Dataupload has been cleaned up") + + r.metrics.RegisterDataDownloadFailure(r.nodeName) } func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index f70706a7ef..a0a9b645f0 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -46,6 +46,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/datapath" datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks" "github.com/vmware-tanzu/velero/pkg/exposer" + "github.com/vmware-tanzu/velero/pkg/metrics" velerotest "github.com/vmware-tanzu/velero/pkg/test" "github.com/vmware-tanzu/velero/pkg/uploader" @@ -136,7 +137,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... if err != nil { return nil, err } - return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger()), nil + return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func TestDataDownloadReconcile(t *testing.T) { diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index a20f009c9e..de54afd0dd 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -46,6 +46,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/datamover" "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/exposer" + "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" @@ -71,11 +72,12 @@ type DataUploadReconciler struct { snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer dataPathMgr *datapath.Manager preparingTimeout time.Duration + metrics *metrics.ServerMetrics } func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, - cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger) *DataUploadReconciler { + cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { return &DataUploadReconciler{ client: client, kubeClient: kubeClient, @@ -89,6 +91,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)}, dataPathMgr: datapath.NewManager(1), preparingTimeout: preparingTimeout, + metrics: metrics, } } @@ -308,8 +311,10 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { log.WithError(err).Error("error updating DataUpload status") + } else { + log.Info("Data upload completed") + r.metrics.RegisterDataUploadSuccess(r.nodeName) } - log.Info("Data upload completed") } func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) { @@ -360,6 +365,8 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { log.WithError(err).Error("error updating DataUpload status") + } else { + r.metrics.RegisterDataUploadCancel(r.nodeName) } } } @@ -518,6 +525,8 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil { log.WithError(patchErr).Error("error updating DataUpload status") + } else { + r.metrics.RegisterDataUploadFailure(r.nodeName) } return err @@ -580,6 +589,8 @@ func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov log.Info("Dataupload has been cleaned up") } + + r.metrics.RegisterDataUploadFailure(r.nodeName) } func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 761a8d66db..3e0c046065 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -46,6 +46,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/builder" "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/exposer" + "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/repository" velerotest "github.com/vmware-tanzu/velero/pkg/test" "github.com/vmware-tanzu/velero/pkg/uploader" @@ -193,7 +194,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci return nil, err } return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), nil, - testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger()), nil + testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func dataUploadBuilder() *builder.DataUploadBuilder { diff --git a/pkg/controller/pod_volume_backup_controller_test.go b/pkg/controller/pod_volume_backup_controller_test.go index a7ec9f59c6..25fcae80a0 100644 --- a/pkg/controller/pod_volume_backup_controller_test.go +++ b/pkg/controller/pod_volume_backup_controller_test.go @@ -192,7 +192,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { r := PodVolumeBackupReconciler{ Client: fakeClient, clock: testclocks.NewFakeClock(now), - metrics: metrics.NewPodVolumeMetrics(), + metrics: metrics.NewNodeMetrics(), credentialGetter: &credentials.CredentialGetter{FromFile: credentialFileStore}, nodeName: "test_node", fileSystem: fakeFS, diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 75d2e79d53..e6879f363c 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -66,6 +66,14 @@ const ( podVolumeOperationLatencySeconds = "pod_volume_operation_latency_seconds" podVolumeOperationLatencyGaugeSeconds = "pod_volume_operation_latency_seconds_gauge" + // data mover metrics + DataUploadSuccessTotal = "data_upload_success_total" + DataUploadFailureTotal = "data_upload_failure_total" + DataUploadCancelTotal = "data_upload_cancel_total" + DataDownloadSuccessTotal = "data_download_success_total" + DataDownloadFailureTotal = "data_download_failure_total" + DataDownloadCancelTotal = "data_download_cancel_total" + // Labels nodeMetricLabel = "node" podVolumeOperationLabel = "operation" @@ -319,7 +327,7 @@ func NewServerMetrics() *ServerMetrics { } } -func NewPodVolumeMetrics() *ServerMetrics { +func NewNodeMetrics() *ServerMetrics { return &ServerMetrics{ metrics: map[string]prometheus.Collector{ podVolumeBackupEnqueueTotal: prometheus.NewCounterVec( @@ -365,6 +373,54 @@ func NewPodVolumeMetrics() *ServerMetrics { }, []string{nodeMetricLabel, podVolumeOperationLabel, backupNameLabel, pvbNameLabel}, ), + DataUploadSuccessTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: podVolumeMetricsNamespace, + Name: DataUploadSuccessTotal, + Help: "Total number of successful uploaded snapshots", + }, + []string{nodeMetricLabel}, + ), + DataUploadFailureTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: podVolumeMetricsNamespace, + Name: DataUploadFailureTotal, + Help: "Total number of failed uploaded snapshots", + }, + []string{nodeMetricLabel}, + ), + DataUploadCancelTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: podVolumeMetricsNamespace, + Name: DataUploadCancelTotal, + Help: "Total number of canceled uploaded snapshots", + }, + []string{nodeMetricLabel}, + ), + DataDownloadSuccessTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: podVolumeMetricsNamespace, + Name: DataDownloadSuccessTotal, + Help: "Total number of successful downloaded snapshots", + }, + []string{nodeMetricLabel}, + ), + DataDownloadFailureTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: podVolumeMetricsNamespace, + Name: DataDownloadFailureTotal, + Help: "Total number of failed downloaded snapshots", + }, + []string{nodeMetricLabel}, + ), + DataDownloadCancelTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: podVolumeMetricsNamespace, + Name: DataDownloadCancelTotal, + Help: "Total number of canceled downloaded snapshots", + }, + []string{nodeMetricLabel}, + ), }, } } @@ -450,13 +506,31 @@ func (m *ServerMetrics) InitSchedule(scheduleName string) { } // InitSchedule initializes counter metrics for a node. -func (m *ServerMetrics) InitPodVolumeMetricsForNode(node string) { +func (m *ServerMetrics) InitMetricsForNode(node string) { if c, ok := m.metrics[podVolumeBackupEnqueueTotal].(*prometheus.CounterVec); ok { c.WithLabelValues(node).Add(0) } if c, ok := m.metrics[podVolumeBackupDequeueTotal].(*prometheus.CounterVec); ok { c.WithLabelValues(node).Add(0) } + if c, ok := m.metrics[DataUploadSuccessTotal].(*prometheus.CounterVec); ok { + c.WithLabelValues(node).Add(0) + } + if c, ok := m.metrics[DataUploadFailureTotal].(*prometheus.CounterVec); ok { + c.WithLabelValues(node).Add(0) + } + if c, ok := m.metrics[DataUploadCancelTotal].(*prometheus.CounterVec); ok { + c.WithLabelValues(node).Add(0) + } + if c, ok := m.metrics[DataDownloadSuccessTotal].(*prometheus.CounterVec); ok { + c.WithLabelValues(node).Add(0) + } + if c, ok := m.metrics[DataDownloadFailureTotal].(*prometheus.CounterVec); ok { + c.WithLabelValues(node).Add(0) + } + if c, ok := m.metrics[DataDownloadCancelTotal].(*prometheus.CounterVec); ok { + c.WithLabelValues(node).Add(0) + } } // RegisterPodVolumeBackupEnqueue records enqueuing of a PodVolumeBackup object. @@ -473,6 +547,48 @@ func (m *ServerMetrics) RegisterPodVolumeBackupDequeue(node string) { } } +// RegisterDataUploadSuccess records successful uploaded snapshots. +func (m *ServerMetrics) RegisterDataUploadSuccess(node string) { + if c, ok := m.metrics[DataUploadSuccessTotal].(*prometheus.CounterVec); ok { + c.WithLabelValues(node).Inc() + } +} + +// RegisterDataUploadFailure records failed uploaded snapshots. +func (m *ServerMetrics) RegisterDataUploadFailure(node string) { + if c, ok := m.metrics[DataUploadFailureTotal].(*prometheus.CounterVec); ok { + c.WithLabelValues(node).Inc() + } +} + +// RegisterDataUploadCancel records canceled uploaded snapshots. +func (m *ServerMetrics) RegisterDataUploadCancel(node string) { + if c, ok := m.metrics[DataUploadCancelTotal].(*prometheus.CounterVec); ok { + c.WithLabelValues(node).Inc() + } +} + +// RegisterDataDownloadSuccess records successful downloaded snapshots. +func (m *ServerMetrics) RegisterDataDownloadSuccess(node string) { + if c, ok := m.metrics[DataDownloadSuccessTotal].(*prometheus.CounterVec); ok { + c.WithLabelValues(node).Inc() + } +} + +// RegisterDataDownloadFailure records failed downloaded snapshots. +func (m *ServerMetrics) RegisterDataDownloadFailure(node string) { + if c, ok := m.metrics[DataDownloadFailureTotal].(*prometheus.CounterVec); ok { + c.WithLabelValues(node).Inc() + } +} + +// RegisterDataDownloadCancel records canceled downloaded snapshots. +func (m *ServerMetrics) RegisterDataDownloadCancel(node string) { + if c, ok := m.metrics[DataDownloadCancelTotal].(*prometheus.CounterVec); ok { + c.WithLabelValues(node).Inc() + } +} + // ObservePodVolumeOpLatency records the number of seconds a pod volume operation took. func (m *ServerMetrics) ObservePodVolumeOpLatency(node, pvbName, opName, backupName string, seconds float64) { if h, ok := m.metrics[podVolumeOperationLatencySeconds].(*prometheus.HistogramVec); ok {