Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data upload/download metrics #6493

Merged
merged 1 commit into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/6493-allenxu404
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add data upload and download metrics
8 changes: 4 additions & 4 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ func (s *nodeAgentServer) run() {
s.logger.Fatalf("Failed to start metric server for node agent at [%s]: %v", s.metricsAddress, err)
}
}()
s.metrics = metrics.NewPodVolumeMetrics()
s.metrics = metrics.NewNodeMetrics()
s.metrics.RegisterAllMetrics()
s.metrics.InitPodVolumeMetricsForNode(s.nodeName)
s.metrics.InitMetricsForNode(s.nodeName)

s.markInProgressCRsFailed()

Expand Down Expand Up @@ -260,13 +260,13 @@ func (s *nodeAgentServer) run() {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}

dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger)
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.markDataUploadsCancel(dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger)
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.markDataDownloadsCancel(dataDownloadReconciler)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
Expand Down
12 changes: 11 additions & 1 deletion pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
datamover "github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
repository "github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
Expand All @@ -63,10 +64,11 @@ type DataDownloadReconciler struct {
repositoryEnsurer *repository.Ensurer
dataPathMgr *datapath.Manager
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
}

func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface,
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger) *DataDownloadReconciler {
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
Expand All @@ -79,6 +81,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
dataPathMgr: datapath.NewManager(1),
preparingTimeout: preparingTimeout,
metrics: metrics,
}
}

Expand Down Expand Up @@ -301,6 +304,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na
log.WithError(err).Error("error updating data download status")
} else {
log.Infof("Data download is marked as %s", dd.Status.Phase)
r.metrics.RegisterDataDownloadSuccess(r.nodeName)
}
}

Expand Down Expand Up @@ -343,6 +347,8 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status")
} else {
r.metrics.RegisterDataDownloadCancel(r.nodeName)
}
}
}
Expand Down Expand Up @@ -497,6 +503,8 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v

if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil {
log.WithError(patchErr).Error("error updating DataDownload status")
} else {
r.metrics.RegisterDataDownloadFailure(r.nodeName)
}

return err
Expand Down Expand Up @@ -548,6 +556,8 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))

log.Info("Dataupload has been cleaned up")

r.metrics.RegisterDataDownloadFailure(r.nodeName)
}

func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload,
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/datapath"
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"

Expand Down Expand Up @@ -136,7 +137,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...
if err != nil {
return nil, err
}
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger()), nil
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}

func TestDataDownloadReconcile(t *testing.T) {
Expand Down
15 changes: 13 additions & 2 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
Expand All @@ -71,11 +72,12 @@ type DataUploadReconciler struct {
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataPathMgr *datapath.Manager
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
}

func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface,
csiSnapshotClient snapshotter.SnapshotV1Interface, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution,
cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger) *DataUploadReconciler {
cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler {
return &DataUploadReconciler{
client: client,
kubeClient: kubeClient,
Expand All @@ -89,6 +91,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa
snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)},
dataPathMgr: datapath.NewManager(1),
preparingTimeout: preparingTimeout,
metrics: metrics,
}
}

Expand Down Expand Up @@ -308,8 +311,10 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp

if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating DataUpload status")
} else {
log.Info("Data upload completed")
r.metrics.RegisterDataUploadSuccess(r.nodeName)
}
log.Info("Data upload completed")
}

func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) {
Expand Down Expand Up @@ -360,6 +365,8 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating DataUpload status")
} else {
r.metrics.RegisterDataUploadCancel(r.nodeName)
}
}
}
Expand Down Expand Up @@ -518,6 +525,8 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil {
log.WithError(patchErr).Error("error updating DataUpload status")
} else {
r.metrics.RegisterDataUploadFailure(r.nodeName)
}

return err
Expand Down Expand Up @@ -580,6 +589,8 @@ func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov

log.Info("Dataupload has been cleaned up")
}

r.metrics.RegisterDataUploadFailure(r.nodeName)
}

func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload,
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/repository"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
Expand Down Expand Up @@ -193,7 +194,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
return nil, err
}
return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), nil,
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger()), nil
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}

