Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions go/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
# Go code for RabbitMQ tutorials

Here you can find Go code examples from
[RabbitMQ tutorials](http://www.rabbitmq.com/getstarted.html).

Here you can find Go code examples from [RabbitMQ tutorials](http://www.rabbitmq.com/getstarted.html).


## Requirements

To run this code you need [Go RabbitMQ client](https://github.com/streadway/amqp).
To run this code you need [Go RabbitMQ client](https://github.com/streadway/amqp):

go get github.com/streadway/amqp


## Code

Expand Down Expand Up @@ -38,6 +42,7 @@ Code examples are executed via `go run`:

[Tutorial six: RPC](http://www.rabbitmq.com/tutorial-six-python.html)

TBD
go run rpc_server.go
go run rpc_client.go 10

To learn more, see [Go RabbitMQ client](https://github.com/streadway/amqp).
33 changes: 15 additions & 18 deletions go/emit_log.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package main

import (
"github.com/streadway/amqp"
"fmt"
"log"
"os"
"fmt"

"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
Expand All @@ -24,13 +25,13 @@ func main() {
defer ch.Close()

err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")

Expand All @@ -41,24 +42,20 @@ func main() {
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
ContentType: "text/plain",
Body: []byte(body),
})

failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

os.Exit(0)
log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
var body string
if (len(args) < 1) || os.Args[1] == "" {
if (len(args) < 2) || os.Args[1] == "" {
body = "hello"
} else {
body = os.Args[1]

}

return body
}
}
32 changes: 13 additions & 19 deletions go/emit_log_direct.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package main

import (
"github.com/streadway/amqp"
"fmt"
"log"
"os"
"fmt"
"strings"

"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
Expand All @@ -30,7 +31,7 @@ func main() {
true, // durable
false, // auto-deleted
false, // internal
false, // noWait
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
Expand All @@ -39,40 +40,33 @@ func main() {
err = ch.Publish(
"logs_direct", // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
ContentType: "text/plain",
Body: []byte(body),
})

failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

os.Exit(0)
log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[2] == "" {
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")

s = strings.Join(args[2:], " ")
}

return s
}

func severityFrom(args []string) string {
var s string

if (len(args) < 1) || os.Args[1] == "" {
if (len(args) < 2) || os.Args[1] == "" {
s = "info"
} else {
s = os.Args[1]

}

return s
}
}
42 changes: 18 additions & 24 deletions go/emit_log_topic.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package main

import (
"github.com/streadway/amqp"
"fmt"
"log"
"os"
"fmt"
"strings"

"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
Expand All @@ -27,52 +28,45 @@ func main() {
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")

body := bodyFrom(os.Args)
err = ch.Publish(
"logs_topic", // exchange
"logs_topic", // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
ContentType: "text/plain",
Body: []byte(body),
})

failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

os.Exit(0)
log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[2] == "" {
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")

s = strings.Join(args[2:], " ")
}

return s
}

func severityFrom(args []string) string {
var s string

if (len(args) < 1) || os.Args[1] == "" {
if (len(args) < 2) || os.Args[1] == "" {
s = "anonymous.info"
} else {
s = os.Args[1]

}

return s
}
}
20 changes: 8 additions & 12 deletions go/new_task.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package main

import (
"github.com/streadway/amqp"
"log"
"os"
"fmt"
"log"
"strings"

"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
Expand All @@ -30,24 +30,20 @@ func main() {
"task_queue", // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")

os.Exit(0)
}

func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[2] == "" {
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")

}

return s
}
27 changes: 15 additions & 12 deletions go/receive.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package main

import (
"github.com/streadway/amqp"
"log"
"os"
"fmt"
"log"

"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
Expand All @@ -21,34 +21,37 @@ func main() {

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")

defer ch.Close()

q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when usused
false, // exclusive
false, // noWait
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

done := make(chan bool)
forever := make(chan bool)

go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
done <- true
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-done
log.Printf("Done")

os.Exit(0)
<-forever
}
Loading