-
Notifications
You must be signed in to change notification settings - Fork 0
/
publish.c
67 lines (55 loc) · 1.91 KB
/
publish.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include <librdkafka/rdkafka.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int main(int argc, char **argv) {
rd_kafka_t *rk; /* Kafka producer handle */
rd_kafka_conf_t *conf; /* Kafka configuration */
char *brokers; /* Kafka broker list */
char *topic; /* Kafka topic to produce to */
char *message; /* Message to produce */
rd_kafka_topic_t *rkt; /* Topic handle */
rd_kafka_resp_err_t err; /* Error code */
/* Check arguments */
if (argc != 4) {
fprintf(stderr, "Usage: %s <broker list> <topic> <message>\n", argv[0]);
exit(1);
}
brokers = argv[1];
topic = argv[2];
message = argv[3];
/* Create Kafka configuration */
conf = rd_kafka_conf_new();
/* Set the broker list */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Failed to set broker list\n");
exit(1);
}
/* Create Kafka producer */
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
if (rk == NULL) {
fprintf(stderr, "Failed to create Kafka producer\n");
exit(1);
}
/* Create Kafka topic */
rkt = rd_kafka_topic_new(rk, topic, NULL);
if (rkt == NULL) {
fprintf(stderr, "Failed to create Kafka topic\n");
rd_kafka_destroy(rk);
exit(1);
}
/* Produce message */
err = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, message, strlen(message), NULL, 0, NULL);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to produce message: %s\n", rd_kafka_err2str(err));
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
exit(1);
}
/* Wait for messages to be delivered */
rd_kafka_flush(rk, 10000);
/* Destroy Kafka topic and producer */
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
return 0;
}