Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Added a couple of new convenience methods to core, cleaned up some docs and formatting. #10

Merged
merged 3 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 91 additions & 82 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,96 +132,105 @@ Ping: Woo!
Pong: Woo!
```

## Example connecting to message broker and using galactic channels
## Example connecting to a message broker and using galactic channels

If you would like to connect the bus to a broker and start streaming stuff, it's quite simple. Here is an example
that connects to `appfabric.vmware.com` and starts streaming over a local channel that is mapped to the live
sample service that it broadcasting every few hundred milliseconds on `/topic/simple-stream`
If you would like to connect the bus to a broker and start streaming stuff, you can run the local demo broker
by first building using `./build-transport.sh` and then starting the local broker (and a bunch of demo services) via `
./transport-go service`

Once running, this example will connect to the broker and starts streaming over a local channel that is mapped to the live
sample service that is broadcasting every few hundred milliseconds on `/topic/simple-stream`

```go
package main

import (
"bifrost/bridge"
"bifrost/bus"
"bifrost/model"
"encoding/json"
"fmt"
"log"
"encoding/json"
"fmt"
"github.com/vmware/transport-go/bridge"
"github.com/vmware/transport-go/bus"
"github.com/vmware/transport-go/model"
"log"
)

func main() {
usingGalacticChannels()
}

func usingGalacticChannels() {

// get a pointer to the bus.
b := bus.GetBus()

// get a pointer to the channel manager
cm := b.GetChannelManager()

channel := "my-stream"
cm.CreateChannel(channel)

// create done signal
var done = make(chan bool)

// listen to stream of messages coming in on channel.
h, err := b.ListenStream(channel)

if err != nil {
log.Panicf("unable to listen to channel stream, error: %e", err)
}

count := 0

// listen for five messages and then exit, send a completed signal on channel.
h.Handle(
func(msg *model.Message) {

// unmarshal the payload into a Response object (used by fabric services)
r := &model.Response{}
d := msg.Payload.([]byte)
json.Unmarshal(d, &r)
fmt.Printf("Stream Ticked: %s\n", r.Payload.(string))
count++
if count >=5 {
done <- true
}
},
func(err error) {
log.Panicf("error received on channel %e", err)
})

// create a broker connector config, in this case, we will connect to the application fabric demo endpoint.
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: "appfabric.vmware.com",
WSPath: "/fabric",
UseWS: true}

// connect to broker.
c, err := b.ConnectBroker(config)
if err != nil {
log.Panicf("unable to connect to fabric, error: %e", err)
}

// mark our local channel as galactic and map it to our connection and the /topic/simple-stream service
// running on appfabric.vmware.com
err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c)
if err != nil {
log.Panicf("unable to map local channel to broker destination: %e", err)
}

// wait for done signal
<-done

// mark channel as local (unsubscribe from all mappings)
err = cm.MarkChannelAsLocal(channel)
if err != nil {
log.Panicf("unable to unsubscribe, error: %e", err)
}
err = c.Disconnect()
if err != nil {
log.Panicf("unable to disconnect, error: %e", err)
}
// get a pointer to the bus.
b := bus.GetBus()

// get a pointer to the channel manager
cm := b.GetChannelManager()

channel := "my-stream"
cm.CreateChannel(channel)

// create done signal
var done = make(chan bool)

// listen to stream of messages coming in on channel.
h, err := b.ListenStream(channel)

if err != nil {
log.Panicf("unable to listen to channel stream, error: %e", err)
}

count := 0

// listen for ten messages and then exit, send a completed signal on channel.
h.Handle(
func(msg *model.Message) {

// unmarshal the payload into a Response object (used by fabric services)
r := &model.Response{}
d := msg.Payload.([]byte)
json.Unmarshal(d, &r)
fmt.Printf("Stream Ticked: %s\n", r.Payload.(string))
count++
if count >=10 {
done <- true
}
},
func(err error) {
log.Panicf("error received on channel %e", err)
})

// create a broker connector config, in this case, we will connect to the demo endpoint.
config := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: "localhost:8090",
WSPath: "/fabric",
UseWS: true}

// connect to broker.
c, err := b.ConnectBroker(config)
if err != nil {
log.Panicf("unable to connect to fabric, error: %e", err)
}

// mark our local channel as galactic and map it to our connection and the /topic/simple-stream service
// running on localhost:8090
err = cm.MarkChannelAsGalactic(channel, "/topic/simple-stream", c)
if err != nil {
log.Panicf("unable to map local channel to broker destination: %e", err)
}

// wait for done signal
<-done

