Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Commit

Permalink
Added a couple of new convenience methods to core, cleaned up some do…
Browse files Browse the repository at this point in the history
…cs and formatting. (#10)

* Added a couple of new convenience methods to core

core fabric methods for errors added to allow payload and headers to be submitted. Also cleaned up some of the godoc in the service package.

Signed-off-by: Dave Shanley <dshanley@vmware.com>

* Disabled some of the demo functions

Cleaner documentation and demo code is required to make these useful. demo needs to ne converted to connecting to local broker instead of cloud.

Signed-off-by: Dave Shanley <dshanley@vmware.com>

* Updated  README to include better instructions.

More examples and samples are required, however the galactic channels sample code has been updated to work against the local broker and not a non-public endpoint.

Signed-off-by: Dave Shanley <dshanley@vmware.com>
  • Loading branch information
daveshanley committed May 7, 2021
1 parent b61c1d5 commit 37278cf
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 239 deletions.
173 changes: 91 additions & 82 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,96 +132,105 @@ Ping: Woo!
Pong: Woo!
```

## Example connecting to message broker and using galactic channels
## Example connecting to a message broker and using galactic channels

If you would like to connect the bus to a broker and start streaming stuff, it's quite simple. Here is an example
that connects to `appfabric.vmware.com` and starts streaming over a local channel that is mapped to the live
sample service that it broadcasting every few hundred milliseconds on `/topic/simple-stream`
If you would like to connect the bus to a broker and start streaming stuff, you can run the local demo broker
by first building using `./build-transport.sh` and then starting the local broker (and a bunch of demo services) via `
./transport-go service`

Once running, this example will connect to the broker and starts streaming over a local channel that is mapped to the live
sample service that is broadcasting every few hundred milliseconds on `/topic/simple-stream`

```go
package main

import (
"bifrost/bridge"
"bifrost/bus"
"bifrost/model"
"encoding/json"
"fmt"
"log"
"encoding/json"
"fmt"
"github.com/vmware/transport-go/bridge"
"github.com/vmware/transport-go/bus"
"github.com/vmware/transport-go/model"
"log"
)

func main() {
usingGalacticChannels()
}

func usingGalacticChannels() {

// get a pointer to the bus.
b := bus.GetBus()

// get a pointer to the channel manager
cm := b.GetChannelManager()

channel := "my-stream"
cm.CreateChannel(channel)

// create done signal
var done = make(chan bool)

// listen to stream of messages coming in on channel.
h, err := b.ListenStream(channel)

if err != nil {
log.Panicf("unable to listen to channel stream, error: %e", err)
}

count := 0

// listen for five messages and then exit, send a completed signal on channel.
h.Handle(
func(msg *model.Message) {

// unmarshal the payload into a Response object (used by fabric services)
r := &model.Response{}
d := msg.Payload.([]byte)
json.Unmarshal(d, &r)
fmt.Printf("Stream Ticked: %s\n", r.Payload.(string))
count++
if count >=5 {
done <- true
}
},
func(err error) {
log.Panicf("error received on channel %e", err)
})

// create a broker connector config, in this case, we will connect to the application fabric demo endpoint.
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: "appfabric.vmware.com",
WSPath: "/fabric",
UseWS: true}

// connect to broker.
c, err := b.ConnectBroker(config)
if err != nil {
log.Panicf("unable to connect to fabric, error: %e", err)
}

// mark our local channel as galactic and map it to our connection and the /topic/simple-stream service
// running on appfabric.vmware.com
err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c)
if err != nil {
log.Panicf("unable to map local channel to broker destination: %e", err)
}

// wait for done signal
<-done

// mark channel as local (unsubscribe from all mappings)
err = cm.MarkChannelAsLocal(channel)
if err != nil {
log.Panicf("unable to unsubscribe, error: %e", err)
}
err = c.Disconnect()
if err != nil {
log.Panicf("unable to disconnect, error: %e", err)
}
// get a pointer to the bus.
b := bus.GetBus()

