Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added stdin reading capability to amqpsend #4

Merged
merged 1 commit into from Mar 2, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.rst
Expand Up @@ -57,7 +57,7 @@ You may also pass a filename as input.

$ ./amqpsend --help::

Usage: ./amqpsend [options] exchange routingkey [msg]
Usage: ./amqpsend [options] exchange routingkey [message]
Options:
--host/-h host specify the host (default: "amqpbroker")
--port/-P port specify AMQP port (default: 5672)
Expand All @@ -71,8 +71,12 @@ $ ./amqpsend --help::
The following environment variables may also be set:
AMQP_HOST, AMQP_PORT, AMQP_VHOST, AMQP_USER, AMQP_PASSWORD, AMQP_PERSISENT
Acceptable values for AMQP_PERSISENT are '1' (No Persist) and '2' (Persist)

With no -f option and no message, message contents will be read from standard
input.

Example:
$ amqpsend -h amqp.example.com -P 5672 amq.fanout mykey "HELLO AMQP"
$ amqpsend -h amqp.example.com -P 5672 amq.fanout mykey -f /etc/hosts
$ echo "HELLO AMQP" | amqpsend -h amqp.example.com -P 5672 amq.fanout mykey

47 changes: 42 additions & 5 deletions amqpsend.c
Expand Up @@ -62,6 +62,8 @@
#include <assert.h>
#include <getopt.h>

#define BUFFERSIZE 8096

// from "example_utils.c"
void die_on_error(int x, char const *context) {
if (x < 0) {
Expand Down Expand Up @@ -115,7 +117,8 @@ void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
}

void print_help(const char *program_name) {
fprintf(stderr, "Usage: %s [options] exchange routingkey [msg]\n", program_name);
fprintf(stderr, "Usage: %s [options] exchange routingkey [message]\n", program_name);
fprintf(stderr, "\n");
fprintf(stderr, "Options:\n");
fprintf(stderr, " --host/-h host specify the host (default: \"amqpbroker\")\n");
fprintf(stderr, " --port/-P port specify AMQP port (default: 5672)\n");
Expand All @@ -128,10 +131,14 @@ void print_help(const char *program_name) {
fprintf(stderr, "\n");
fprintf(stderr, "The following environment variables may also be set:\n");
fprintf(stderr, " AMQP_HOST, AMQP_PORT, AMQP_VHOST, AMQP_USER, AMQP_PASSWORD, AMQP_PERSISTENT\n");
fprintf(stderr, "Acceptable values for AMQP_PERSISENT are '1' (Not Persistent) and '2' (Persistent)\n\n");
fprintf(stderr, "Acceptable values for AMQP_PERSISENT are '1' (Not Persistent) and '2' (Persistent)\n");
fprintf(stderr, "\n");
fprintf(stderr, "With no -f option and no message, message contents will be read from standard input.\n");
fprintf(stderr, "\n");
fprintf(stderr, "Example:\n");
fprintf(stderr, "$ amqpsend -h amqp.example.com -P 5672 amq.fanout mykey \"HELLO AMQP\"\n");
fprintf(stderr, "$ amqpsend -h amqp.example.com -P 5672 amq.fanout mykey -f /etc/hosts\n\n");
fprintf(stderr, "$ amqpsend -h amqp.example.com -P 5672 amq.fanout mykey -f /etc/hosts\n");
fprintf(stderr, "$ echo \"HELLO AMQP\" | amqpsend -h amqp.example.com -P 5672 amq.fanout mykey\n\n");
}

// shamelessly taken from the public domain
Expand All @@ -148,12 +155,30 @@ int load_file_into_memory(const char *filename, char **result) {
*result = (char *)malloc(size);
if(size != fread(*result, sizeof(char), size, f)) {
free(*result);
fclose(f);
return -2;
}
fclose(f);
return size;
}

int load_stdin_into_memory(char **file) {
int size = 0, read = 0;
char *buffer = (char *)malloc(sizeof(char) * BUFFERSIZE);
do {
size += (read = fread(buffer, sizeof(char), BUFFERSIZE, stdin));
if (ferror(stdin)) {
free(*file);
free(buffer);
return -1;
}
*file = (char *)realloc((void *)*file, sizeof(char) * size);
memcpy(*file + (sizeof(char) * (size - read)), buffer, read);
} while (read == BUFFERSIZE);
free(buffer);
return size;
}

int main(int argc, char **argv) {
char const *hostname = "amqpbroker"; // amqp hostname
int port = 5672; // amqp port
Expand Down Expand Up @@ -237,14 +262,26 @@ int main(int argc, char **argv) {
}
}

if ((argc-optind) < 2 || (NULL == filename && (argc-optind) < 3)) {
// if ((argc-optind) < 2 || (NULL == filename && (argc-optind) < 3)) {
if ((argc-optind) < 2) {
print_help(argv[0]);
return 1;
}
exchange = argv[optind];
routingkey = argv[optind+1];
if(NULL == filename) {
messagebody = amqp_cstring_bytes(argv[optind+2]);
if ((argc-optind) >= 3) {
messagebody = amqp_cstring_bytes(argv[optind+2]);
} else {
char *file = NULL;
int size = load_stdin_into_memory(&file);
if (size >= 0) {
messagebody = (amqp_bytes_t) {.len = size, .bytes = (void *)file};
} else {
fprintf(stderr, "Error reading from STDIN\n");
exit(size);
}
}
} else {
char *bytes = NULL;
int size = load_file_into_memory(filename, &bytes);
Expand Down