Skip to content

Commit

Permalink
stomp: override default behavior for "host" header
Browse files Browse the repository at this point in the history
If not provided the go-stomp package looks at the address reported by
the TCP socket's remote side. This is not useful if the server doesn't
run at a fixed address. The connect function now attempts to parse and
use the specified host name.

Signed-off-by: Hank Donnay <hdonnay@redhat.com>
  • Loading branch information
hdonnay committed Jun 22, 2023
1 parent e2f264f commit 5b87693
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 23 deletions.
1 change: 1 addition & 0 deletions notifier/amqp/deliverer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestDeliverer(t *testing.T) {
if uri == "" {
uri = defaultRabbitMQURI
}
t.Logf("using uri: %q", uri)

conf.URIs = []string{
// give a few bogus URIs to confirm failover mechanisms are working
Expand Down
1 change: 1 addition & 0 deletions notifier/amqp/directdeliverer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestDirectDeliverer(t *testing.T) {
if uri == "" {
uri = defaultRabbitMQURI
}
t.Logf("using uri: %q", uri)
conn, err := samqp.Dial(uri)
if err != nil {
t.Fatalf("failed to connect to broker at %v: %v", uri, err)
Expand Down
9 changes: 7 additions & 2 deletions notifier/stomp/deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net/url"
"time"

gostomp "github.com/go-stomp/stomp/v3"
"github.com/google/uuid"
Expand Down Expand Up @@ -32,6 +33,10 @@ func New(conf *config.STOMP) (*Deliverer, error) {
}

func (d *Deliverer) load(cfg *config.STOMP) error {
d.fo.timeout = 30 * time.Second
// TODO(hank) Wire up the "host" and "timeout" config somehow -- probably
// just make the config URIs strings actual URIs and parse them out with
// query parameters.
var err error
if cfg.TLS != nil {
d.fo.tls, err = cfg.TLS.Config()
Expand All @@ -46,8 +51,8 @@ func (d *Deliverer) load(cfg *config.STOMP) error {
}
}

d.fo.uris = make([]string, len(cfg.URIs))
copy(d.fo.uris, cfg.URIs)
d.fo.addrs = make([]string, len(cfg.URIs))
copy(d.fo.addrs, cfg.URIs)
d.destination = cfg.Destination
d.rollup = cfg.Rollup
return nil
Expand Down
30 changes: 17 additions & 13 deletions notifier/stomp/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,48 @@ import (
//
// failOver is safe for concurrent usage.
type failOver struct {
tls *tls.Config
login *config.Login
uris []string
tls *tls.Config
login *config.Login
addrs []string
timeout time.Duration
}

// Dial will dial the provided URI in accordance with the provided Config.
// Dial will dial the provided address in accordance with the provided Config.
//
// Note: the STOMP protocol does not support multiplexing operations over a
// single TCP connection. A TCP connection must be made for each STOMP
// connection.
func (f *failOver) Dial(ctx context.Context, uri string) (*gostomp.Conn, error) {
func (f *failOver) Dial(ctx context.Context, addr string) (*gostomp.Conn, error) {
var opts []func(*gostomp.Conn) error
if f.login != nil {
opts = append(opts, gostomp.ConnOpt.Login(f.login.Login, f.login.Passcode))
}
if host, _, err := net.SplitHostPort(addr); err == nil {
opts = append(opts, gostomp.ConnOpt.Host(host))
}

var d interface {
DialContext(context.Context, string, string) (net.Conn, error)
} = &net.Dialer{
Timeout: 2 * time.Second,
Timeout: f.timeout,
}
if f.tls != nil {
d = &tls.Dialer{
NetDialer: d.(*net.Dialer),
Config: f.tls,
}
}
conn, err := d.DialContext(ctx, "tcp", uri)
conn, err := d.DialContext(ctx, "tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to connect to broker @ %v: %w", uri, err)
return nil, fmt.Errorf("failed to connect to broker @ %v: %w", addr, err)
}

stompConn, err := gostomp.Connect(conn, opts...)
if err != nil {
if conn != nil {
conn.Close()
}
return nil, fmt.Errorf("stomp connect handshake to broker @ %v failed: %w", uri, err)
return nil, fmt.Errorf("stomp connect handshake to broker @ %v failed: %w", addr, err)
}

return stompConn, err
Expand All @@ -68,13 +72,13 @@ func (f *failOver) Dial(ctx context.Context, uri string) (*gostomp.Conn, error)
func (f *failOver) Connection(ctx context.Context) (*gostomp.Conn, error) {
ctx = zlog.ContextWithValues(ctx, "component", "notifier/stomp/failOver.Connection")

for _, uri := range f.uris {
conn, err := f.Dial(ctx, uri)
for _, addr := range f.addrs {
conn, err := f.Dial(ctx, addr)
if err != nil {
zlog.Debug(ctx).
Str("broker", uri).
Str("broker", addr).
Err(err).
Msg("failed to dial broker. attempting next")
Msg("failed to dial broker, attempting next")
continue
}
return conn, nil
Expand Down
18 changes: 10 additions & 8 deletions notifier/stomp/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/quay/clair/v4/notifier"
)

func setURI(t *testing.T, cfg *config.STOMP, uri string) (dial string, opt []func(*stomp.Conn) error) {
func setURI(t *testing.T, cfg config.STOMP, uri string) (next config.STOMP, dial string, opt []func(*stomp.Conn) error) {
const (
defaultStompBrokerURI = "localhost:61613"
)
Expand All @@ -30,28 +30,30 @@ func setURI(t *testing.T, cfg *config.STOMP, uri string) (dial string, opt []fun
case uri == "":
t.Logf("using default broker URI: %q", defaultStompBrokerURI)
cfg.URIs = append(cfg.URIs, defaultStompBrokerURI)
return defaultStompBrokerURI, nil
return cfg, defaultStompBrokerURI, nil
case strings.Contains(uri, "://"): // probably a URL
u, err := url.Parse(uri)
if err != nil {
t.Logf("weird test URI: %q: %v", uri, err)
return setURI(t, cfg, "")
}
t.Logf("using broker URI: %q", u.Host)
t.Logf("using broker address: %q", u.Host)
cfg.URIs = append(cfg.URIs, u.Host)
t.Logf("using broker vhost: %q", u.Hostname())
opt = append(opt, stomp.ConnOpt.Host(u.Hostname()))
if u := u.User; u != nil {
t.Logf("using login: %q", u.String())
cfg.Login = &config.Login{
Login: u.Username(),
}
cfg.Login.Passcode, _ = u.Password()
opt = append(opt, stomp.ConnOpt.Login(u.Username(), cfg.Login.Passcode))
opt = append(opt, stomp.ConnOpt.Login(cfg.Login.Login, cfg.Login.Passcode))
}
return u.Host, opt
return cfg, u.Host, opt
default:
t.Logf("using broker URI: %q", uri)
cfg.URIs = append(cfg.URIs, uri)
return uri, nil
return cfg, uri, nil
}
}

Expand Down Expand Up @@ -125,7 +127,7 @@ func TestDeliverer(t *testing.T) {
},
}
)
dial, opt := setURI(t, &conf, os.Getenv("STOMP_CONNECTION_STRING"))
conf, dial, opt := setURI(t, conf, os.Getenv("STOMP_CONNECTION_STRING"))
opt = append(opt, stomp.ConnOpt.Logger(logAdapter{t}))

// test parallel usage
Expand Down Expand Up @@ -205,7 +207,7 @@ func TestDirectDeliverer(t *testing.T) {
Rollup: tt.rollup,
Destination: queue,
}
dial, opt := setURI(t, &conf, os.Getenv("STOMP_CONNECTION_STRING"))
conf, dial, opt := setURI(t, conf, os.Getenv("STOMP_CONNECTION_STRING"))

noteID := uuid.New()
notes := make([]notifier.Notification, 0, tt.notes)
Expand Down

0 comments on commit 5b87693

Please sign in to comment.