Skip to content

Commit

Permalink
WIP wait for informer caches to sync before running controllers
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Kriss <krisss@vmware.com>
  • Loading branch information
skriss committed Feb 25, 2020
1 parent 31dca0e commit 2604ef4
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 40 deletions.
41 changes: 31 additions & 10 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,10 +586,10 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.logLevel,
newPluginManager,
backupTracker,
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.config.defaultBackupLocation,
s.config.defaultBackupTTL,
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(),
defaultVolumeSnapshotLocations,
s.metrics,
s.config.formatFlag.Parse(),
Expand Down Expand Up @@ -656,7 +656,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}

restoreControllerRunInfo := func() controllerRunInfo {

restorer, err := restore.NewKubernetesRestorer(
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
Expand All @@ -675,9 +674,9 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.veleroClient.VeleroV1(),
s.veleroClient.VeleroV1(),
restorer,
s.sharedInformerFactory.Velero().V1().Backups(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations(),
s.sharedInformerFactory.Velero().V1().Backups().Lister(),
s.sharedInformerFactory.Velero().V1().BackupStorageLocations().Lister(),
s.sharedInformerFactory.Velero().V1().VolumeSnapshotLocations().Lister(),
s.logger,
s.logLevel,
newPluginManager,
Expand Down Expand Up @@ -771,18 +770,40 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}
}

// Instantiate the enabled controllers. This needs to be done *before*
// the shared informer factory is started, because the controller
// constructors add event handlers to various informers, which should
// be done before the informers are running.
controllers := make([]controllerRunInfo, 0, len(enabledControllers))
for _, newController := range enabledControllers {
controllerRunInfo := newController()
controllers = append(controllers, newController())
}

// start the informers & and wait for the caches to sync
go s.sharedInformerFactory.Start(ctx.Done())

s.logger.Info("Waiting for informer caches to sync")
cacheSyncResults := s.sharedInformerFactory.WaitForCacheSync(ctx.Done())
s.logger.Info("Done waiting for informer caches to sync")

for informer, synced := range cacheSyncResults {
if !synced {
return errors.Errorf("cache was not synced for informer %v", informer)
}
s.logger.WithField("informer", informer).Info("Informer cache synced")
}

// now that the informer caches have all synced, we can start running the controllers
for i := range controllers {
controllerRunInfo := controllers[i]

wg.Add(1)
go func() {
controllerRunInfo.controller.Run(ctx, controllerRunInfo.numWorkers)
wg.Done()
}()
}

// SHARED INFORMERS HAVE TO BE STARTED AFTER ALL CONTROLLERS
go s.sharedInformerFactory.Start(ctx.Done())

s.logger.Info("Server started successfully")

<-ctx.Done()
Expand Down
21 changes: 8 additions & 13 deletions pkg/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
pkgbackup "github.com/vmware-tanzu/velero/pkg/backup"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
Expand All @@ -57,16 +57,16 @@ type backupController struct {
*genericController

backupper pkgbackup.Backupper
lister listers.BackupLister
lister velerov1listers.BackupLister
client velerov1client.BackupsGetter
clock clock.Clock
backupLogLevel logrus.Level
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
backupLocationLister listers.BackupStorageLocationLister
backupLocationLister velerov1listers.BackupStorageLocationLister
defaultBackupLocation string
defaultBackupTTL time.Duration
snapshotLocationLister listers.VolumeSnapshotLocationLister
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister
defaultSnapshotLocations map[string]string
metrics *metrics.ServerMetrics
newBackupStore func(*velerov1api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error)
Expand All @@ -81,10 +81,10 @@ func NewBackupController(
backupLogLevel logrus.Level,
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupTracker BackupTracker,
backupLocationInformer informers.BackupStorageLocationInformer,
backupLocationLister velerov1listers.BackupStorageLocationLister,
defaultBackupLocation string,
defaultBackupTTL time.Duration,
volumeSnapshotLocationInformer informers.VolumeSnapshotLocationInformer,
volumeSnapshotLocationLister velerov1listers.VolumeSnapshotLocationLister,
defaultSnapshotLocations map[string]string,
metrics *metrics.ServerMetrics,
formatFlag logging.Format,
Expand All @@ -98,10 +98,10 @@ func NewBackupController(
backupLogLevel: backupLogLevel,
newPluginManager: newPluginManager,
backupTracker: backupTracker,
backupLocationLister: backupLocationInformer.Lister(),
backupLocationLister: backupLocationLister,
defaultBackupLocation: defaultBackupLocation,
defaultBackupTTL: defaultBackupTTL,
snapshotLocationLister: volumeSnapshotLocationInformer.Lister(),
snapshotLocationLister: volumeSnapshotLocationLister,
defaultSnapshotLocations: defaultSnapshotLocations,
metrics: metrics,
formatFlag: formatFlag,
Expand All @@ -110,11 +110,6 @@ func NewBackupController(
}

c.syncHandler = c.processBackup
c.cacheSyncWaiters = append(c.cacheSyncWaiters,
backupInformer.Informer().HasSynced,
backupLocationInformer.Informer().HasSynced,
volumeSnapshotLocationInformer.Informer().HasSynced,
)
c.resyncFunc = c.resync
c.resyncPeriod = time.Minute

Expand Down
29 changes: 12 additions & 17 deletions pkg/controller/restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
Expand Down Expand Up @@ -78,10 +78,10 @@ type restoreController struct {
restoreClient velerov1client.RestoresGetter
podVolumeBackupClient velerov1client.PodVolumeBackupsGetter
restorer pkgrestore.Restorer
backupLister listers.BackupLister
restoreLister listers.RestoreLister
backupLocationLister listers.BackupStorageLocationLister
snapshotLocationLister listers.VolumeSnapshotLocationLister
backupLister velerov1listers.BackupLister
restoreLister velerov1listers.RestoreLister
backupLocationLister velerov1listers.BackupStorageLocationLister
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister
restoreLogLevel logrus.Level
defaultBackupLocation string
metrics *metrics.ServerMetrics
Expand All @@ -97,9 +97,9 @@ func NewRestoreController(
restoreClient velerov1client.RestoresGetter,
podVolumeBackupClient velerov1client.PodVolumeBackupsGetter,
restorer pkgrestore.Restorer,
backupInformer informers.BackupInformer,
backupLocationInformer informers.BackupStorageLocationInformer,
snapshotLocationInformer informers.VolumeSnapshotLocationInformer,
backupLister velerov1listers.BackupLister,
backupLocationLister velerov1listers.BackupStorageLocationLister,
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister,
logger logrus.FieldLogger,
restoreLogLevel logrus.Level,
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
Expand All @@ -113,10 +113,10 @@ func NewRestoreController(
restoreClient: restoreClient,
podVolumeBackupClient: podVolumeBackupClient,
restorer: restorer,
backupLister: backupInformer.Lister(),
backupLister: backupLister,
restoreLister: restoreInformer.Lister(),
backupLocationLister: backupLocationInformer.Lister(),
snapshotLocationLister: snapshotLocationInformer.Lister(),
backupLocationLister: backupLocationLister,
snapshotLocationLister: snapshotLocationLister,
restoreLogLevel: restoreLogLevel,
defaultBackupLocation: defaultBackupLocation,
metrics: metrics,
Expand All @@ -129,12 +129,6 @@ func NewRestoreController(
}

c.syncHandler = c.processQueueItem
c.cacheSyncWaiters = append(c.cacheSyncWaiters,
backupInformer.Informer().HasSynced,
restoreInformer.Informer().HasSynced,
backupLocationInformer.Informer().HasSynced,
snapshotLocationInformer.Informer().HasSynced,
)
c.resyncFunc = c.resync
c.resyncPeriod = time.Minute

Expand Down Expand Up @@ -445,6 +439,7 @@ func (c *restoreController) runValidatedRestore(restore *api.Restore, info backu
defer closeAndRemoveFile(backupFile, c.logger)

opts := restic.NewPodVolumeBackupListOptions(restore.Spec.BackupName)

podVolumeBackupList, err := c.podVolumeBackupClient.PodVolumeBackups(c.namespace).List(opts)
if err != nil {
return errors.WithStack(err)
Expand Down

0 comments on commit 2604ef4

Please sign in to comment.