// get a pointer to the channel manager
cm := b.GetChannelManager()

channel := "my-stream"
cm.CreateChannel(channel)

// create done signal
var done = make(chan bool)

// listen to stream of messages coming in on channel.
h, err := b.ListenStream(channel)

if err != nil {
log.Panicf("unable to listen to channel stream, error: %e", err)
}

count := 0

// listen for ten messages and then exit, send a completed signal on channel.
h.Handle(
func(msg *model.Message) {

// unmarshal the payload into a Response object (used by fabric services)
r := &model.Response{}
d := msg.Payload.([]byte)
json.Unmarshal(d, &r)
fmt.Printf("Stream Ticked: %s\n", r.Payload.(string))
count++
if count >=10 {
done <- true
}
},
func(err error) {
log.Panicf("error received on channel %e", err)
})

// create a broker connector config, in this case, we will connect to the demo endpoint.
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: "localhost:8090",
WSPath: "/fabric",
UseWS: true}

// connect to broker.
c, err := b.ConnectBroker(config)
if err != nil {
log.Panicf("unable to connect to fabric, error: %e", err)
}

// mark our local channel as galactic and map it to our connection and the /topic/simple-stream service
// running on localhost:8090
err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c)
if err != nil {
log.Panicf("unable to map local channel to broker destination: %e", err)
}

// wait for done signal
<-done

