Skip to content

Commit

Permalink
Remove MessageEntry and add functionality to PubSubRouter
Browse files Browse the repository at this point in the history
Fixed issue where messages would've been stored although no WRITE access
was allowed.
  • Loading branch information
bogh committed May 20, 2016
1 parent ce27737 commit ef1d00f
Show file tree
Hide file tree
Showing 17 changed files with 175 additions and 245 deletions.
11 changes: 11 additions & 0 deletions gcm/mocks_server_gen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package gcm

import (
gomock "github.com/golang/mock/gomock"
guble "github.com/smancke/guble/guble"
server "github.com/smancke/guble/server"
auth "github.com/smancke/guble/server/auth"
store "github.com/smancke/guble/store"
Expand Down Expand Up @@ -42,6 +43,16 @@ func (_mr *_MockPubSubSourceRecorder) AccessManager() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager")
}

func (_m *MockPubSubSource) HandleMessage(_param0 *guble.Message) error {
ret := _m.ctrl.Call(_m, "HandleMessage", _param0)
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockPubSubSourceRecorder) HandleMessage(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "HandleMessage", arg0)
}

func (_m *MockPubSubSource) KVStore() (store.KVStore, error) {
ret := _m.ctrl.Call(_m, "KVStore")
ret0, _ := ret[0].(store.KVStore)
Expand Down
21 changes: 6 additions & 15 deletions gubled/gubled.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,16 @@ var CreateMessageStore = func(args Args) store.MessageStore {

var CreateModules = func(
router server.PubSubSource,
messageEntry server.MessageSink,
args Args) []interface{} {
modules := make([]interface{}, 0, 2)

if wsHandler, err := server.NewWSHandler(router, messageEntry, "/stream/"); err != nil {
if wsHandler, err := server.NewWSHandler(router, "/stream/"); err != nil {
guble.Err("Error loading WSHandler module: %s", err)
} else {
modules = append(modules, wsHandler)
}

modules = append(modules, server.NewRestMessageApi(messageEntry, "/api/"))
modules = append(modules, server.NewRestMessageApi(router, "/api/"))

if args.GcmEnable {
if args.GcmApiKey == "" {
Expand Down Expand Up @@ -139,18 +138,10 @@ func StartupService(args Args) *server.Service {
kvStore := CreateKVStore(args)

router := server.NewPubSubRouter(accessManager, messageStore, kvStore)
messageEntry := server.NewMessageEntry(router, messageStore)

service := server.NewService(
args.Listen,
kvStore,
messageStore,
messageEntry,
router,
accessManager,
)

for _, module := range CreateModules(router, messageEntry, args) {

service := server.NewService(args.Listen, router)

for _, module := range CreateModules(router, args) {
service.Register(module)
}

Expand Down
8 changes: 3 additions & 5 deletions gubled/gubled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,13 @@ func TestGcmOnlyStartedIfEnabled(t *testing.T) {

amMock := NewMockAccessManager(mockCtrl)
msMock := NewMockMessageStore(mockCtrl)
msinkMock := NewMockMessageSink(mockCtrl)

routerMock.EXPECT().KVStore().Return(store.NewMemoryKVStore(), nil)
routerMock.EXPECT().AccessManager().Return(amMock, nil).MaxTimes(2)
routerMock.EXPECT().MessageStore().Return(msMock, nil).MaxTimes(2)

a.True(containsGcmModule(CreateModules(routerMock, msinkMock, Args{GcmEnable: true, GcmApiKey: "xyz"})))
a.False(containsGcmModule(CreateModules(routerMock, msinkMock, Args{GcmEnable: false})))
a.True(containsGcmModule(CreateModules(routerMock, Args{GcmEnable: true, GcmApiKey: "xyz"})))
a.False(containsGcmModule(CreateModules(routerMock, Args{GcmEnable: false})))
}

func containsGcmModule(modules []interface{}) bool {
Expand All @@ -169,7 +168,6 @@ func TestPanicOnMissingGcmApiKey(t *testing.T) {
routerMock := NewMockPubSubSource(mockCtrl)
amMock := NewMockAccessManager(mockCtrl)
msMock := NewMockMessageStore(mockCtrl)
msinkMock := NewMockMessageSink(mockCtrl)

routerMock.EXPECT().AccessManager().Return(amMock, nil)
routerMock.EXPECT().MessageStore().Return(msMock, nil)
Expand All @@ -181,7 +179,7 @@ func TestPanicOnMissingGcmApiKey(t *testing.T) {
}
}()

CreateModules(routerMock, msinkMock, Args{GcmEnable: true})
CreateModules(routerMock, Args{GcmEnable: true})
}

func TestCreateStoreBackendPanicInvalidBackend(t *testing.T) {
Expand Down
43 changes: 11 additions & 32 deletions gubled/mocks_server_gen_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Automatically generated by MockGen. DO NOT EDIT!
// Source: github.com/smancke/guble/server (interfaces: PubSubSource,MessageSink)
// Source: github.com/smancke/guble/server (interfaces: PubSubSource)

package gubled

Expand Down Expand Up @@ -43,6 +43,16 @@ func (_mr *_MockPubSubSourceRecorder) AccessManager() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager")
}

func (_m *MockPubSubSource) HandleMessage(_param0 *guble.Message) error {
ret := _m.ctrl.Call(_m, "HandleMessage", _param0)
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockPubSubSourceRecorder) HandleMessage(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "HandleMessage", arg0)
}

func (_m *MockPubSubSource) KVStore() (store.KVStore, error) {
ret := _m.ctrl.Call(_m, "KVStore")
ret0, _ := ret[0].(store.KVStore)
Expand Down Expand Up @@ -83,34 +93,3 @@ func (_m *MockPubSubSource) Unsubscribe(_param0 *server.Route) {
func (_mr *_MockPubSubSourceRecorder) Unsubscribe(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Unsubscribe", arg0)
}

// Mock of MessageSink interface
type MockMessageSink struct {
ctrl *gomock.Controller
recorder *_MockMessageSinkRecorder
}

// Recorder for MockMessageSink (not exported)
type _MockMessageSinkRecorder struct {
mock *MockMessageSink
}

func NewMockMessageSink(ctrl *gomock.Controller) *MockMessageSink {
mock := &MockMessageSink{ctrl: ctrl}
mock.recorder = &_MockMessageSinkRecorder{mock}
return mock
}

func (_m *MockMessageSink) EXPECT() *_MockMessageSinkRecorder {
return _m.recorder
}

func (_m *MockMessageSink) HandleMessage(_param0 *guble.Message) error {
ret := _m.ctrl.Call(_m, "HandleMessage", _param0)
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockMessageSinkRecorder) HandleMessage(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "HandleMessage", arg0)
}
4 changes: 2 additions & 2 deletions scripts/generate_mocks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ MOCKGEN=$GOPATH/bin/mockgen
$MOCKGEN -self_package server -package server \
-destination server/mocks_server_gen_test.go \
github.com/smancke/guble/server \
PubSubSource,MessageSink,WSConnection,Startable,Stopable,Endpoint
PubSubSource,WSConnection,Startable,Stopable,Endpoint
replace "server/mocks_server_gen_test.go" "server \"github.com\/smancke\/guble\/server\"" "server\."

$MOCKGEN -self_package server -package server \
Expand Down Expand Up @@ -61,7 +61,7 @@ $MOCKGEN -self_package gcm -package gcm \
$MOCKGEN -package gubled \
-destination gubled/mocks_server_gen_test.go \
github.com/smancke/guble/server \
PubSubSource,MessageSink
PubSubSource

$MOCKGEN -self_package gubled -package gubled \
-destination gubled/mocks_auth_gen_test.go \
Expand Down
4 changes: 0 additions & 4 deletions server/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ type PubSubSource interface {

Subscribe(r *Route) (*Route, error)
Unsubscribe(r *Route)
}

// MessageSink interface allows for sending/pushing messages
type MessageSink interface {
HandleMessage(message *guble.Message) error
}

Expand Down
40 changes: 0 additions & 40 deletions server/message_entry.go

This file was deleted.

42 changes: 0 additions & 42 deletions server/message_entry_test.go

This file was deleted.

43 changes: 11 additions & 32 deletions server/mocks_server_gen_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Automatically generated by MockGen. DO NOT EDIT!
// Source: github.com/smancke/guble/server (interfaces: PubSubSource,MessageSink,WSConnection,Startable,Stopable,Endpoint)
// Source: github.com/smancke/guble/server (interfaces: PubSubSource,WSConnection,Startable,Stopable,Endpoint)

package server

Expand Down Expand Up @@ -44,6 +44,16 @@ func (_mr *_MockPubSubSourceRecorder) AccessManager() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager")
}

func (_m *MockPubSubSource) HandleMessage(_param0 *guble.Message) error {
ret := _m.ctrl.Call(_m, "HandleMessage", _param0)
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockPubSubSourceRecorder) HandleMessage(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "HandleMessage", arg0)
}

func (_m *MockPubSubSource) KVStore() (store.KVStore, error) {
ret := _m.ctrl.Call(_m, "KVStore")
ret0, _ := ret[0].(store.KVStore)
Expand Down Expand Up @@ -85,37 +95,6 @@ func (_mr *_MockPubSubSourceRecorder) Unsubscribe(arg0 interface{}) *gomock.Call
return _mr.mock.ctrl.RecordCall(_mr.mock, "Unsubscribe", arg0)
}

// Mock of MessageSink interface
type MockMessageSink struct {
ctrl *gomock.Controller
recorder *_MockMessageSinkRecorder
}

// Recorder for MockMessageSink (not exported)
type _MockMessageSinkRecorder struct {
mock *MockMessageSink
}

func NewMockMessageSink(ctrl *gomock.Controller) *MockMessageSink {
mock := &MockMessageSink{ctrl: ctrl}
mock.recorder = &_MockMessageSinkRecorder{mock}
return mock
}

func (_m *MockMessageSink) EXPECT() *_MockMessageSinkRecorder {
return _m.recorder
}

func (_m *MockMessageSink) HandleMessage(_param0 *guble.Message) error {
ret := _m.ctrl.Call(_m, "HandleMessage", _param0)
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockMessageSinkRecorder) HandleMessage(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "HandleMessage", arg0)
}

// Mock of WSConnection interface
type MockWSConnection struct {
ctrl *gomock.Controller
Expand Down
8 changes: 4 additions & 4 deletions server/rest_message_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
const X_HEADER_PREFIX = "x-guble-"

type RestMessageApi struct {
MessageSink
PubSubSource
mux http.Handler
prefix string
}

func NewRestMessageApi(messageSink MessageSink, prefix string) *RestMessageApi {
func NewRestMessageApi(router PubSubSource, prefix string) *RestMessageApi {
mux := httprouter.New()
api := &RestMessageApi{messageSink, mux, prefix}
api := &RestMessageApi{router, mux, prefix}

p := removeTrailingSlash(prefix)
mux.POST(p+"/message/*topic", api.PostMessage)
Expand Down Expand Up @@ -54,7 +54,7 @@ func (api *RestMessageApi) PostMessage(w http.ResponseWriter, r *http.Request, p
HeaderJSON: headersToJson(r.Header),
}

api.MessageSink.HandleMessage(msg)
api.HandleMessage(msg)
}

// returns a query parameter
Expand Down
6 changes: 3 additions & 3 deletions server/rest_message_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func TestPostMessage(t *testing.T) {
a := assert.New(t)

// given: a rest api with a message sink
messageSink := NewMockMessageSink(ctrl)
api := NewRestMessageApi(messageSink, "/api")
routerMock := NewMockPubSubSource(ctrl)
api := NewRestMessageApi(routerMock, "/api")

url, _ := url.Parse("http://localhost/api/message/my/topic?userId=marvin&messageId=42")
// and a http context
Expand All @@ -38,7 +38,7 @@ func TestPostMessage(t *testing.T) {
}

// then i expect
messageSink.EXPECT().HandleMessage(gomock.Any()).Do(func(msg *guble.Message) {
routerMock.EXPECT().HandleMessage(gomock.Any()).Do(func(msg *guble.Message) {
a.Equal(testBytes, msg.Body)
a.Equal("{}", msg.HeaderJSON)
a.Equal("/my/topic", string(msg.Path))
Expand Down
Loading

0 comments on commit ef1d00f

Please sign in to comment.