/
brod.erl
1424 lines (1319 loc) · 56.3 KB
/
brod.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
%%%
%%% Copyright (c) 2014-2021, Klarna Bank AB (publ)
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%
-module(brod).
-behaviour(application).
%% Application
-export([ start/0
, start/2
, stop/0
, stop/1
]).
%% Client API
-export([ get_partitions_count/2
, get_partitions_count_safe/2
, start_client/1
, start_client/2
, start_client/3
, start_consumer/3
, start_producer/3
, stop_client/1
]).
-export([ start_link_client/1
, start_link_client/2
, start_link_client/3
]).
%% Producer API
-export([ get_producer/3
, produce/2
, produce/3
, produce/5
, produce_cb/4
, produce_cb/6
, produce_sync/2
, produce_sync/3
, produce_sync/5
, produce_sync_offset/5
, produce_no_ack/5
, sync_produce_request/1
, sync_produce_request/2
, sync_produce_request_offset/1
, sync_produce_request_offset/2
]).
%% Transactions API
-export([ transaction/3
, txn_do/3
, txn_produce/5
, txn_produce/4
, txn_add_offsets/3
, commit/1
, abort/1
]).
%% Simple Consumer API
-export([ consume_ack/2
, consume_ack/4
, get_consumer/3
, subscribe/3
, subscribe/5
, unsubscribe/1
, unsubscribe/2
, unsubscribe/3
, unsubscribe/4
]).
%% Subscriber API
-export([ start_link_group_subscriber_v2/1
, start_link_topic_subscriber/1
]).
%% Deprecated API
-export([ start_link_group_subscriber/7
, start_link_group_subscriber/8
, start_link_topic_subscriber/5
, start_link_topic_subscriber/6
, start_link_topic_subscriber/7
]).
%% Topic APIs
-export([ create_topics/3
, create_topics/4
, delete_topics/3
, delete_topics/4
]).
%% APIs for quick metadata or message inspection and brod_cli
-export([ get_metadata/1
, get_metadata/2
, get_metadata/3
, resolve_offset/3
, resolve_offset/4
, resolve_offset/5
, resolve_offset/6
, fetch/4
, fetch/5
, fold/8
, connect_leader/4
, list_all_groups/2
, list_groups/2
, describe_groups/3
, connect_group_coordinator/3
, fetch_committed_offsets/2
, fetch_committed_offsets/3
]).
-deprecated([ {fetch, 7, next_version}
, {fetch, 8, next_version}
]).
-export([ fetch/7
, fetch/8
]).
-ifdef(build_brod_cli).
-export([main/1]).
-endif.
-export_type([ batch_input/0
, bootstrap/0
, call_ref/0
, cg/0
, cg_protocol_type/0
, client/0
, client_config/0
, client_id/0
, compression/0
, connection/0
, conn_config/0
, consumer_config/0
, endpoint/0
, error_code/0
, fetch_opts/0
, fold_acc/0
, fold_fun/1
, fold_limits/0
, fold_stop_reason/0
, fold_result/0
, group_config/0
, group_generation_id/0
, group_id/0
, group_member/0
, group_member_id/0
, hostname/0
, key/0
, msg_input/0
, msg_ts/0
, message/0
, message_set/0
, offset/0
, offset_time/0
, offsets_to_commit/0
, partition/0
, partition_assignment/0
, partition_fun/0
, partitioner/0
, portnum/0
, produce_ack_cb/0
, producer_config/0
, produce_reply/0
, produce_result/0
, received_assignments/0
, topic/0
, topic_partition/0
, value/0
, transactional_id/0
, transaction/0
, transaction_config/0
]).
-include("brod_int.hrl").
%%%_* Types ====================================================================
%% basics
-type hostname() :: kpro:hostname().
-type portnum() :: pos_integer().
-type endpoint() :: {hostname(), portnum()}.
-type topic() :: kpro:topic().
-type topic_config() :: kpro:struct().
-type partition() :: kpro:partition().
-type topic_partition() :: {topic(), partition()}.
-type offset() :: kpro:offset(). %% Physical offset (an integer)
-type offsets_to_commit() :: kpro:offsets_to_commit().
-type key() :: undefined %% no key, transformed to <<>>
| binary().
-type value() :: undefined %% no value, transformed to <<>>
| iodata() %% single value
| {msg_ts(), binary()} %% one message with timestamp
| [?KV(key(), value())] %% backward compatible
| [?TKV(msg_ts(), key(), value())] %% backward compatible
| kpro:msg_input() %% one magic v2 message
| kpro:batch_input(). %% maybe nested batch
-type transactional_id() :: brod_transaction:transactional_id().
-type transaction() :: brod_transaction:transaction().
-type transaction_config() :: brod_transaction:transaction_config().
-type txn_function() :: brod_transaction_processor:process_function().
-type txn_do_options() :: brod_transaction_processor:do_options().
-type msg_input() :: kpro:msg_input().
-type batch_input() :: [msg_input()].
-type msg_ts() :: kpro:msg_ts(). %% Unix time in milliseconds
-type client_id() :: atom().
-type client() :: client_id() | pid().
-type client_config() :: brod_client:config().
-type bootstrap() :: [endpoint()] %% default client config
| {[endpoint()], client_config()}.
-type offset_time() :: msg_ts()
| ?OFFSET_EARLIEST
| ?OFFSET_LATEST.
-type message() :: kpro:message(). %% A record with offset, key, value, ts_type, ts, and headers.
-type message_set() :: #kafka_message_set{}.
%% A record with topic, partition, high_wm_offset (max offset of the partition), and messages.
%%
%% See <a href="https://github.com/kafka4beam/brod/blob/master/include/brod.hrl#L26">
%% the definition</a> for more information.
-type error_code() :: kpro:error_code().
%% producers
-type produce_reply() :: #brod_produce_reply{}.
%% A record with call_ref, base_offset, and result.
%%
%% See the <a href="https://github.com/kafka4beam/brod/blob/master/include/brod.hrl#L49">
%% the definition</a> for more information.
-type producer_config() :: brod_producer:config().
-type partition_fun() :: fun((topic(), pos_integer(), key(), value()) ->
{ok, partition()}).
-type partitioner() :: partition_fun() | random | hash.
-type produce_ack_cb() :: fun((partition(), offset()) -> _).
-type compression() :: no_compression | gzip | snappy.
-type call_ref() :: #brod_call_ref{}. %% A record with caller, callee, and ref.
-type produce_result() :: brod_produce_req_buffered
| brod_produce_req_acked.
%% consumers
-type consumer_config() :: [ {begin_offset, offset_time()}
| {min_bytes, non_neg_integer()}
| {max_bytes, non_neg_integer()}
| {max_wait_time, integer()}
| {sleep_timeout, integer()}
| {prefetch_count, integer()}
| {prefetch_bytes, non_neg_integer()}
| {offset_reset_policy, brod_consumer:offset_reset_policy()}
| {size_stat_window, non_neg_integer()}
| {isolation_level, brod_consumer:isolation_level()}
].
%% Consumer configuration.
%%
%% The meaning of the options is documented at {@link brod_consumer:start_link/5}.
-type connection() :: kpro:connection().
-type conn_config() :: [{atom(), term()}] | kpro:conn_config().
%% Connection configuration that will be passed to `kpro' calls.
%%
%% For more info, see the {@link kpro_connection:config()} type.
%% consumer groups
-type group_id() :: kpro:group_id().
-type group_member_id() :: binary().
-type group_member() :: {group_member_id(), #kafka_group_member_metadata{}}.
-type group_generation_id() :: non_neg_integer().
-type group_config() :: proplists:proplist().
-type partition_assignment() :: {topic() , [partition()]}.
-type received_assignments() :: [#brod_received_assignment{}].
-type cg() :: #brod_cg{}.
-type cg_protocol_type() :: binary().
-type fetch_opts() :: kpro:fetch_opts().
-type fold_acc() :: term().
-type fold_fun(Acc) :: fun((message(), Acc) -> {ok, Acc} | {error, any()}).
%% `fold' always returns when reaches the high watermark offset. `fold'
%% also returns when any of the limits is hit.
-type fold_limits() :: #{ message_count => pos_integer()
, reach_offset => offset()
}.
-type fold_stop_reason() :: reached_end_of_partition
| reached_message_count_limit
| reached_target_offset
| {error, any()}.
%% OffsetToContinue: begin offset for the next fold call
-type fold_result() :: ?BROD_FOLD_RET(fold_acc(),
OffsetToContinue :: offset(),
fold_stop_reason()).
%%%_* APIs =====================================================================
%% @doc Start brod application.
-spec start() -> ok | no_return().
start() ->
{ok, _Apps} = application:ensure_all_started(brod),
ok.
%% @doc Stop brod application.
-spec stop() -> ok.
stop() ->
application:stop(brod).
%% @doc Application behaviour callback
start(_StartType, _StartArgs) -> brod_sup:start_link().
%% @doc Application behaviour callback
stop(_State) -> ok.
%% @equiv start_client(BootstrapEndpoints, brod_default_client)
-spec start_client([endpoint()]) -> ok | {error, any()}.
start_client(BootstrapEndpoints) ->
start_client(BootstrapEndpoints, ?BROD_DEFAULT_CLIENT_ID).
%% @equiv start_client(BootstrapEndpoints, ClientId, [])
-spec start_client([endpoint()], client_id()) -> ok | {error, any()}.
start_client(BootstrapEndpoints, ClientId) ->
start_client(BootstrapEndpoints, ClientId, []).
%% @doc Start a client ({@link brod_client}).
%%
%% `BootstrapEndpoints':
%% Kafka cluster endpoints, can be any of the brokers in the cluster,
%% which does not necessarily have to be the leader of any partition,
%% e.g. a load-balanced entrypoint to the remote Kafka cluster.
%%
%% `ClientId': Atom to identify the client process.
%%
%% `Config' is a proplist, possible values:
%% <ul>
%% <li>`restart_delay_seconds' (optional, default=10)
%%
%% How long to wait between attempts to restart brod_client
%% process when it crashes.</li>
%%
%% <li>`get_metadata_timeout_seconds' (optional, default=5)
%%
%% Return `{error, timeout}' from `brod_client:get_xxx' calls if
%% responses for APIs such as `metadata', `find_coordinator'
%% are not received in time.</li>
%%
%% <li>`reconnect_cool_down_seconds' (optional, default=1)
%%
%% Delay this configured number of seconds before retrying to
%% establish a new connection to the kafka partition leader.</li>
%%
%% <li>`allow_topic_auto_creation' (optional, default=true)
%%
%% By default, brod respects what is configured in the broker
%% about topic auto-creation. i.e. whether
%% `auto.create.topics.enable' is set in the broker configuration.
%% However if `allow_topic_auto_creation' is set to `false' in
%% client config, brod will avoid sending metadata requests that
%% may cause an auto-creation of the topic regardless of what
%% broker config is.</li>
%%
%% <li>`auto_start_producers' (optional, default=false)
%%
%% If true, brod client will spawn a producer automatically when
%% user is trying to call `produce' but did not call `brod:start_producer'
%% explicitly. Can be useful for applications which don't know beforehand
%% which topics they will be working with.</li>
%%
%% <li>`default_producer_config' (optional, default=[])
%%
%% Producer configuration to use when auto_start_producers is true.
%% See {@link brod_producer:start_link/4} for details about producer config</li>
%%
%% </ul>
%%
%% Connection options can be added to the same proplist. See
%% `kpro_connection.erl' in `kafka_protocol' for the details:
%%
%% <ul>
%% <li>`ssl' (optional, default=false)
%%
%% `true | false | ssl:ssl_option()'
%% `true' is translated to `[]' as `ssl:ssl_option()' i.e. all default.
%% </li>
%%
%% <li>`sasl' (optional, default=undefined)
%%
%% Credentials for SASL/Plain authentication.
%% `{mechanism(), Filename}' or `{mechanism(), UserName, Password}'
%% where mechanism can be atoms: `plain' (for "PLAIN"), `scram_sha_256'
%% (for "SCRAM-SHA-256") or `scram_sha_512' (for SCRAM-SHA-512).
%% `Filename' should be a file consisting two lines, first line
%% is the username and the second line is the password.
%% Both `Username' and `Password' should be `string() | binary()'</li>
%%
%% <li>`connect_timeout' (optional, default=5000)
%%
%% Timeout when trying to connect to an endpoint.</li>
%%
%% <li>`request_timeout' (optional, default=240000, constraint: >= 1000)
%%
%% Timeout when waiting for a response, connection restart when timed
%% out.</li>
%%
%% <li>`query_api_versions' (optional, default=true)
%%
%% Must be set to false to work with kafka versions prior to 0.10,
%% When set to `true', at connection start, brod will send a query request
%% to get the broker supported API version ranges.
%% When set to 'false', brod will always use the lowest supported API version
%% when sending requests to kafka.
%% Supported API version ranges can be found in:
%% `brod_kafka_apis:supported_versions/1'</li>
%%
%% <li>`extra_sock_opts' (optional, default=[])
%%
%% Extra socket options to tune socket performance.
%% e.g. `[{sndbuf, 1 bsl 20}]'.
%% <a href="http://erlang.org/doc/man/gen_tcp.html#type-option">More info
%% </a>
%% </li>
%% </ul>
%%
%% You can read more about clients in the
%% <a href="https://hexdocs.pm/brod/readme.html#clients">overview</a>.
-spec start_client([endpoint()], client_id(), client_config()) ->
ok | {error, any()}.
start_client(BootstrapEndpoints, ClientId, Config) ->
case brod_sup:start_client(BootstrapEndpoints, ClientId, Config) of
ok -> ok;
{error, {already_started, _Pid}} -> ok;
{error, Reason} -> {error, Reason}
end.
%% @equiv start_link_client(BootstrapEndpoints, brod_default_client)
-spec start_link_client([endpoint()]) -> {ok, pid()} | {error, any()}.
start_link_client(BootstrapEndpoints) ->
start_link_client(BootstrapEndpoints, ?BROD_DEFAULT_CLIENT_ID).
%% @equiv start_link_client(BootstrapEndpoints, ClientId, [])
-spec start_link_client([endpoint()], client_id()) ->
{ok, pid()} | {error, any()}.
start_link_client(BootstrapEndpoints, ClientId) ->
start_link_client(BootstrapEndpoints, ClientId, []).
-spec start_link_client([endpoint()], client_id(), client_config()) ->
{ok, pid()} | {error, any()}.
start_link_client(BootstrapEndpoints, ClientId, Config) ->
brod_client:start_link(BootstrapEndpoints, ClientId, Config).
%% @doc Stop a client.
-spec stop_client(client()) -> ok.
stop_client(Client) when is_atom(Client) ->
case brod_sup:find_client(Client) of
[_Pid] -> brod_sup:stop_client(Client);
[] -> brod_client:stop(Client)
end;
stop_client(Client) when is_pid(Client) ->
brod_client:stop(Client).
%% @doc Dynamically start a per-topic producer and register it in the client.
%%
%% You have to start a producer for each topic you want to produce messages
%% into, unless you have specified `auto_start_producers = true' when starting
%% the client (in that case you don't have to call this function at all).
%%
%% After starting the producer, you can call {@link produce/5} and friends
%% for producing messages.
%%
%% You can read more about producers in the
%% <a href="https://hexdocs.pm/brod/readme.html#producers">overview</a>.
%%
%% A client has to be already started before making this call (e.g. by calling
%% {@link start_client/3}).
%%
%% See {@link brod_producer:start_link/4} for a list of available configuration
%% options.
%%
%% Example:
%% ```
%% > brod:start_producer(my_client, <<"my_topic">>, [{max_retries, 5}]).
%% ok
%% '''
-spec start_producer(client(), topic(), producer_config()) ->
ok | {error, any()}.
start_producer(Client, TopicName, ProducerConfig) ->
brod_client:start_producer(Client, TopicName, ProducerConfig).
%% @doc Dynamically start topic consumer(s) and register it in the client.
%%
%% A {@link brod_consumer} is started for each partition of the given topic.
%% Note that you can have only one consumer per client-topic.
%%
%% See {@link brod_consumer:start_link/5} for details about consumer config.
%%
%% You can read more about consumers in the
%% <a href="https://hexdocs.pm/brod/readme.html#consumers">overview</a>.
-spec start_consumer(client(), topic(), consumer_config()) ->
ok | {error, any()}.
start_consumer(Client, TopicName, ConsumerConfig) ->
brod_client:start_consumer(Client, TopicName, ConsumerConfig).
%% @doc Get number of partitions for a given topic.
%%
%% The higher level producers may need the partition numbers to
%% find the partition producer pid – if the number of partitions
%% is not statically configured for them.
%% It is up to the callers how they want to distribute their data
%% (e.g. random, roundrobin or consistent-hashing) to the partitions.
%% NOTE: The partitions count is cached for 120 seconds.
-spec get_partitions_count(client(), topic()) ->
{ok, pos_integer()} | {error, any()}.
get_partitions_count(Client, Topic) ->
brod_client:get_partitions_count(Client, Topic).
%% @doc The same as `get_partitions_count(Client, Topic)'
%% but ensured not to auto-create topics in Kafka even
%% when Kafka has topic auto-creation configured.
-spec get_partitions_count_safe(client(), topic()) ->
{ok, pos_integer()} | {error, any()}.
get_partitions_count_safe(Client, Topic) ->
brod_client:get_partitions_count_safe(Client, Topic).
-spec get_consumer(client(), topic(), partition()) ->
{ok, pid()} | {error, Reason}
when Reason :: client_down
| {client_down, any()}
| {consumer_down, any()}
| {consumer_not_found, topic()}
| {consumer_not_found, topic(), partition()}.
get_consumer(Client, Topic, Partition) ->
brod_client:get_consumer(Client, Topic, Partition).
%% @equiv brod_client:get_producer(Client, Topic, Partition)
-spec get_producer(client(), topic(), partition()) ->
{ok, pid()} | {error, Reason}
when Reason :: client_down
| {client_down, any()}
| {producer_down, any()}
| {producer_not_found, topic()}
| {producer_not_found, topic(), partition()}.
get_producer(Client, Topic, Partition) ->
brod_client:get_producer(Client, Topic, Partition).
%% @equiv produce(Pid, <<>>, Value)
-spec produce(pid(), value()) -> {ok, call_ref()} | {error, any()}.
produce(Pid, Value) ->
produce(Pid, _Key = <<>>, Value).
%% @doc Produce one or more messages.
%%
%% See {@link produce/5} for information about possible shapes
%% of `Value'.
%%
%% The pid should be a partition producer pid, NOT client pid.
%%
%% The return value is a call reference of type `call_ref()',
%% so the caller can use it to expect (match)
%% a `#brod_produce_reply{result = brod_produce_req_acked}'
%% message after the produce request has been acked by Kafka.
-spec produce(pid(), key(), value()) ->
{ok, call_ref()} | {error, any()}.
produce(ProducerPid, Key, Value) ->
brod_producer:produce(ProducerPid, Key, Value).
%% @doc Produce one or more messages.
%%
%% `Value' can have many different forms:
%% <ul>
%% <li>`binary()': Single message with key from the `Key' argument</li>
%% <li>`{brod:msg_ts(), binary()}': Single message with
%% its create-time timestamp and key from `Key'</li>
%% <li>`#{ts => brod:msg_ts(), value => binary(), headers => [{_, _}]}':
%% Single message; if this map does not have a `key'
%% field, `Key' is used instead</li>
%% <li>`[{K, V} | {T, K, V}]': A batch, where `V' could be
%% a nested list of such representation</li>
%% <li>`[#{key => K, value => V, ts => T, headers => [{_, _}]}]':
%% A batch</li>
%% </ul>
%%
%% When `Value' is a batch, the `Key' argument is only used
%% as partitioner input and all messages are written on the
%% same partition.
%%
%% `ts' field is dropped for kafka prior to version `0.10'
%% (produce API version 0, magic version 0). `headers' field
%% is dropped for kafka prior to version `0.11' (produce API
%% version 0-2, magic version 0-1).
%%
%% `Partition' may be either a concrete partition (an integer)
%% or a partitioner (see {@link partitioner()} for more info).
%%
%% A producer for the particular topic has to be already started
%% (by calling {@link start_producer/3}), unless you have specified
%% `auto_start_producers = true' when starting the client.
%%
%% This function first looks up the producer pid, then calls {@link produce/3}
%% to do the real work.
%%
%% The return value is a call reference of type {@link call_ref()}, so the caller
%% can used it to expect (match)
%% a `#brod_produce_reply{result = brod_produce_req_acked}'
%% (see the {@link produce_reply()} type) message after the
%% produce request has been acked by Kafka.
%%
%% Example:
%% ```
%% > brod:produce(my_client, <<"my_topic">>, 0, "key", <<"Hello from erlang!">>).
%% {ok,{brod_call_ref,<0.83.0>,<0.133.0>,#Ref<0.3024768151.2556690436.92841>}}
%% > flush().
%% Shell got {brod_produce_reply,
%% {brod_call_ref,<0.83.0>,<0.133.0>,
%% #Ref<0.3024768151.2556690436.92841>},
%% 12,brod_produce_req_acked}
%% '''
-spec produce(client(), topic(), partition() | partitioner(),
key(), value()) -> {ok, call_ref()} | {error, any()}.
produce(Client, Topic, Partition, Key, Value) when is_integer(Partition) ->
case get_producer(Client, Topic, Partition) of
{ok, Pid} -> produce(Pid, Key, Value);
{error, Reason} -> {error, Reason}
end;
produce(Client, Topic, Partitioner, Key, Value) ->
PartFun = brod_utils:make_part_fun(Partitioner),
case brod_client:get_partitions_count(Client, Topic) of
{ok, PartitionsCnt} ->
{ok, Partition} = PartFun(Topic, PartitionsCnt, Key, Value),
produce(Client, Topic, Partition, Key, Value);
{error, Reason} ->
{error, Reason}
end.
%% @doc Same as {@link produce/3}, only the ack is not delivered as a message,
%% instead, the callback is evaluated by producer worker when ack is received
%% from kafka (see the {@link produce_ack_cb()} type).
-spec produce_cb(pid(), key(), value(), produce_ack_cb()) ->
ok | {error, any()}.
produce_cb(ProducerPid, Key, Value, AckCb) ->
brod_producer:produce_cb(ProducerPid, Key, Value, AckCb).
%% @doc Same as {@link produce/5} only the ack is not delivered as a message,
%% instead, the callback is evaluated by producer worker when ack is received
%% from kafka (see the {@link produce_ack_cb()} type).
%%
%% Return the partition to caller as `{ok, Partition}' for caller
%% to correlate the callback when the 3rd arg is not a partition number.
-spec produce_cb(client(), topic(), partition() | partitioner(),
key(), value(), produce_ack_cb()) ->
ok | {ok, partition()} | {error, any()}.
produce_cb(Client, Topic, Part, Key, Value, AckCb) when is_integer(Part) ->
case get_producer(Client, Topic, Part) of
{ok, Pid} -> produce_cb(Pid, Key, Value, AckCb);
{error, Reason} -> {error, Reason}
end;
produce_cb(Client, Topic, Partitioner, Key, Value, AckCb) ->
PartFun = brod_utils:make_part_fun(Partitioner),
case brod_client:get_partitions_count(Client, Topic) of
{ok, PartitionsCnt} ->
{ok, Partition} = PartFun(Topic, PartitionsCnt, Key, Value),
case produce_cb(Client, Topic, Partition, Key, Value, AckCb) of
ok -> {ok, Partition};
{error, Reason} -> {error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
%% @doc Send the message to partition worker without any ack.
%%
%% NOTE: This call has no back-pressure to the caller,
%% excessive usage may cause BEAM to run out of memory.
-spec produce_no_ack(pid(), key(), value()) -> ok | {error, any()}.
produce_no_ack(ProducerPid, Key, Value) ->
brod_producer:produce_no_ack(ProducerPid, Key, Value).
%% @doc Find the partition worker and send message without any ack.
%%
%% NOTE: This call has no back-pressure to the caller,
%% excessive usage may cause BEAM to run out of memory.
-spec produce_no_ack(client(), topic(), partition() | partitioner(),
key(), value()) -> ok | {error, any()}.
produce_no_ack(Client, Topic, Part, Key, Value) when is_integer(Part) ->
case get_producer(Client, Topic, Part) of
{ok, Pid} -> produce_no_ack(Pid, Key, Value);
{error, Reason} -> {error, Reason}
end;
produce_no_ack(Client, Topic, Partitioner, Key, Value) ->
PartFun = brod_utils:make_part_fun(Partitioner),
case brod_client:get_partitions_count(Client, Topic) of
{ok, PartitionsCnt} ->
{ok, Partition} = PartFun(Topic, PartitionsCnt, Key, Value),
produce_no_ack(Client, Topic, Partition, Key, Value);
{error, _Reason} ->
%% error ignored
ok
end.
%% @equiv produce_sync(Pid, <<>>, Value)
-spec produce_sync(pid(), value()) -> ok | {error, any()}.
produce_sync(Pid, Value) ->
produce_sync(Pid, _Key = <<>>, Value).
%% @doc Sync version of {@link produce/3}.
%%
%% This function will not return until the response is received from
%% Kafka. But when producer is started with `required_acks' set to 0,
%% this function will return once the messages are buffered in the
%% producer process.
-spec produce_sync(pid(), key(), value()) ->
ok | {error, any()}.
produce_sync(Pid, Key, Value) ->
case produce(Pid, Key, Value) of
{ok, CallRef} ->
%% Wait until the request is acked by kafka
sync_produce_request(CallRef);
{error, Reason} ->
{error, Reason}
end.
%% @doc Sync version of {@link produce/5}.
%%
%% This function will not return until a response is received from kafka,
%% however if producer is started with `required_acks' set to 0, this function
%% will return once the messages are buffered in the producer process.
-spec produce_sync(client(), topic(), partition() | partitioner(),
key(), value()) -> ok | {error, any()}.
produce_sync(Client, Topic, Partition, Key, Value) ->
case produce_sync_offset(Client, Topic, Partition, Key, Value) of
{ok, _} -> ok;
Else -> Else
end.
%% @doc Version of {@link produce_sync/5} that returns the offset assigned by Kafka.
%%
%% If producer is started with `required_acks' set to 0, the offset will be
%% `?BROD_PRODUCE_UNKNOWN_OFFSET'.
-spec produce_sync_offset(client(), topic(), partition() | partitioner(),
key(), value()) -> {ok, offset()} | {error, any()}.
produce_sync_offset(Client, Topic, Partition, Key, Value) ->
case produce(Client, Topic, Partition, Key, Value) of
{ok, CallRef} ->
sync_produce_request_offset(CallRef);
{error, Reason} ->
{error, Reason}
end.
%% @equiv sync_produce_request(CallRef, infinity)
-spec sync_produce_request(call_ref()) ->
ok | {error, Reason :: any()}.
sync_produce_request(CallRef) ->
sync_produce_request(CallRef, infinity).
%% @doc Block wait for sent produced request to be acked by kafka.
%%
%% This way, you can turn asynchronous requests, made by {@link produce/5}
%% and friends, into synchronous ones.
%%
%% Example:
%% ```
%% {ok, CallRef} = brod:produce(
%% brod_client_1, <<"my_topic">>, 0, <<"some-key">>, <<"some-value">>)
%% ). % returns immediately
%% % the following call waits and returns after the ack is received or timed out
%% brod:sync_produce_request(CallRef, 5_000).
%% '''
-spec sync_produce_request(call_ref(), timeout()) ->
ok | {error, Reason :: any()}.
sync_produce_request(CallRef, Timeout) ->
case sync_produce_request_offset(CallRef, Timeout) of
{ok, _} -> ok;
Else -> Else
end.
%% @equiv sync_produce_request_offset(CallRef, infinity)
-spec sync_produce_request_offset(call_ref()) ->
{ok, offset()} | {error, Reason :: any()}.
sync_produce_request_offset(CallRef) ->
sync_produce_request_offset(CallRef, infinity).
%% @doc As {@link sync_produce_request/2}, but also returning assigned offset.
%%
%% See @{link produce_sync_offset/5}.
-spec sync_produce_request_offset(call_ref(), timeout()) ->
{ok, offset()} | {error, Reason :: any()}.
sync_produce_request_offset(CallRef, Timeout) ->
brod_producer:sync_produce_request(CallRef, Timeout).
%% @doc Subscribe to a data stream from the given topic-partition.
%%
%% A client has to be already started (by calling {@link start_client/3},
%% one client per multiple topics is enough) and a corresponding consumer
%% for the topic and partition as well (by calling {@link start_consumer/3}),
%% before calling this function.
%%
%% Caller may specify a set of options extending consumer config.
%% See {@link brod_consumer:subscribe/3} for more info on that.
%%
%% If `{error, Reason}' is returned, the caller should perhaps retry later.
%%
%% `{ok, ConsumerPid}' is returned on success. The caller may want to
%% monitor the consumer pid and re-subscribe should the `ConsumerPid' crash.
%%
%% Upon successful subscription the subscriber process should expect messages
%% of pattern:
%% `{ConsumerPid, #kafka_message_set{}}' and
%% `{ConsumerPid, #kafka_fetch_error{}}'.
%%
%% `-include_lib("brod/include/brod.hrl")' to access the records.
%%
%% In case `#kafka_fetch_error{}' is received the subscriber should
%% re-subscribe itself to resume the data stream.
%%
%% To provide a mechanism to handle backpressure, brod requires all messages
%% sent to a subscriber to be acked by calling {@link consume_ack/4} after
%% they are processed. If there are too many not-acked messages received by
%% the subscriber, the consumer will stop to fetch new ones so the subscriber
%% won't get overwhelmed.
%%
%% Only one process can be subscribed to a consumer. This means that if
%% you want to read at different places (or at different paces), you have
%% to create separate consumers (and thus also separate clients).
-spec subscribe(client(), pid(), topic(), partition(),
consumer_config()) -> {ok, pid()} | {error, any()}.
subscribe(Client, SubscriberPid, Topic, Partition, Options) ->
case brod_client:get_consumer(Client, Topic, Partition) of
{ok, ConsumerPid} ->
case subscribe(ConsumerPid, SubscriberPid, Options) of
ok -> {ok, ConsumerPid};
Error -> Error
end;
{error, Reason} ->
{error, Reason}
end.
%% @doc Subscribe to a data stream from the given consumer.
%%
%% See {@link subscribe/5} for more information.
-spec subscribe(pid(), pid(), consumer_config()) -> ok | {error, any()}.
subscribe(ConsumerPid, SubscriberPid, Options) ->
brod_consumer:subscribe(ConsumerPid, SubscriberPid, Options).
%% @doc Unsubscribe the current subscriber.
%%
%% Assuming the subscriber is %% `self()'.
-spec unsubscribe(client(), topic(), partition()) -> ok | {error, any()}.
unsubscribe(Client, Topic, Partition) ->
unsubscribe(Client, Topic, Partition, self()).
%% @doc Unsubscribe the current subscriber.
-spec unsubscribe(client(), topic(), partition(), pid()) -> ok | {error, any()}.
unsubscribe(Client, Topic, Partition, SubscriberPid) ->
case brod_client:get_consumer(Client, Topic, Partition) of
{ok, ConsumerPid} -> unsubscribe(ConsumerPid, SubscriberPid);
Error -> Error
end.
%% @doc Unsubscribe the current subscriber.
%%
%% Assuming the subscriber is %% `self()'.
-spec unsubscribe(pid()) -> ok | {error, any()}.
unsubscribe(ConsumerPid) ->
unsubscribe(ConsumerPid, self()).
%% @doc Unsubscribe the current subscriber.
-spec unsubscribe(pid(), pid()) -> ok | {error, any()}.
unsubscribe(ConsumerPid, SubscriberPid) ->
brod_consumer:unsubscribe(ConsumerPid, SubscriberPid).
%% @doc Acknowledge that one or more messages have been processed.
%%
%% {@link brod_consumer} sends message-sets to the subscriber process, and keep
%% the messages in a 'pending' queue.
%% The subscriber may choose to ack any received offset.
%% Acknowledging a greater offset will automatically acknowledge
%% the messages before this offset.
%% For example, if message `[1, 2, 3, 4]' have been sent to (as one or more message-sets)
%% to the subscriber, the subscriber may acknowledge with offset `3' to indicate that
%% the first three messages are successfully processed, leaving behind only message `4'
%% pending.
%%
%%
%% The 'pending' queue has a size limit (see `prefetch_count' consumer config)
%% which is to provide a mechanism to handle back-pressure.
%% If there are too many messages pending on ack, the consumer will stop
%% fetching new ones so the subscriber won't get overwhelmed.
%%
%% Note, there is no range check done for the acknowledging offset, meaning if offset `[M, N]'
%% are pending to be acknowledged, acknowledging with `Offset > N' will cause all offsets to be
%% removed from the pending queue, and acknowledging with `Offset < M' has no effect.
%%
%% Use this function only with plain partition subscribers (i.e., when you
%% manually call {@link subscribe/5}). Behaviours like
%% {@link brod_topic_subscriber} have their own way how to ack messages.
-spec consume_ack(client(), topic(), partition(), offset()) ->
ok | {error, any()}.
consume_ack(Client, Topic, Partition, Offset) ->
case brod_client:get_consumer(Client, Topic, Partition) of
{ok, ConsumerPid} -> consume_ack(ConsumerPid, Offset);
{error, Reason} -> {error, Reason}
end.
%% @equiv brod_consumer:ack(ConsumerPid, Offset)
%% @doc See {@link consume_ack/4} for more information.
-spec consume_ack(pid(), offset()) -> ok | {error, any()}.
consume_ack(ConsumerPid, Offset) ->
brod_consumer:ack(ConsumerPid, Offset).
%% @see brod_group_subscriber:start_link/7
-spec start_link_group_subscriber(
client(), group_id(), [topic()],
group_config(), consumer_config(), module(), term()) ->
{ok, pid()} | {error, any()}.
start_link_group_subscriber(Client, GroupId, Topics, GroupConfig,
ConsumerConfig, CbModule, CbInitArg) ->
brod_group_subscriber:start_link(Client, GroupId, Topics, GroupConfig,
ConsumerConfig, CbModule, CbInitArg).
%% @doc Start group_subscriber_v2.
-spec start_link_group_subscriber_v2(
brod_group_subscriber_v2:subscriber_config()
) -> {ok, pid()} | {error, any()}.
start_link_group_subscriber_v2(Config) ->
brod_group_subscriber_v2:start_link(Config).
%% @see brod_group_subscriber:start_link/8
-spec start_link_group_subscriber(
client(), group_id(), [topic()], group_config(),
consumer_config(), message | message_set,
module(), term()) ->
{ok, pid()} | {error, any()}.
start_link_group_subscriber(Client, GroupId, Topics, GroupConfig,
ConsumerConfig, MessageType,
CbModule, CbInitArg) ->
brod_group_subscriber:start_link(Client, GroupId, Topics, GroupConfig,
ConsumerConfig, MessageType,
CbModule, CbInitArg).
%% @equiv start_link_topic_subscriber(Client, Topic, 'all', ConsumerConfig,
%% CbModule, CbInitArg)
%% @deprecated Please use {@link start_link_topic_subscriber/1} instead
-spec start_link_topic_subscriber(
client(), topic(), consumer_config(), module(), term()) ->
{ok, pid()} | {error, any()}.
start_link_topic_subscriber(Client, Topic, ConsumerConfig,
CbModule, CbInitArg) ->
start_link_topic_subscriber(Client, Topic, all, ConsumerConfig,
CbModule, CbInitArg).
%% @equiv start_link_topic_subscriber(Client, Topic, Partitions,
%% ConsumerConfig, message,
%% CbModule, CbInitArg)
%% @deprecated Please use {@link start_link_topic_subscriber/1} instead
-spec start_link_topic_subscriber(
client(), topic(), all | [partition()],
consumer_config(), module(), term()) ->
{ok, pid()} | {error, any()}.
start_link_topic_subscriber(Client, Topic, Partitions,
ConsumerConfig, CbModule, CbInitArg) ->
start_link_topic_subscriber(Client, Topic, Partitions,
ConsumerConfig, message, CbModule, CbInitArg).
%% @see brod_topic_subscriber:start_link/7
%% @deprecated Please use {@link start_link_topic_subscriber/1} instead
-spec start_link_topic_subscriber(
client(), topic(), all | [partition()],
consumer_config(), message | message_set,
module(), term()) ->
{ok, pid()} | {error, any()}.
start_link_topic_subscriber(Client, Topic, Partitions,
ConsumerConfig, MessageType, CbModule, CbInitArg) ->
brod_topic_subscriber:start_link(Client, Topic, Partitions,
ConsumerConfig, MessageType,
CbModule, CbInitArg).
%% @see brod_topic_subscriber:start_link/1
-spec start_link_topic_subscriber(
brod_topic_subscriber:topic_subscriber_config()
) -> {ok, pid()} | {error, any()}.
start_link_topic_subscriber(Config) ->
brod_topic_subscriber:start_link(Config).
%% @equiv create_topics(Hosts, TopicsConfigs, RequestConfigs, [])
-spec create_topics([endpoint()], [topic_config()], #{timeout => kpro:int32()}) ->
ok | {error, any()}.
create_topics(Hosts, TopicConfigs, RequestConfigs) ->
brod_utils:create_topics(Hosts, TopicConfigs, RequestConfigs).
%% @doc Create topic(s) in kafka.
%%
%% `TopicConfigs' is a list of topic configurations.
%% A topic configuration is a map (or tuple list for backward compatibility)
%% with the following keys (all of them are reuired):
%% <ul>