// mark channel as local (unsubscribe from all mappings)
err = cm.MarkChannelAsLocal(channel)
if err != nil {
log.Panicf("unable to unsubscribe, error: %e", err)
}
err = c.Disconnect()
if err != nil {
log.Panicf("unable to disconnect, error: %e", err)
}
}
```

Expand Down
68 changes: 57 additions & 11 deletions service/fabric_core.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2020 VMware, Inc.
// Copyright 2019-2021 VMware, Inc.
// SPDX-License-Identifier: BSD-2-Clause

package service
Expand All @@ -10,31 +10,44 @@ import (
"github.com/vmware/transport-go/model"
)

// Interface providing base functionality to fabric services.
// FabricServiceCore is the interface providing base functionality to fabric services.
type FabricServiceCore interface {
// Returns the EventBus instance.
// Bus Returns the EventBus instance.
Bus() bus.EventBus
// Uses the "responsePayload" and "request" params to build and send model.Response object

// SendResponse Uses the "responsePayload" and "request" params to build and send model.Response object
// on the service channel.
SendResponse(request *model.Request, responsePayload interface{})
// Same as SendResponse, but include headers. Useful for HTTP REST interfaces - these headers will be

// SendResponseWithHeaders is the same as SendResponse, but include headers. Useful for HTTP REST interfaces - these headers will be
// set as HTTP response headers. Great for custom mime-types, binary stuff and more.
SendResponseWithHeaders(request *model.Request, responsePayload interface{}, headers map[string]string)
// Builds an error model.Response object and sends it on the service channel as
// response to the "request" param.

// SendErrorResponse builds an error model.Response object and sends it on the service channel as response to the "request" param.
SendErrorResponse(request *model.Request, responseErrorCode int, responseErrorMessage string)

// SendErrorResponseWithPayload is the same as SendErrorResponse, but adds a payload
SendErrorResponseWithPayload(request *model.Request, responseErrorCode int, responseErrorMessage string, payload interface{})
// Handles unknown/unsupported request.

// SendErrorResponseWithHeaders is the same as SendErrorResponse, but adds headers as well.
SendErrorResponseWithHeaders(request *model.Request, responseErrorCode int, responseErrorMessage string, headers map[string]string)

// SendErrorResponseWithHeadersAndPayload is the same as SendErrorResponseWithPayload, but adds headers as well.
SendErrorResponseWithHeadersAndPayload(request *model.Request, responseErrorCode int, responseErrorMessage string, payload interface{}, headers map[string]string)

// HandleUnknownRequest handles unknown/unsupported/un-implemented requests,
HandleUnknownRequest(request *model.Request)
// Make a new RestService call.

// RestServiceRequest will make a new RestService call.
RestServiceRequest(restRequest *RestServiceRequest,
successHandler model.ResponseHandlerFunction, errorHandler model.ResponseHandlerFunction)
// Set global headers for a given fabric service (each service has its own set of global headers).

// SetHeaders Set global headers for a given fabric service (each service has its own set of global headers).
// The headers will be applied to all requests made by this instance's RestServiceRequest method.
// Global header values can be overridden per request via the RestServiceRequest.Headers property.
SetHeaders(headers map[string]string)

// Automatically ready to go map with json headers.
// GenerateJSONHeaders Automatically ready to go map with json headers.
GenerateJSONHeaders() map[string]string
}

Expand Down Expand Up @@ -90,6 +103,39 @@ func (core *fabricCore) SendErrorResponseWithPayload(
core.bus.SendResponseMessage(core.channelName, response, request.Id)
}

func (core *fabricCore) SendErrorResponseWithHeaders(
request *model.Request,
responseErrorCode int, responseErrorMessage string, headers map[string]string) {

response := &model.Response{
Id: request.Id,
Destination: core.channelName,
Headers: headers,
Error: true,
ErrorCode: responseErrorCode,
ErrorMessage: responseErrorMessage,
BrokerDestination: request.BrokerDestination,
}
core.bus.SendResponseMessage(core.channelName, response, request.Id)
}

func (core *fabricCore) SendErrorResponseWithHeadersAndPayload(
request *model.Request,
responseErrorCode int, responseErrorMessage string, payload interface{}, headers map[string]string) {

response := &model.Response{
Id: request.Id,
Destination: core.channelName,
Payload: payload,
Headers: headers,
Error: true,
ErrorCode: responseErrorCode,
ErrorMessage: responseErrorMessage,
BrokerDestination: request.BrokerDestination,
}
core.bus.SendResponseMessage(core.channelName, response, request.Id)
}

func (core *fabricCore) HandleUnknownRequest(request *model.Request) {
errorMsg := fmt.Sprintf("unsupported request for \"%s\": %s", core.channelName, request.Request)
core.SendErrorResponse(request, 403, errorMsg)
Expand Down
36 changes: 35 additions & 1 deletion service/fabric_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,46 @@ func TestFabricCore_SendMethods(t *testing.T) {
assert.Equal(t, response.ErrorMessage, "test-error")

wg.Add(1)
core.HandleUnknownRequest(&req)

h = make(map[string]string)
h["chicken"] = "nugget"
core.SendErrorResponseWithHeaders(&req, 422, "test-header-error", h)
wg.Wait()

assert.Equal(t, count, 4)
response = lastMessage.Payload.(*model.Response)

assert.Equal(t, response.Id, req.Id)
assert.Equal(t, response.Headers["chicken"], "nugget")
assert.Nil(t, response.Payload)
assert.True(t, response.Error)
assert.Equal(t, response.ErrorCode, 422)
assert.Equal(t, response.ErrorMessage, "test-header-error")

wg.Add(1)

h = make(map[string]string)
h["potato"] = "dog"
core.SendErrorResponseWithHeadersAndPayload(&req, 500, "test-header-payload-error", "oh my!", h)
wg.Wait()

assert.Equal(t, count, 5)
response = lastMessage.Payload.(*model.Response)

assert.Equal(t, response.Id, req.Id)
assert.Equal(t, "dog", response.Headers["potato"])
assert.Equal(t, "oh my!", response.Payload.(string))
assert.True(t, response.Error)
assert.Equal(t, response.ErrorCode, 500)
assert.Equal(t, response.ErrorMessage, "test-header-payload-error")

wg.Add(1)
core.HandleUnknownRequest(&req)
wg.Wait()

assert.Equal(t, count, 6)
response = lastMessage.Payload.(*model.Response)

assert.Equal(t, response.Id, req.Id)
assert.True(t, response.Error)
assert.Equal(t, 403, response.ErrorCode)
Expand Down

0 comments on commit 37278cf

Please sign in to comment.