Skip to content

Commit

Permalink
Fix creating sync subscription for the same method and path (#476)
Browse files Browse the repository at this point in the history
  • Loading branch information
mthenw committed Jul 2, 2018
1 parent 24c3c35 commit 60b4238
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 228 deletions.
52 changes: 27 additions & 25 deletions internal/cache/cors_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,52 +9,54 @@ import (
"go.uber.org/zap"
)

func TestCORSCacheModifiedEvents(t *testing.T) {
ccache := newCORSCache(zap.NewNop())
func TestCORSCacheModified(t *testing.T) {
t.Run("added", func(t *testing.T) {
ccache := newCORSCache(zap.NewNop())

ccache.Modified("GET%2Ftest", []byte(`{
"corsId":"GET%2Ftest",
"space": "default",
"method": "GET",
"path": "/test",
"allowedOrigins": ["*"]}`))
ccache.Modified("GET%2Ftest", []byte(`{
"corsId":"GET%2Ftest",
"space": "default",
"method": "GET",
"path": "/test",
"allowedOrigins": ["*"]}`))

value, _ := ccache.endpoints["GET"].Resolve("/test")
config := value.(cors.CORS)
assert.Equal(t, cors.ID("GET%2Ftest"), config.ID)
assert.Equal(t, "default", config.Space)
}
value, _ := ccache.endpoints["GET"].Resolve("/test")
config := value.(cors.CORS)
assert.Equal(t, cors.ID("GET%2Ftest"), config.ID)
assert.Equal(t, "default", config.Space)
})

func TestCORSCacheModifiedEventsWrongPayload(t *testing.T) {
ccache := newCORSCache(zap.NewNop())
t.Run("wrong payload", func(t *testing.T) {
ccache := newCORSCache(zap.NewNop())

ccache.Modified("GET%2Ftest", []byte(`not json`))
ccache.Modified("GET%2Ftest", []byte(`not json`))

assert.Nil(t, ccache.endpoints["GET"])
}
assert.Nil(t, ccache.endpoints["GET"])
})

func TestCORSCacheModifiedHTTPSubscriptionDeleted(t *testing.T) {
ccache := newCORSCache(zap.NewNop())
t.Run("deleted", func(t *testing.T) {
ccache := newCORSCache(zap.NewNop())

ccache.Modified("GET%2Ftest", []byte(`{
ccache.Modified("GET%2Ftest", []byte(`{
"corsId":"GET%2Ftest",
"space": "default",
"method": "GET",
"path": "/test",
"allowedOrigins": ["*"]}`))
ccache.Modified("GET%2Ftest1", []byte(`{
ccache.Modified("GET%2Ftest1", []byte(`{
"corsId":"GET%2Ftest1",
"space": "default",
"method": "GET",
"path": "/test1",
"allowedOrigins": ["*"]}`))
ccache.Deleted("GET%2Ftest1", []byte(`{
ccache.Deleted("GET%2Ftest1", []byte(`{
"corsId":"GET%2Ftest1",
"space": "default",
"method": "GET",
"path": "/test1",
"allowedOrigins": ["*"]}`))

value, _ := ccache.endpoints["GET"].Resolve("/test1")
assert.Nil(t, value)
value, _ := ccache.endpoints["GET"].Resolve("/test1")
assert.Nil(t, value)
})
}
78 changes: 40 additions & 38 deletions internal/cache/eventtype_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,44 @@ import (
)

func TestEventTypeCacheModified(t *testing.T) {
typesCache := newEventTypeCache(zap.NewNop())

typesCache.Modified("default/user.created", []byte(`{"name":"user.created","space":"default"}`))
typesCache.Modified("default/user.deleted", []byte(`{"name":"user.deleted","space":"default"}`))

name1 := eventpkg.TypeName("user.created")
name2 := eventpkg.TypeName("user.deleted")
assert.Equal(t,
&eventpkg.Type{Name: name1, Space: "default"},
typesCache.cache[libkv.EventTypeKey{Space: "default", Name: name1}])
assert.Equal(t,
&eventpkg.Type{Name: name2, Space: "default"},
typesCache.cache[libkv.EventTypeKey{Space: "default", Name: name2}])
}

func TestEventTypeCacheModified_WrongPayload(t *testing.T) {
typesCache := newEventTypeCache(zap.NewNop())

typesCache.Modified("default/user.created", []byte(`not json`))

assert.Equal(t, map[libkv.EventTypeKey]*eventpkg.Type{}, typesCache.cache)
}

func TestEventTypeCacheModifiedDeleted(t *testing.T) {
typesCache := newEventTypeCache(zap.NewNop())

typesCache.Modified("default/user.created", []byte(`{"name":"user.created","space":"default"}`))
typesCache.Modified("default/user.deleted", []byte(`{"name":"user.deleted","space":"default"}`))
typesCache.Deleted("default/user.deleted", []byte(`{"name":"user.deleted","space":"default"}`))

name := eventpkg.TypeName("user.created")
expected := map[libkv.EventTypeKey]*eventpkg.Type{
libkv.EventTypeKey{Space: "default", Name: name}: &eventpkg.Type{
Name: name,
Space: "default",
},
}
assert.Equal(t, expected, typesCache.cache)
t.Run("added", func(t *testing.T) {
typesCache := newEventTypeCache(zap.NewNop())

typesCache.Modified("default/user.created", []byte(`{"name":"user.created","space":"default"}`))
typesCache.Modified("default/user.deleted", []byte(`{"name":"user.deleted","space":"default"}`))

name1 := eventpkg.TypeName("user.created")
name2 := eventpkg.TypeName("user.deleted")
assert.Equal(t,
&eventpkg.Type{Name: name1, Space: "default"},
typesCache.cache[libkv.EventTypeKey{Space: "default", Name: name1}])
assert.Equal(t,
&eventpkg.Type{Name: name2, Space: "default"},
typesCache.cache[libkv.EventTypeKey{Space: "default", Name: name2}])
})

t.Run("wrong payload", func(t *testing.T) {
typesCache := newEventTypeCache(zap.NewNop())

typesCache.Modified("default/user.created", []byte(`not json`))

assert.Equal(t, map[libkv.EventTypeKey]*eventpkg.Type{}, typesCache.cache)
})

t.Run("deleted", func(t *testing.T) {
typesCache := newEventTypeCache(zap.NewNop())

typesCache.Modified("default/user.created", []byte(`{"name":"user.created","space":"default"}`))
typesCache.Modified("default/user.deleted", []byte(`{"name":"user.deleted","space":"default"}`))
typesCache.Deleted("default/user.deleted", []byte(`{"name":"user.deleted","space":"default"}`))

name := eventpkg.TypeName("user.created")
expected := map[libkv.EventTypeKey]*eventpkg.Type{
libkv.EventTypeKey{Space: "default", Name: name}: &eventpkg.Type{
Name: name,
Space: "default",
},
}
assert.Equal(t, expected, typesCache.cache)
})
}
92 changes: 47 additions & 45 deletions internal/cache/function_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,59 @@ import (
)

func TestFunctionCacheModified(t *testing.T) {
fcache := newFunctionCache(zap.NewNop())
t.Run("added", func(t *testing.T) {
fcache := newFunctionCache(zap.NewNop())

fcache.Modified(
"default/testfunc1",
[]byte(`{"functionId":"testfunc1","space":"default","type":"http","provider":{"url":"http://e.io"}}`),
)
fcache.Modified(
"default/testfunc2",
[]byte(`{"functionId":"testfunc2","space":"default","type":"http","provider":{"url":"http://e.io"}}`),
)
fcache.Modified(
"default/testfunc1",
[]byte(`{"functionId":"testfunc1","space":"default","type":"http","provider":{"url":"http://e.io"}}`),
)
fcache.Modified(
"default/testfunc2",
[]byte(`{"functionId":"testfunc2","space":"default","type":"http","provider":{"url":"http://e.io"}}`),
)

id1 := function.ID("testfunc1")
id2 := function.ID("testfunc2")
assert.Equal(
t,
&function.Function{ID: id1, Space: "default", ProviderType: http.Type, Provider: &http.HTTP{URL: "http://e.io"}},
fcache.cache[libkv.FunctionKey{Space: "default", ID: id1}],
)
assert.Equal(
t,
&function.Function{ID: id2, Space: "default", ProviderType: http.Type, Provider: &http.HTTP{URL: "http://e.io"}},
fcache.cache[libkv.FunctionKey{Space: "default", ID: id2}],
)
}
id1 := function.ID("testfunc1")
id2 := function.ID("testfunc2")
assert.Equal(
t,
&function.Function{ID: id1, Space: "default", ProviderType: http.Type, Provider: &http.HTTP{URL: "http://e.io"}},
fcache.cache[libkv.FunctionKey{Space: "default", ID: id1}],
)
assert.Equal(
t,
&function.Function{ID: id2, Space: "default", ProviderType: http.Type, Provider: &http.HTTP{URL: "http://e.io"}},
fcache.cache[libkv.FunctionKey{Space: "default", ID: id2}],
)
})

func TestFunctionCacheModified_WrongPayload(t *testing.T) {
fcache := newFunctionCache(zap.NewNop())
t.Run("wrong payload", func(t *testing.T) {
fcache := newFunctionCache(zap.NewNop())

fcache.Modified("default/testfunc1", []byte(`not json`))
fcache.Modified("default/testfunc1", []byte(`not json`))

assert.Equal(t, map[libkv.FunctionKey]*function.Function{}, fcache.cache)
}
assert.Equal(t, map[libkv.FunctionKey]*function.Function{}, fcache.cache)
})

func TestFunctionCacheModifiedDeleted(t *testing.T) {
fcache := newFunctionCache(zap.NewNop())
t.Run("deleted", func(t *testing.T) {
fcache := newFunctionCache(zap.NewNop())

fcache.Modified(
"default/testfunc1",
[]byte(`{"functionId":"testfunc1","space":"default","type":"http","provider":{"url":"http://e.io"}}`),
)
fcache.Modified("default/testfunc2", []byte(`{"functionId":"testfunc2"}`))
fcache.Deleted("default/testfunc2", []byte(`{"functionId":"testfunc2"}`))
fcache.Modified(
"default/testfunc1",
[]byte(`{"functionId":"testfunc1","space":"default","type":"http","provider":{"url":"http://e.io"}}`),
)
fcache.Modified("default/testfunc2", []byte(`{"functionId":"testfunc2"}`))
fcache.Deleted("default/testfunc2", []byte(`{"functionId":"testfunc2"}`))

fid := function.ID("testfunc1")
expected := map[libkv.FunctionKey]*function.Function{
libkv.FunctionKey{Space: "default", ID: fid}: &function.Function{
ID: fid,
Space: "default",
ProviderType: http.Type,
Provider: &http.HTTP{URL: "http://e.io"},
},
}
assert.Equal(t, expected, fcache.cache)
fid := function.ID("testfunc1")
expected := map[libkv.FunctionKey]*function.Function{
libkv.FunctionKey{Space: "default", ID: fid}: &function.Function{
ID: fid,
Space: "default",
ProviderType: http.Type,
Provider: &http.HTTP{URL: "http://e.io"},
},
}
assert.Equal(t, expected, fcache.cache)
})
}
54 changes: 31 additions & 23 deletions internal/cache/subscription_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ import (

type subscriptionCache struct {
sync.RWMutex
// eventToFunctions maps method path and event type to function key (space + function ID)
eventToFunctions map[string]map[string]map[eventpkg.TypeName][]libkv.FunctionKey
// endpoints maps HTTP method to internal/pathtree. Tree struct which is used for resolving HTTP requests paths.
endpoints map[string]*pathtree.Node
log *zap.Logger
// async maps method, path and event type to function key (space + function ID) (async subscriptions)
async map[string]map[string]map[eventpkg.TypeName][]libkv.FunctionKey
// sync maps method and event type to internal/pathtree (sync subscriptions)
sync map[string]map[eventpkg.TypeName]*pathtree.Node
log *zap.Logger
}

func newSubscriptionCache(log *zap.Logger) *subscriptionCache {
return &subscriptionCache{
eventToFunctions: map[string]map[string]map[eventpkg.TypeName][]libkv.FunctionKey{},
endpoints: map[string]*pathtree.Node{},
log: log,
async: map[string]map[string]map[eventpkg.TypeName][]libkv.FunctionKey{},
sync: map[string]map[eventpkg.TypeName]*pathtree.Node{},
log: log,
}
}

Expand All @@ -44,24 +44,25 @@ func (c *subscriptionCache) Modified(k string, v []byte) {
key := libkv.FunctionKey{Space: s.Space, ID: s.FunctionID}

if s.Type == subscription.TypeSync {
root := c.endpoints[s.Method]
c.ensureSyncMethod(s.Method)
root := c.sync[s.Method][s.EventType]
if root == nil {
root = pathtree.NewNode()
c.endpoints[s.Method] = root
c.sync[s.Method][s.EventType] = root
}
err := root.AddRoute(s.Path, libkv.FunctionKey{Space: s.Space, ID: s.FunctionID})
if err != nil {
c.log.Error("Could not add path to the tree.", zap.Error(err), zap.String("path", s.Path), zap.String("method", s.Method))
c.log.Error("Could not add path to the tree.", zap.Error(err), zap.String("path", s.Path), zap.String("method", s.Method), zap.String("eventType", string(s.EventType)))
}
} else {
c.createMethodPath(s.Method, s.Path)
ids, exists := c.eventToFunctions[s.Method][s.Path][s.EventType]
c.ensureAsyncMethodPath(s.Method, s.Path)
ids, exists := c.async[s.Method][s.Path][s.EventType]
if exists {
ids = append(ids, key)
} else {
ids = []libkv.FunctionKey{key}
}
c.eventToFunctions[s.Method][s.Path][s.EventType] = ids
c.async[s.Method][s.Path][s.EventType] = ids
}
}

Expand All @@ -83,20 +84,27 @@ func (c *subscriptionCache) Deleted(k string, v []byte) {
}
}

func (c *subscriptionCache) createMethodPath(method, path string) {
_, exists := c.eventToFunctions[method]
func (c *subscriptionCache) ensureAsyncMethodPath(method, path string) {
_, exists := c.async[method]
if !exists {
c.eventToFunctions[method] = map[string]map[eventpkg.TypeName][]libkv.FunctionKey{}
c.async[method] = map[string]map[eventpkg.TypeName][]libkv.FunctionKey{}
}

_, exists = c.eventToFunctions[method][path]
_, exists = c.async[method][path]
if !exists {
c.eventToFunctions[method][path] = map[eventpkg.TypeName][]libkv.FunctionKey{}
c.async[method][path] = map[eventpkg.TypeName][]libkv.FunctionKey{}
}
}

func (c *subscriptionCache) ensureSyncMethod(method string) {
_, exists := c.sync[method]
if !exists {
c.sync[method] = map[eventpkg.TypeName]*pathtree.Node{}
}
}

func (c *subscriptionCache) deleteEndpoint(sub subscription.Subscription) {
root := c.endpoints[sub.Method]
root := c.sync[sub.Method][sub.EventType]
if root == nil {
return
}
Expand All @@ -107,7 +115,7 @@ func (c *subscriptionCache) deleteEndpoint(sub subscription.Subscription) {
}

func (c *subscriptionCache) deleteSubscription(sub subscription.Subscription) {
ids, exists := c.eventToFunctions[sub.Method][sub.Path][sub.EventType]
ids, exists := c.async[sub.Method][sub.Path][sub.EventType]
if exists {
for i, id := range ids {
key := libkv.FunctionKey{Space: sub.Space, ID: sub.FunctionID}
Expand All @@ -116,10 +124,10 @@ func (c *subscriptionCache) deleteSubscription(sub subscription.Subscription) {
break
}
}
c.eventToFunctions[sub.Method][sub.Path][sub.EventType] = ids
c.async[sub.Method][sub.Path][sub.EventType] = ids

if len(ids) == 0 {
delete(c.eventToFunctions[sub.Method][sub.Path], sub.EventType)
delete(c.async[sub.Method][sub.Path], sub.EventType)
}
}
}
Loading

0 comments on commit 60b4238

Please sign in to comment.