Skip to content

Commit

Permalink
Clean up interfaces, rename client ==> peer
Browse files Browse the repository at this point in the history
  - add logging support
  • Loading branch information
jcelliott committed Aug 11, 2014
1 parent 1b46b14 commit a2d989e
Show file tree
Hide file tree
Showing 14 changed files with 231 additions and 226 deletions.
95 changes: 45 additions & 50 deletions broker.go
@@ -1,62 +1,50 @@
package turnpike

func spliceSubscribers(subs []Subscriber, i int) []Subscriber {
if i == len(subs)-1 {
return subs[:i]
}
return append(subs[:i], subs[i+1:]...)
}

type ErrorHandler interface {
// SendError sends an ERROR message to the client.
SendError(*Error)
}

type Publisher interface {
ErrorHandler

// SendPublished sends acknowledgement that the Event has
// been successfully published.
SendPublished(*Published)
}

// A Subscriber can subscribe to messages on a Topic URI.
type Subscriber interface {
ErrorHandler

// SendEvent sends a Published Event to the client.
SendEvent(*Event)
// SendUnsubscribed sends an acknowledgement that the
// client has been unsubscribed from messages on the Topic.
SendUnsubscribed(*Unsubscribed)
// SendSubscribed sends an acknowledgement that the
// client has been subscribed to messages on the Topic.
SendSubscribed(*Subscribed)
}

// type Publisher interface {
// ErrorHandler
//
// // SendPublished sends acknowledgement that the Event has
// // been successfully published.
// SendPublished(*Published)
// }
//
// // A Subscriber can subscribe to messages on a Topic URI.
// type Subscriber interface {
// ErrorHandler
//
// // SendEvent sends a Published Event to the client.
// SendEvent(*Event)
// // SendUnsubscribed sends an acknowledgement that the
// // client has been unsubscribed from messages on the Topic.
// SendUnsubscribed(*Unsubscribed)
// // SendSubscribed sends an acknowledgement that the
// // client has been subscribed to messages on the Topic.
// SendSubscribed(*Subscribed)
// }
//
type Broker interface {
// Publishes a message to all Subscribers.
Publish(Publisher, *Publish)
Publish(Sender, *Publish)
// Subscribes to messages on a URI.
Subscribe(Subscriber, *Subscribe)
Subscribe(Sender, *Subscribe)
// Unsubscribes from messages on a URI.
Unsubscribe(Subscriber, *Unsubscribe)
Unsubscribe(Sender, *Unsubscribe)
}

// A super simple broker that matches URIs to Subscribers.
type DefaultBroker struct {
routes map[URI]map[ID]Subscriber
routes map[URI]map[ID]Sender
subscriptions map[ID]URI
}

func NewDefaultBroker() *DefaultBroker {
return &DefaultBroker{
routes: make(map[URI]map[ID]Subscriber),
routes: make(map[URI]map[ID]Sender),
subscriptions: make(map[ID]URI),
}
}

