Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix catsrc pod hash logic #3102

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/controller/operators/catalog/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2120,7 +2120,13 @@ func toManifest(t *testing.T, obj runtime.Object) string {
}

func pod(s v1alpha1.CatalogSource) *corev1.Pod {
pod := reconciler.Pod(&s, "registry-server", "central-opm", "central-util", s.Spec.Image, s.GetName(), s.GetLabels(), s.GetAnnotations(), 5, 10, 1001)
serviceAccount := &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: s.GetNamespace(),
Name: s.GetName(),
},
}
pod := reconciler.Pod(&s, "registry-server", "central-opm", "central-util", s.Spec.Image, serviceAccount, s.GetLabels(), s.GetAnnotations(), 5, 10, 1001)
ownerutil.AddOwner(pod, &s, false, true)
return pod
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/reconciler/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *configMapCatalogSourceDecorator) Service() *corev1.Service {
}

func (s *configMapCatalogSourceDecorator) Pod(image string) *corev1.Pod {
pod := Pod(s.CatalogSource, "configmap-registry-server", "", "", image, "", s.Labels(), s.Annotations(), 5, 5, s.runAsUser)
pod := Pod(s.CatalogSource, "configmap-registry-server", "", "", image, nil, s.Labels(), s.Annotations(), 5, 5, s.runAsUser)
pod.Spec.ServiceAccountName = s.GetName() + ConfigMapServerPostfix
pod.Spec.Containers[0].Command = []string{"configmap-server", "-c", s.Spec.ConfigMap, "-n", s.GetNamespace()}
ownerutil.AddOwner(pod, s.CatalogSource, false, true)
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/registry/reconciler/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,11 @@ func objectsForCatalogSource(catsrc *v1alpha1.CatalogSource) []runtime.Object {
case v1alpha1.SourceTypeGrpc:
if catsrc.Spec.Image != "" {
decorated := grpcCatalogSourceDecorator{CatalogSource: catsrc, createPodAsUser: runAsUser, opmImage: ""}
serviceAccount := decorated.ServiceAccount()
objs = clientfake.AddSimpleGeneratedNames(
decorated.Pod(catsrc.GetName()),
decorated.Pod(serviceAccount),
decorated.Service(),
decorated.ServiceAccount(),
serviceAccount,
)
}
}
Expand Down
68 changes: 43 additions & 25 deletions pkg/controller/registry/reconciler/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func (s *grpcCatalogSourceDecorator) ServiceAccount() *corev1.ServiceAccount {
}
}

