forked from elastic/logstash-forwarder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
emitter.c
66 lines (60 loc) · 2.14 KB
/
emitter.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include "emitter.h"
#include <zmq.h>
#include "insist.h"
#include <errno.h>
#include <stdint.h> /* C99 for int64_t */
#include <string.h>
#include <time.h> /* struct timespec, clock_gettime */
#ifdef __MACH__
// copied mostly from https://gist.github.com/1087739
/* OS X doesn't have clock_gettime, sigh */
#include <mach/clock.h>
#include <mach/mach.h>
typedef int clockid_t;
#define CLOCK_MONOTONIC 1
long clock_gettime(clockid_t __attribute__((unused)) which_clock, struct timespec *tp) {
clock_serv_t cclock;
mach_timespec_t mts;
host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
clock_get_time(cclock, &mts);
mach_port_deallocate(mach_task_self(), cclock);
tp->tv_sec = mts.tv_sec;
tp->tv_nsec = mts.tv_nsec;
return 0; /* success, according to clock_gettime(3) */
}
#endif
// end gist copy
void *emitter(void *arg) {
struct emitter_config *config = arg;
int rc;
void *socket = zmq_socket(config->zmq, ZMQ_PULL);
insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno));
int64_t hwm = 100;
zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
rc = zmq_bind(socket, config->zmq_endpoint);
insist(rc != -1, "zmq_bind(%s) failed: %s", config->zmq_endpoint,
zmq_strerror(errno));
struct timespec start;
clock_gettime(CLOCK_MONOTONIC, &start);
for (int count = 0; ;count++) {
zmq_msg_t message;
rc = zmq_msg_init(&message);
insist(rc == 0, "zmq_msg_init failed");
if (count == 1000000) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
double s = (start.tv_sec + 0.0) + (start.tv_nsec / 1000000000.0);
double n = (now.tv_sec + 0.0) + (now.tv_nsec / 1000000000.0);
printf("Rate: %f\n", (count + 0.0) / (n - s));
clock_gettime(CLOCK_MONOTONIC, &start);
count = 0;
}
rc = zmq_recv(socket, &message, 0);
insist(rc == 0, "zmq_recv(%s) failed (returned %d): %s",
config->zmq_endpoint, rc, zmq_strerror(errno));
//printf("received: %.*s\n", (int)zmq_msg_size(&message),
//(char *)zmq_msg_data(&message));
/* TODO(sissel): ship this out to a remote server */
zmq_msg_close(&message);
}
} /* emitter */