Skip to content

Commit

Permalink
Change queue
Browse files Browse the repository at this point in the history
  • Loading branch information
pratishshr committed Sep 10, 2019
1 parent 93aa90f commit a5e1367
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 1 deletion.
Binary file added deployment/deploy
Binary file not shown.
71 changes: 71 additions & 0 deletions deployment/services/mq/deployment/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package deployment

import (
"fmt"

"github.com/leapfrogtechnology/shift/deployment/services/mq"
"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
if err != nil {
panic(fmt.Sprintf("%s: %s", msg, err))
}
}

func getQueue() (*amqp.Connection, *amqp.Channel, *amqp.Queue) {
conn := mq.GetConnection()

ch, err := conn.Channel()

failOnError(err, "Failed to open a channel")

q, err := ch.QueueDeclare("Deployment",
false, // durable bool
false, // autoDelete bool
false, // exclusive bool
false, // noWait bool
nil, // args amqp.Table
)

failOnError(err, "Failed to declare a queue")

return conn, ch, &q
}

// Publish message to queue
func Publish(message []byte) {
_, ch, q := getQueue()

defer ch.Close()

msg := amqp.Publishing{
ContentType: "application/json",
Body: message,
}

ch.Publish("", q.Name, false, false, msg)
}

// Consume listens to message from queue
func Consume(deploy func([]byte)) {
_, ch, q := getQueue()

defer ch.Close()

msgs, err := ch.Consume(
q.Name, // queue string
"", // consumer string
true, // autoAck bool
false, // exclusive bool
false, // noLocal bool
false, // noWait bool
nil, //args amqp.Table
)

failOnError(err, "Failed to register a consumer")

for msg := range msgs {
deploy(msg.Body)
}
}
2 changes: 1 addition & 1 deletion deployment/services/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func failOnError(err error, msg string) {
}

func getQueue() (*amqp.Connection, *amqp.Channel, *amqp.Queue) {
conn, err := amqp.Dial("amqp://shift:shiftdeveloper@dev-shiftmq.lftechnology.com:5672")
conn, err := amqp.Dial("amqp://shift:shiftdeveloper@localhost:5672")

failOnError(err, "Failed to connect to RabbitMQ")

Expand Down
71 changes: 71 additions & 0 deletions deployment/services/mq/trigger/trigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package trigger

import (
"fmt"

"github.com/leapfrogtechnology/shift/deployment/services/mq"
"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
if err != nil {
panic(fmt.Sprintf("%s: %s", msg, err))
}
}

func getQueue() (*amqp.Connection, *amqp.Channel, *amqp.Queue) {
conn := mq.GetConnection()

ch, err := conn.Channel()

failOnError(err, "Failed to open a channel")

q, err := ch.QueueDeclare("Trigger",
false, // durable bool
false, // autoDelete bool
false, // exclusive bool
false, // noWait bool
nil, // args amqp.Table
)

failOnError(err, "Failed to declare a queue")

return conn, ch, &q
}

// Publish message to queue
func Publish(message []byte) {
_, ch, q := getQueue()

defer ch.Close()

msg := amqp.Publishing{
ContentType: "application/json",
Body: message,
}

ch.Publish("", q.Name, false, false, msg)
}

// Consume listens to message from queue
func Consume(deploy func([]byte)) {
_, ch, q := getQueue()

defer ch.Close()

msgs, err := ch.Consume(
q.Name, // queue string
"", // consumer string
true, // autoAck bool
false, // exclusive bool
false, // noLocal bool
false, // noWait bool
nil, //args amqp.Table
)

failOnError(err, "Failed to register a consumer")

for msg := range msgs {
deploy(msg.Body)
}
}

0 comments on commit a5e1367

Please sign in to comment.