diff --git a/components/eventing-controller/controllers/subscription/beb/reconciler_test.go b/components/eventing-controller/controllers/subscription/beb/reconciler_test.go index 7d223759bbd0..920f5770bae4 100644 --- a/components/eventing-controller/controllers/subscription/beb/reconciler_test.go +++ b/components/eventing-controller/controllers/subscription/beb/reconciler_test.go @@ -83,14 +83,16 @@ var _ = Describe("Subscription Reconciliation Tests", func() { AfterEach(func() { // detailed request logs - logf.Log.V(1).Info("beb requests", "number", len(beb.Requests)) + logf.Log.V(1).Info("beb requests", "number", beb.Requests.Len()) i := 0 - for req, payloadObject := range beb.Requests { - reqDescription := fmt.Sprintf("method: %q, url: %q, payload object: %+v", req.Method, req.RequestURI, payloadObject) - fmt.Printf("request[%d]: %s\n", i, reqDescription) - i++ - } + + beb.Requests.ReadEach( + func(req *http.Request, payload interface{}) { + reqDescription := fmt.Sprintf("method: %q, url: %q, payload object: %+v", req.Method, req.RequestURI, payload) + fmt.Printf("request[%d]: %s\n", i, reqDescription) + i++ + }) // print all subscriptions in the namespace for debugging purposes if err := printSubscriptions(namespaceName); err != nil { @@ -237,17 +239,17 @@ var _ = Describe("Subscription Reconciliation Tests", func() { By("Creating a BEB Subscription") var bebSubscription bebtypes.Subscription - Eventually(func() bool { - for r, payloadObject := range beb.Requests { - if reconcilertesting.IsBEBSubscriptionCreate(r, *beb.BEBConfig) { - bebSubscription = payloadObject.(bebtypes.Subscription) - receivedSubscriptionName := bebSubscription.Name - // ensure the correct subscription was created - return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName - } + + containsCorrectSubscription := func(request *http.Request, payload interface{}) bool { + if reconcilertesting.IsBEBSubscriptionCreate(request, *beb.BEBConfig) { + bebSubscription = payload.(bebtypes.Subscription) + receivedSubscriptionName := bebSubscription.Name + // ensure the correct subscription was created + return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName } return false - }).Should(BeTrue()) + } + Eventually(beb.Requests.CheckIfAny(containsCorrectSubscription)).Should(BeTrue()) }) }) @@ -413,17 +415,16 @@ var _ = Describe("Subscription Reconciliation Tests", func() { By("Creating a BEB Subscription") var bebSubscription bebtypes.Subscription - Eventually(func() bool { - for r, payloadObject := range beb.Requests { - if reconcilertesting.IsBEBSubscriptionCreate(r, *beb.BEBConfig) { - bebSubscription = payloadObject.(bebtypes.Subscription) - receivedSubscriptionName := bebSubscription.Name - // ensure the correct subscription was created - return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName - } + contiansCorrectSubscription := func(request *http.Request, payload interface{}) bool { + if reconcilertesting.IsBEBSubscriptionCreate(request, *beb.BEBConfig) { + bebSubscription = payload.(bebtypes.Subscription) + receivedSubscriptionName := bebSubscription.Name + // ensure the correct subscription was created + return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName } return false - }).Should(BeTrue()) + } + Eventually(beb.Requests.CheckIfAny(contiansCorrectSubscription)).Should(BeTrue()) By("Updating APIRule") apiRule := &apigatewayv1alpha1.APIRule{ @@ -792,17 +793,17 @@ var _ = Describe("Subscription Reconciliation Tests", func() { By("Creating a BEB Subscription") var bebSubscription bebtypes.Subscription - Eventually(func() bool { - for r, payloadObject := range beb.Requests { - if reconcilertesting.IsBEBSubscriptionCreate(r, *beb.BEBConfig) { - bebSubscription = payloadObject.(bebtypes.Subscription) - receivedSubscriptionName := bebSubscription.Name - // ensure the correct subscription was created - return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName - } + + containsCorrectSubscription := func(request *http.Request, payload interface{}) bool { + if reconcilertesting.IsBEBSubscriptionCreate(request, *beb.BEBConfig) { + bebSubscription = payload.(bebtypes.Subscription) + receivedSubscriptionName := bebSubscription.Name + // ensure the correct subscription was created + return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName } return false - }).Should(BeTrue()) + } + Eventually(beb.Requests.CheckIfAny(containsCorrectSubscription)).Should(BeTrue()) }) By("Deleting the Subscription") @@ -812,17 +813,16 @@ var _ = Describe("Subscription Reconciliation Tests", func() { getSubscription(ctx, givenSubscription).Should(reconcilertesting.IsAnEmptySubscription()) By("Deleting the BEB Subscription") - Eventually(func() bool { - for r := range beb.Requests { - if reconcilertesting.IsBEBSubscriptionDelete(r) { - receivedSubscriptionName := reconcilertesting.GetRestAPIObject(r.URL) - // ensure the correct subscription was created - return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName - } + + containsCorrectSubscription := func(request *http.Request, payload interface{}) bool { + if reconcilertesting.IsBEBSubscriptionDelete(request) { + receivedSubscriptionName := reconcilertesting.GetRestAPIObject(request.URL) + // ensure the correct subscription was created + return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName } return false - }).Should(BeTrue()) - + } + Eventually(beb.Requests.CheckIfAny(containsCorrectSubscription)).Should(BeTrue()) By("Removing the APIRule") Expect(apiRuleCreated.GetDeletionTimestamp).NotTo(BeNil()) @@ -872,18 +872,16 @@ var _ = Describe("Subscription Reconciliation Tests", func() { Context("Check Kyma Subscription ready", func() { By("Checking BEB mock server creation requests to contain Subscription creation request", func() { - Eventually(func() bool { - for r, payload := range beb.Requests { - if reconcilertesting.IsBEBSubscriptionCreate(r, *beb.BEBConfig) { - bebSubscription, ok := payload.(bebtypes.Subscription) - if ok { - return nameMapper.MapSubscriptionName(kymaSubscription) == bebSubscription.Name - } - return false + containsSubscription := func(request *http.Request, payload interface{}) bool { + if reconcilertesting.IsBEBSubscriptionCreate(request, *beb.BEBConfig) { + bebSubscription, ok := payload.(bebtypes.Subscription) + if ok { + return nameMapper.MapSubscriptionName(kymaSubscription) == bebSubscription.Name } } return false - }).Should(BeTrue()) + } + Eventually(beb.Requests.CheckIfAny(containsSubscription)).Should(BeTrue()) }) By("Checking Kyma Subscription ready condition to be true", func() { @@ -1092,15 +1090,14 @@ func ensureSubscriberSvcCreated(ctx context.Context, svc *v1.Service) { // getBEBSubscriptionCreationRequests filters the http requests made against BEB and returns the BEB Subscriptions func getBEBSubscriptionCreationRequests(bebSubscriptions []bebtypes.Subscription) AsyncAssertion { - return Eventually(func() []bebtypes.Subscription { - - for r, payloadObject := range beb.Requests { - if reconcilertesting.IsBEBSubscriptionCreate(r, *beb.BEBConfig) { - bebSubscription := payloadObject.(bebtypes.Subscription) - bebSubscriptions = append(bebSubscriptions, bebSubscription) - } - } + beb.Requests.ReadEach( + func(request *http.Request, payload interface{}) { + if reconcilertesting.IsBEBSubscriptionCreate(request, *beb.BEBConfig) { + bebSubscription := payload.(bebtypes.Subscription) + bebSubscriptions = append(bebSubscriptions, bebSubscription) + } + }) return bebSubscriptions }, bigTimeOut, bigPollingInterval) } @@ -1363,26 +1360,27 @@ func createSubscriptionObjectsAndWaitForReadiness(ctx context.Context, givenSubs // countBEBRequests returns how many requests for a given subscription are sent for each HTTP method func countBEBRequests(subscriptionName string) (countGet int, countPost int, countDelete int) { countGet, countPost, countDelete = 0, 0, 0 - for req, v := range beb.Requests { - switch method := req.Method; method { - case http.MethodGet: - if strings.Contains(req.URL.Path, subscriptionName) { - countGet++ - } - case http.MethodPost: - subscription, ok := v.(bebtypes.Subscription) - if ok && len(subscription.Events) > 0 { - for _, event := range subscription.Events { - if event.Type == reconcilertesting.OrderCreatedEventType && subscription.Name == subscriptionName { - countPost++ + beb.Requests.ReadEach( + func(request *http.Request, payload interface{}) { + switch method := request.Method; method { + case http.MethodGet: + if strings.Contains(request.URL.Path, subscriptionName) { + countGet++ + } + case http.MethodPost: + subscription, ok := payload.(bebtypes.Subscription) + if ok && len(subscription.Events) > 0 { + for _, event := range subscription.Events { + if event.Type == reconcilertesting.OrderCreatedEventType && subscription.Name == subscriptionName { + countPost++ + } } } + case http.MethodDelete: + if strings.Contains(request.URL.Path, subscriptionName) { + countDelete++ + } } - case http.MethodDelete: - if strings.Contains(req.URL.Path, subscriptionName) { - countDelete++ - } - } - } + }) return countGet, countPost, countDelete } diff --git a/components/eventing-controller/testing/bebmock.go b/components/eventing-controller/testing/bebmock.go index a755a3052672..be3f47153db1 100644 --- a/components/eventing-controller/testing/bebmock.go +++ b/components/eventing-controller/testing/bebmock.go @@ -30,7 +30,7 @@ const ( // BEBMock implements a programmable mock for BEB type BEBMock struct { - Requests map[*http.Request]interface{} + Requests *SafeRequests Subscriptions *SafeSubscriptions TokenURL string MessagingURL string @@ -46,7 +46,7 @@ type BEBMock struct { func NewBEBMock(bebConfig *config.Config) *BEBMock { logger := logf.Log.WithName("beb mock") return &BEBMock{ - nil, NewSafeSubscriptions(), "", "", bebConfig, + NewSafeRequests(), NewSafeSubscriptions(), "", "", bebConfig, logger, nil, nil, nil, nil, nil, } @@ -57,7 +57,9 @@ type response func(w http.ResponseWriter) func (m *BEBMock) Reset() { m.log.Info("Initializing requests") - m.Requests = make(map[*http.Request]interface{}) + //TODO clean when done + //m.Requests = make(map[*http.Request]interface{}) + m.Requests = NewSafeRequests() m.Subscriptions = NewSafeSubscriptions() m.AuthResponse = nil m.GetResponse = nil @@ -74,7 +76,9 @@ func (m *BEBMock) Start() string { defer GinkgoRecover() // store request - m.Requests[r] = nil + //TODO clean when done + //m.Requests[r] = nil + m.Requests.StoreRequest(r) description := "" reqBytes, err := httputil.DumpRequest(r, true) @@ -112,7 +116,9 @@ func (m *BEBMock) Start() string { case http.MethodPost: var subscription bebtypes.Subscription _ = json.NewDecoder(r.Body).Decode(&subscription) - m.Requests[r] = subscription + //TODO clean + //m.Requests[r] = subscription + m.Requests.PutSubscription(r, subscription) key := r.URL.Path + "/" + subscription.Name m.Subscriptions.PutSubscription(key, &subscription) if m.CreateResponse != nil { diff --git a/components/eventing-controller/testing/saferequests.go b/components/eventing-controller/testing/saferequests.go new file mode 100644 index 000000000000..eb6826dbbff9 --- /dev/null +++ b/components/eventing-controller/testing/saferequests.go @@ -0,0 +1,64 @@ +package testing + +import ( + "github.com/kyma-project/kyma/components/eventing-controller/pkg/ems/api/events/types" + "net/http" + "sync" +) + +// SafeRequests encapsulates Requests to provide mutual exclusion. +type SafeRequests struct { + sync.RWMutex + requests map[*http.Request]interface{} +} + +// NewSafeRequests returns a new SafeRequests. +func NewSafeRequests() *SafeRequests { + return &SafeRequests{ + sync.RWMutex{}, + make(map[*http.Request]interface{}), + } +} + +// StoreRequest adds a request to requests and sets it's corresponding subscription to nil. +func (r *SafeRequests) StoreRequest(request *http.Request) { + r.Lock() + defer r.Unlock() + r.requests[request] = nil +} + +// PutSubscription sets a the subscription of a request. +func (r *SafeRequests) PutSubscription(request *http.Request, subscription types.Subscription) { + r.Lock() + defer r.Unlock() + r.requests[request] = subscription +} + +// Len returns the length of requests. +func (r *SafeRequests) Len() int { + r.RLock() + defer r.RUnlock() + return len(r.requests) +} + +// CheckIfAny iterates over requests and checks if a given func f is true at any iteration's request and payload. +// CheckIfAny is only read-save; f must be a read-only operation. +func (r *SafeRequests) CheckIfAny(f func(request *http.Request, payload interface{}) bool) bool { + r.RLock() + defer r.RUnlock() + for req, payload := range r.requests { + if f(req, payload) { + return true + } + } + return false +} + +// ReadEach iterates over requests and executes a given func f with the iteration's request and payload. +func (r *SafeRequests) ReadEach(f func(request *http.Request, payload interface{})) { + r.RLock() + defer r.RUnlock() + for req, payload := range r.requests { + f(req, payload) + } +} diff --git a/components/eventing-controller/testing/safesubscriptions.go b/components/eventing-controller/testing/safesubscriptions.go index e3d46e65ce79..a9ca2e3efc4e 100644 --- a/components/eventing-controller/testing/safesubscriptions.go +++ b/components/eventing-controller/testing/safesubscriptions.go @@ -16,7 +16,8 @@ type SafeSubscriptions struct { // NewSafeSubscriptions returns a new SafeSubscriptions. func NewSafeSubscriptions() *SafeSubscriptions { return &SafeSubscriptions{ - sync.RWMutex{}, make(map[string]*bebtypes.Subscription), + sync.RWMutex{}, + make(map[string]*bebtypes.Subscription), } }