Skip to content

Commit

Permalink
update connector
Browse files Browse the repository at this point in the history
  • Loading branch information
mylxsw committed Sep 29, 2020
1 parent 76a459a commit 7b0bc17
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 19 deletions.
2 changes: 1 addition & 1 deletion api/controller/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func createPayloadForTemplateCheck(r RuleController, conf *configs.Config, msgID
return messages
}

payload := action.BuildPayload(
payload := action.CreatePayload(
conf,
messagesQuerier,
"dingding",
Expand Down
19 changes: 10 additions & 9 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package main

import (
"bufio"
"context"
"fmt"
"io"
"os"
"strings"
"time"

"github.com/mylxsw/adanos-alert/internal/repository"
"github.com/mylxsw/adanos-alert/pkg/connector"
Expand Down Expand Up @@ -78,15 +80,14 @@ func main() {
RecoveryAfter: c.String("recovery-after"),
}

return connector.Send(
adanosServers,
c.String("adanos-token"),
createMessageMeta(c.StringSlice("meta")),
c.StringSlice("tag"),
c.String("origin"),
ctl,
message,
)
msg := connector.NewMessage(message).
WithTags(c.StringSlice("tag")...).
WithOrigin(c.String("origin")).
WithMetas(createMessageMeta(c.StringSlice("meta"))).
WithCtl(ctl)

ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second)
return connector.NewConnector(c.String("adanos-token"), adanosServers...).Send(ctx, msg)
},
}

Expand Down
4 changes: 2 additions & 2 deletions internal/action/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ func (q *QueueAction) Handle(rule repository.Rule, trigger repository.Trigger, g
})
}

// BuildPayload 创建一个 Payload
func BuildPayload(conf *configs.Config, messageQuerier MessageQuerier, action string, rule repository.Rule, trigger repository.Trigger, grp repository.MessageGroup) *Payload {
// CreatePayload 创建一个 Payload
func CreatePayload(conf *configs.Config, messageQuerier MessageQuerier, action string, rule repository.Rule, trigger repository.Trigger, grp repository.MessageGroup) *Payload {
payload := &Payload{
Action: action,
Rule: rule,
Expand Down
2 changes: 1 addition & 1 deletion internal/action/dingding.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (d DingdingAction) Handle(rule repository.Rule, trigger repository.Trigger,
return fmt.Errorf("query robot for id=%s failed: %v", meta.RobotID, err)
}

payload := BuildPayload(conf, CreateRepositoryMessageQuerier(msgRepo), "dingding", rule, trigger, grp)
payload := CreatePayload(conf, CreateRepositoryMessageQuerier(msgRepo), "dingding", rule, trigger, grp)
ruleTemplateContent, err := template.Parse(d.manager, rule.Template, payload)
if err != nil {
ruleTemplateContent = fmt.Sprintf("<rule> template parse failed: %s", err)
Expand Down
66 changes: 60 additions & 6 deletions pkg/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,71 @@ import (
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/mylxsw/adanos-alert/pkg/misc"
"github.com/mylxsw/asteria/log"
"github.com/pkg/errors"
)

// Connector 是一个连接器对象,用于创建于 Adanos-alert 的连接
type Connector struct {
servers []string
token string
}

// NewConnector create a new connector
func NewConnector(token string, servers ...string) *Connector {
return &Connector{servers: servers, token: token}
}

// Send send a message to adanos server
func (conn *Connector) Send(ctx context.Context, msg *Message) error {
return Send(ctx, conn.servers, conn.token, msg.meta, msg.tags, msg.origin, msg.ctl, msg.content)
}

// Message is a adanos alert message
type Message struct {
meta map[string]interface{}
tags []string
origin string
ctl misc.MessageControl
content string
}

// NewMessage create a new Message
func NewMessage(content string) *Message {
return &Message{content: content, tags: make([]string, 0), meta: make(map[string]interface{})}
}

func (m *Message) WithTags(tags ...string) *Message {
m.tags = append(m.tags, tags...)
return m
}

func (m *Message) WithOrigin(origin string) *Message {
m.origin = origin
return m
}

func (m *Message) WithCtl(ctl misc.MessageControl) *Message {
m.ctl = ctl
return m
}

func (m *Message) WithMetas(metas map[string]interface{}) *Message {
for k, v := range metas {
m.meta[k] = v
}
return m
}

func (m *Message) WithMeta(key string, value interface{}) *Message {
m.meta[key] = value
return m
}

// Send send a message to adanos servers
func Send(servers []string, token string, meta map[string]interface{}, tags []string, origin string, ctl misc.MessageControl, message string) error {
func Send(ctx context.Context, servers []string, token string, meta map[string]interface{}, tags []string, origin string, ctl misc.MessageControl, message string) error {
commonMessage := misc.CommonMessage{
Content: message,
Meta: meta,
Expand All @@ -28,7 +84,7 @@ func Send(servers []string, token string, meta map[string]interface{}, tags []st

var err error
for _, s := range servers {
if err = sendMessageToServer(commonMessage, data, s, token); err == nil {
if err = sendMessageToServer(ctx, commonMessage, data, s, token); err == nil {
break
}

Expand All @@ -38,15 +94,13 @@ func Send(servers []string, token string, meta map[string]interface{}, tags []st
return err
}

func sendMessageToServer(commonMessage misc.CommonMessage, data []byte, adanosServer, adanosToken string) error {
func sendMessageToServer(ctx context.Context, commonMessage misc.CommonMessage, data []byte, adanosServer, adanosToken string) error {
reqURL := fmt.Sprintf("%s/api/messages/", strings.TrimRight(adanosServer, "/"))

log.WithFields(log.Fields{
"message": commonMessage,
}).Debugf("request: %v", reqURL)

ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)

client := &http.Client{}
req, err := http.NewRequestWithContext(ctx, "POST", reqURL, bytes.NewReader(data))
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions pkg/connector/connector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package connector_test

import (
"context"
"testing"
"time"

"github.com/mylxsw/adanos-alert/pkg/connector"
"github.com/stretchr/testify/assert"
)

func TestSend(t *testing.T) {
ctx, _ := context.WithTimeout(context.TODO(), 1*time.Second)
assert.NoError(t, connector.NewConnector("", "http://localhost:19999").Send(
ctx,
connector.NewMessage("Hello, world").
WithMeta("occur_at", time.Now()).
WithMeta("user", "adanos").
WithTags("hello", "connector").
WithOrigin("connector"),
))
}

0 comments on commit 7b0bc17

Please sign in to comment.