Skip to content

Commit

Permalink
notifier: check msg contents in integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Zmeskal <jzmeskal@redhat.com>
  • Loading branch information
Jan Zmeskal authored and ldelossa committed Apr 27, 2021
1 parent 9e67501 commit 6d33153
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 55 deletions.
10 changes: 10 additions & 0 deletions Documentation/howto/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ localhost:8081 --- Postgres GUI (pgadmin4)
localhost:8082 --- OpenAPI Swagger Editor.
You can view ClairV4's public API here.
localhost:8087 --- RabbitMQ management GUI
Login:
username: guest
password: guest
localhost:8161 --- ActiveMQ management GUI
Login:
username: admin
password: admin
localhost:7000 --- Traefik Web UI.
Good for troubleshooting http issues.
Expand Down
6 changes: 4 additions & 2 deletions Documentation/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ Integer 0 or greater.
If `direct` is true this value will inform notifier how many notifications
to send in a single direct delivery. For example, if `direct` is set to
`true` and `rollup` is set to `5`, the notifier will deliver no more then
5 notifications in a single json payload to the broker.
5 notifications in a single json payload to the broker. Setting the value
to 0 will effectively set it to 1.

#### `$.notifier.amqp.exchange`
The AMQP Exchange to connect to.
Expand Down Expand Up @@ -442,7 +443,8 @@ Integer 0 or greater.
If `direct` is `true`, this value will limit the number of notifications
sent in a single direct delivery. For example, if `direct` is set to
`true` and `rollup` is set to `5`, the notifier will deliver no more
then 5 notifications in a single json payload to the broker.
then 5 notifications in a single json payload to the broker. Setting the value
to 0 will effectively set it to 1.