func (s *grpcCatalogSourceDecorator) Pod(saName string) *corev1.Pod {
pod := Pod(s.CatalogSource, "registry-server", s.opmImage, s.utilImage, s.Spec.Image, saName, s.Labels(), s.Annotations(), 5, 10, s.createPodAsUser)
func (s *grpcCatalogSourceDecorator) Pod(serviceAccount *corev1.ServiceAccount) *corev1.Pod {
pod := Pod(s.CatalogSource, "registry-server", s.opmImage, s.utilImage, s.Spec.Image, serviceAccount, s.Labels(), s.Annotations(), 5, 10, s.createPodAsUser)
ownerutil.AddOwner(pod, s.CatalogSource, false, true)
return pod
}
Expand Down Expand Up @@ -191,14 +191,14 @@ func (c *GrpcRegistryReconciler) currentUpdatePods(source grpcCatalogSourceDecor
return pods
}

func (c *GrpcRegistryReconciler) currentPodsWithCorrectImageAndSpec(source grpcCatalogSourceDecorator, saName string) []*corev1.Pod {
func (c *GrpcRegistryReconciler) currentPodsWithCorrectImageAndSpec(source grpcCatalogSourceDecorator, serviceAccount *corev1.ServiceAccount) []*corev1.Pod {
pods, err := c.Lister.CoreV1().PodLister().Pods(source.GetNamespace()).List(labels.SelectorFromValidatedSet(source.Labels()))
if err != nil {
logrus.WithError(err).Warn("couldn't find pod in cache")
return nil
}
found := []*corev1.Pod{}
newPod := source.Pod(saName)
newPod := source.Pod(serviceAccount)
for _, p := range pods {
if correctImages(source, p) && podHashMatch(p, newPod) {
found = append(found, p)
Expand Down Expand Up @@ -231,20 +231,26 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.Ca

//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
sa, err := c.ensureSA(source)
// recreate the pod if no existing pod is serving the latest image or correct spec
overwritePod := overwrite || len(c.currentPodsWithCorrectImageAndSpec(source, sa.GetName())) == 0

if err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "error ensuring service account: %s", source.GetName())
}
if err := c.ensurePod(source, sa.GetName(), overwritePod); err != nil {
return errors.Wrapf(err, "error ensuring pod: %s", source.Pod(sa.Name).GetName())

sa, err = c.OpClient.GetServiceAccount(sa.GetNamespace(), sa.GetName())
if err != nil {
return err
}
if err := c.ensureUpdatePod(source, sa.Name); err != nil {

// recreate the pod if no existing pod is serving the latest image or correct spec
overwritePod := overwrite || len(c.currentPodsWithCorrectImageAndSpec(source, sa)) == 0

if err := c.ensurePod(source, sa, overwritePod); err != nil {
return errors.Wrapf(err, "error ensuring pod: %s", source.Pod(sa).GetName())
}
if err := c.ensureUpdatePod(source, sa); err != nil {
if _, ok := err.(UpdateNotReadyErr); ok {
return err
}
return errors.Wrapf(err, "error ensuring updated catalog source pod: %s", source.Pod(sa.Name).GetName())
return errors.Wrapf(err, "error ensuring updated catalog source pod: %s", source.Pod(sa).GetName())
}
if err := c.ensureService(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring service: %s", source.Service().GetName())
Expand Down Expand Up @@ -279,7 +285,7 @@ func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) bool {
return true
}

func (c *GrpcRegistryReconciler) ensurePod(source grpcCatalogSourceDecorator, saName string, overwrite bool) error {
func (c *GrpcRegistryReconciler) ensurePod(source grpcCatalogSourceDecorator, serviceAccount *corev1.ServiceAccount, overwrite bool) error {
// currentLivePods refers to the currently live instances of the catalog source
currentLivePods := c.currentPods(source)
if len(currentLivePods) > 0 {
Expand All @@ -292,16 +298,17 @@ func (c *GrpcRegistryReconciler) ensurePod(source grpcCatalogSourceDecorator, sa
}
}
}
_, err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Create(context.TODO(), source.Pod(saName), metav1.CreateOptions{})
desiredPod := source.Pod(serviceAccount)
_, err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Create(context.TODO(), desiredPod, metav1.CreateOptions{})
if err != nil {
return errors.Wrapf(err, "error creating new pod: %s", source.Pod(saName).GetGenerateName())
return errors.Wrapf(err, "error creating new pod: %s", desiredPod.GetGenerateName())
}

return nil
}

// ensureUpdatePod checks that for the same catalog source version the same container imageID is running
func (c *GrpcRegistryReconciler) ensureUpdatePod(source grpcCatalogSourceDecorator, saName string) error {
func (c *GrpcRegistryReconciler) ensureUpdatePod(source grpcCatalogSourceDecorator, serviceAccount *corev1.ServiceAccount) error {
if !source.Poll() {
return nil
}
Expand All @@ -311,7 +318,7 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(source grpcCatalogSourceDecorat

if source.Update() && len(currentUpdatePods) == 0 {
logrus.WithField("CatalogSource", source.GetName()).Debugf("catalog update required at %s", time.Now().String())
pod, err := c.createUpdatePod(source, saName)
pod, err := c.createUpdatePod(source, serviceAccount)
if err != nil {
return errors.Wrapf(err, "creating update catalog source pod")
}
Expand Down Expand Up @@ -419,14 +426,14 @@ func HashServiceSpec(spec corev1.ServiceSpec) string {
}

// createUpdatePod is an internal method that creates a pod using the latest catalog source.
func (c *GrpcRegistryReconciler) createUpdatePod(source grpcCatalogSourceDecorator, saName string) (*corev1.Pod, error) {
func (c *GrpcRegistryReconciler) createUpdatePod(source grpcCatalogSourceDecorator, serviceAccount *corev1.ServiceAccount) (*corev1.Pod, error) {
// remove label from pod to ensure service does not accidentally route traffic to the pod
p := source.Pod(saName)
p := source.Pod(serviceAccount)
p = swapLabels(p, "", source.Name)

pod, err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Create(context.TODO(), p, metav1.CreateOptions{})
if err != nil {
logrus.WithField("pod", source.Pod(saName).GetName()).Warn("couldn't create new catalogsource pod")
logrus.WithField("pod", source.Pod(serviceAccount).GetName()).Warn("couldn't create new catalogsource pod")
return nil, err
}

Expand Down Expand Up @@ -476,18 +483,29 @@ func (c *GrpcRegistryReconciler) removePods(pods []*corev1.Pod, namespace string
}

// CheckRegistryServer returns true if the given CatalogSource is considered healthy; false otherwise.
func (c *GrpcRegistryReconciler) CheckRegistryServer(catalogSource *v1alpha1.CatalogSource) (healthy bool, err error) {
func (c *GrpcRegistryReconciler) CheckRegistryServer(catalogSource *v1alpha1.CatalogSource) (bool, error) {
source := grpcCatalogSourceDecorator{CatalogSource: catalogSource, createPodAsUser: c.createPodAsUser, opmImage: c.opmImage, utilImage: c.utilImage}

// The CheckRegistryServer function is called by the CatalogSoruce controller before the registry resources are created,
// returning a IsNotFound error will cause the controller to exit and never create the resources, so we should
// only return an error if it is something other than a NotFound error.
serviceAccount := source.ServiceAccount()
serviceAccount, err := c.OpClient.GetServiceAccount(serviceAccount.GetNamespace(), serviceAccount.GetName())
if err != nil {
if !apierrors.IsNotFound(err) {
return false, err
}
return false, nil
}

// Check on registry resources
// TODO: add gRPC health check
if len(c.currentPodsWithCorrectImageAndSpec(source, source.ServiceAccount().GetName())) < 1 ||
if len(c.currentPodsWithCorrectImageAndSpec(source, serviceAccount)) < 1 ||
c.currentService(source) == nil || c.currentServiceAccount(source) == nil {
healthy = false
return
return false, nil
}

healthy = true
return
return true, nil
}

// promoteCatalog swaps the labels on the update pod so that the update pod is now reachable by the catalog service.
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/registry/reconciler/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ func TestGrpcRegistryReconciler(t *testing.T) {

// Check for resource existence
decorated := grpcCatalogSourceDecorator{CatalogSource: tt.in.catsrc, createPodAsUser: runAsUser}
pod := decorated.Pod(tt.in.catsrc.GetName())
service := decorated.Service()
sa := decorated.ServiceAccount()
pod := decorated.Pod(sa)
service := decorated.Service()
listOptions := metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set{CatalogSourceLabelKey: tt.in.catsrc.GetName()}).String()}
outPods, podErr := client.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).List(context.TODO(), listOptions)
outService, serviceErr := client.KubernetesInterface().CoreV1().Services(service.GetNamespace()).Get(context.TODO(), service.GetName(), metav1.GetOptions{})
Expand Down Expand Up @@ -451,7 +451,7 @@ func TestRegistryPodPriorityClass(t *testing.T) {

// Check for resource existence
decorated := grpcCatalogSourceDecorator{CatalogSource: tt.in.catsrc, createPodAsUser: runAsUser}
pod := decorated.Pod(tt.in.catsrc.GetName())
pod := decorated.Pod(serviceAccount(tt.in.catsrc.Namespace, tt.in.catsrc.Name))
listOptions := metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set{CatalogSourceLabelKey: tt.in.catsrc.GetName()}).String()}
outPods, podErr := client.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).List(context.TODO(), listOptions)
require.NoError(t, podErr)
Expand Down
14 changes: 13 additions & 1 deletion pkg/controller/registry/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewRegistryReconcilerFactory(lister operatorlister.OperatorLister, opClient
}
}

func Pod(source *operatorsv1alpha1.CatalogSource, name, opmImg, utilImage, img, saName string, labels, annotations map[string]string, readinessDelay, livenessDelay int32, runAsUser int64) *corev1.Pod {
func Pod(source *operatorsv1alpha1.CatalogSource, name, opmImg, utilImage, img string, serviceAccount *corev1.ServiceAccount, labels, annotations map[string]string, readinessDelay, livenessDelay int32, runAsUser int64) *corev1.Pod {
// make a copy of the labels and annotations to avoid mutating the input parameters
podLabels := make(map[string]string)
podAnnotations := make(map[string]string)
Expand All @@ -129,6 +129,15 @@ func Pod(source *operatorsv1alpha1.CatalogSource, name, opmImg, utilImage, img,
podAnnotations[key] = value
}

// Default case for nil serviceAccount
var saName string
var saImagePullSecrets []corev1.LocalObjectReference
// If the serviceAccount is not nil, set the fields that should appear on the pod
if serviceAccount != nil {
saName = serviceAccount.GetName()
saImagePullSecrets = serviceAccount.ImagePullSecrets
}

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: source.GetName() + "-",
Expand Down Expand Up @@ -192,6 +201,9 @@ func Pod(source *operatorsv1alpha1.CatalogSource, name, opmImg, utilImage, img,
"kubernetes.io/os": "linux",
},
ServiceAccountName: saName,
// If this field is not set, there that the is a chance that pod will be created without the imagePullSecret
// defined by the serviceAccount
ImagePullSecrets: saImagePullSecrets,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line change is more or less the purpose for the entire PR.

},
}

Expand Down
73 changes: 66 additions & 7 deletions pkg/controller/registry/reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,22 @@ func TestPodMemoryTarget(t *testing.T) {
}

for _, testCase := range testCases {
pod := Pod(testCase.input, "name", "opmImage", "utilImage", "image", "service-account", map[string]string{}, map[string]string{}, int32(0), int32(0), int64(workloadUserID))
pod := Pod(testCase.input, "name", "opmImage", "utilImage", "image", serviceAccount("", "service-account"), map[string]string{}, map[string]string{}, int32(0), int32(0), int64(workloadUserID))
if diff := cmp.Diff(pod, testCase.expected); diff != "" {
t.Errorf("got incorrect pod: %v", diff)
}
}
}

func serviceAccount(namespace, name string) *corev1.ServiceAccount {
return &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
},
}
}

func TestPodExtractContent(t *testing.T) {
awgreene marked this conversation as resolved.
Show resolved Hide resolved
var testCases = []struct {
name string
Expand Down Expand Up @@ -362,13 +371,63 @@ func TestPodExtractContent(t *testing.T) {
}

for _, testCase := range testCases {
pod := Pod(testCase.input, "name", "opmImage", "utilImage", "image", "service-account", map[string]string{}, map[string]string{}, int32(0), int32(0), int64(workloadUserID))
pod := Pod(testCase.input, "name", "opmImage", "utilImage", "image", serviceAccount("", "service-account"), map[string]string{}, map[string]string{}, int32(0), int32(0), int64(workloadUserID))
if diff := cmp.Diff(pod, testCase.expected); diff != "" {
t.Errorf("got incorrect pod: %v", diff)
}
}
}

func TestPodServiceAccountImagePullSecrets(t *testing.T) {
var testCases = []struct {
name string
catalogSource *v1alpha1.CatalogSource
serviceAccount *corev1.ServiceAccount
}{
{
name: "ServiceAccount has no imagePullSecret",
serviceAccount: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "",
Name: "service-account",
},
},
},
{
name: "ServiceAccount has one imagePullSecret",
serviceAccount: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "",
Name: "service-account",
},
ImagePullSecrets: []corev1.LocalObjectReference{{Name: "foo"}},
},
},
}

catalogSource := &v1alpha1.CatalogSource{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "testns",
},
Spec: v1alpha1.CatalogSourceSpec{
GrpcPodConfig: &v1alpha1.GrpcPodConfig{
ExtractContent: &v1alpha1.ExtractContentConfig{
CacheDir: "/tmp/cache",
CatalogDir: "/catalog",
},
},
},
}

for _, testCase := range testCases {
pod := Pod(catalogSource, "name", "opmImage", "utilImage", "image", testCase.serviceAccount, map[string]string{}, map[string]string{}, int32(0), int32(0), int64(workloadUserID))
if diff := cmp.Diff(testCase.serviceAccount.ImagePullSecrets, pod.Spec.ImagePullSecrets); diff != "" {
t.Errorf("got incorrect pod: %v", diff)
}
}
}

