diff --git a/go/README.md b/go/README.md index 17454213..c74f282f 100644 --- a/go/README.md +++ b/go/README.md @@ -1,11 +1,15 @@ # Go code for RabbitMQ tutorials -Here you can find Go code examples from -[RabbitMQ tutorials](http://www.rabbitmq.com/getstarted.html). + +Here you can find Go code examples from [RabbitMQ tutorials](http://www.rabbitmq.com/getstarted.html). + ## Requirements -To run this code you need [Go RabbitMQ client](https://github.com/streadway/amqp). +To run this code you need [Go RabbitMQ client](https://github.com/streadway/amqp): + + go get github.com/streadway/amqp + ## Code @@ -38,6 +42,7 @@ Code examples are executed via `go run`: [Tutorial six: RPC](http://www.rabbitmq.com/tutorial-six-python.html) - TBD + go run rpc_server.go + go run rpc_client.go 10 To learn more, see [Go RabbitMQ client](https://github.com/streadway/amqp). diff --git a/go/emit_log.go b/go/emit_log.go index 288b18ad..c0cffa52 100644 --- a/go/emit_log.go +++ b/go/emit_log.go @@ -1,10 +1,11 @@ package main import ( - "github.com/streadway/amqp" + "fmt" "log" "os" - "fmt" + + "github.com/streadway/amqp" ) func failOnError(err error, msg string) { @@ -24,13 +25,13 @@ func main() { defer ch.Close() err = ch.ExchangeDeclare( - "logs", // name - "fanout", // type - true, // durable - false, // auto-deleted - false, // internal - false, // noWait - nil, // arguments + "logs", // name + "fanout", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments ) failOnError(err, "Failed to declare an exchange") @@ -41,24 +42,20 @@ func main() { false, // mandatory false, // immediate amqp.Publishing{ - ContentType: "text/plain", - Body: []byte(body), + ContentType: "text/plain", + Body: []byte(body), }) - failOnError(err, "Failed to publish a message") - log.Printf(" [x] Sent %s", body) - os.Exit(0) + log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var body string - if (len(args) < 1) || os.Args[1] == "" { + if (len(args) < 2) || os.Args[1] == "" { body = "hello" } else { body = os.Args[1] - } - return body -} \ No newline at end of file +} diff --git a/go/emit_log_direct.go b/go/emit_log_direct.go index 636bce8c..bcc6c856 100644 --- a/go/emit_log_direct.go +++ b/go/emit_log_direct.go @@ -1,11 +1,12 @@ package main import ( - "github.com/streadway/amqp" + "fmt" "log" "os" - "fmt" "strings" + + "github.com/streadway/amqp" ) func failOnError(err error, msg string) { @@ -30,7 +31,7 @@ func main() { true, // durable false, // auto-deleted false, // internal - false, // noWait + false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") @@ -39,40 +40,33 @@ func main() { err = ch.Publish( "logs_direct", // exchange severityFrom(os.Args), // routing key - false, // mandatory - false, // immediate + false, // mandatory + false, // immediate amqp.Publishing{ - ContentType: "text/plain", - Body: []byte(body), + ContentType: "text/plain", + Body: []byte(body), }) - failOnError(err, "Failed to publish a message") - log.Printf(" [x] Sent %s", body) - os.Exit(0) + log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string - if (len(args) < 2) || os.Args[2] == "" { + if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { - s = strings.Join(args[1:], " ") - + s = strings.Join(args[2:], " ") } - return s } func severityFrom(args []string) string { var s string - - if (len(args) < 1) || os.Args[1] == "" { + if (len(args) < 2) || os.Args[1] == "" { s = "info" } else { s = os.Args[1] - } - return s -} \ No newline at end of file +} diff --git a/go/emit_log_topic.go b/go/emit_log_topic.go index 6cfaa887..1bee2543 100644 --- a/go/emit_log_topic.go +++ b/go/emit_log_topic.go @@ -1,11 +1,12 @@ package main import ( - "github.com/streadway/amqp" + "fmt" "log" "os" - "fmt" "strings" + + "github.com/streadway/amqp" ) func failOnError(err error, msg string) { @@ -27,52 +28,45 @@ func main() { err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type - true, // durable - false, // auto-deleted - false, // internal - false, // noWait - nil, // arguments + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( - "logs_topic", // exchange + "logs_topic", // exchange severityFrom(os.Args), // routing key - false, // mandatory - false, // immediate + false, // mandatory + false, // immediate amqp.Publishing{ - ContentType: "text/plain", - Body: []byte(body), + ContentType: "text/plain", + Body: []byte(body), }) - failOnError(err, "Failed to publish a message") - log.Printf(" [x] Sent %s", body) - os.Exit(0) + log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string - if (len(args) < 2) || os.Args[2] == "" { + if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { - s = strings.Join(args[1:], " ") - + s = strings.Join(args[2:], " ") } - return s } func severityFrom(args []string) string { var s string - - if (len(args) < 1) || os.Args[1] == "" { + if (len(args) < 2) || os.Args[1] == "" { s = "anonymous.info" } else { s = os.Args[1] - } - return s -} \ No newline at end of file +} diff --git a/go/new_task.go b/go/new_task.go index 7eb8d087..104c86e8 100644 --- a/go/new_task.go +++ b/go/new_task.go @@ -1,11 +1,11 @@ package main import ( - "github.com/streadway/amqp" - "log" - "os" "fmt" + "log" "strings" + + "github.com/streadway/amqp" ) func failOnError(err error, msg string) { @@ -30,24 +30,20 @@ func main() { "task_queue", // routing key false, // mandatory false, - amqp.Publishing { - DeliveryMode: amqp.Persistent, - ContentType: "text/plain", - Body: []byte(body), + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: []byte(body), }) failOnError(err, "Failed to publish a message") - - os.Exit(0) } func bodyFrom(args []string) string { var s string - if (len(args) < 2) || os.Args[2] == "" { + if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") - } - return s } diff --git a/go/receive.go b/go/receive.go index db8bbce3..51d4f7e8 100644 --- a/go/receive.go +++ b/go/receive.go @@ -1,10 +1,10 @@ package main import ( - "github.com/streadway/amqp" - "log" - "os" "fmt" + "log" + + "github.com/streadway/amqp" ) func failOnError(err error, msg string) { @@ -21,7 +21,6 @@ func main() { ch, err := conn.Channel() failOnError(err, "Failed to open a channel") - defer ch.Close() q, err := ch.QueueDeclare( @@ -29,26 +28,30 @@ func main() { false, // durable false, // delete when usused false, // exclusive - false, // noWait + false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") - msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) failOnError(err, "Failed to register a consumer") - done := make(chan bool) + forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) - done <- true } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") - <-done - log.Printf("Done") - - os.Exit(0) + <-forever } diff --git a/go/receive_logs.go b/go/receive_logs.go index 78748397..15158a01 100644 --- a/go/receive_logs.go +++ b/go/receive_logs.go @@ -1,10 +1,10 @@ package main import ( - "github.com/streadway/amqp" - "log" - "os" "fmt" + "log" + + "github.com/streadway/amqp" ) func failOnError(err error, msg string) { @@ -27,11 +27,12 @@ func main() { "", // name false, // durable false, // delete when usused - false, // exclusive - false, // noWait + true, // exclusive + false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") + err = ch.QueueBind( q.Name, // queue name "", // routing key @@ -40,20 +41,25 @@ func main() { nil) failOnError(err, "Failed to bind a queue") - msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") - done := make(chan bool) + forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) - done <- true } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") - <-done - log.Printf("Done") - - os.Exit(0) + <-forever } diff --git a/go/receive_logs_direct.go b/go/receive_logs_direct.go index ff657e3d..10b60e7f 100644 --- a/go/receive_logs_direct.go +++ b/go/receive_logs_direct.go @@ -1,10 +1,11 @@ package main import ( - "github.com/streadway/amqp" + "fmt" "log" "os" - "fmt" + + "github.com/streadway/amqp" ) func failOnError(err error, msg string) { @@ -29,16 +30,17 @@ func main() { true, // durable false, // auto-deleted false, // internal - false, // noWait + false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") + q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused - false, // exclusive - false, // noWait + true, // exclusive + false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") @@ -50,24 +52,33 @@ func main() { s, // routing key "logs_direct", // exchange false, - nil) + nil) failOnError(err, "Failed to bind a queue") } - msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto ack + false, // exclusive + false, // no local + false, // no wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") done := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) + done <- true } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") + <-done log.Printf("Done") - - os.Exit(0) } diff --git a/go/receive_logs_topic.go b/go/receive_logs_topic.go index 7e70ae6c..ec96fddd 100644 --- a/go/receive_logs_topic.go +++ b/go/receive_logs_topic.go @@ -1,10 +1,11 @@ package main import ( - "github.com/streadway/amqp" + "fmt" "log" "os" - "fmt" + + "github.com/streadway/amqp" ) func failOnError(err error, msg string) { @@ -29,16 +30,17 @@ func main() { true, // durable false, // auto-deleted false, // internal - false, // noWait + false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") + q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused - false, // exclusive - false, // noWait + true, // exclusive + false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") @@ -46,28 +48,33 @@ func main() { for _, s := range os.Args { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s) err = ch.QueueBind( - q.Name, // queue name - s, // routing key + q.Name, // queue name + s, // routing key "logs_topic", // exchange false, - nil) + nil) failOnError(err, "Failed to bind a queue") } - msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto ack + false, // exclusive + false, // no local + false, // no wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") - done := make(chan bool) + forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) - done <- true } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") - <-done - log.Printf("Done") - - os.Exit(0) + <-forever } diff --git a/go/rpc_client.go b/go/rpc_client.go new file mode 100644 index 00000000..b41371a8 --- /dev/null +++ b/go/rpc_client.go @@ -0,0 +1,112 @@ +package main + +import ( + "fmt" + "log" + "math/rand" + "os" + "strconv" + "strings" + "time" + + "github.com/streadway/amqp" +) + +func failOnError(err error, msg string) { + if err != nil { + log.Fatalf("%s: %s", msg, err) + panic(fmt.Sprintf("%s: %s", msg, err)) + } +} + +func randomString(l int) string { + bytes := make([]byte, l) + for i := 0; i < l; i++ { + bytes[i] = byte(randInt(65, 90)) + } + return string(bytes) +} + +func randInt(min int, max int) int { + return min + rand.Intn(max-min) +} + +func fibonacciRPC(n int) (res int, err error) { + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer ch.Close() + + q, err := ch.QueueDeclare( + "", // name + false, // durable + false, // delete when usused + true, // exclusive + false, // noWait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") + + corrId := randomString(32) + + err = ch.Publish( + "", // exchange + "rpc_queue", // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + CorrelationId: corrId, + ReplyTo: q.Name, + Body: []byte(strconv.Itoa(n)), + }) + failOnError(err, "Failed to publish a message") + + for d := range msgs { + if corrId == d.CorrelationId { + res, err = strconv.Atoi(string(d.Body)) + failOnError(err, "Failed to convert body to integer") + break + } + } + + return +} + +func main() { + rand.Seed(time.Now().UTC().UnixNano()) + + n := bodyFrom(os.Args) + + log.Printf(" [x] Requesting fib(%d)", n) + res, err := fibonacciRPC(n) + failOnError(err, "Failed to handle RPC request") + + log.Printf(" [.] Got %d", res) +} + +func bodyFrom(args []string) int { + var s string + if (len(args) < 2) || os.Args[1] == "" { + s = "30" + } else { + s = strings.Join(args[1:], " ") + } + n, err := strconv.Atoi(s) + failOnError(err, "Failed to convert arg to integer") + return n +} diff --git a/go/rpc_server.go b/go/rpc_server.go new file mode 100644 index 00000000..ac91e1c4 --- /dev/null +++ b/go/rpc_server.go @@ -0,0 +1,93 @@ +package main + +import ( + "fmt" + "log" + "strconv" + + "github.com/streadway/amqp" +) + +func failOnError(err error, msg string) { + if err != nil { + log.Fatalf("%s: %s", msg, err) + panic(fmt.Sprintf("%s: %s", msg, err)) + } +} + +func fib(n int) int { + if n == 0 { + return 0 + } else if n == 1 { + return 1 + } else { + return fib(n-1) + fib(n-2) + } +} + +func main() { + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer ch.Close() + + q, err := ch.QueueDeclare( + "rpc_queue", // name + false, // durable + false, // delete when usused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + err = ch.Qos( + 1, // prefetch count + 0, // prefetch size + false, // global + ) + failOnError(err, "Failed to set QoS") + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") + + forever := make(chan bool) + + go func() { + for d := range msgs { + n, err := strconv.Atoi(string(d.Body)) + failOnError(err, "Failed to convert body to integer") + + log.Printf(" [.] fib(%d)", n) + response := fib(n) + + err = ch.Publish( + "", // exchange + d.ReplyTo, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + CorrelationId: d.CorrelationId, + Body: []byte(strconv.Itoa(response)), + }) + failOnError(err, "Failed to publish a message") + + d.Ack(false) + } + }() + + log.Printf(" [*] Awaiting RPC requests") + <-forever +} diff --git a/go/send.go b/go/send.go index 0a9a3e66..c192861a 100644 --- a/go/send.go +++ b/go/send.go @@ -1,10 +1,10 @@ package main import ( - "github.com/streadway/amqp" - "log" - "os" "fmt" + "log" + + "github.com/streadway/amqp" ) func failOnError(err error, msg string) { @@ -21,7 +21,6 @@ func main() { ch, err := conn.Channel() failOnError(err, "Failed to open a channel") - defer ch.Close() q, err := ch.QueueDeclare( @@ -29,22 +28,20 @@ func main() { false, // durable false, // delete when usused false, // exclusive - false, // noWait + false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") body := "hello" err = ch.Publish( - "", // exchange - q.Name, // routing key - false, // mandatory - false, // immediate + "", // exchange + q.Name, // routing key + false, // mandatory + false, // immediate amqp.Publishing{ - ContentType: "text/plain", - Body: []byte(body), + ContentType: "text/plain", + Body: []byte(body), }) failOnError(err, "Failed to publish a message") - - os.Exit(0) } diff --git a/go/worker.go b/go/worker.go index c86ba565..2f58d465 100644 --- a/go/worker.go +++ b/go/worker.go @@ -1,10 +1,10 @@ package main import ( - "github.com/streadway/amqp" - "log" - "os" "fmt" + "log" + + "github.com/streadway/amqp" ) func failOnError(err error, msg string) { @@ -21,37 +21,45 @@ func main() { ch, err := conn.Channel() failOnError(err, "Failed to open a channel") - defer ch.Close() q, err := ch.QueueDeclare( "task_queue", // name - true, // durable - false, // delete when unused - false, // exclusive - false, // noWait - nil, // arguments + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments ) failOnError(err, "Failed to declare a queue") - ch.Qos(3, 0, false) + err = ch.Qos( + 3, // prefetch count + 0, // prefetch size + false, // global + ) + failOnError(err, "Failed to set QoS") - msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil) + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) failOnError(err, "Failed to register a consumer") - done := make(chan bool) + forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) d.Ack(false) - done <- true } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") - <-done - log.Printf("Done") - - os.Exit(0) + <-forever }