func (br *DefaultBroker) Publish(pub Publisher, msg *Publish) {
func (br *DefaultBroker) Publish(pub Sender, msg *Publish) {
pubId := NewID()
evtTemplate := Event{
Publication: pubId,
Expand All @@ -67,38 +55,38 @@ func (br *DefaultBroker) Publish(pub Publisher, msg *Publish) {
// shallow-copy the template
event := evtTemplate
event.Subscription = id
sub.SendEvent(&event)
sub.Send(&event)
// TODO: publisher should not receive event, even if subscribed.
// see: https://github.com/tavendo/WAMP/blob/master/spec/basic.md#event-1
}

// only send published message if acknowledge is present and set to true
if doPub, _ := msg.Options["acknowledge"].(bool); doPub {
pub.SendPublished(&Published{Request: msg.Request, Publication: pubId})
pub.Send(&Published{Request: msg.Request, Publication: pubId})
}
}

func (br *DefaultBroker) Subscribe(sub Subscriber, msg *Subscribe) {
func (br *DefaultBroker) Subscribe(sub Sender, msg *Subscribe) {
if _, ok := br.routes[msg.Topic]; !ok {
br.routes[msg.Topic] = make(map[ID]Subscriber)
br.routes[msg.Topic] = make(map[ID]Sender)
}
id := NewID()
br.routes[msg.Topic][id] = sub

br.subscriptions[id] = msg.Topic

sub.SendSubscribed(&Subscribed{Request: msg.Request, Subscription: id})
sub.Send(&Subscribed{Request: msg.Request, Subscription: id})
}

func (br *DefaultBroker) Unsubscribe(sub Subscriber, msg *Unsubscribe) {
func (br *DefaultBroker) Unsubscribe(sub Sender, msg *Unsubscribe) {
topic, ok := br.subscriptions[msg.Subscription]
if !ok {
err := &Error{
Type: msg.MessageType(),
Request: msg.Request,
Error: WAMP_ERROR_NO_SUCH_SUBSCRIPTION,
}
sub.SendError(err)
sub.Send(err)
return
}

Expand All @@ -108,19 +96,26 @@ func (br *DefaultBroker) Unsubscribe(sub Subscriber, msg *Unsubscribe) {
Request: msg.Request,
Error: URI("wamp.error.internal_error"),
}
sub.SendError(err)
sub.Send(err)
} else if _, ok := r[msg.Subscription]; !ok {
err := &Error{
Type: msg.MessageType(),
Request: msg.Request,
Error: URI("wamp.error.internal_error"),
}
sub.SendError(err)
sub.Send(err)
} else {
delete(r, msg.Subscription)
if len(r) == 0 {
delete(br.routes, topic)
}
sub.SendUnsubscribed(&Unsubscribed{Request: msg.Request})
sub.Send(&Unsubscribed{Request: msg.Request})
}
}

func spliceSubscribers(subs []Sender, i int) []Sender {
if i == len(subs)-1 {
return subs[:i]
}
return append(subs[:i], subs[i+1:]...)
}
17 changes: 0 additions & 17 deletions client.go

This file was deleted.

72 changes: 36 additions & 36 deletions dealer.go
@@ -1,34 +1,34 @@
package turnpike

type Callee interface {
ErrorHandler
// Acknowledge that the endpoint was succesfully registered
SendRegistered(*Registered)
// Acknowledge that the endpoint was succesfully unregistered
SendUnregistered(*Unregistered)
// Dealer requests fulfillment of a procedure call
SendInvocation(*Invocation)
}

type Caller interface {
ErrorHandler
// Dealer sends the returned result from the procedure call
SendResult(*Result)
}
// type Callee interface {
// ErrorHandler
// // Acknowledge that the endpoint was succesfully registered
// SendRegistered(*Registered)
// // Acknowledge that the endpoint was succesfully unregistered
// SendUnregistered(*Unregistered)
// // Dealer requests fulfillment of a procedure call
// SendInvocation(*Invocation)
// }
//
// type Caller interface {
// ErrorHandler
// // Dealer sends the returned result from the procedure call
// SendResult(*Result)
// }

type Dealer interface {
// Register a procedure on an endpoint
Register(Callee, *Register)
Register(Sender, *Register)
// Unregister a procedure on an endpoint
Unregister(Callee, *Unregister)
Unregister(Sender, *Unregister)
// Call a procedure on an endpoint
Call(Caller, *Call)
Call(Sender, *Call)
// Return the result of a procedure call
Yield(Callee, *Yield)
Yield(Sender, *Yield)
}

type RemoteProcedure struct {
Endpoint Callee
Endpoint Sender
Procedure URI
}

Expand All @@ -40,7 +40,7 @@ type DefaultDealer struct {
// multiple callees for the same procedure
registrations map[URI]ID
// keep track of call IDs so we can send the response to the caller
calls map[ID]Caller
calls map[ID]Sender
// link the invocation ID to the call ID
invocations map[ID]ID
}
Expand All @@ -49,14 +49,14 @@ func NewDefaultDealer() *DefaultDealer {
return &DefaultDealer{
procedures: make(map[ID]RemoteProcedure),
registrations: make(map[URI]ID),
calls: make(map[ID]Caller),
calls: make(map[ID]Sender),
invocations: make(map[ID]ID),
}
}

func (d *DefaultDealer) Register(callee Callee, msg *Register) {
func (d *DefaultDealer) Register(callee Sender, msg *Register) {
if _, ok := d.registrations[msg.Procedure]; ok {
callee.SendError(&Error{
callee.Send(&Error{
Type: msg.MessageType(),
Request: msg.Request,
Error: WAMP_ERROR_PROCEDURE_ALREADY_EXISTS,
Expand All @@ -66,40 +66,40 @@ func (d *DefaultDealer) Register(callee Callee, msg *Register) {
reg := NewID()
d.procedures[reg] = RemoteProcedure{callee, msg.Procedure}
d.registrations[msg.Procedure] = reg
callee.SendRegistered(&Registered{
callee.Send(&Registered{
Request: msg.Request,
Registration: reg,
})
}

func (d *DefaultDealer) Unregister(callee Callee, msg *Unregister) {
func (d *DefaultDealer) Unregister(callee Sender, msg *Unregister) {
if procedure, ok := d.procedures[msg.Registration]; !ok {
// the registration doesn't exist
callee.SendError(&Error{
callee.Send(&Error{
Type: msg.MessageType(),
Request: msg.Request,
Error: WAMP_ERROR_NO_SUCH_REGISTRATION,
})
} else {
delete(d.registrations, procedure.Procedure)
delete(d.procedures, msg.Registration)
callee.SendUnregistered(&Unregistered{
callee.Send(&Unregistered{
Request: msg.Request,
})
}
}

func (d *DefaultDealer) Call(caller Caller, msg *Call) {
func (d *DefaultDealer) Call(caller Sender, msg *Call) {
if reg, ok := d.registrations[msg.Procedure]; !ok {
caller.SendError(&Error{
caller.Send(&Error{
Type: msg.MessageType(),
Request: msg.Request,
Error: WAMP_ERROR_NO_SUCH_PROCEDURE,
})
} else {
if rproc, ok := d.procedures[reg]; !ok {
// found a registration id, but doesn't match any remote procedure
caller.SendError(&Error{
caller.Send(&Error{
Type: msg.MessageType(),
Request: msg.Request,
// TODO: what should this error be?
Expand All @@ -110,7 +110,7 @@ func (d *DefaultDealer) Call(caller Caller, msg *Call) {
d.calls[msg.Request] = caller
invocationID := NewID()
d.invocations[invocationID] = msg.Request
rproc.Endpoint.SendInvocation(&Invocation{
rproc.Endpoint.Send(&Invocation{
Request: invocationID,
Registration: reg,
Arguments: msg.Arguments,
Expand All @@ -120,9 +120,9 @@ func (d *DefaultDealer) Call(caller Caller, msg *Call) {
}
}

func (d *DefaultDealer) Yield(callee Callee, msg *Yield) {
func (d *DefaultDealer) Yield(callee Sender, msg *Yield) {
if callID, ok := d.invocations[msg.Request]; !ok {
callee.SendError(&Error{
callee.Send(&Error{
Type: msg.MessageType(),
Request: msg.Request,
// TODO: what should this error be?
Expand All @@ -131,15 +131,15 @@ func (d *DefaultDealer) Yield(callee Callee, msg *Yield) {
} else {
if caller, ok := d.calls[callID]; !ok {
// found the invocation id, but doesn't match any call id
callee.SendError(&Error{
callee.Send(&Error{
Type: msg.MessageType(),
Request: msg.Request,
// TODO: what should this error be?
Error: URI("wamp.error.no_such_call"),
})
} else {
// return the result to the caller
caller.SendResult(&Result{
caller.Send(&Result{
Request: callID,
Arguments: msg.Arguments,
ArgumentsKw: msg.ArgumentsKw,
Expand Down
5 changes: 1 addition & 4 deletions dealer_test.go
Expand Up @@ -9,10 +9,7 @@ type TestCallee struct {
received Message
}

func (c *TestCallee) SendError(msg *Error) { c.received = msg }
func (c *TestCallee) SendRegistered(msg *Registered) { c.received = msg }
func (c *TestCallee) SendUnregistered(msg *Unregistered) { c.received = msg }
func (c *TestCallee) SendInvocation(msg *Invocation) { c.received = msg }
func (c *TestCallee) Send(msg Message) error { c.received = msg; return nil }

func TestRegister(t *testing.T) {
Convey("Registering a procedure", t, func() {
Expand Down
33 changes: 33 additions & 0 deletions log.go
@@ -0,0 +1,33 @@
package turnpike

import (
"errors"
"fmt"
glog "log"
"os"
)

var (
logFlags = glog.Ldate | glog.Ltime | glog.Lshortfile
log *glog.Logger
)

// setup logger for package, writes to /dev/null by default
func init() {
if devNull, err := os.Create(os.DevNull); err != nil {
panic("could not create logger: " + err.Error())
} else {
log = glog.New(devNull, "", 0)
}
}

// change log output to stderr
func Debug() {
log = glog.New(os.Stderr, "", logFlags)
}

func logErr(v ...interface{}) error {
err := errors.New(fmt.Sprintln(v...))
log.Println(err)
return err
}

0 comments on commit a2d989e

Please sign in to comment.