Skip to content

Commit

Permalink
Merge pull request #240 from awgreene/grpc-address-bug-fix
Browse files Browse the repository at this point in the history
Bug 2026343: Address Invalid Address in GRPC Catalogs (#2499)
  • Loading branch information
openshift-merge-robot committed Jan 26, 2022
2 parents e8f3aa6 + 12e83f9 commit d795a1d
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ func (o *Operator) syncConnection(logger *logrus.Entry, in *v1alpha1.CatalogSour
// Set connection status and return.
out.Status.GRPCConnectionState.LastConnectTime = now
out.Status.GRPCConnectionState.LastObservedState = source.ConnectionState.String()
out.Status.GRPCConnectionState.Address = source.Address
}

return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned/fake"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
olmerrors "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/errors"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/grpc"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
Expand Down Expand Up @@ -758,6 +759,12 @@ func TestExecutePlanDynamicResources(t *testing.T) {
}
}

func withStatus(catalogSource v1alpha1.CatalogSource, status v1alpha1.CatalogSourceStatus) *v1alpha1.CatalogSource {
copy := catalogSource.DeepCopy()
copy.Status = status
return copy
}

func TestSyncCatalogSources(t *testing.T) {
clockFake := utilclock.NewFakeClock(time.Date(2018, time.January, 26, 20, 40, 0, 0, time.UTC))
now := metav1.NewTime(clockFake.Now())
Expand Down Expand Up @@ -786,14 +793,15 @@ func TestSyncCatalogSources(t *testing.T) {
},
}
tests := []struct {
testName string
namespace string
catalogSource *v1alpha1.CatalogSource
k8sObjs []runtime.Object
configMap *corev1.ConfigMap
expectedStatus *v1alpha1.CatalogSourceStatus
expectedObjs []runtime.Object
expectedError error
testName string
namespace string
catalogSource *v1alpha1.CatalogSource
k8sObjs []runtime.Object
configMap *corev1.ConfigMap
expectedStatus *v1alpha1.CatalogSourceStatus
expectedObjs []runtime.Object
expectedError error
existingSources []sourceAddress
}{
{
testName: "CatalogSourceWithInvalidSourceType",
Expand Down Expand Up @@ -1013,6 +1021,47 @@ func TestSyncCatalogSources(t *testing.T) {
},
expectedError: nil,
},
{
testName: "GRPCConnectionStateAddressIsUpdated",
namespace: "cool-namespace",
catalogSource: withStatus(*grpcCatalog, v1alpha1.CatalogSourceStatus{
RegistryServiceStatus: &v1alpha1.RegistryServiceStatus{
Protocol: "grpc",
ServiceName: "cool-catalog",
ServiceNamespace: "cool-namespace",
Port: "50051",
CreatedAt: now,
},
GRPCConnectionState: &v1alpha1.GRPCConnectionState{
Address: "..svc:", // Needs to be updated to cool-catalog.cool-namespace.svc:50051
},
}),
k8sObjs: []runtime.Object{
pod(*grpcCatalog),
service(grpcCatalog.GetName(), grpcCatalog.GetNamespace()),
},
existingSources: []sourceAddress{
{
sourceKey: registry.CatalogKey{Name: "cool-catalog", Namespace: "cool-namespace"},
address: "cool-catalog.cool-namespace.svc:50051",
},
},
expectedStatus: &v1alpha1.CatalogSourceStatus{
RegistryServiceStatus: &v1alpha1.RegistryServiceStatus{
Protocol: "grpc",
ServiceName: "cool-catalog",
ServiceNamespace: "cool-namespace",
Port: "50051",
CreatedAt: now,
},
GRPCConnectionState: &v1alpha1.GRPCConnectionState{
Address: "cool-catalog.cool-namespace.svc:50051",
LastObservedState: "",
LastConnectTime: now,
},
},
expectedError: nil,
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
Expand All @@ -1023,7 +1072,7 @@ func TestSyncCatalogSources(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

op, err := NewFakeOperator(ctx, tt.namespace, []string{tt.namespace}, withClock(clockFake), withClientObjs(clientObjs...), withK8sObjs(tt.k8sObjs...))
op, err := NewFakeOperator(ctx, tt.namespace, []string{tt.namespace}, withClock(clockFake), withClientObjs(clientObjs...), withK8sObjs(tt.k8sObjs...), withSources(tt.existingSources...))
require.NoError(t, err)

// Run sync
Expand All @@ -1040,6 +1089,13 @@ func TestSyncCatalogSources(t *testing.T) {
require.NotEmpty(t, updated)

if tt.expectedStatus != nil {
if tt.expectedStatus.GRPCConnectionState != nil {
updated.Status.GRPCConnectionState.LastConnectTime = now
// Ignore LastObservedState difference if an expected LastObservedState is no provided
if tt.expectedStatus.GRPCConnectionState.LastObservedState == "" {
updated.Status.GRPCConnectionState.LastObservedState = ""
}
}
require.NotEmpty(t, updated.Status)
require.Equal(t, *tt.expectedStatus, updated.Status)

Expand Down Expand Up @@ -1384,6 +1440,7 @@ type fakeOperatorConfig struct {
resolver resolver.StepResolver
recorder record.EventRecorder
reconciler reconciler.RegistryReconcilerFactory
sources []sourceAddress
}

// fakeOperatorOption applies an option to the given fake operator configuration.
Expand All @@ -1395,6 +1452,12 @@ func withResolver(res resolver.StepResolver) fakeOperatorOption {
}
}

func withSources(sources ...sourceAddress) fakeOperatorOption {
return func(config *fakeOperatorConfig) {
config.sources = sources
}
}

func withReconciler(rec reconciler.RegistryReconcilerFactory) fakeOperatorOption {
return func(config *fakeOperatorConfig) {
config.reconciler = rec
Expand Down Expand Up @@ -1431,6 +1494,11 @@ func withFakeClientOptions(options ...clientfake.Option) fakeOperatorOption {
}
}

type sourceAddress struct {
address string
sourceKey registry.CatalogKey
}

// NewFakeOperator creates a new operator using fake clients.
func NewFakeOperator(ctx context.Context, namespace string, namespaces []string, fakeOptions ...fakeOperatorOption) (*Operator, error) {
// Apply options to default config
Expand Down Expand Up @@ -1548,6 +1616,9 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,

op.RunInformers(ctx)
op.sources.Start(ctx)
for _, source := range config.sources {
op.sources.Add(source.sourceKey, source.address)
}

if ok := cache.WaitForCacheSync(ctx.Done(), op.HasSynced); !ok {
return nil, fmt.Errorf("failed to wait for caches to sync")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.Ca
source := grpcCatalogSourceDecorator{catalogSource}

// if service status is nil, we force create every object to ensure they're created the first time
overwrite := source.Status.RegistryServiceStatus == nil
overwrite := source.Status.RegistryServiceStatus == nil || !isRegistryServiceStatusValid(&source)

//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
sa, err := c.ensureSA(source)
Expand All @@ -216,17 +216,33 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.Ca

if overwritePod {
now := c.now()
service := source.Service()
catalogSource.Status.RegistryServiceStatus = &v1alpha1.RegistryServiceStatus{
CreatedAt: now,
Protocol: "grpc",
ServiceName: source.Service().GetName(),
ServiceName: service.GetName(),
ServiceNamespace: source.GetNamespace(),
Port: fmt.Sprintf("%d", source.Service().Spec.Ports[0].Port),
Port: getPort(service),
}
}
return nil
}

func getPort(service *corev1.Service) string {
return fmt.Sprintf("%d", service.Spec.Ports[0].Port)
}

func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) bool {
service := source.Service()
if source.Status.RegistryServiceStatus.ServiceName != service.GetName() ||
source.Status.RegistryServiceStatus.ServiceNamespace != service.GetNamespace() ||
source.Status.RegistryServiceStatus.Port != getPort(service) ||
source.Status.RegistryServiceStatus.Protocol != "grpc" {
return false
}
return true
}

func (c *GrpcRegistryReconciler) ensurePod(source grpcCatalogSourceDecorator, saName string, overwrite bool) error {
// currentLivePods refers to the currently live instances of the catalog source
currentLivePods := c.currentPods(source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func grpcCatalogSourceWithSecret(secretNames []string) *v1alpha1.CatalogSource {
},
}
}
func grpcCatalogSourceWithStatus(status v1alpha1.CatalogSourceStatus) *v1alpha1.CatalogSource {
catsrc := validGrpcCatalogSource("image", "")
catsrc.Status = status
return catsrc
}

func grpcCatalogSourceWithAnnotations(annotations map[string]string) *v1alpha1.CatalogSource {
catsrc := validGrpcCatalogSource("image", "")
Expand Down Expand Up @@ -284,6 +289,29 @@ func TestGrpcRegistryReconciler(t *testing.T) {
},
},
},
{
testName: "Grpc/ExistingRegistry/UpdateInvalidRegistryServiceStatus",
in: in{
cluster: cluster{
k8sObjs: objectsForCatalogSource(validGrpcCatalogSource("image", "")),
},
catsrc: grpcCatalogSourceWithStatus(v1alpha1.CatalogSourceStatus{
RegistryServiceStatus: &v1alpha1.RegistryServiceStatus{
CreatedAt: now(),
Protocol: "grpc",
},
}),
},
out: out{
status: &v1alpha1.RegistryServiceStatus{
CreatedAt: now(),
Protocol: "grpc",
ServiceName: "img-catalog",
ServiceNamespace: testNamespace,
Port: "50051",
},
},
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d795a1d

Please sign in to comment.