Skip to content

Commit

Permalink
Merge ce18ddc into 2d23877
Browse files Browse the repository at this point in the history
  • Loading branch information
talal committed Jul 22, 2019
2 parents 2d23877 + ce18ddc commit 0d88775
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 21 deletions.
26 changes: 5 additions & 21 deletions pkg/audit/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
package audit

import (
"encoding/json"
"fmt"

"github.com/prometheus/client_golang/prometheus"
"github.com/sapcc/hermes/pkg/cadf"
"github.com/sapcc/hermes/pkg/rabbit"
"github.com/streadway/amqp"
)

Expand Down Expand Up @@ -52,38 +53,21 @@ func sendEvents(clusterID string, config Config, events []cadf.Event) error {
defer ch.Close()

//declare a queue to hold and deliver messages to consumers
q, err := ch.QueueDeclare(
config.RabbitMQ.QueueName, // name of the queue
false, // durable: queue should survive cluster reset (or broker restart)
false, // autodelete when unused
false, // exclusive: queue only accessible by connection that declares and deleted when the connection closes
false, // noWait: the queue will assume to be declared on the server
nil, // arguments for advanced config
)
q, err := rabbit.DeclareQueue(ch, config.RabbitMQ.QueueName)
if err != nil {
eventPublishFailedCounter.With(labels).Inc()
return fmt.Errorf("RabbitMQ -- %s -- Failed to declare a queue: %s", events[0].ID, err)
}

//publish the events to an exchange on the server
for _, event := range events {
body, _ := json.Marshal(event)
err = ch.Publish(
"", // exchange: publish to default
q.Name, // routing key: same as queue name
false, // mandatory: don't publish if no queue is bound that matches the routing key
false, // immediate: don't publish if no consumer on the matched queue is ready to accept the delivery
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
err := rabbit.PublishEvent(ch, q.Name, &event)
if err != nil {
eventPublishFailedCounter.With(labels).Inc()
return fmt.Errorf("RabbitMQ -- %s -- Failed to publish the audit event: %s", event.ID, err)
}
eventPublishSuccessCounter.With(labels).Inc()
}

return err
return nil
}
65 changes: 65 additions & 0 deletions pkg/rabbit/rabbit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*******************************************************************************
*
* Copyright 2019 SAP SE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You should have received a copy of the License along with this
* program. If not, you may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/

package rabbit

import (
"encoding/json"
"errors"

"github.com/sapcc/hermes/pkg/cadf"
"github.com/streadway/amqp"
)

// DeclareQueue is a wrapper around *amqp.Channel.QueueDeclare. It declares a
// Queue with parameters expected by Hermes' RabbitMQ deployment.
func DeclareQueue(ch *amqp.Channel, queueName string) (amqp.Queue, error) {
return ch.QueueDeclare(
queueName, // name of the queue
false, // durable: queue should survive cluster reset (or broker restart)
false, // autodelete when unused
false, // exclusive: queue only accessible by connection that declares and deleted when the connection closes
false, // noWait: the queue will assume to be declared on the server
nil, // arguments for advanced config
)
}

// PublishEvent is a wrapper around *amqp.Channel.Publish. It publishes a
// cadf.Event to the specified Queue with parameters expected by Hermes'
// RabbitMQ deployment.
// A nil pointer for event parameter will result in an error.
func PublishEvent(ch *amqp.Channel, queueName string, event *cadf.Event) error {
if event == nil {
return errors.New("rabbit: could not publish event: got a nil pointer for 'event' parameter")
}
b, err := json.Marshal(event)
if err != nil {
return err
}
return ch.Publish(
"", // exchange: publish to default
queueName, // routing key: same as queue name
false, // mandatory: don't publish if no queue is bound that matches the routing key
false, // immediate: don't publish if no consumer on the matched queue is ready to accept the delivery
amqp.Publishing{
ContentType: "text/plain",
Body: b,
},
)
}

0 comments on commit 0d88775

Please sign in to comment.