Skip to content

Commit

Permalink
Merge branch 'master' into feature/pn-27-reliable-receiving-in-gcm
Browse files Browse the repository at this point in the history
  • Loading branch information
bogh committed Jun 23, 2016
2 parents fe0921c + c71c62e commit 849f6d2
Show file tree
Hide file tree
Showing 67 changed files with 1,706 additions and 535 deletions.
13 changes: 6 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package client

import (
"github.com/smancke/guble/protocol"

log "github.com/Sirupsen/logrus"
"github.com/gorilla/websocket"
"github.com/smancke/guble/protocol"

"net/http"
"sync"
"time"
)

var logger = log.WithFields(log.Fields{
"app": "guble-cli",
"module": "client",
"env": "TBD"})
})

type WSConnection interface {
WriteMessage(messageType int, data []byte) error
Expand Down Expand Up @@ -185,11 +186,9 @@ func (c *client) shouldStop() bool {
}

func (c *client) handleIncomingMessage(msg []byte) {
parsed, err := protocol.ParseMessage(msg)
parsed, err := protocol.Decode(msg)
if err != nil {

logger.WithField("err", err).Error("Error on parsing of incomming message")

logger.WithField("err", err).Error("Error on parsing of incoming message")
c.errors <- clientErrorMessage(err.Error())
return
}
Expand Down
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"
)

var aNormalMessage = `/foo/bar,42,user01,phone01,id123,1420110000
var aNormalMessage = `/foo/bar,42,user01,phone01,id123,1420110000,0
Hello World`

Expand Down
6 changes: 3 additions & 3 deletions gcm/benchmarks/benchmarking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ func (params *benchParams) setUp() {
}()
a.NoError(errTempDir)

*config.Listen = "localhost:0"
*config.KVBackend = "memory"
*config.MSBackend = "file"
*config.HttpListen = "localhost:0"
*config.KVS = "memory"
*config.MS = "file"
*config.StoragePath = dir
*config.GCM.Enabled = true
*config.GCM.APIKey = "WILL BE OVERWRITTEN"
Expand Down
11 changes: 11 additions & 0 deletions gcm/mocks_server_gen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
protocol "github.com/smancke/guble/protocol"
server "github.com/smancke/guble/server"
auth "github.com/smancke/guble/server/auth"
cluster "github.com/smancke/guble/server/cluster"
store "github.com/smancke/guble/store"
)

Expand Down Expand Up @@ -43,6 +44,16 @@ func (_mr *_MockRouterRecorder) AccessManager() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "AccessManager")
}

func (_m *MockRouter) Cluster() *cluster.Cluster {
ret := _m.ctrl.Call(_m, "Cluster")
ret0, _ := ret[0].(*cluster.Cluster)
return ret0
}

func (_mr *_MockRouterRecorder) Cluster() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Cluster")
}

func (_m *MockRouter) Fetch(_param0 store.FetchRequest) error {
ret := _m.ctrl.Call(_m, "Fetch", _param0)
ret0, _ := ret[0].(error)
Expand Down
21 changes: 8 additions & 13 deletions gcm/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ func (s *subscription) subscriptionLoop() {

// fetch messages from store starting with lastID
func (s *subscription) fetch() error {
if s.lastID == 0 {
s.logger.WithField("lastID", s.lastID).Debug("Nothing to fetch")
return nil
}
// if s.lastID == 0 {
// s.logger.WithField("lastID", s.lastID).Debug("Nothing to fetch")
// return nil
// }

s.gcm.wg.Add(1)
defer func() {
Expand All @@ -202,6 +202,7 @@ func (s *subscription) fetch() error {
if err := s.gcm.router.Fetch(req); err != nil {
return err
}

for {
select {
case results := <-req.StartC:
Expand All @@ -216,15 +217,9 @@ func (s *subscription) fetch() error {
return err
}

if message, ok := message.(*protocol.Message); ok {
s.logger.WithFields(log.Fields{
"ID": msgAndID.ID,
"parsedID": message.ID,
}).Debug("Fetched message")
s.pipe(message)
} else {
s.logger.WithField("messageID", msgAndID.ID).Warn("Message is not a *protocol.Message.")
}
s.logger.WithFields(log.Fields{"ID": msgAndID.ID, "parsedID": message.ID}).Debug("Fetched message")
// Pipe message into gcm connector
s.pipe(message)
case err := <-req.ErrorC:
return err
case <-s.gcm.stopC:
Expand Down
24 changes: 14 additions & 10 deletions gcm/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/assert"
)

var fetchMessage = `/foo/bar,42,user01,phone01,id123,1420110000
var fetchMessage = `/foo/bar,42,user01,phone01,id123,1420110000,1
{"Content-Type": "text/plain", "Correlation-Id": "7sdks723ksgqn"}
Hello World`

Expand Down Expand Up @@ -64,16 +64,20 @@ func TestSub_Fetch(t *testing.T) {
close(done)
}()

go func() {
select {
case <-done:
// all good
case <-time.After(30 * time.Millisecond):
// taking too long, fail the test
a.Fail("Fetching messages and piping them took too long.")
}
}()

// start subscription fetching
go sub.fetch()

select {
case <-done:
// all good
case <-time.After(30 * time.Millisecond):
// taking too long, fail the test
a.Fail("Fetching messages and piping them took too long.")
}
err := sub.fetch()
a.NoError(err)

}

