Skip to content

Commit

Permalink
Change vars to consts
Browse files Browse the repository at this point in the history
  • Loading branch information
afshin committed Jun 11, 2016
1 parent cfad114 commit 36ce6a1
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 73 deletions.
44 changes: 24 additions & 20 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package sleuth

import (
"net/http"
"strconv"
"sync"
"time"

Expand All @@ -15,7 +16,7 @@ import (

type listener struct {
*sync.Mutex
handles map[int64]chan *http.Response
handles map[string]chan *http.Response
}

type notifier struct {
Expand All @@ -42,8 +43,8 @@ type Client struct {
services map[string]*workers // map[service-type]service-workers
}

func (c *Client) add(gid, name, node, service, version string) error {
if gid != group {
func (c *Client) add(group, name, node, service, version string) error {
if group != c.group {
c.log.Debug("sleuth: no group header for %s, client-only", name)
return nil
}
Expand All @@ -65,7 +66,7 @@ func (c *Client) add(gid, name, node, service, version string) error {
if c.additions.notify != nil {
c.additions.notify <- struct{}{}
}
c.log.Info("sleuth: add %s/%s %s to %s", service, version, name, group)
c.log.Info("sleuth: add %s/%s %s to %s", service, version, name, c.group)
return nil
}

Expand Down Expand Up @@ -93,8 +94,8 @@ func (c *Client) block(services ...string) bool {

// Close leaves the sleuth network and stops the Gyre node.
func (c *Client) Close() error {
c.log.Info("%s leaving %s...", c.node.Name(), group)
if err := c.node.Leave(group); err != nil {
c.log.Info("%s leaving %s...", c.node.Name(), c.group)
if err := c.node.Leave(c.group); err != nil {
return newError(errLeave, err.Error())
}
if err := c.node.Stop(); err != nil {
Expand All @@ -107,12 +108,12 @@ func (c *Client) Close() error {
func (c *Client) dispatch(payload []byte) error {
// Returned responses (RECV command) and outstanding requests (REPL command)
// have these headers, respectively: SLEUTH-V0RECV and SLEUTH-V0REPL
groupLength := len(group)
groupLength := len(c.group)
dispatchLength := 4
headerLength := groupLength + dispatchLength
// If the message header does not match the group, bail.
if len(payload) < headerLength || string(payload[0:groupLength]) != group {
return newError(errDispatchHeader, "bad header")
if len(payload) < headerLength || string(payload[0:groupLength]) != c.group {
return newError(errDispatchHeader, "bad dispatch header")
}
action := string(payload[groupLength : groupLength+dispatchLength])
switch action {
Expand All @@ -121,7 +122,7 @@ func (c *Client) dispatch(payload []byte) error {
case repl:
return c.reply(payload[headerLength:])
default:
return newError(errDispatchAction, "bad action: %s", action)
return newError(errDispatchAction, "bad dispatch action: %s", action)
}
}

Expand All @@ -133,7 +134,8 @@ func (c *Client) dispatch(payload []byte) error {
// sleuth://foo-service/bar?baz=qux
func (c *Client) Do(req *http.Request) (*http.Response, error) {
to := req.URL.Host
handle := c.handle
// Handles are hexadecimal strings that are incremented by one.
handle := strconv.FormatInt(c.handle, 16)
c.handle++
if req.URL.Scheme != scheme {
err := newError(errScheme,
Expand All @@ -145,8 +147,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
return nil, newError(errUnknownService, "%s is an unknown service", to)
}
p := services.next()
receiver := c.node.UUID()
payload, err := marshalRequest(receiver, handle, req)
payload, err := marshalReq(c.group, c.node.UUID(), handle, req)
if err != nil {
return nil, err.(*Error).escalate(errRequest)
}
Expand Down Expand Up @@ -182,15 +183,15 @@ func (c *Client) has(services ...string) bool {
return available == total
}

func (c *Client) listen(handle int64, listener chan *http.Response) {
func (c *Client) listen(handle string, listener chan *http.Response) {
c.listener.Lock()
defer c.listener.Unlock()
c.listener.handles[handle] = listener
go c.timeout(handle)
}

func (c *Client) receive(payload []byte) error {
handle, res, err := unmarshalResponse(payload)
handle, res, err := unmarshalRes(payload)
if err != nil {
return err.(*Error).escalate(errRECV)
}
Expand All @@ -212,20 +213,22 @@ func (c *Client) remove(name string) {
delete(c.services, service)
}
delete(c.directory, name)
c.log.Info("sleuth: remove %s (%s) from %s", service, name, group)
c.log.Info("sleuth: remove %s:%s", service, name)
return
}
c.log.Info("sleuth: unable to remove %s", name)
}

func (c *Client) reply(payload []byte) error {
dest, req, err := unmarshalRequest(payload)
dest, req, err := unmarshalReq(payload)
if err != nil {
return err.(*Error).escalate(errREPL)
}
c.handler.ServeHTTP(newWriter(c.node, dest), req)
return nil
}

func (c *Client) timeout(handle int64) {
func (c *Client) timeout(handle string) {
<-time.After(c.Timeout)
c.listener.Lock()
defer c.listener.Unlock()
Expand All @@ -242,13 +245,14 @@ func (c *Client) WaitFor(services ...string) {
}
}

func newClient(node *gyre.Gyre, out *logger.Logger) *Client {
func newClient(group string, node *gyre.Gyre, out *logger.Logger) *Client {
return &Client{
additions: &notifier{Mutex: new(sync.Mutex)},
directory: make(map[string]string),
group: group,
listener: &listener{
new(sync.Mutex),
make(map[int64]chan *http.Response)},
make(map[string]chan *http.Response)},
log: out,
node: node,
Timeout: time.Millisecond * 500,
Expand Down
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
// optional, but Interface is particularly important to guarantee all peers
// reside on the same subnet.
type Config struct {
group string

// Handler is the HTTP handler for a service made available via sleuth.
Handler http.Handler `json:"-"`

Expand Down Expand Up @@ -54,6 +56,9 @@ func initConfig(config *Config) *Config {
if config == nil {
config = new(Config)
}
if len(config.group) == 0 {
config.group = group
}
if len(config.LogLevel) == 0 {
config.LogLevel = "listen"
}
Expand Down
27 changes: 15 additions & 12 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,23 @@ import (
)

type destination struct {
handle int64
group string
handle string
node string
}

type request struct {
Body []byte `json:"body,omitempty"`
Handle int64 `json:"handle"`
Header map[string][]string `json:"header"`
Method string `json:"method"`
Receiver string `json:"receiver"`
URL string `json:"url"`
Body []byte `json:"body,omitempty"`
Destination string `json:"destination"`
Group string `json:"group"`
Handle string `json:"handle"`
Header map[string][]string `json:"header"`
Method string `json:"method"`
URL string `json:"url"`
}

func marshalRequest(receiver string, handle int64, in *http.Request) ([]byte, error) {
out := new(request)
func marshalReq(group, dest, handle string, in *http.Request) ([]byte, error) {
out := &request{Group: group}
if in.Body != nil {
if body, err := ioutil.ReadAll(in.Body); err == nil {
out.Body = body
Expand All @@ -38,7 +40,7 @@ func marshalRequest(receiver string, handle int64, in *http.Request) ([]byte, er
in.URL.Scheme = ""
in.URL.Host = ""
out.URL = in.URL.String()
out.Receiver = receiver
out.Destination = dest
out.Handle = handle
marshalled, err := json.Marshal(out)
if err != nil {
Expand All @@ -47,7 +49,7 @@ func marshalRequest(receiver string, handle int64, in *http.Request) ([]byte, er
return append([]byte(group+repl), zip(marshalled)...), nil
}

func unmarshalRequest(payload []byte) (*destination, *http.Request, error) {
func unmarshalReq(payload []byte) (*destination, *http.Request, error) {
unzipped, err := unzip(payload)
if err != nil {
return nil, nil, err.(*Error).escalate(errReqUnmarshal)
Expand All @@ -62,7 +64,8 @@ func unmarshalRequest(payload []byte) (*destination, *http.Request, error) {
}
out.Header = http.Header(in.Header)
dest := new(destination)
dest.group = in.Group
dest.handle = in.Handle
dest.node = in.Receiver
dest.node = in.Destination
return dest, out, nil
}
8 changes: 4 additions & 4 deletions response.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type response struct {
Body []byte `json:"body"`
Code int `json:"code"`
Handle int64 `json:"handle"`
Handle string `json:"handle"`
Header http.Header `json:"header"`
}

Expand All @@ -24,14 +24,14 @@ type body struct {

func (*body) Close() error { return nil }

func marshalResponse(res *response) []byte {
func marshalRes(group string, res *response) []byte {
// This will never fail to marshal, so error can be ignored.
marshalled, _ := json.Marshal(res)
return append([]byte(group+recv), zip(marshalled)...)
}

func unmarshalResponse(payload []byte) (int64, *http.Response, error) {
var handle int64 = -1
func unmarshalRes(payload []byte) (string, *http.Response, error) {
var handle string
var res *http.Response
unzipped, err := unzip(payload)
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions sleuth.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/zeromq/gyre"
)

var (
const (
group = "SLEUTH-v0"
port = 5670
recv = "RECV"
Expand All @@ -27,6 +27,7 @@ var (

type connection struct {
adapter string
group string
handler http.Handler
name string
node string
Expand Down Expand Up @@ -63,7 +64,7 @@ func listen(client *Client) {
}
}

func newNode(log *logger.Logger, conn *connection) (*gyre.Gyre, error) {
func newNode(conn *connection, log *logger.Logger) (*gyre.Gyre, error) {
node, err := gyre.New()
if err != nil {
return nil, newError(errInitialize, err.Error())
Expand All @@ -80,7 +81,7 @@ func newNode(log *logger.Logger, conn *connection) (*gyre.Gyre, error) {
if conn.server {
errors := [...]int{
errGroupHeader, errNodeHeader, errServiceHeader, errVersionHeader}
values := [...]string{group, node.UUID(), conn.name, conn.version}
values := [...]string{conn.group, node.UUID(), conn.name, conn.version}
for i, header := range [...]string{"group", "node", "type", "version"} {
if err := node.SetHeader(header, values[i]); err != nil {
return nil, newError(errors[i], err.Error())
Expand Down Expand Up @@ -114,7 +115,7 @@ func New(config *Config) (*Client, error) {
// Use the same log level as the instantiator of the client. Because log level
// is guaranteed to be correct in initConfig, errors can be ignored.
log, _ := logger.New(config.logLevel)
conn := new(connection)
conn := &connection{group: config.group}
if conn.server = config.Handler != nil; conn.server {
conn.handler = config.Handler
conn.name = config.Service
Expand All @@ -133,11 +134,11 @@ func New(config *Config) (*Client, error) {
if conn.version = config.Version; len(conn.version) == 0 {
conn.version = "unknown"
}
node, err := newNode(log, conn)
node, err := newNode(conn, log)
if err != nil {
return nil, err.(*Error).escalate(errNew)
}
client := newClient(node, log)
client := newClient(config.group, node, log)
client.handler = conn.handler
go listen(client)
return client, nil
Expand Down

0 comments on commit 36ce6a1

Please sign in to comment.