Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait for informer caches to sync before running controllers #2299

Merged
merged 2 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/2299-skriss
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
refactoring: wait for all informer caches to sync before running controllers
65 changes: 42 additions & 23 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.veleroClient.VeleroV1(),
s.veleroClient.VeleroV1(),
s.veleroClient.VeleroV1(),
s.sharedInformerFactory.Velero().V1().Backups(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().PodVolumeBackups(),
s.sharedInformerFactory.Velero().V1().Backups().Lister(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.config.backupSyncPeriod,
s.namespace,
s.config.defaultBackupLocation,
Expand Down Expand Up @@ -586,10 +585,10 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.logLevel,
newPluginManager,
backupTracker,
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.config.defaultBackupLocation,
s.config.defaultBackupTTL,
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(),
defaultVolumeSnapshotLocations,
s.metrics,
s.config.formatFlag.Parse(),
Expand Down Expand Up @@ -621,9 +620,9 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
gcController := controller.NewGCController(
s.logger,
s.sharedInformerFactory.Velero().V1().Backups(),
s.sharedInformerFactory.Velero().V1().DeleteBackupRequests(),
s.sharedInformerFactory.Velero().V1().DeleteBackupRequests().Lister(),
s.veleroClient.VeleroV1(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
)

return controllerRunInfo{
Expand All @@ -638,13 +637,13 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.sharedInformerFactory.Velero().V1().DeleteBackupRequests(),
s.veleroClient.VeleroV1(), // deleteBackupRequestClient
s.veleroClient.VeleroV1(), // backupClient
s.sharedInformerFactory.Velero().V1().Restores(),
s.sharedInformerFactory.Velero().V1().Restores().Lister(),
s.veleroClient.VeleroV1(), // restoreClient
backupTracker,
s.resticManager,
s.sharedInformerFactory.Velero().V1().PodVolumeBackups(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations(),
s.sharedInformerFactory.Velero().V1().PodVolumeBackups().Lister(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(),
newPluginManager,
s.metrics,
)
Expand All @@ -656,7 +655,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}

restoreControllerRunInfo := func() controllerRunInfo {

restorer, err := restore.NewKubernetesRestorer(
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
Expand All @@ -675,9 +673,9 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.veleroClient.VeleroV1(),
s.veleroClient.VeleroV1(),
restorer,
s.sharedInformerFactory.Velero().V1().Backups(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations(),
s.sharedInformerFactory.Velero().V1().Backups().Lister(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(),
s.logger,
s.logLevel,
newPluginManager,
Expand All @@ -697,7 +695,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.logger,
s.sharedInformerFactory.Velero().V1().ResticRepositories(),
s.veleroClient.VeleroV1(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.resticManager,
s.config.defaultResticMaintenanceFrequency,
)
Expand All @@ -712,9 +710,9 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
downloadRequestController := controller.NewDownloadRequestController(
s.veleroClient.VeleroV1(),
s.sharedInformerFactory.Velero().V1().DownloadRequests(),
s.sharedInformerFactory.Velero().V1().Restores(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().Backups(),
s.sharedInformerFactory.Velero().V1().Restores().Lister(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.sharedInformerFactory.Velero().V1().Backups().Lister(),
newPluginManager,
s.logger,
)
Expand Down Expand Up @@ -771,18 +769,39 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}
}

// Instantiate the enabled controllers. This needs to be done *before*
// the shared informer factory is started, because the controller
// constructors add event handlers to various informers, which should
// be done before the informers are running.
controllers := make([]controllerRunInfo, 0, len(enabledControllers))
for _, newController := range enabledControllers {
controllerRunInfo := newController()
controllers = append(controllers, newController())
}

// start the informers & and wait for the caches to sync
s.sharedInformerFactory.Start(ctx.Done())
s.logger.Info("Waiting for informer caches to sync")
cacheSyncResults := s.sharedInformerFactory.WaitForCacheSync(ctx.Done())
s.logger.Info("Done waiting for informer caches to sync")

for informer, synced := range cacheSyncResults {
if !synced {
return errors.Errorf("cache was not synced for informer %v", informer)
}
s.logger.WithField("informer", informer).Info("Informer cache synced")
}

// now that the informer caches have all synced, we can start running the controllers
for i := range controllers {
controllerRunInfo := controllers[i]

wg.Add(1)
go func() {
controllerRunInfo.controller.Run(ctx, controllerRunInfo.numWorkers)
wg.Done()
}()
}

// SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS
go s.sharedInformerFactory.Start(ctx.Done())

s.logger.Info("Server started successfully")

<-ctx.Done()
Expand Down
25 changes: 10 additions & 15 deletions pkg/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
pkgbackup "github.com/vmware-tanzu/velero/pkg/backup"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
Expand All @@ -57,34 +57,34 @@ type backupController struct {
*genericController

backupper pkgbackup.Backupper
lister listers.BackupLister
lister velerov1listers.BackupLister
client velerov1client.BackupsGetter
clock clock.Clock
backupLogLevel logrus.Level
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
backupLocationLister listers.BackupStorageLocationLister
backupLocationLister velerov1listers.BackupStorageLocationLister
defaultBackupLocation string
defaultBackupTTL time.Duration
snapshotLocationLister listers.VolumeSnapshotLocationLister
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister
defaultSnapshotLocations map[string]string
metrics *metrics.ServerMetrics
newBackupStore func(*velerov1api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error)
formatFlag logging.Format
}

func NewBackupController(
backupInformer informers.BackupInformer,
backupInformer velerov1informers.BackupInformer,
client velerov1client.BackupsGetter,
backupper pkgbackup.Backupper,
logger logrus.FieldLogger,
backupLogLevel logrus.Level,
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupTracker BackupTracker,
backupLocationInformer informers.BackupStorageLocationInformer,
backupLocationLister velerov1listers.BackupStorageLocationLister,
defaultBackupLocation string,
defaultBackupTTL time.Duration,
volumeSnapshotLocationInformer informers.VolumeSnapshotLocationInformer,
volumeSnapshotLocationLister velerov1listers.VolumeSnapshotLocationLister,
defaultSnapshotLocations map[string]string,
metrics *metrics.ServerMetrics,
formatFlag logging.Format,
Expand All @@ -98,10 +98,10 @@ func NewBackupController(
backupLogLevel: backupLogLevel,
newPluginManager: newPluginManager,
backupTracker: backupTracker,
backupLocationLister: backupLocationInformer.Lister(),
backupLocationLister: backupLocationLister,
defaultBackupLocation: defaultBackupLocation,
defaultBackupTTL: defaultBackupTTL,
snapshotLocationLister: volumeSnapshotLocationInformer.Lister(),
snapshotLocationLister: volumeSnapshotLocationLister,
defaultSnapshotLocations: defaultSnapshotLocations,
metrics: metrics,
formatFlag: formatFlag,
Expand All @@ -110,11 +110,6 @@ func NewBackupController(
}

c.syncHandler = c.processBackup
c.cacheSyncWaiters = append(c.cacheSyncWaiters,
backupInformer.Informer().HasSynced,
backupLocationInformer.Informer().HasSynced,
volumeSnapshotLocationInformer.Informer().HasSynced,
)
c.resyncFunc = c.resync
c.resyncPeriod = time.Minute

Expand Down
42 changes: 17 additions & 25 deletions pkg/controller/backup_deletion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
v1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
pkgbackup "github.com/vmware-tanzu/velero/pkg/backup"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
Expand All @@ -53,15 +53,15 @@ type backupDeletionController struct {
*genericController

deleteBackupRequestClient velerov1client.DeleteBackupRequestsGetter
deleteBackupRequestLister listers.DeleteBackupRequestLister
deleteBackupRequestLister velerov1listers.DeleteBackupRequestLister
backupClient velerov1client.BackupsGetter
restoreLister listers.RestoreLister
restoreLister velerov1listers.RestoreLister
restoreClient velerov1client.RestoresGetter
backupTracker BackupTracker
resticMgr restic.RepositoryManager
podvolumeBackupLister listers.PodVolumeBackupLister
backupLocationLister listers.BackupStorageLocationLister
snapshotLocationLister listers.VolumeSnapshotLocationLister
podvolumeBackupLister velerov1listers.PodVolumeBackupLister
backupLocationLister velerov1listers.BackupStorageLocationLister
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister
processRequestFunc func(*v1.DeleteBackupRequest) error
clock clock.Clock
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
Expand All @@ -72,16 +72,16 @@ type backupDeletionController struct {
// NewBackupDeletionController creates a new backup deletion controller.
func NewBackupDeletionController(
logger logrus.FieldLogger,
deleteBackupRequestInformer informers.DeleteBackupRequestInformer,
deleteBackupRequestInformer velerov1informers.DeleteBackupRequestInformer,
deleteBackupRequestClient velerov1client.DeleteBackupRequestsGetter,
backupClient velerov1client.BackupsGetter,
restoreInformer informers.RestoreInformer,
restoreLister velerov1listers.RestoreLister,
restoreClient velerov1client.RestoresGetter,
backupTracker BackupTracker,
resticMgr restic.RepositoryManager,
podvolumeBackupInformer informers.PodVolumeBackupInformer,
backupLocationInformer informers.BackupStorageLocationInformer,
snapshotLocationInformer informers.VolumeSnapshotLocationInformer,
podvolumeBackupLister velerov1listers.PodVolumeBackupLister,
backupLocationLister velerov1listers.BackupStorageLocationLister,
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister,
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
metrics *metrics.ServerMetrics,
) Interface {
Expand All @@ -90,13 +90,13 @@ func NewBackupDeletionController(
deleteBackupRequestClient: deleteBackupRequestClient,
deleteBackupRequestLister: deleteBackupRequestInformer.Lister(),
backupClient: backupClient,
restoreLister: restoreInformer.Lister(),
restoreLister: restoreLister,
restoreClient: restoreClient,
backupTracker: backupTracker,
resticMgr: resticMgr,
podvolumeBackupLister: podvolumeBackupInformer.Lister(),
backupLocationLister: backupLocationInformer.Lister(),
snapshotLocationLister: snapshotLocationInformer.Lister(),
podvolumeBackupLister: podvolumeBackupLister,
backupLocationLister: backupLocationLister,
snapshotLocationLister: snapshotLocationLister,
metrics: metrics,
// use variables to refer to these functions so they can be
// replaced with fakes for testing.
Expand All @@ -107,14 +107,6 @@ func NewBackupDeletionController(
}

c.syncHandler = c.processQueueItem
c.cacheSyncWaiters = append(
c.cacheSyncWaiters,
deleteBackupRequestInformer.Informer().HasSynced,
restoreInformer.Informer().HasSynced,
podvolumeBackupInformer.Informer().HasSynced,
backupLocationInformer.Informer().HasSynced,
snapshotLocationInformer.Informer().HasSynced,
)
c.processRequestFunc = c.processRequest

deleteBackupRequestInformer.Informer().AddEventHandler(
Expand Down Expand Up @@ -383,7 +375,7 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e

func volumeSnapshotterForSnapshotLocation(
namespace, snapshotLocationName string,
snapshotLocationLister listers.VolumeSnapshotLocationLister,
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister,
pluginManager clientmgmt.Manager,
) (velero.VolumeSnapshotter, error) {
snapshotLocation, err := snapshotLocationLister.VolumeSnapshotLocations(namespace).Get(snapshotLocationName)
Expand Down
24 changes: 12 additions & 12 deletions pkg/controller/backup_deletion_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func TestBackupDeletionControllerProcessQueueItem(t *testing.T) {
sharedInformers.Velero().V1().DeleteBackupRequests(),
client.VeleroV1(), // deleteBackupRequestClient
client.VeleroV1(), // backupClient
sharedInformers.Velero().V1().Restores(),
sharedInformers.Velero().V1().Restores().Lister(),
client.VeleroV1(), // restoreClient
NewBackupTracker(),
nil, // restic repository manager
sharedInformers.Velero().V1().PodVolumeBackups(),
sharedInformers.Velero().V1().BackupStorageLocations(),
sharedInformers.Velero().V1().VolumeSnapshotLocations(),
sharedInformers.Velero().V1().PodVolumeBackups().Lister(),
sharedInformers.Velero().V1().BackupStorageLocations().Lister(),
sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(),
nil, // new plugin manager func
metrics.NewServerMetrics(),
).(*backupDeletionController)
Expand Down Expand Up @@ -145,13 +145,13 @@ func setupBackupDeletionControllerTest(objects ...runtime.Object) *backupDeletio
sharedInformers.Velero().V1().DeleteBackupRequests(),
client.VeleroV1(), // deleteBackupRequestClient
client.VeleroV1(), // backupClient
sharedInformers.Velero().V1().Restores(),
sharedInformers.Velero().V1().Restores().Lister(),
client.VeleroV1(), // restoreClient
NewBackupTracker(),
nil, // restic repository manager
sharedInformers.Velero().V1().PodVolumeBackups(),
sharedInformers.Velero().V1().BackupStorageLocations(),
sharedInformers.Velero().V1().VolumeSnapshotLocations(),
sharedInformers.Velero().V1().PodVolumeBackups().Lister(),
sharedInformers.Velero().V1().BackupStorageLocations().Lister(),
sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(),
func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
metrics.NewServerMetrics(),
).(*backupDeletionController),
Expand Down Expand Up @@ -843,13 +843,13 @@ func TestBackupDeletionControllerDeleteExpiredRequests(t *testing.T) {
sharedInformers.Velero().V1().DeleteBackupRequests(),
client.VeleroV1(), // deleteBackupRequestClient
client.VeleroV1(), // backupClient
sharedInformers.Velero().V1().Restores(),
sharedInformers.Velero().V1().Restores().Lister(),
client.VeleroV1(), // restoreClient
NewBackupTracker(),
nil,
sharedInformers.Velero().V1().PodVolumeBackups(),
sharedInformers.Velero().V1().BackupStorageLocations(),
sharedInformers.Velero().V1().VolumeSnapshotLocations(),
sharedInformers.Velero().V1().PodVolumeBackups().Lister(),
sharedInformers.Velero().V1().BackupStorageLocations().Lister(),
sharedInformers.Velero().V1().VolumeSnapshotLocations().Lister(),
nil, // new plugin manager func
metrics.NewServerMetrics(),
).(*backupDeletionController)
Expand Down