// mark channel as local (unsubscribe from all mappings)
err = cm.MarkChannelAsLocal(channel)
if err != nil {
log.Panicf("unable to unsubscribe, error: %e", err)
}
err = c.Disconnect()
if err != nil {
log.Panicf("unable to disconnect, error: %e", err)
}
}
```

Expand Down
68 changes: 57 additions & 11 deletions service/fabric_core.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2020 VMware, Inc.
// Copyright 2019-2021 VMware, Inc.
// SPDX-License-Identifier: BSD-2-Clause

package service
Expand All @@ -10,31 +10,44 @@ import (
"github.com/vmware/transport-go/model"
)

// Interface providing base functionality to fabric services.
// FabricServiceCore is the interface providing base functionality to fabric services.
type FabricServiceCore interface {
// Returns the EventBus instance.
// Bus Returns the EventBus instance.
Bus() bus.EventBus
// Uses the "responsePayload" and "request" params to build and send model.Response object

// SendResponse Uses the "responsePayload" and "request" params to build and send model.Response object
// on the service channel.
SendResponse(request *model.Request, responsePayload interface{})
// Same as SendResponse, but include headers. Useful for HTTP REST interfaces - these headers will be

// SendResponseWithHeaders is the same as SendResponse, but include headers. Useful for HTTP REST interfaces - these headers will be
// set as HTTP response headers. Great for custom mime-types, binary stuff and more.
SendResponseWithHeaders(request *model.Request, responsePayload interface{}, headers map[string]string)
// Builds an error model.Response object and sends it on the service channel as
// response to the "request" param.

// SendErrorResponse builds an error model.Response object and sends it on the service channel as response to the "request" param.
SendErrorResponse(request *model.Request, responseErrorCode int, responseErrorMessage string)

// SendErrorResponseWithPayload is the same as SendErrorResponse, but adds a payload
SendErrorResponseWithPayload(request *model.Request, responseErrorCode int, responseErrorMessage string, payload interface{})
// Handles unknown/unsupported request.

// SendErrorResponseWithHeaders is the same as SendErrorResponse, but adds headers as well.
SendErrorResponseWithHeaders(request *model.Request, responseErrorCode int, responseErrorMessage string, headers map[string]string)

// SendErrorResponseWithHeadersAndPayload is the same as SendErrorResponseWithPayload, but adds headers as well.
SendErrorResponseWithHeadersAndPayload(request *model.Request, responseErrorCode int, responseErrorMessage string, payload interface{}, headers map[string]string)

// HandleUnknownRequest handles unknown/unsupported/un-implemented requests,
HandleUnknownRequest(request *model.Request)
// Make a new RestService call.

// RestServiceRequest will make a new RestService call.
RestServiceRequest(restRequest *RestServiceRequest,
successHandler model.ResponseHandlerFunction, errorHandler model.ResponseHandlerFunction)
// Set global headers for a given fabric service (each service has its own set of global headers).

// SetHeaders Set global headers for a given fabric service (each service has its own set of global headers).
// The headers will be applied to all requests made by this instance's RestServiceRequest method.
// Global header values can be overridden per request via the RestServiceRequest.Headers property.
SetHeaders(headers map[string]string)

// Automatically ready to go map with json headers.
// GenerateJSONHeaders Automatically ready to go map with json headers.
GenerateJSONHeaders() map[string]string
}

Expand Down Expand Up @@ -90,6 +103,39 @@ func (core *fabricCore) SendErrorResponseWithPayload(
core.bus.SendResponseMessage(core.channelName, response, request.Id)
}

func (core *fabricCore) SendErrorResponseWithHeaders(
request *model.Request,
responseErrorCode int, responseErrorMessage string, headers map[string]string) {

response := &model.Response{
Id: request.Id,
Destination: core.channelName,
Headers: headers,
Error: true,
ErrorCode: responseErrorCode,
ErrorMessage: responseErrorMessage,
BrokerDestination: request.BrokerDestination,
}
core.bus.SendResponseMessage(core.channelName, response, request.Id)
}

func (core *fabricCore) SendErrorResponseWithHeadersAndPayload(
request *model.Request,
responseErrorCode int, responseErrorMessage string, payload interface{}, headers map[string]string) {

response := &model.Response{
Id: request.Id,
Destination: core.channelName,
Payload: payload,
Headers: headers,
Error: true,
ErrorCode: responseErrorCode,
ErrorMessage: responseErrorMessage,
BrokerDestination: request.BrokerDestination,
}
core.bus.SendResponseMessage(core.channelName, response, request.Id)
}

func (core *fabricCore) HandleUnknownRequest(request *model.Request) {
errorMsg := fmt.Sprintf("unsupported request for \"%s\": %s", core.channelName, request.Request)
core.SendErrorResponse(request, 403, errorMsg)
Expand Down
36 changes: 35 additions & 1 deletion service/fabric_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,46 @@ func TestFabricCore_SendMethods(t *testing.T) {
assert.Equal(t, response.ErrorMessage, "test-error")

wg.Add(1)
core.HandleUnknownRequest(&req)

h = make(map[string]string)
h["chicken"] = "nugget"
core.SendErrorResponseWithHeaders(&req, 422, "test-header-error", h)
wg.Wait()

assert.Equal(t, count, 4)
response = lastMessage.Payload.(*model.Response)

assert.Equal(t, response.Id, req.Id)
assert.Equal(t, response.Headers["chicken"], "nugget")
assert.Nil(t, response.Payload)
assert.True(t, response.Error)
assert.Equal(t, response.ErrorCode, 422)
assert.Equal(t, response.ErrorMessage, "test-header-error")

wg.Add(1)

h = make(map[string]string)
h["potato"] = "dog"
core.SendErrorResponseWithHeadersAndPayload(&req, 500, "test-header-payload-error", "oh my!", h)
wg.Wait()

assert.Equal(t, count, 5)
response = lastMessage.Payload.(*model.Response)

assert.Equal(t, response.Id, req.Id)
assert.Equal(t, "dog", response.Headers["potato"])
assert.Equal(t, "oh my!", response.Payload.(string))
assert.True(t, response.Error)
assert.Equal(t, response.ErrorCode, 500)
assert.Equal(t, response.ErrorMessage, "test-header-payload-error")

wg.Add(1)
core.HandleUnknownRequest(&req)
wg.Wait()

assert.Equal(t, count, 6)
response = lastMessage.Payload.(*model.Response)

assert.Equal(t, response.Id, req.Id)
assert.True(t, response.Error)
assert.Equal(t, 403, response.ErrorCode)
Expand Down
Loading