forked from Syncano/rabbitmq_exporter
-
Notifications
You must be signed in to change notification settings - Fork 194
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
added a docker test environment for testing the exporter against real rabbitmqs.
- Loading branch information
Showing
5 changed files
with
241 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,3 +3,4 @@ dependencies-stamp | |
rabbitmq_exporter | ||
rabbitmq_exporter.exe | ||
.tarballs/ | ||
coverage.out |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
// +build integration | ||
|
||
package main | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"testing" | ||
"time" | ||
|
||
"regexp" | ||
|
||
"github.com/kbudde/rabbitmq_exporter/testenv" | ||
) | ||
|
||
func TestQueueCount(t *testing.T) { | ||
env := testenv.NewEnvironment(t, testenv.RabbitMQ3Latest) | ||
defer env.CleanUp() // do not panic or exit fatally or the container will stay up | ||
|
||
var exporterURL = fmt.Sprintf("http://localhost:%s/metrics", defaultConfig.PublishPort) | ||
var rabbitManagementURL = env.ManagementURL() | ||
os.Setenv("RABBIT_URL", rabbitManagementURL) | ||
defer os.Unsetenv("RABBIT_URL") | ||
|
||
go main() | ||
time.Sleep(2 * time.Second) | ||
t.Run("Ensure there are no queues", func(t *testing.T) { | ||
body := testenv.GetOrDie(exporterURL, 5*time.Second) | ||
|
||
r := regexp.MustCompile("rabbitmq_queuesTotal 0") | ||
if s := r.FindString(body); s == "" { | ||
t.Fatalf("QueueCount not found in body: %v", body) | ||
} | ||
}) | ||
|
||
t.Run("Add one queue and check again", func(t *testing.T) { | ||
env.Rabbit.DeclareQueue("QueueForCheckCount", false) | ||
|
||
body := testenv.GetOrDie(exporterURL, 5*time.Second) | ||
|
||
r := regexp.MustCompile("rabbitmq_queuesTotal 1") | ||
if s := r.FindString(body); s == "" { | ||
t.Logf("body: %s", body) | ||
t.Fatalf("QueueCount not found ") | ||
} | ||
}) | ||
// not implemented | ||
// t.Run("Add message with timestamp", func(t *testing.T) { | ||
// queue := "timestamp" | ||
// env.Rabbit.DeclareQueue(queue, true) | ||
// timestamp := time.Now() | ||
// env.Rabbit.SendMessageToQ("Test timestamp", queue, ×tamp) | ||
|
||
// // log.Println(timestamp.Unix()) | ||
// body := testenv.GetOrDie(exporterURL, 5*time.Second) | ||
|
||
// r := regexp.MustCompile("rabbitmq_queue_headmessage_time{queue=\"QueueForCheckCount\",vhost=\"/\"} 123456") | ||
// if s := r.FindString(body); s == "" { | ||
// t.Logf("body: %s", body) | ||
// log.Println(rabbitManagementURL + "/api/queues") | ||
// // time.Sleep(30 * time.Minute) | ||
// // t.Fatalf("QueueCount not found ") | ||
// } | ||
// }) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package testenv | ||
|
||
import ( | ||
"log" | ||
|
||
"time" | ||
|
||
"github.com/streadway/amqp" | ||
) | ||
|
||
type rabbit struct { | ||
conn *amqp.Connection | ||
channel *amqp.Channel | ||
} | ||
|
||
func (r *rabbit) connect(url string) { | ||
conn, err := amqp.Dial(url) | ||
if err != nil { | ||
log.Fatalf("Failed to connect to RabbitMQ:%s", err) | ||
} | ||
r.conn = conn | ||
|
||
ch, err := conn.Channel() | ||
if err != nil { | ||
log.Fatalf("Failed to open a channel: %s", err) | ||
} | ||
r.channel = ch | ||
} | ||
|
||
func (r *rabbit) DeclareQueue(name string, durable bool) { | ||
_, err := r.channel.QueueDeclare( | ||
name, // name | ||
durable, // durable | ||
false, // delete when unused | ||
false, // exclusive | ||
false, // no-wait | ||
nil, // arguments | ||
) | ||
if err != nil { | ||
log.Fatalf("Failed to declare a queue: %s", err) | ||
} | ||
} | ||
|
||
func (r *rabbit) SendMessageToQ(body string, routingKey string, timestamp *time.Time) { | ||
pub := amqp.Publishing{ | ||
ContentType: "text/plain", | ||
Body: []byte(body), | ||
} | ||
if timestamp != nil { | ||
pub.Timestamp = *timestamp | ||
} | ||
err := r.channel.Publish( | ||
"", // exchange | ||
routingKey, // routing key | ||
false, // mandatory | ||
false, // immediate | ||
pub) | ||
if err != nil { | ||
log.Fatalf("Failed to publish a message:%s . Error:%s", body, err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
// Package testenv provides a rabbitmq test environment in docker for a full set of integration tests. | ||
// Some usefull helper functions for rabbitmq interaction are included as well | ||
package testenv | ||
|
||
import ( | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"testing" | ||
"time" | ||
|
||
"os" | ||
|
||
"log" | ||
|
||
"gopkg.in/ory-am/dockertest.v3" | ||
) | ||
|
||
//list of docker tags with rabbitmq versions | ||
const ( | ||
RabbitMQ3_5 = "3.5-management" | ||
RabbitMQ3Latest = "3-management-alpine" | ||
) | ||
|
||
// MaxWait is time before the docker setup will fail with timeout | ||
var MaxWait = 20 * time.Second | ||
|
||
//TestEnvironment contains all necessars | ||
type TestEnvironment struct { | ||
t *testing.T | ||
docker *dockertest.Pool | ||
resource *dockertest.Resource | ||
Rabbit rabbit | ||
} | ||
|
||
//NewEnvironment sets up a new environment. It will nlog fatal if something goes wrong | ||
func NewEnvironment(t *testing.T, dockerTag string) TestEnvironment { | ||
tenv := TestEnvironment{t: t} | ||
|
||
pool, err := dockertest.NewPool("") | ||
if err != nil { | ||
log.Fatalf("Could not connect to docker: %s", err) | ||
} | ||
tenv.docker = pool | ||
tenv.docker.MaxWait = MaxWait | ||
|
||
// pulls an image, creates a container based on it and runs it | ||
resource, err := tenv.docker.Run("rabbitmq", dockerTag, []string{}) | ||
if err != nil { | ||
log.Fatalf("Could not start resource: %s", err) | ||
} | ||
tenv.resource = resource | ||
|
||
checkManagementWebsite := func() error { | ||
_, err := GetURL(tenv.ManagementURL(), 5*time.Second) | ||
return err | ||
} | ||
|
||
// exponential backoff-retry, because the application in the container might not be ready to accept connections yet | ||
if err := tenv.docker.Retry(checkManagementWebsite); err != nil { | ||
perr := tenv.docker.Purge(resource) | ||
log.Fatalf("Could not connect to docker: %s; Purge Error: %s", err, perr) | ||
} | ||
|
||
r := rabbit{} | ||
r.connect(tenv.AmqpURL(true)) | ||
tenv.Rabbit = r | ||
return tenv | ||
} | ||
|
||
// CleanUp removes the container. If not called the container will run forever | ||
func (tenv *TestEnvironment) CleanUp() { | ||
if err := tenv.docker.Purge(tenv.resource); err != nil { | ||
fmt.Fprintf(os.Stderr, "Could not purge resource: %s", err) | ||
} | ||
} | ||
|
||
//ManagementURL returns the full http url including username/password to the management api in the docker environment. | ||
// e.g. http://guest:guest@localhost:15672 | ||
func (tenv *TestEnvironment) ManagementURL() string { | ||
return fmt.Sprintf("http://guest:guest@localhost:%s", tenv.resource.GetPort("15672/tcp")) | ||
} | ||
|
||
//AmqpURL returns the url to the rabbitmq server | ||
// e.g. amqp://localhost:5672 | ||
func (tenv *TestEnvironment) AmqpURL(withCred bool) string { | ||
return fmt.Sprintf("amqp://localhost:%s", tenv.resource.GetPort("5672/tcp")) | ||
} | ||
|
||
// GetURL fetches the url. Will fail after timeout. | ||
func GetURL(url string, timeout time.Duration) (string, error) { | ||
maxTime := time.Duration(timeout) | ||
client := http.Client{ | ||
Timeout: maxTime, | ||
} | ||
resp, err := client.Get(url) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
body, err := ioutil.ReadAll(resp.Body) | ||
resp.Body.Close() | ||
|
||
return string(body), err | ||
} | ||
|
||
func GetOrDie(url string, timeout time.Duration) string { | ||
body, err := GetURL(url, timeout) | ||
if err != nil { | ||
log.Fatalf("Failed to get url in time: %s", err) | ||
} | ||
return body | ||
} |