Skip to content

Commit

Permalink
feat: webnotfiers to send topic information
Browse files Browse the repository at this point in the history
- in previous version, web notifier topics are only used for resolving/building
URLs which was forcing subscribers to support path matching topic names
(challenge: topic name is private and dynamic based on usecases)

- changes: web notifiers will send message containing uniquer ID, Topic
Name and Raw JSON message to given webnotfier endpoint.

- This fixed issue in JS worker where it wasn't able to find topic of
incoming messages.

- closes hyperledger-archives#1323

Signed-off-by: sudesh.shetty <sudesh.shetty@securekey.com>
  • Loading branch information
sudeshrshetty committed Feb 28, 2020
1 parent 4e3a6d6 commit 06bef57
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 59 deletions.
5 changes: 2 additions & 3 deletions cmd/aries-js-worker/src/worker-impl-rest.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ const wsnotifier = class {
constructor(url, postMsg) {
this.socket = new WebSocket(url);
this.socket.addEventListener('message', function (event) {
// TODO REST agents are not currently revealing topic information on incoming messages,
// Once REST supports this feature, topic value will be dynamic. [Issue #1323]
postMsg(newResponse(Math.random().toString(36).slice(2), JSON.parse(event.data), "", "all"));
const incoming = JSON.parse(event.data)
postMsg(newResponse(incoming.id, incoming.message, "", incoming.topic));
});
}
stop(){
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/webnotifier/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ func (n *HTTPNotifier) Notify(topic string, message []byte) error {
return fmt.Errorf(emptyMessageErrMsg)
}

topicMsg, err := prepareTopicMessage(topic, message)
if err != nil {
return fmt.Errorf(failedToCreateErrMsg, err)
}

var allErrs error

for _, webhookURL := range n.urls {
err := notifyWH(fmt.Sprintf("%s/%s", webhookURL, topic), message)
err := notifyWH(webhookURL, topicMsg)
allErrs = appendError(allErrs, err)
}

Expand Down
22 changes: 14 additions & 8 deletions pkg/controller/webnotifier/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ func TestNotifyCorrectJSON(t *testing.T) {
t.Fatal(err)
}

err := notifyWH(fmt.Sprintf("http://%s%s", clientHost, topicWithLeadingSlash), getTestBasicMessageJSON())
msg, err := prepareTopicMessage("test-topic", getTestBasicMessageJSON())
require.NoError(t, err)

err = notifyWH(fmt.Sprintf("http://%s%s", clientHost, topicWithLeadingSlash), msg)
require.NoError(t, err)
}

Expand Down Expand Up @@ -191,12 +194,9 @@ func TestNotifyEmptyMessage(t *testing.T) {
func TestNotifyMultipleErrors(t *testing.T) {
testNotifier := NewHTTPNotifier([]string{"badURL1", "badURL2"})

err := testNotifier.Notify("someTopic", []byte(`someMessage`))
err := testNotifier.Notify("someTopic", []byte(`{}`))

require.Contains(t, err.Error(), `failed to post notification to badURL1/someTopic: `+
`Post "badURL1/someTopic": unsupported protocol scheme ""`)
require.Contains(t, err.Error(), `failed to post notification to badURL2/someTopic: `+
`Post "badURL2/someTopic": unsupported protocol scheme ""`)
require.Contains(t, err.Error(), `unsupported protocol scheme`)
}

func TestWebhookNotificationClient500Response(t *testing.T) {
Expand Down Expand Up @@ -251,18 +251,24 @@ func listenAndStopAfterReceivingNotification(addr string) error {
m.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
response, err := ioutil.ReadAll(req.Body)
if err == nil {
var receivedMessage msg
var receivedMessage struct {
ID string `json:"id"`
Topic string `json:"topic"`
Message msg `json:"message"`
}
err = json.Unmarshal(response, &receivedMessage)
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
}

expectedTestBasicMessage := msg{
ConnectionID: "SomeConnectionID",
MessageID: "SomeMessageId",
Content: "SomeContent",
State: "SomeState",
}
if receivedMessage != expectedTestBasicMessage {

if receivedMessage.Message != expectedTestBasicMessage {
resp.WriteHeader(http.StatusBadRequest)
}
} else {
Expand Down
18 changes: 18 additions & 0 deletions pkg/controller/webnotifier/webnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ SPDX-License-Identifier: Apache-2.0
package webnotifier

import (
"encoding/json"
"fmt"
"time"

"github.com/google/uuid"

"github.com/hyperledger/aries-framework-go/pkg/common/log"
"github.com/hyperledger/aries-framework-go/pkg/controller/command"
"github.com/hyperledger/aries-framework-go/pkg/controller/rest"
Expand All @@ -19,6 +22,7 @@ const (
notificationSendTimeout = 10 * time.Second
emptyTopicErrMsg = "cannot notify with an empty topic"
emptyMessageErrMsg = "cannot notify with an empty message"
failedToCreateErrMsg = "failed to create topic message : %w"
)

var logger = log.New("aries-framework/webnotifier")
Expand Down Expand Up @@ -67,3 +71,17 @@ func appendError(errToAppendTo, err error) error {

return fmt.Errorf("%v;%v", errToAppendTo, err)
}

func prepareTopicMessage(topic string, message []byte) ([]byte, error) {
topicMsg := struct {
ID string `json:"id"`
Topic string `json:"topic"`
Message json.RawMessage `json:"message"`
}{
ID: uuid.New().String(),
Topic: topic,
Message: message,
}

return json.Marshal(topicMsg)
}
9 changes: 7 additions & 2 deletions pkg/controller/webnotifier/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,23 @@ func (n *WSNotifier) Notify(topic string, message []byte) error {
copy(conns, n.conns)
n.connsLock.RUnlock()

topicMsg, err := prepareTopicMessage(topic, message)
if err != nil {
return fmt.Errorf(failedToCreateErrMsg, err)
}

var allErrs error

for _, conn := range conns {
// TODO parent ctx should be an argument to Notify https://github.com/hyperledger/aries-framework-go/issues/1355
err := notifyWS(context.Background(), conn, topic, message)
err := notifyWS(context.Background(), conn, topicMsg)
allErrs = appendError(allErrs, err)
}

return nil
}

func notifyWS(parent context.Context, conn *websocket.Conn, _ string, message []byte) error {
func notifyWS(parent context.Context, conn *websocket.Conn, message []byte) error {
ctx, cancel := context.WithTimeout(parent, notificationSendTimeout)
defer cancel()

Expand Down
30 changes: 27 additions & 3 deletions pkg/controller/webnotifier/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package webnotifier

import (
"context"
"encoding/json"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -95,7 +96,8 @@ func TestNotifyWS(t *testing.T) {
expTopic = "example"
)

payloads := []string{"payload1", "payload2", "payload3", "payload4", "payload5"}
payloads := []string{`{"msg":"payload1"}`, `{"msg":"payload2"}`, `{"msg":"payload3"}`,
`{"msg":"payload4"}`, `{"msg":"payload5"}`}

n := NewWSNotifier(path)
clientHost := randomURL()
Expand All @@ -120,10 +122,21 @@ func TestNotifyWS(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
msgType, payload, err := conn.Read(ctx)
cancel()
require.NoError(t, err)

var topic struct {
ID string `json:"id"`
Topic string `json:"topic"`
Message json.RawMessage `json:"message"`
}
err = json.Unmarshal(payload, &topic)
require.NoError(t, err)

b, err := topic.Message.MarshalJSON()
require.NoError(t, err)

require.Equal(t, websocket.MessageText, msgType)
require.Equal(t, []byte(expPayload), payload)
require.Equal(t, []byte(expPayload), b)
}

err := conn.Close(websocket.StatusNormalClosure, "")
Expand All @@ -148,10 +161,21 @@ func TestNotifyWS(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
msgType, payload, err := conn.Read(ctx)
cancel()
require.NoError(t, err)

var topic struct {
ID string `json:"id"`
Topic string `json:"topic"`
Message json.RawMessage `json:"message"`
}
err = json.Unmarshal(payload, &topic)
require.NoError(t, err)

b, err := topic.Message.MarshalJSON()
require.NoError(t, err)

require.Equal(t, websocket.MessageText, msgType)
require.Equal(t, []byte(expPayload), payload)
require.Equal(t, []byte(expPayload), b)
}

err := conn.Close(websocket.StatusNormalClosure, "")
Expand Down
20 changes: 13 additions & 7 deletions test/bdd/pkg/didexchange/didexchange_controller_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,25 @@ func (a *ControllerSteps) pullWebhookEvents(agentID, state string) (string, erro
return "", fmt.Errorf("unable to find webhook URL for agent [%s]", agentID)
}

var incoming struct {
ID string `json:"id"`
Topic string `json:"topic"`
Message didexcmd.ConnectionMsg `json:"message"`
}

// try to pull recently pushed topics from webhook
for i := 0; i < pullTopicsAttemptsBeforeFail; i++ {
var connectionMsg didexcmd.ConnectionMsg

err := sendHTTP(http.MethodGet, webhookURL+checkForTopics, nil, &connectionMsg)
err := sendHTTP(http.MethodGet, webhookURL+checkForTopics, nil, &incoming)
if err != nil {
return "", fmt.Errorf("failed pull topics from webhook, cause : %s", err)
}

if strings.EqualFold(state, connectionMsg.State) {
logger.Debugf("Able to find webhook topic with expected state[%s] for agent[%s] and connection[%s]",
connectionMsg.State, agentID, connectionMsg.ConnectionID)
return connectionMsg.ConnectionID, nil
if incoming.Topic == "connections" {
if strings.EqualFold(state, incoming.Message.State) {
logger.Debugf("Able to find webhook topic with expected state[%s] for agent[%s] and connection[%s]",
incoming.Message.State, agentID, incoming.Message.ConnectionID)
return incoming.Message.ConnectionID, nil
}
}

time.Sleep(pullTopicsWaitInMilliSec * time.Millisecond)
Expand Down
12 changes: 8 additions & 4 deletions test/bdd/pkg/messaging/messaging_controller_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,21 @@ func (d *ControllerSteps) pullMsgFromWebhook(agentID string) (*service.DIDCommMs
return nil, fmt.Errorf("unable to find webhook URL for agent [%s]", agentID)
}

msg := service.DIDCommMsgMap{}
var incoming struct {
ID string `json:"id"`
Topic string `json:"topic"`
Message service.DIDCommMsgMap `json:"message"`
}

// try to pull recently pushed topics from webhook
for i := 0; i < pullTopicsAttemptsBeforeFail; i++ {
err := sendHTTP(http.MethodGet, webhookURL+checkForTopics, nil, &msg)
err := sendHTTP(http.MethodGet, webhookURL+checkForTopics, nil, &incoming)
if err != nil {
return nil, fmt.Errorf("failed pull topics from webhook, cause : %w", err)
}

if len(msg) > 0 {
return &msg, nil
if len(incoming.Message) > 0 {
return &incoming.Message, nil
}

time.Sleep(pullTopicsWaitInMilliSec * time.Millisecond)
Expand Down
37 changes: 6 additions & 31 deletions test/bdd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0
package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -17,19 +16,15 @@ import (
"github.com/gorilla/mux"

"github.com/hyperledger/aries-framework-go/pkg/common/log"
"github.com/hyperledger/aries-framework-go/pkg/controller/command/didexchange"
)

var logger = log.New("aries-framework/webhook")

const (
addressPattern = ":%s"
connectionsPath = "/connections"
checkTopicsPath = "/checktopics"
genericInvitePath = "/generic-invite"
basicMsgPath = "/basic-message"
topicsSize = 50
topicTimeout = 100 * time.Millisecond
addressPattern = ":%s"
checkTopicsPath = "/checktopics"
topicsSize = 50
topicTimeout = 100 * time.Millisecond
)

var topics = make(chan []byte, topicsSize) //nolint:gochecknoglobals
Expand All @@ -40,14 +35,7 @@ func connections(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
}

connMsg := didexchange.ConnectionMsg{}

err = json.Unmarshal(msg, &connMsg)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
}

logger.Infof("received state transition event :: connID=%s state=%s", connMsg.ConnectionID, connMsg.State)
logger.Infof("received topic message: %s", string(msg))

topics <- msg
}
Expand All @@ -64,27 +52,14 @@ func checkTopics(w http.ResponseWriter, r *http.Request) {
}
}

func messages(w http.ResponseWriter, r *http.Request) {
msg, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
}

logger.Infof("received generic msg event")

topics <- msg
}

func main() {
port := os.Getenv("WEBHOOK_PORT")
if port == "" {
panic("port to be passed as ENV variable")
}

router := mux.NewRouter().StrictSlash(true)
router.HandleFunc(connectionsPath, connections).Methods(http.MethodPost)
router.HandleFunc("/", connections).Methods(http.MethodPost)
router.HandleFunc(checkTopicsPath, checkTopics).Methods(http.MethodGet)
router.HandleFunc(genericInvitePath, messages).Methods(http.MethodPost)
router.HandleFunc(basicMsgPath, messages).Methods(http.MethodPost)
logger.Fatalf("webhook server start error %s", http.ListenAndServe(fmt.Sprintf(addressPattern, port), router))
}

0 comments on commit 06bef57

Please sign in to comment.