Skip to content

Commit

Permalink
implement mutex in bebmock requests
Browse files Browse the repository at this point in the history
  • Loading branch information
friedrichwilken committed Dec 4, 2021
1 parent 5c9cce1 commit 24252ad
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 83 deletions.
Expand Up @@ -83,14 +83,16 @@ var _ = Describe("Subscription Reconciliation Tests", func() {

AfterEach(func() {
// detailed request logs
logf.Log.V(1).Info("beb requests", "number", len(beb.Requests))
logf.Log.V(1).Info("beb requests", "number", beb.Requests.Len())

i := 0
for req, payloadObject := range beb.Requests {
reqDescription := fmt.Sprintf("method: %q, url: %q, payload object: %+v", req.Method, req.RequestURI, payloadObject)
fmt.Printf("request[%d]: %s\n", i, reqDescription)
i++
}

beb.Requests.ReadEach(
func(req *http.Request, payload interface{}) {
reqDescription := fmt.Sprintf("method: %q, url: %q, payload object: %+v", req.Method, req.RequestURI, payload)
fmt.Printf("request[%d]: %s\n", i, reqDescription)
i++
})

// print all subscriptions in the namespace for debugging purposes
if err := printSubscriptions(namespaceName); err != nil {
Expand Down Expand Up @@ -237,17 +239,17 @@ var _ = Describe("Subscription Reconciliation Tests", func() {

By("Creating a BEB Subscription")
var bebSubscription bebtypes.Subscription
Eventually(func() bool {
for r, payloadObject := range beb.Requests {
if reconcilertesting.IsBEBSubscriptionCreate(r, *beb.BEBConfig) {
bebSubscription = payloadObject.(bebtypes.Subscription)
receivedSubscriptionName := bebSubscription.Name
// ensure the correct subscription was created
return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName
}

containsCorrectSubscription := func(request *http.Request, payload interface{}) bool {
if reconcilertesting.IsBEBSubscriptionCreate(request, *beb.BEBConfig) {
bebSubscription = payload.(bebtypes.Subscription)
receivedSubscriptionName := bebSubscription.Name
// ensure the correct subscription was created
return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName
}
return false
}).Should(BeTrue())
}
Eventually(beb.Requests.CheckIfAny(containsCorrectSubscription)).Should(BeTrue())
})
})

Expand Down Expand Up @@ -413,17 +415,16 @@ var _ = Describe("Subscription Reconciliation Tests", func() {

By("Creating a BEB Subscription")
var bebSubscription bebtypes.Subscription
Eventually(func() bool {
for r, payloadObject := range beb.Requests {
if reconcilertesting.IsBEBSubscriptionCreate(r, *beb.BEBConfig) {
bebSubscription = payloadObject.(bebtypes.Subscription)
receivedSubscriptionName := bebSubscription.Name
// ensure the correct subscription was created
return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName
}
contiansCorrectSubscription := func(request *http.Request, payload interface{}) bool {
if reconcilertesting.IsBEBSubscriptionCreate(request, *beb.BEBConfig) {
bebSubscription = payload.(bebtypes.Subscription)
receivedSubscriptionName := bebSubscription.Name
// ensure the correct subscription was created
return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName
}
return false
}).Should(BeTrue())
}
Eventually(beb.Requests.CheckIfAny(contiansCorrectSubscription)).Should(BeTrue())

By("Updating APIRule")
apiRule := &apigatewayv1alpha1.APIRule{
Expand Down Expand Up @@ -792,17 +793,17 @@ var _ = Describe("Subscription Reconciliation Tests", func() {

By("Creating a BEB Subscription")
var bebSubscription bebtypes.Subscription
Eventually(func() bool {
for r, payloadObject := range beb.Requests {
if reconcilertesting.IsBEBSubscriptionCreate(r, *beb.BEBConfig) {
bebSubscription = payloadObject.(bebtypes.Subscription)
receivedSubscriptionName := bebSubscription.Name
// ensure the correct subscription was created
return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName
}

containsCorrectSubscription := func(request *http.Request, payload interface{}) bool {
if reconcilertesting.IsBEBSubscriptionCreate(request, *beb.BEBConfig) {
bebSubscription = payload.(bebtypes.Subscription)
receivedSubscriptionName := bebSubscription.Name
// ensure the correct subscription was created
return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName
}
return false
}).Should(BeTrue())
}
Eventually(beb.Requests.CheckIfAny(containsCorrectSubscription)).Should(BeTrue())
})

By("Deleting the Subscription")
Expand All @@ -812,17 +813,16 @@ var _ = Describe("Subscription Reconciliation Tests", func() {
getSubscription(ctx, givenSubscription).Should(reconcilertesting.IsAnEmptySubscription())

By("Deleting the BEB Subscription")
Eventually(func() bool {
for r := range beb.Requests {
if reconcilertesting.IsBEBSubscriptionDelete(r) {
receivedSubscriptionName := reconcilertesting.GetRestAPIObject(r.URL)
// ensure the correct subscription was created
return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName
}

containsCorrectSubscription := func(request *http.Request, payload interface{}) bool {
if reconcilertesting.IsBEBSubscriptionDelete(request) {
receivedSubscriptionName := reconcilertesting.GetRestAPIObject(request.URL)
// ensure the correct subscription was created
return nameMapper.MapSubscriptionName(givenSubscription) == receivedSubscriptionName
}
return false
}).Should(BeTrue())

}
Eventually(beb.Requests.CheckIfAny(containsCorrectSubscription)).Should(BeTrue())
By("Removing the APIRule")
Expect(apiRuleCreated.GetDeletionTimestamp).NotTo(BeNil())

Expand Down Expand Up @@ -872,18 +872,16 @@ var _ = Describe("Subscription Reconciliation Tests", func() {

Context("Check Kyma Subscription ready", func() {
By("Checking BEB mock server creation requests to contain Subscription creation request", func() {
Eventually(func() bool {
for r, payload := range beb.Requests {
if reconcilertesting.IsBEBSubscriptionCreate(r, *beb.BEBConfig) {
bebSubscription, ok := payload.(bebtypes.Subscription)
if ok {
return nameMapper.MapSubscriptionName(kymaSubscription) == bebSubscription.Name
}
return false
containsSubscription := func(request *http.Request, payload interface{}) bool {
if reconcilertesting.IsBEBSubscriptionCreate(request, *beb.BEBConfig) {
bebSubscription, ok := payload.(bebtypes.Subscription)
if ok {
return nameMapper.MapSubscriptionName(kymaSubscription) == bebSubscription.Name
}
}
return false
}).Should(BeTrue())
}
Eventually(beb.Requests.CheckIfAny(containsSubscription)).Should(BeTrue())
})

By("Checking Kyma Subscription ready condition to be true", func() {
Expand Down Expand Up @@ -1092,15 +1090,14 @@ func ensureSubscriberSvcCreated(ctx context.Context, svc *v1.Service) {

// getBEBSubscriptionCreationRequests filters the http requests made against BEB and returns the BEB Subscriptions
func getBEBSubscriptionCreationRequests(bebSubscriptions []bebtypes.Subscription) AsyncAssertion {

return Eventually(func() []bebtypes.Subscription {

for r, payloadObject := range beb.Requests {
if reconcilertesting.IsBEBSubscriptionCreate(r, *beb.BEBConfig) {
bebSubscription := payloadObject.(bebtypes.Subscription)
bebSubscriptions = append(bebSubscriptions, bebSubscription)
}
}
beb.Requests.ReadEach(
func(request *http.Request, payload interface{}) {
if reconcilertesting.IsBEBSubscriptionCreate(request, *beb.BEBConfig) {
bebSubscription := payload.(bebtypes.Subscription)
bebSubscriptions = append(bebSubscriptions, bebSubscription)
}
})
return bebSubscriptions
}, bigTimeOut, bigPollingInterval)
}
Expand Down Expand Up @@ -1363,26 +1360,27 @@ func createSubscriptionObjectsAndWaitForReadiness(ctx context.Context, givenSubs
// countBEBRequests returns how many requests for a given subscription are sent for each HTTP method
func countBEBRequests(subscriptionName string) (countGet int, countPost int, countDelete int) {
countGet, countPost, countDelete = 0, 0, 0
for req, v := range beb.Requests {
switch method := req.Method; method {
case http.MethodGet:
if strings.Contains(req.URL.Path, subscriptionName) {
countGet++
}
case http.MethodPost:
subscription, ok := v.(bebtypes.Subscription)
if ok && len(subscription.Events) > 0 {
for _, event := range subscription.Events {
if event.Type == reconcilertesting.OrderCreatedEventType && subscription.Name == subscriptionName {
countPost++
beb.Requests.ReadEach(
func(request *http.Request, payload interface{}) {
switch method := request.Method; method {
case http.MethodGet:
if strings.Contains(request.URL.Path, subscriptionName) {
countGet++
}
case http.MethodPost:
subscription, ok := payload.(bebtypes.Subscription)
if ok && len(subscription.Events) > 0 {
for _, event := range subscription.Events {
if event.Type == reconcilertesting.OrderCreatedEventType && subscription.Name == subscriptionName {
countPost++
}
}
}
case http.MethodDelete:
if strings.Contains(request.URL.Path, subscriptionName) {
countDelete++
}
}
case http.MethodDelete:
if strings.Contains(req.URL.Path, subscriptionName) {
countDelete++
}
}
}
})
return countGet, countPost, countDelete
}
16 changes: 11 additions & 5 deletions components/eventing-controller/testing/bebmock.go
Expand Up @@ -30,7 +30,7 @@ const (

// BEBMock implements a programmable mock for BEB
type BEBMock struct {
Requests map[*http.Request]interface{}
Requests *SafeRequests
Subscriptions *SafeSubscriptions
TokenURL string
MessagingURL string
Expand All @@ -46,7 +46,7 @@ type BEBMock struct {
func NewBEBMock(bebConfig *config.Config) *BEBMock {
logger := logf.Log.WithName("beb mock")
return &BEBMock{
nil, NewSafeSubscriptions(), "", "", bebConfig,
NewSafeRequests(), NewSafeSubscriptions(), "", "", bebConfig,
logger,
nil, nil, nil, nil, nil,
}
Expand All @@ -57,7 +57,9 @@ type response func(w http.ResponseWriter)

func (m *BEBMock) Reset() {
m.log.Info("Initializing requests")
m.Requests = make(map[*http.Request]interface{})
//TODO clean when done
//m.Requests = make(map[*http.Request]interface{})
m.Requests = NewSafeRequests()
m.Subscriptions = NewSafeSubscriptions()
m.AuthResponse = nil
m.GetResponse = nil
Expand All @@ -74,7 +76,9 @@ func (m *BEBMock) Start() string {
defer GinkgoRecover()

// store request
m.Requests[r] = nil
//TODO clean when done
//m.Requests[r] = nil
m.Requests.StoreRequest(r)

description := ""
reqBytes, err := httputil.DumpRequest(r, true)
Expand Down Expand Up @@ -112,7 +116,9 @@ func (m *BEBMock) Start() string {
case http.MethodPost:
var subscription bebtypes.Subscription
_ = json.NewDecoder(r.Body).Decode(&subscription)
m.Requests[r] = subscription
//TODO clean
//m.Requests[r] = subscription
m.Requests.PutSubscription(r, subscription)
key := r.URL.Path + "/" + subscription.Name
m.Subscriptions.PutSubscription(key, &subscription)
if m.CreateResponse != nil {
Expand Down
64 changes: 64 additions & 0 deletions components/eventing-controller/testing/saferequests.go
@@ -0,0 +1,64 @@
package testing

import (
"github.com/kyma-project/kyma/components/eventing-controller/pkg/ems/api/events/types"
"net/http"
"sync"
)

// SafeRequests encapsulates Requests to provide mutual exclusion.
type SafeRequests struct {
sync.RWMutex
requests map[*http.Request]interface{}
}

// NewSafeRequests returns a new SafeRequests.
func NewSafeRequests() *SafeRequests {
return &SafeRequests{
sync.RWMutex{},
make(map[*http.Request]interface{}),
}
}

// StoreRequest adds a request to requests and sets it's corresponding subscription to nil.
func (r *SafeRequests) StoreRequest(request *http.Request) {
r.Lock()
defer r.Unlock()
r.requests[request] = nil
}

// PutSubscription sets a the subscription of a request.
func (r *SafeRequests) PutSubscription(request *http.Request, subscription types.Subscription) {
r.Lock()
defer r.Unlock()
r.requests[request] = subscription
}

// Len returns the length of requests.
func (r *SafeRequests) Len() int {
r.RLock()
defer r.RUnlock()
return len(r.requests)
}

// CheckIfAny iterates over requests and checks if a given func f is true at any iteration's request and payload.
// CheckIfAny is only read-save; f must be a read-only operation.
func (r *SafeRequests) CheckIfAny(f func(request *http.Request, payload interface{}) bool) bool {
r.RLock()
defer r.RUnlock()
for req, payload := range r.requests {
if f(req, payload) {
return true
}
}
return false
}

// ReadEach iterates over requests and executes a given func f with the iteration's request and payload.
func (r *SafeRequests) ReadEach(f func(request *http.Request, payload interface{})) {
r.RLock()
defer r.RUnlock()
for req, payload := range r.requests {
f(req, payload)
}
}
3 changes: 2 additions & 1 deletion components/eventing-controller/testing/safesubscriptions.go
Expand Up @@ -16,7 +16,8 @@ type SafeSubscriptions struct {
// NewSafeSubscriptions returns a new SafeSubscriptions.
func NewSafeSubscriptions() *SafeSubscriptions {
return &SafeSubscriptions{
sync.RWMutex{}, make(map[string]*bebtypes.Subscription),
sync.RWMutex{},
make(map[string]*bebtypes.Subscription),
}
}

Expand Down

0 comments on commit 24252ad

Please sign in to comment.