func dataUploadBuilder() *builder.DataUploadBuilder {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/pod_volume_backup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
r := PodVolumeBackupReconciler{
Client: fakeClient,
clock: testclocks.NewFakeClock(now),
metrics: metrics.NewPodVolumeMetrics(),
metrics: metrics.NewNodeMetrics(),
credentialGetter: &credentials.CredentialGetter{FromFile: credentialFileStore},
nodeName: "test_node",
fileSystem: fakeFS,
Expand Down
120 changes: 118 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ const (
podVolumeOperationLatencySeconds = "pod_volume_operation_latency_seconds"
podVolumeOperationLatencyGaugeSeconds = "pod_volume_operation_latency_seconds_gauge"

// data mover metrics
DataUploadSuccessTotal = "data_upload_success_total"
DataUploadFailureTotal = "data_upload_failure_total"
DataUploadCancelTotal = "data_upload_cancel_total"
DataDownloadSuccessTotal = "data_download_success_total"
DataDownloadFailureTotal = "data_download_failure_total"
DataDownloadCancelTotal = "data_download_cancel_total"

// Labels
nodeMetricLabel = "node"
podVolumeOperationLabel = "operation"
Expand Down Expand Up @@ -319,7 +327,7 @@ func NewServerMetrics() *ServerMetrics {
}
}

func NewPodVolumeMetrics() *ServerMetrics {
func NewNodeMetrics() *ServerMetrics {
return &ServerMetrics{
metrics: map[string]prometheus.Collector{
podVolumeBackupEnqueueTotal: prometheus.NewCounterVec(
Expand Down Expand Up @@ -365,6 +373,54 @@ func NewPodVolumeMetrics() *ServerMetrics {
},
[]string{nodeMetricLabel, podVolumeOperationLabel, backupNameLabel, pvbNameLabel},
),
DataUploadSuccessTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: podVolumeMetricsNamespace,
Name: DataUploadSuccessTotal,
Help: "Total number of successful uploaded snapshots",
},
[]string{nodeMetricLabel},
),
DataUploadFailureTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: podVolumeMetricsNamespace,
Name: DataUploadFailureTotal,
Help: "Total number of failed uploaded snapshots",
},
[]string{nodeMetricLabel},
),
DataUploadCancelTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: podVolumeMetricsNamespace,
Name: DataUploadCancelTotal,
Help: "Total number of canceled uploaded snapshots",
},
[]string{nodeMetricLabel},
),
DataDownloadSuccessTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: podVolumeMetricsNamespace,
Name: DataDownloadSuccessTotal,
Help: "Total number of successful downloaded snapshots",
},
[]string{nodeMetricLabel},
),
DataDownloadFailureTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: podVolumeMetricsNamespace,
Name: DataDownloadFailureTotal,
Help: "Total number of failed downloaded snapshots",
},
[]string{nodeMetricLabel},
),
DataDownloadCancelTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: podVolumeMetricsNamespace,
Name: DataDownloadCancelTotal,
Help: "Total number of canceled downloaded snapshots",
},
[]string{nodeMetricLabel},
),
},
}
}
Expand Down Expand Up @@ -450,13 +506,31 @@ func (m *ServerMetrics) InitSchedule(scheduleName string) {
}

// InitSchedule initializes counter metrics for a node.
func (m *ServerMetrics) InitPodVolumeMetricsForNode(node string) {
func (m *ServerMetrics) InitMetricsForNode(node string) {
if c, ok := m.metrics[podVolumeBackupEnqueueTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[podVolumeBackupDequeueTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[DataUploadSuccessTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[DataUploadFailureTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[DataUploadCancelTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[DataDownloadSuccessTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[DataDownloadFailureTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[DataDownloadCancelTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
}

// RegisterPodVolumeBackupEnqueue records enqueuing of a PodVolumeBackup object.
Expand All @@ -473,6 +547,48 @@ func (m *ServerMetrics) RegisterPodVolumeBackupDequeue(node string) {
}
}

// RegisterDataUploadSuccess records successful uploaded snapshots.
func (m *ServerMetrics) RegisterDataUploadSuccess(node string) {
if c, ok := m.metrics[DataUploadSuccessTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Inc()
}
}

// RegisterDataUploadFailure records failed uploaded snapshots.
func (m *ServerMetrics) RegisterDataUploadFailure(node string) {
if c, ok := m.metrics[DataUploadFailureTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Inc()
}
}

// RegisterDataUploadCancel records canceled uploaded snapshots.
func (m *ServerMetrics) RegisterDataUploadCancel(node string) {
if c, ok := m.metrics[DataUploadCancelTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Inc()
}
}

// RegisterDataDownloadSuccess records successful downloaded snapshots.
func (m *ServerMetrics) RegisterDataDownloadSuccess(node string) {
if c, ok := m.metrics[DataDownloadSuccessTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Inc()
}
}

// RegisterDataDownloadFailure records failed downloaded snapshots.
func (m *ServerMetrics) RegisterDataDownloadFailure(node string) {
if c, ok := m.metrics[DataDownloadFailureTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Inc()
}
}

// RegisterDataDownloadCancel records canceled downloaded snapshots.
func (m *ServerMetrics) RegisterDataDownloadCancel(node string) {
if c, ok := m.metrics[DataDownloadCancelTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Inc()
}
}

// ObservePodVolumeOpLatency records the number of seconds a pod volume operation took.
func (m *ServerMetrics) ObservePodVolumeOpLatency(node, pvbName, opName, backupName string, seconds float64) {
if h, ok := m.metrics[podVolumeOperationLatencySeconds].(*prometheus.HistogramVec); ok {
Expand Down