diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index c70582ed168..eee1be70531 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -474,6 +474,8 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { metrics.RegisterCatalogSourceState(state.Key.Name, state.Key.Namespace, state.State) switch state.State { + case connectivity.Idle: + fallthrough case connectivity.Ready: o.resolverSourceProvider.Invalidate(resolvercache.SourceKey(state.Key)) if o.namespace == state.Key.Namespace { diff --git a/pkg/controller/registry/grpc/source_test.go b/pkg/controller/registry/grpc/source_test.go index 852a4be5416..32741f511dc 100644 --- a/pkg/controller/registry/grpc/source_test.go +++ b/pkg/controller/registry/grpc/source_test.go @@ -134,6 +134,7 @@ func TestConnectionEvents(t *testing.T) { {Name: "test", Namespace: "test"}: { connectivity.Connecting, connectivity.Ready, + connectivity.Idle, }, }, }, @@ -143,10 +144,12 @@ func TestConnectionEvents(t *testing.T) { {Name: "test", Namespace: "test"}: { connectivity.Connecting, connectivity.Ready, + connectivity.Idle, }, {Name: "test2", Namespace: "test2"}: { connectivity.Connecting, connectivity.Ready, + connectivity.Idle, }, }, }, diff --git a/pkg/fakes/fake_registry_store.go b/pkg/fakes/fake_registry_store.go index 5894a426aed..74f840ea7e1 100644 --- a/pkg/fakes/fake_registry_store.go +++ b/pkg/fakes/fake_registry_store.go @@ -371,6 +371,18 @@ type FakeQuery struct { result1 []string result2 error } + SendBundlesStub func(context.Context, registry.BundleSender) error + sendBundlesMutex sync.RWMutex + sendBundlesArgsForCall []struct { + arg1 context.Context + arg2 registry.BundleSender + } + sendBundlesReturns struct { + result1 error + } + sendBundlesReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -1987,6 +1999,67 @@ func (fake *FakeQuery) ListTablesReturnsOnCall(i int, result1 []string, result2 }{result1, result2} } +func (fake *FakeQuery) SendBundles(arg1 context.Context, arg2 registry.BundleSender) error { + fake.sendBundlesMutex.Lock() + ret, specificReturn := fake.sendBundlesReturnsOnCall[len(fake.sendBundlesArgsForCall)] + fake.sendBundlesArgsForCall = append(fake.sendBundlesArgsForCall, struct { + arg1 context.Context + arg2 registry.BundleSender + }{arg1, arg2}) + fake.recordInvocation("SendBundles", []interface{}{arg1, arg2}) + fake.sendBundlesMutex.Unlock() + if fake.SendBundlesStub != nil { + return fake.SendBundlesStub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.sendBundlesReturns + return fakeReturns.result1 +} + +func (fake *FakeQuery) SendBundlesCallCount() int { + fake.sendBundlesMutex.RLock() + defer fake.sendBundlesMutex.RUnlock() + return len(fake.sendBundlesArgsForCall) +} + +func (fake *FakeQuery) SendBundlesCalls(stub func(context.Context, registry.BundleSender) error) { + fake.sendBundlesMutex.Lock() + defer fake.sendBundlesMutex.Unlock() + fake.SendBundlesStub = stub +} + +func (fake *FakeQuery) SendBundlesArgsForCall(i int) (context.Context, registry.BundleSender) { + fake.sendBundlesMutex.RLock() + defer fake.sendBundlesMutex.RUnlock() + argsForCall := fake.sendBundlesArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeQuery) SendBundlesReturns(result1 error) { + fake.sendBundlesMutex.Lock() + defer fake.sendBundlesMutex.Unlock() + fake.SendBundlesStub = nil + fake.sendBundlesReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeQuery) SendBundlesReturnsOnCall(i int, result1 error) { + fake.sendBundlesMutex.Lock() + defer fake.sendBundlesMutex.Unlock() + fake.SendBundlesStub = nil + if fake.sendBundlesReturnsOnCall == nil { + fake.sendBundlesReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.sendBundlesReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeQuery) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -2040,6 +2113,8 @@ func (fake *FakeQuery) Invocations() map[string][][]interface{} { defer fake.listRegistryBundlesMutex.RUnlock() fake.listTablesMutex.RLock() defer fake.listTablesMutex.RUnlock() + fake.sendBundlesMutex.RLock() + defer fake.sendBundlesMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/package-server/provider/registry.go b/pkg/package-server/provider/registry.go index 0a959b8c098..d935d84e781 100644 --- a/pkg/package-server/provider/registry.go +++ b/pkg/package-server/provider/registry.go @@ -226,6 +226,8 @@ func (p *RegistryProvider) syncSourceState(state registrygrpc.SourceState) { var err error switch state.State { + case connectivity.Idle: + fallthrough case connectivity.Ready: var client *registryClient client, err = p.registryClient(key) diff --git a/test/e2e/catalog_e2e_test.go b/test/e2e/catalog_e2e_test.go index 8121a2a6046..d857e71b3e0 100644 --- a/test/e2e/catalog_e2e_test.go +++ b/test/e2e/catalog_e2e_test.go @@ -441,9 +441,14 @@ var _ = Describe("Starting CatalogSource e2e tests", func() { Expect(err).ShouldNot(HaveOccurred()) // Check pod created - initialPods, err := c.KubernetesInterface().CoreV1().Pods(ns.GetName()).List(context.Background(), metav1.ListOptions{LabelSelector: "olm.configMapResourceVersion=" + configMap.ResourceVersion}) - Expect(err).ShouldNot(HaveOccurred()) - Expect(initialPods.Items).To(HaveLen(1)) + Eventually(func() int { + initialPods, err := c.KubernetesInterface().CoreV1().Pods(ns.GetName()).List(context.Background(), metav1.ListOptions{LabelSelector: "olm.configMapResourceVersion=" + configMap.ResourceVersion}) + if err != nil { + GinkgoWriter.Printf("warning: error listing pods: %s", err) + return -1 + } + return len(initialPods.Items) + }).Should(Equal(1)) // delete the first catalog cleanupSource() diff --git a/test/e2e/magic_catalog.go b/test/e2e/magic_catalog.go index 81d788778f4..48c5158725b 100644 --- a/test/e2e/magic_catalog.go +++ b/test/e2e/magic_catalog.go @@ -5,6 +5,7 @@ import ( "fmt" operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" + "google.golang.org/grpc/connectivity" corev1 "k8s.io/api/core/v1" k8serror "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -18,7 +19,6 @@ const ( olmCatalogLabel string = "olm.catalogSource" catalogMountPath string = "/opt/olm" catalogServicePort int32 = 50051 - catalogReadyState string = "READY" ) type MagicCatalog interface { @@ -80,7 +80,7 @@ func catalogSourceIsReady(ctx context.Context, c k8scontrollerclient.Client, cs return false, err } state := cs.Status.GRPCConnectionState.LastObservedState - if state != catalogReadyState { + if state != connectivity.Ready.String() && state != connectivity.Idle.String() { return false, nil } return true, nil diff --git a/test/e2e/subscription_e2e_test.go b/test/e2e/subscription_e2e_test.go index 12366eb5460..6fc409389c1 100644 --- a/test/e2e/subscription_e2e_test.go +++ b/test/e2e/subscription_e2e_test.go @@ -16,6 +16,7 @@ import ( configv1 "github.com/openshift/api/config/v1" configv1client "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1" "github.com/stretchr/testify/require" + "google.golang.org/grpc/connectivity" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" @@ -2920,8 +2921,9 @@ func updateInternalCatalog(t GinkgoTInterface, c operatorclient.ClientInterface, _, err = fetchCatalogSourceOnStatus(crc, catalogSourceName, namespace, func(catalog *operatorsv1alpha1.CatalogSource) bool { before := fetchedInitialCatalog.Status.ConfigMapResource after := catalog.Status.ConfigMapResource + lastObservedState := catalog.Status.GRPCConnectionState.LastObservedState if after != nil && after.LastUpdateTime.After(before.LastUpdateTime.Time) && after.ResourceVersion != before.ResourceVersion && - catalog.Status.GRPCConnectionState.LastConnectTime.After(after.LastUpdateTime.Time) && catalog.Status.GRPCConnectionState.LastObservedState == "READY" { + catalog.Status.GRPCConnectionState.LastConnectTime.After(after.LastUpdateTime.Time) && (lastObservedState == connectivity.Ready.String() || lastObservedState == connectivity.Idle.String()) { fmt.Println("catalog updated") return true } diff --git a/test/e2e/util.go b/test/e2e/util.go index 4e9b56ecc29..06e1d2dce77 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -13,6 +13,7 @@ import ( . "github.com/onsi/gomega" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" @@ -337,11 +338,11 @@ func registryPodHealthy(address string) bool { } func catalogSourceRegistryPodSynced(catalog *operatorsv1alpha1.CatalogSource) bool { - registry := catalog.Status.RegistryServiceStatus + registryStatus := catalog.Status.RegistryServiceStatus connState := catalog.Status.GRPCConnectionState - if registry != nil && connState != nil && !connState.LastConnectTime.IsZero() && connState.LastObservedState == "READY" { - fmt.Printf("catalog %s pod with address %s\n", catalog.GetName(), registry.Address()) - return registryPodHealthy(registry.Address()) + if registryStatus != nil && connState != nil && !connState.LastConnectTime.IsZero() && (connState.LastObservedState == connectivity.Ready.String() || connState.LastObservedState == connectivity.Idle.String()) { + fmt.Printf("catalog %s pod with address %s\n", catalog.GetName(), registryStatus.Address()) + return registryPodHealthy(registryStatus.Address()) } state := "NO_CONNECTION" if connState != nil { @@ -616,6 +617,11 @@ func createInternalCatalogSource( Spec: operatorsv1alpha1.CatalogSourceSpec{ SourceType: "internal", ConfigMap: configMap.GetName(), + UpdateStrategy: &operatorsv1alpha1.UpdateStrategy{ + RegistryPoll: &operatorsv1alpha1.RegistryPoll{ + RawInterval: "10s", + }, + }, }, }