#### `$.notifier.stomp.callback`
a URL string
Expand Down
1 change: 1 addition & 0 deletions notifier/amqp/deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (d *Deliverer) Deliver(ctx context.Context, nID uuid.UUID) error {
if err != nil {
return &clairerror.ErrDeliveryFailed{err}
}
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
Expand Down
65 changes: 65 additions & 0 deletions notifier/amqp/deliverer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package amqp

import (
"context"
"encoding/json"
"fmt"
"os"
"testing"
"time"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -56,10 +58,14 @@ func TestDeliverer(t *testing.T) {
if err != nil {
t.Fatalf("failed to connect to broker at %v: %v", uri, err)
}
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
t.Fatalf("failed to obtain channel from broker %v: %v", uri, err)
}
defer ch.Close()

// this queue will autobind to the default "direct" exchange
// and the queue name may be used as the routing key.
_, err = ch.QueueDeclare(
Expand Down Expand Up @@ -96,4 +102,63 @@ func TestDeliverer(t *testing.T) {
t.Fatalf("test failed: %v", err)
}

// create consumer
consumerConn, err := samqp.Dial(uri)
if err != nil {
t.Fatalf("failed to create consumer connection: %v", err)
}
defer consumerConn.Close()

consumerCh, err := consumerConn.Channel()
if err != nil {
t.Fatalf("failed to create consumer channel: %v", err)
}
defer consumerCh.Close()

msgs, err := consumerCh.Consume(
queueAndKey,
"test",
false,
false,
false,
false,
nil,
)
if err != nil {
t.Fatalf("failed to start consuming messages: %v", err)
}

// read messages
for i := 0; i < 4; i++ {
m := <-msgs
if m.ContentType != "application/json" {
t.Errorf("msg content type mismatch: expected %s, got %s", "application/json", m.ContentType)
}
if m.AppId != "clairV4-notifier" {
t.Errorf("msg app ID mismatch: expected %s, got %s", "clairV4-notifier", m.AppId)
}
var msgBody map[string]string
if err = json.Unmarshal(m.Body, &msgBody); err != nil {
t.Errorf("cannot unmarshall msg body into map: %v", err)
}
nid, ok := msgBody["notification_id"]
if !ok {
t.Errorf("cannot find \"notification_id\" key in msg body")
}
cb, ok := msgBody["callback"]
if !ok {
t.Errorf("cannot find \"callback\" key in msg body")
}
if cb != fmt.Sprintf("%s/%s", callback, nid) {
t.Errorf("callback mismatch: expected: %s, got %s", fmt.Sprintf("%s/%s", callback, nid), cb)
}
m.Ack(false)
}

// check if msgs channel is empty
select {
case m := <-msgs:
t.Fatalf("there is still msg in msgs channel: %#v", m)
case <-time.After(1 * time.Millisecond): // no msg found, as expected
}
}
1 change: 1 addition & 0 deletions notifier/amqp/directdeliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (d *DirectDeliverer) Deliver(ctx context.Context, _ uuid.UUID) error {
if err != nil {
return &clairerror.ErrDeliveryFailed{err}
}
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
Expand Down
118 changes: 93 additions & 25 deletions notifier/amqp/directdeliverer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package amqp

import (
"context"
"encoding/json"
"fmt"
"os"
"testing"
"time"

"github.com/google/uuid"
"github.com/quay/clair/v4/notifier"
Expand All @@ -20,44 +22,52 @@ func TestDirectDeliverer(t *testing.T) {
integration.Skip(t)
// test start
table := []struct {
name string
rollup int
notes int
name string
rollup int
notes int
expectedMsgs int
}{
{
name: "check 0",
rollup: 0,
notes: 1,
name: "check 0",
rollup: 0,
notes: 1,
expectedMsgs: 1,
},
{
name: "check 1",
rollup: 1,
notes: 5,
name: "check 1",
rollup: 1,
notes: 5,
expectedMsgs: 5,
},
{
name: "check rollup overflow",
rollup: 10,
notes: 5,
name: "check rollup overflow",
rollup: 10,
notes: 5,
expectedMsgs: 1,
},
{
name: "check odds",
rollup: 3,
notes: 7,
name: "check odds",
rollup: 3,
notes: 7,
expectedMsgs: 3,
},
{
name: "check odds rollup",
rollup: 3,
notes: 8,
name: "check odds rollup",
rollup: 3,
notes: 8,
expectedMsgs: 3,
},
{
name: "check odds notes",
rollup: 4,
notes: 7,
name: "check odds notes",
rollup: 4,
notes: 7,
expectedMsgs: 2,
},
{
name: "check large",
rollup: 100,
notes: 1000,
name: "check large",
rollup: 100,
notes: 1000,
expectedMsgs: 10,
},
}

Expand All @@ -69,14 +79,14 @@ func TestDirectDeliverer(t *testing.T) {
if err != nil {
t.Fatalf("failed to connect to broker at %v: %v", uri, err)
}
defer conn.Close()
// our test assumes a default exchange
exchange := Exchange{
Name: "",
Type: "direct",
Durable: true,
AutoDelete: false,
}
defer conn.Close()
for _, tt := range table {
t.Run(tt.name, func(t *testing.T) {
// rabbitmq queue declare
Expand All @@ -87,6 +97,7 @@ func TestDirectDeliverer(t *testing.T) {
if err != nil {
t.Fatalf("failed to obtain channel from broker %v: %v", uri, err)
}
defer ch.Close()
// this queue will autobind to the default "direct" exchange
// and the queue name may be used as the routing key.
_, err = ch.QueueDeclare(
Expand Down Expand Up @@ -154,6 +165,63 @@ func TestDirectDeliverer(t *testing.T) {
if err := g.Wait(); err != nil {
t.Fatalf("test failed: %v", err)
}

// create consumer
consumerConn, err := samqp.Dial(uri)
if err != nil {
t.Fatalf("failed to create consumer connection: %v", err)
}
defer consumerConn.Close()

consumerCh, err := consumerConn.Channel()
if err != nil {
t.Fatalf("failed to create consumer channel: %v", err)
}
defer consumerCh.Close()

msgs, err := consumerCh.Consume(
queueAndKey,
"test",
false,
false,
false,
false,
nil,
)
if err != nil {
t.Fatalf("failed to start consuming messages: %v", err)
}

// read messages
totalExpectedMsgs := tt.expectedMsgs * 4
for i := 0; i < totalExpectedMsgs; i++ {
m := <-msgs
if m.ContentType != "application/json" {
t.Errorf("msg content type mismatch: expected %s, got %s", "application/json", m.ContentType)
}
if m.AppId != "clairV4-notifier" {
t.Errorf("msg app ID mismatch: expected %s, got %s", "clairV4-notifier", m.AppId)
}
var msgBody []notifier.Notification
if err = json.Unmarshal(m.Body, &msgBody); err != nil {
t.Errorf("cannot unmarshall msg body into slice of notifications: %v", err)
}
rollup := tt.rollup
if tt.rollup == 0 {
rollup++
}
if len(msgBody) > rollup {
t.Errorf("found more notifications in msg than expected: rollup %d, got %d", rollup, len(msgBody))
}
m.Ack(false)
}

// check if msgs channel is empty
select {
case <-msgs:
t.Fatal("there is still msg in msgs channel")
case <-time.After(1 * time.Millisecond): // no msg found, as expected
}
})
}
}

0 comments on commit 6d33153

Please sign in to comment.