Skip to content
Browse files

Merge pull request #4 from astephens25/amqpsend-stdin

Added stdin reading capability to amqpsend
  • Loading branch information...
2 parents d08bc0c + 81c6cdc commit 29041cce3f76430f4f9d0903718f91fcb80fafda @rmt committed Mar 2, 2012
Showing with 47 additions and 6 deletions.
  1. +5 −1 README.rst
  2. +42 −5 amqpsend.c
View
6 README.rst
@@ -57,7 +57,7 @@ You may also pass a filename as input.
$ ./amqpsend --help::
- Usage: ./amqpsend [options] exchange routingkey [msg]
+ Usage: ./amqpsend [options] exchange routingkey [message]
Options:
--host/-h host specify the host (default: "amqpbroker")
--port/-P port specify AMQP port (default: 5672)
@@ -71,8 +71,12 @@ $ ./amqpsend --help::
The following environment variables may also be set:
AMQP_HOST, AMQP_PORT, AMQP_VHOST, AMQP_USER, AMQP_PASSWORD, AMQP_PERSISENT
Acceptable values for AMQP_PERSISENT are '1' (No Persist) and '2' (Persist)
+
+ With no -f option and no message, message contents will be read from standard
+ input.
Example:
$ amqpsend -h amqp.example.com -P 5672 amq.fanout mykey "HELLO AMQP"
$ amqpsend -h amqp.example.com -P 5672 amq.fanout mykey -f /etc/hosts
+ $ echo "HELLO AMQP" | amqpsend -h amqp.example.com -P 5672 amq.fanout mykey
View
47 amqpsend.c
@@ -62,6 +62,8 @@
#include <assert.h>
#include <getopt.h>
+#define BUFFERSIZE 8096
+
// from "example_utils.c"
void die_on_error(int x, char const *context) {
if (x < 0) {
@@ -115,7 +117,8 @@ void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
}
void print_help(const char *program_name) {
- fprintf(stderr, "Usage: %s [options] exchange routingkey [msg]\n", program_name);
+ fprintf(stderr, "Usage: %s [options] exchange routingkey [message]\n", program_name);
+ fprintf(stderr, "\n");
fprintf(stderr, "Options:\n");
fprintf(stderr, " --host/-h host specify the host (default: \"amqpbroker\")\n");
fprintf(stderr, " --port/-P port specify AMQP port (default: 5672)\n");
@@ -128,10 +131,14 @@ void print_help(const char *program_name) {
fprintf(stderr, "\n");
fprintf(stderr, "The following environment variables may also be set:\n");
fprintf(stderr, " AMQP_HOST, AMQP_PORT, AMQP_VHOST, AMQP_USER, AMQP_PASSWORD, AMQP_PERSISTENT\n");
- fprintf(stderr, "Acceptable values for AMQP_PERSISENT are '1' (Not Persistent) and '2' (Persistent)\n\n");
+ fprintf(stderr, "Acceptable values for AMQP_PERSISENT are '1' (Not Persistent) and '2' (Persistent)\n");
+ fprintf(stderr, "\n");
+ fprintf(stderr, "With no -f option and no message, message contents will be read from standard input.\n");
+ fprintf(stderr, "\n");
fprintf(stderr, "Example:\n");
fprintf(stderr, "$ amqpsend -h amqp.example.com -P 5672 amq.fanout mykey \"HELLO AMQP\"\n");
- fprintf(stderr, "$ amqpsend -h amqp.example.com -P 5672 amq.fanout mykey -f /etc/hosts\n\n");
+ fprintf(stderr, "$ amqpsend -h amqp.example.com -P 5672 amq.fanout mykey -f /etc/hosts\n");
+ fprintf(stderr, "$ echo \"HELLO AMQP\" | amqpsend -h amqp.example.com -P 5672 amq.fanout mykey\n\n");
}
// shamelessly taken from the public domain
@@ -148,12 +155,30 @@ int load_file_into_memory(const char *filename, char **result) {
*result = (char *)malloc(size);
if(size != fread(*result, sizeof(char), size, f)) {
free(*result);
+ fclose(f);
return -2;
}
fclose(f);
return size;
}
+int load_stdin_into_memory(char **file) {
+ int size = 0, read = 0;
+ char *buffer = (char *)malloc(sizeof(char) * BUFFERSIZE);
+ do {
+ size += (read = fread(buffer, sizeof(char), BUFFERSIZE, stdin));
+ if (ferror(stdin)) {
+ free(*file);
+ free(buffer);
+ return -1;
+ }
+ *file = (char *)realloc((void *)*file, sizeof(char) * size);
+ memcpy(*file + (sizeof(char) * (size - read)), buffer, read);
+ } while (read == BUFFERSIZE);
+ free(buffer);
+ return size;
+}
+
int main(int argc, char **argv) {
char const *hostname = "amqpbroker"; // amqp hostname
int port = 5672; // amqp port
@@ -237,14 +262,26 @@ int main(int argc, char **argv) {
}
}
- if ((argc-optind) < 2 || (NULL == filename && (argc-optind) < 3)) {
+ // if ((argc-optind) < 2 || (NULL == filename && (argc-optind) < 3)) {
+ if ((argc-optind) < 2) {
print_help(argv[0]);
return 1;
}
exchange = argv[optind];
routingkey = argv[optind+1];
if(NULL == filename) {
- messagebody = amqp_cstring_bytes(argv[optind+2]);
+ if ((argc-optind) >= 3) {
+ messagebody = amqp_cstring_bytes(argv[optind+2]);
+ } else {
+ char *file = NULL;
+ int size = load_stdin_into_memory(&file);
+ if (size >= 0) {
+ messagebody = (amqp_bytes_t) {.len = size, .bytes = (void *)file};
+ } else {
+ fprintf(stderr, "Error reading from STDIN\n");
+ exit(size);
+ }
+ }
} else {
char *bytes = NULL;
int size = load_file_into_memory(filename, &bytes);

0 comments on commit 29041cc

Please sign in to comment.
Something went wrong with that request. Please try again.