Skip to content

Commit

Permalink
update protos, complete tests for eventforwarders, refactor app and w…
Browse files Browse the repository at this point in the history
…orker, events standardization 🤘🏼
  • Loading branch information
felipejfc committed Jul 10, 2017
1 parent df8238c commit 60695b0
Show file tree
Hide file tree
Showing 15 changed files with 315 additions and 178 deletions.
29 changes: 17 additions & 12 deletions api/api_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,27 @@ import (
pgmocks "github.com/topfreegames/extensions/pg/mocks"
redismocks "github.com/topfreegames/extensions/redis/mocks"
"github.com/topfreegames/maestro/api"
eventforwardermock "github.com/topfreegames/maestro/eventforwarder/mock"
"github.com/topfreegames/maestro/login/mocks"
"github.com/topfreegames/maestro/models"
mtesting "github.com/topfreegames/maestro/testing"
)

var (
app *api.App
clientset *fake.Clientset
config *viper.Viper
hook *test.Hook
logger *logrus.Logger
mockCtrl *gomock.Controller
mockDb *pgmocks.MockDB
mockPipeline *redismocks.MockPipeliner
mockRedisClient *redismocks.MockRedisClient
mockLogin *mocks.MockLogin
mockClock *clockmocks.MockClock
allStatus = []string{
app *api.App
clientset *fake.Clientset
config *viper.Viper
hook *test.Hook
logger *logrus.Logger
mockCtrl *gomock.Controller
mockDb *pgmocks.MockDB
mockPipeline *redismocks.MockPipeliner
mockRedisClient *redismocks.MockRedisClient
mockEventForwarder1 *eventforwardermock.MockEventForwarder
mockEventForwarder2 *eventforwardermock.MockEventForwarder
mockLogin *mocks.MockLogin
mockClock *clockmocks.MockClock
allStatus = []string{
models.StatusCreating,
models.StatusReady,
models.StatusOccupied,
Expand All @@ -64,6 +67,8 @@ var _ = BeforeEach(func() {
mockCtrl = gomock.NewController(GinkgoT())
mockDb = pgmocks.NewMockDB(mockCtrl)
mockRedisClient = redismocks.NewMockRedisClient(mockCtrl)
mockEventForwarder1 = eventforwardermock.NewMockEventForwarder(mockCtrl)
mockEventForwarder2 = eventforwardermock.NewMockEventForwarder(mockCtrl)
mockPipeline = redismocks.NewMockPipeliner(mockCtrl)

config, err = mtesting.GetDefaultConfig()
Expand Down
47 changes: 2 additions & 45 deletions api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"io"
"net"
"net/http"
"runtime"
"time"

"github.com/Sirupsen/logrus"
"github.com/gorilla/mux"
Expand All @@ -24,7 +22,6 @@ import (
"github.com/topfreegames/maestro/errors"
"github.com/topfreegames/maestro/eventforwarder"
"github.com/topfreegames/maestro/extensions"
ex "github.com/topfreegames/maestro/extensions"
"github.com/topfreegames/maestro/login"
logininterfaces "github.com/topfreegames/maestro/login/interfaces"
"github.com/topfreegames/maestro/metadata"
Expand Down Expand Up @@ -198,12 +195,7 @@ func (a *App) configureApp(dbOrNil pginterfaces.DB, redisClientOrNil redisinterf
a.loadConfigurationDefaults()
a.configureLogger()

if runtime.GOOS == "linux" {
a.configureForwarders()
} else {
a.Logger.Warn("not loading any forwarder plugin because not running on linux")
}

a.configureForwarders()
err := a.configureDatabase(dbOrNil)
if err != nil {
return err
Expand Down Expand Up @@ -240,42 +232,7 @@ func (a *App) loadConfigurationDefaults() {
}

func (a *App) configureForwarders() {
forwardersConfig := a.Config.GetStringMap("forwarders")
for k, v := range forwardersConfig {
a.Logger.Infof("loading plugin: %s", k)
p, err := ex.LoadPlugin(k, "./bin")
if err != nil {
a.Logger.Errorf("error loading plugin %s: %s", k, err.Error())
continue
}
forwarderConfigMap, ok := v.(map[string]interface{})
if ok {
for kk := range forwarderConfigMap {
a.Logger.Infof("loading forwarder %s.%s", k, kk)
cfg := a.Config.Sub(fmt.Sprintf("forwarders.%s.%s", k, kk))
forwarder, err := ex.LoadForwarder(p, cfg)
if err != nil {
a.Logger.Error(err)
continue
}
a.Forwarders = append(a.Forwarders, forwarder)
}
}
}
for {
eventforwarder.ForwardEventToForwarders(a.Forwarders, "RoomReady", map[string]interface{}{
"game": "somegame",
"roomId": "rid",
"roomType": "sometype",
"host": "somehost",
"port": 100,
"metadata": map[string]string{
"aeah": "pieah",
},
})
time.Sleep(1000 * time.Millisecond)

}
a.Forwarders = eventforwarder.LoadEventForwardersFromConfig(a.Config, a.Logger)
}

func (a *App) configureKubernetesClient(kubernetesClientOrNil kubernetes.Interface) error {
Expand Down
125 changes: 123 additions & 2 deletions api/room_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ import (
"net/http/httptest"
"time"

"k8s.io/client-go/kubernetes"

"github.com/go-redis/redis"
"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/topfreegames/maestro/api"
"github.com/topfreegames/maestro/eventforwarder"
"github.com/topfreegames/maestro/models"
. "github.com/topfreegames/maestro/testing"
)
Expand All @@ -28,8 +32,7 @@ var _ = Describe("Room Handler", func() {
var request *http.Request
var recorder *httptest.ResponseRecorder

BeforeEach(func() {
// Record HTTP responses.
BeforeEach(func() { // Record HTTP responses.
recorder = httptest.NewRecorder()
})

Expand Down Expand Up @@ -185,6 +188,7 @@ var _ = Describe("Room Handler", func() {
pKey := "scheduler:schedulerName:ping"
lKey := "scheduler:schedulerName:last:status:occupied"
roomName := "roomName"
namespace := "schedulerName"
status := "ready"
newSKey := fmt.Sprintf("scheduler:schedulerName:status:%s", status)
allStatusKeys := []string{
Expand Down Expand Up @@ -290,6 +294,123 @@ var _ = Describe("Room Handler", func() {
Expect(obj["description"]).To(ContainSubstring("does not validate as matches"))
Expect(obj["success"]).To(Equal(false))
})

Context("with eventforwarders", func() {
// TODO map status from api to something standard
createNamespace := func(name string, clientset kubernetes.Interface) error {
return models.NewNamespace(name).Create(clientset)
}
createPod := func(name, namespace string, clientset kubernetes.Interface) error {
_, err := models.NewPod(
"game",
"img",
name,
namespace,
nil,
nil,
0,
[]*models.Port{
&models.Port{
ContainerPort: 1234,
Name: "port1",
Protocol: "UDP",
}},
nil,
nil,
).Create(clientset)
return err
}

var app *api.App
game := "somegame"
BeforeEach(func() {
createNamespace(namespace, clientset)
err := createPod(roomName, namespace, clientset)
Expect(err).NotTo(HaveOccurred())
app, err = api.NewApp("0.0.0.0", 9998, config, logger, false, "", mockDb, mockRedisClient, clientset)
Expect(err).NotTo(HaveOccurred())
app.Forwarders = []eventforwarder.EventForwarder{mockEventForwarder1, mockEventForwarder2}
})
It("should forward event to eventforwarders", func() {
reader := JSONFor(JSON{
"status": status,
"timestamp": time.Now().Unix(),
})
request, _ = http.NewRequest("PUT", url, reader)

mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)
mockPipeline.EXPECT().HMSet(rKey, map[string]interface{}{
"lastPing": time.Now().Unix(),
"status": status,
})
mockPipeline.EXPECT().ZAdd(pKey, gomock.Any())
mockPipeline.EXPECT().ZRem(lKey, roomName)
mockPipeline.EXPECT().SAdd(newSKey, rKey)
for _, key := range allStatusKeys {
mockPipeline.EXPECT().SRem(key, rKey)
}
mockPipeline.EXPECT().Exec()
mockDb.EXPECT().Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", namespace).
Do(func(scheduler *models.Scheduler, query string, modifier string) {
scheduler.YAML = ""
scheduler.Game = game
})
mockEventForwarder1.EXPECT().Forward(status, gomock.Any())
mockEventForwarder2.EXPECT().Forward(status, gomock.Any())

app.Router.ServeHTTP(recorder, request)
Expect(recorder.Code).To(Equal(200))
Expect(recorder.Body.String()).To(Equal(`{"success": true}`))
})
It("should forward event to eventforwarders with metadata", func() {
reader := JSONFor(JSON{
"status": status,
"timestamp": time.Now().Unix(),
"metadata": map[string]string{
"type": "sometype",
},
})
request, _ = http.NewRequest("PUT", url, reader)

mockRedisClient.EXPECT().TxPipeline().Return(mockPipeline)
mockPipeline.EXPECT().HMSet(rKey, map[string]interface{}{
"lastPing": time.Now().Unix(),
"status": status,
})
mockPipeline.EXPECT().ZAdd(pKey, gomock.Any())
mockPipeline.EXPECT().ZRem(lKey, roomName)
mockPipeline.EXPECT().SAdd(newSKey, rKey)
for _, key := range allStatusKeys {
mockPipeline.EXPECT().SRem(key, rKey)
}
mockPipeline.EXPECT().Exec()
mockDb.EXPECT().Query(gomock.Any(), "SELECT * FROM schedulers WHERE name = ?", namespace).
Do(func(scheduler *models.Scheduler, query string, modifier string) {
scheduler.YAML = ""
scheduler.Game = game
})
mockEventForwarder1.EXPECT().Forward(status, gomock.Any()).Do(
func(status string, infos map[string]interface{}) {
Expect(infos["game"]).To(Equal(game))
Expect(infos["roomId"]).To(Equal(roomName))
Expect(infos["metadata"]).To(BeEquivalentTo(map[string]string{
"type": "sometype",
}))
})
mockEventForwarder2.EXPECT().Forward(status, gomock.Any()).Do(
func(status string, infos map[string]interface{}) {
Expect(infos["game"]).To(Equal(game))
Expect(infos["roomId"]).To(Equal(roomName))
Expect(infos["metadata"]).To(BeEquivalentTo(map[string]string{
"type": "sometype",
}))
})

app.Router.ServeHTTP(recorder, request)
Expect(recorder.Code).To(Equal(200))
Expect(recorder.Body.String()).To(Equal(`{"success": true}`))
})
})
})

Context("when redis is down", func() {
Expand Down
57 changes: 57 additions & 0 deletions eventforwarder/eventforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,64 @@

package eventforwarder

import (
"fmt"
"plugin"
"runtime"

"github.com/Sirupsen/logrus"
"github.com/spf13/viper"
ex "github.com/topfreegames/maestro/extensions"
)

// EventForwarder interface
type EventForwarder interface {
Forward(event string, infos map[string]interface{}) (int32, error)
}

// LoadEventForwardersFromConfig returns a slice of configured eventforwarders
func LoadEventForwardersFromConfig(config *viper.Viper, logger logrus.FieldLogger) []EventForwarder {
forwarders := []EventForwarder{}
if runtime.GOOS == "linux" {
forwardersConfig := config.GetStringMap("forwarders")
if len(forwardersConfig) > 0 {
for k, v := range forwardersConfig {
logger.Infof("loading plugin: %s", k)
p, err := ex.LoadPlugin(k, "./bin")
if err != nil {
logger.Errorf("error loading plugin %s: %s", k, err.Error())
continue
}
forwarderConfigMap, ok := v.(map[string]interface{})
if ok {
for kk := range forwarderConfigMap {
logger.Infof("loading forwarder %s.%s", k, kk)
cfg := config.Sub(fmt.Sprintf("forwarders.%s.%s", k, kk))
forwarder, err := LoadForwarder(p, cfg)
if err != nil {
logger.Error(err)
continue
}
forwarders = append(forwarders, forwarder)
}
}
}
}
} else {
logger.Warn("not loading any forwarder plugin because not running on linux")
}
return forwarders
}

// LoadForwarder loads a forwarder from a plugin
func LoadForwarder(p *plugin.Plugin, config *viper.Viper) (EventForwarder, error) {
f, err := p.Lookup("NewForwarder")
if err != nil {
return nil, err
}
ff, ok := f.(func(*viper.Viper) (EventForwarder, error))
if !ok {
return nil, err
}
return ff(config)
}
16 changes: 0 additions & 16 deletions extensions/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,9 @@ package extensions
import (
"fmt"
"plugin"

"github.com/spf13/viper"
"github.com/topfreegames/maestro/eventforwarder"
)

// LoadPlugin loads a plugin
func LoadPlugin(pluginName string, path string) (*plugin.Plugin, error) {
return plugin.Open(fmt.Sprintf("%s/%s.so", path, pluginName))
}

// LoadForwarder loads a forwarder from a plugin
func LoadForwarder(p *plugin.Plugin, config *viper.Viper) (eventforwarder.EventForwarder, error) {
f, err := p.Lookup("NewForwarder")
if err != nil {
return nil, err
}
ff, ok := f.(func(*viper.Viper) (eventforwarder.EventForwarder, error))
if !ok {
return nil, err
}
return ff(config)
}
11 changes: 11 additions & 0 deletions models/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ import (
"k8s.io/client-go/pkg/api/v1"
)

const (
// RoomReady string representation
RoomReady = "ready"
// RoomOccupied string representation
RoomOccupied = "occupied"
// RoomTerminating string representation
RoomTerminating = "terminating"
// RoomTerminated string representation
RoomTerminated = "terminated"
)

// Room is the struct that defines a room in maestro
type Room struct {
ID string
Expand Down
Binary file removed plugins/grpc/example_server/server
Binary file not shown.
2 changes: 1 addition & 1 deletion plugins/grpc/example_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
type server struct{}

func (*server) SendRoomStatus(ctx context.Context, roomStatus *pb.RoomStatus) (*pb.Response, error) {
fmt.Println("Received msg", roomStatus.GetRoom(), roomStatus.GetStatusType(), roomStatus.GetMetadata())
fmt.Println("Received msg", roomStatus.GetRoom(), roomStatus.GetStatusType())
return &pb.Response{
Message: "Hi!",
Code: 200,
Expand Down
Loading

0 comments on commit 60695b0

Please sign in to comment.