func dummyGCMResponse() *gcm.Response {
Expand Down
14 changes: 6 additions & 8 deletions guble-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ var (
Exit = kingpin.Flag("exit", "Exit after sending the commands").Short('x').Bool()
Commands = kingpin.Arg("commands", "The commands to send after startup").Strings()
Verbose = kingpin.Flag("verbose", "Display verbose server communication").Short('v').Bool()
URL = kingpin.Flag("url", "The websocket url to connect (ws://localhost:8080/stream/)").
Default("ws://localhost:8080/stream/").
String()
User = kingpin.Flag("user", "The user name to connect with (guble-cli)").Default("guble-cli").String()
Log = kingpin.Flag("log", "Log level").
Default(log.ErrorLevel.String()).
Envar("GUBLE_LOG").
Enum(logLevels()...)
URL = kingpin.Flag("url", "The websocket url to connect to").Default("ws://localhost:8080/stream/").String()
User = kingpin.Flag("user", "The user name to connect with (guble-cli)").Default("guble-cli").String()
Log = kingpin.Flag("log", "Log level").
Default(log.ErrorLevel.String()).
Envar("GUBLE_LOG").
Enum(logLevels()...)

logger = log.WithField("app", "guble-cli")
)
Expand Down
6 changes: 3 additions & 3 deletions gubled/benchmarking_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func Benchmark_E2E_Fetch_HelloWorld_Messages(b *testing.B) {
dir, _ := ioutil.TempDir("", "guble_benchmarking_fetch_test")
defer os.RemoveAll(dir)

*config.Listen = "localhost:0"
*config.KVBackend = "memory"
*config.MSBackend = "file"
*config.HttpListen = "localhost:0"
*config.KVS = "memory"
*config.MS = "file"
*config.StoragePath = dir
service := StartService()
defer service.Stop()
Expand Down
6 changes: 3 additions & 3 deletions gubled/benchmarking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ func TestThroughput(t *testing.T) {
dir, _ := ioutil.TempDir("", "guble_benchmarking_test")
defer os.RemoveAll(dir)

*config.Listen = "localhost:0"
*config.KVBackend = "memory"
*config.MSBackend = "file"
*config.HttpListen = "localhost:0"
*config.KVS = "memory"
*config.MS = "file"
*config.StoragePath = dir

service := StartService()
Expand Down
88 changes: 42 additions & 46 deletions gubled/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,57 +3,43 @@ package config
// Config package contains the public config vars required in guble

import (
"runtime"
"strconv"

log "github.com/Sirupsen/logrus"

"gopkg.in/alecthomas/kingpin.v2"

"net"
"runtime"
"strconv"
)

const (
healthEndpointPrefix = "/_health"
defaultHttp = ":8080"
defaultHealthEndpoint = "/_health"
defaultStoragePath = "/var/lib/guble"
defaultNodePort = "10000"
defaultKVBackend = "file"
defaultMSBackend = "file"
)

var (
// Listen the address to bind the HTTP server to
Listen = kingpin.Flag("listen", "[Host:]Port the address to listen on (:8080)").
Default(":8080").
Short('l').
Envar("GUBLE_LISTEN").
String()
// StoragePath where to save the file system store
StoragePath = kingpin.Flag("storage-path", "The path for storing messages and key value data if 'file' is enabled (/var/lib/guble)").
Short('p').
Envar("GUBLE_STORAGE_PATH").
String()
// KVBackend sets the key-value storage format to use
KVBackend = kingpin.Flag("kv-backend", "The storage backend for the key value store to use: file|memory (file)").
Default("file").
Envar("GUBLE_KV_BACKEND").
String()
// MSBackend sets the message store format to use
MSBackend = kingpin.Flag("ms-backend", "The message storage backend : file|memory (file)").
Default("file").
Envar("GUBLE_MS_BACKEND").
String()
// Health sets the health endpoint to bind in the HTTP server
Health = kingpin.Flag("health", `The health endpoint (default: /_health; value for disabling it: "")`).
Default(healthEndpointPrefix).
Envar("GUBLE_HEALTH_ENDPOINT").
String()
HttpListen = kingpin.Flag("http", `The address to for the HTTP server to listen on (format: "[Host]:Port" ; default: ":8080")`).
Default(defaultHttp).Envar("GUBLE_HTTP").String()
StoragePath = kingpin.Flag("storage-path", "The path for storing messages and key-value data if 'file' is enabled (default: /var/lib/guble)").
Default(defaultStoragePath).Envar("GUBLE_STORAGE_PATH").String()
KVS = kingpin.Flag("kvs", "The storage backend for the key-value store to use: file|memory (file)").
Default(defaultKVBackend).Envar("GUBLE_KVS").String()
MS = kingpin.Flag("ms", "The message storage backend : file|memory (file)").
Default(defaultMSBackend).Envar("GUBLE_MS").String()
HealthEndpoint = kingpin.Flag("health-endpoint", `The health endpoint to be used by the HTTP server (default: /_health; value for disabling it: "")`).
Default(defaultHealthEndpoint).Envar("GUBLE_HEALTH_ENDPOINT").String()

Metrics = struct {
Enabled *bool
Endpoint *string
}{
// Enable metrics collection
Enabled: kingpin.Flag("metrics", "Enable metrics").Envar("GUBLE_METRICS_ENABLED").Bool(),

// Endpoint sets the metrics endpoint to bind in the HTTP server and return metrics data
Endpoint: kingpin.Flag("metrics-endpoint", "The metrics endpoint (disabled by default; a possible value for enabling it: /_metrics )").
Envar("GUBLE_METRICS_ENDPOINT").
String(),
Enabled: kingpin.Flag("metrics", "Enable collection of metrics").
Envar("GUBLE_METRICS").Bool(),
Endpoint: kingpin.Flag("metrics-endpoint", "The metrics endpoint to be used by the HTTP server (disabled by default; a possible value for enabling it: /_metrics )").
Envar("GUBLE_METRICS_ENDPOINT").String(),
}

// GCM settings related to activating the GCM connector module
Expand All @@ -62,16 +48,26 @@ var (
APIKey *string
Workers *int
}{
Enabled: kingpin.Flag("gcm-enabled", "Enable the Google Cloud Messaging Connector").
Envar("GUBLE_GCM_ENABLED").
Bool(),
Enabled: kingpin.Flag("gcm", "Enable the Google Cloud Messaging Connector").
Envar("GUBLE_GCM").Bool(),
APIKey: kingpin.Flag("gcm-api-key", "The Google API Key for Google Cloud Messaging").
Envar("GUBLE_GCM_API_KEY").
String(),
Envar("GUBLE_GCM_API_KEY").String(),
Workers: kingpin.Flag("gcm-workers", "The number of workers handling traffic with Google Cloud Messaging (default: GOMAXPROCS)").
Default(strconv.Itoa(runtime.GOMAXPROCS(0))).
Envar("GUBLE_GCM_WORKERS").
Int(),
Default(strconv.Itoa(runtime.GOMAXPROCS(0))).Envar("GUBLE_GCM_WORKERS").Int(),
}

// Cluster settings related to activating the cluster module
Cluster = struct {
NodeID *int
NodePort *int
Remotes *[]*net.TCPAddr
}{
NodeID: kingpin.Flag("node-id", "This guble node's own ID (used in cluster mode): a strictly positive integer number which must be unique in cluster").
Envar("GUBLE_NODE_ID").Int(),
NodePort: kingpin.Flag("node-port", "This guble node's own local port (used in cluster mode): a strictly positive integer number").
Default(defaultNodePort).Envar("GUBLE_NODE_PORT").Int(),
Remotes: kingpin.Arg("tcplist", `The list of TCP addresses of some other guble nodes (used in cluster mode; format: "IP:port")`).
TCPList(),
}

// Log level
Expand Down
Loading

0 comments on commit 849f6d2

Please sign in to comment.