func TestPodNodeSelector(t *testing.T) {
catsrc := &v1alpha1.CatalogSource{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -380,7 +439,7 @@ func TestPodNodeSelector(t *testing.T) {
key := "kubernetes.io/os"
value := "linux"

gotCatSrcPod := Pod(catsrc, "hello", "utilImage", "opmImage", "busybox", "", map[string]string{}, map[string]string{}, int32(0), int32(0), int64(workloadUserID))
gotCatSrcPod := Pod(catsrc, "hello", "utilImage", "opmImage", "busybox", serviceAccount("", "service-account"), map[string]string{}, map[string]string{}, int32(0), int32(0), int64(workloadUserID))
gotCatSrcPodSelector := gotCatSrcPod.Spec.NodeSelector

if gotCatSrcPodSelector[key] != value {
Expand Down Expand Up @@ -428,7 +487,7 @@ func TestPullPolicy(t *testing.T) {
}

for _, tt := range table {
p := Pod(source, "catalog", "opmImage", "utilImage", tt.image, "", nil, nil, int32(0), int32(0), int64(workloadUserID))
p := Pod(source, "catalog", "opmImage", "utilImage", tt.image, serviceAccount("", "service-account"), nil, nil, int32(0), int32(0), int64(workloadUserID))
policy := p.Spec.Containers[0].ImagePullPolicy
if policy != tt.policy {
t.Fatalf("expected pull policy %s for image %s", tt.policy, tt.image)
Expand Down Expand Up @@ -540,7 +599,7 @@ func TestPodContainerSecurityContext(t *testing.T) {
},
}
for _, testcase := range testcases {
outputPod := Pod(testcase.inputCatsrc, "hello", "utilImage", "opmImage", "busybox", "", map[string]string{}, map[string]string{}, int32(0), int32(0), int64(workloadUserID))
outputPod := Pod(testcase.inputCatsrc, "hello", "utilImage", "opmImage", "busybox", serviceAccount("", "service-account"), map[string]string{}, map[string]string{}, int32(0), int32(0), int64(workloadUserID))
if testcase.expectedSecurityContext != nil {
require.Equal(t, testcase.expectedSecurityContext, outputPod.Spec.SecurityContext)
}
Expand Down Expand Up @@ -570,7 +629,7 @@ func TestPodAvoidsConcurrentWrite(t *testing.T) {
"annotation": "somethingelse",
}

gotPod := Pod(catsrc, "hello", "opmImage", "utilImage", "busybox", "", labels, annotations, int32(0), int32(0), int64(workloadUserID))
gotPod := Pod(catsrc, "hello", "opmImage", "utilImage", "busybox", serviceAccount("", "service-account"), labels, annotations, int32(0), int32(0), int64(workloadUserID))

// check labels and annotations point to different addresses between parameters and what's in the pod
require.NotEqual(t, &labels, &gotPod.Labels)
Expand Down Expand Up @@ -799,7 +858,7 @@ func TestPodSchedulingOverrides(t *testing.T) {
}

for _, testCase := range testCases {
pod := Pod(testCase.catalogSource, "hello", "opmImage", "utilImage", "busybox", "", map[string]string{}, testCase.annotations, int32(0), int32(0), int64(workloadUserID))
pod := Pod(testCase.catalogSource, "hello", "opmImage", "utilImage", "busybox", serviceAccount("", "service-account"), map[string]string{}, testCase.annotations, int32(0), int32(0), int64(workloadUserID))
require.Equal(t, testCase.expectedNodeSelectors, pod.Spec.NodeSelector)
require.Equal(t, testCase.expectedPriorityClassName, pod.Spec.PriorityClassName)
require.Equal(t, testCase.expectedTolerations, pod.Spec.Tolerations)
Expand Down