Skip to content

Commit

Permalink
feat: integrate data-streams into kafee producer (#52)
Browse files Browse the repository at this point in the history
Also includes the correct otel spans
  • Loading branch information
btkostner authored Oct 6, 2023
1 parent cae7bce commit ffdd5da
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 97 deletions.
66 changes: 64 additions & 2 deletions lib/kafee/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,12 @@ defmodule Kafee.Producer do
"""

require OpenTelemetry.Tracer, as: Tracer

alias Kafee.Producer.{Config, Message, ValidationError}

@data_streams_propagator_key Datadog.DataStreams.Propagator.propagation_key()

@doc false
defmacro __using__(module_opts \\ []) do
quote do
Expand Down Expand Up @@ -174,6 +178,7 @@ defmodule Kafee.Producer do
messages
|> Kafee.Producer.normalize(__MODULE__)
|> Kafee.Producer.validate_batch!()
|> Kafee.Producer.annotate_batch()
|> Kafee.Producer.produce(__MODULE__)
end
end
Expand Down Expand Up @@ -271,19 +276,76 @@ defmodule Kafee.Producer do
message
end

@doc """
Annotates a list of messages with tracking. See `annotate/1` for more
information.
"""
@spec annotate_batch([Message.t()]) :: [Message.t()]
def annotate_batch(messages) do
Enum.map(messages, &annotate/1)
end

@doc """
Annotations a message with tracking. Currently this only integrates the
`Datadog.DataStreams.Integrations.Kafka` module.
"""
@spec annotate(Message.t()) :: Message.t()
def annotate(%Message{} = message) do
already_includes_header? =
Enum.find(message.headers, fn {key, _} ->
key == @data_streams_propagator_key
end)

if already_includes_header? do
message
else
Datadog.DataStreams.Integrations.Kafka.trace_produce(message)
end
end

@doc """
Produces a list of messages depending on the configuration set
in the producer.
## Examples
iex> produce([%Kafee.Producer.Message{}], MyProducer)
iex> produce([%Kafee.Producer.Message{topic: "test"}], MyProducer)
:ok
"""
@spec produce([Message.t()], atom) :: :ok | {:error, term()}
def produce(messages, producer) do
config = Config.get(producer)
config.producer_backend.produce(config, messages)
{span_name, span_attributes} = otel_values(messages, config)

Tracer.with_span span_name, span_attributes do
config.producer_backend.produce(config, messages)
end
end

# These values come from the official opentelemetry specification about messaging
# and Kafka handling. For more information, view this link:
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
@spec otel_values([Message.t()], Config.t()) :: {String.t(), map()}
defp otel_values([message | _] = messages, config) do
# Ideally the topic will be validated above before producing, but
# we want to be double safe.
span_name = if is_nil(message.topic), do: "publish", else: message.topic <> " publish"

{span_name,
%{
kind: :client,
attributes: %{
"messaging.batch.message_count": length(messages),
"messaging.destination.kind": "topic",
"messaging.destination.name": message.topic,
"messaging.operation": "publish",
"messaging.system": "kafka",
"network.transport": "tcp",
"peer.service": "kafka",
"server.address": config.hostname,
"server.socket.port": config.port
}
}}
end
end
24 changes: 15 additions & 9 deletions lib/kafee/producer/async_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ defmodule Kafee.Producer.AsyncWorker do

require Logger

alias Datadog.DataStreams.Integrations.Kafka, as: DDKafka

# The max request size Kafka can handle by default is 1mb.
# We shrink it by 8kb as an extra precaution for data.
@default_max_request_size 1_040_384
Expand Down Expand Up @@ -192,16 +194,17 @@ defmodule Kafee.Producer.AsyncWorker do
# We sent messages to Kafka successfully, so we pull them from the message queue
# and try sending more messages
@doc false
def handle_info({task_ref, {:ok, messages_sent}}, %{send_task: %{ref: task_ref}} = state) do
def handle_info({task_ref, {:ok, messages_sent, offset}}, %{send_task: %{ref: task_ref}} = state) do
Logger.debug("Successfully sent messages to Kafka")

if is_integer(offset), do: DDKafka.track_produce(state.topic, state.partition, offset)
{_sent_messages, remaining_messages} = :queue.split(messages_sent, state.queue)
emit_queue_telemetry(state, :queue.len(remaining_messages))

Process.send_after(self(), :send, state.send_throttle_time)
{:noreply, %{state | queue: remaining_messages}}
end

@doc false
def handle_info({_task_ref, {:error, {:producer_down, :noproc}}}, state) do
Logger.debug("The brod producer process is currently down. Waiting for it to come back online")
Process.send_after(self(), :send, 10_000)
Expand Down Expand Up @@ -318,7 +321,8 @@ defmodule Kafee.Producer.AsyncWorker do
# make sure we get an ack back from it and send all remaining messages.
def terminate(reason, %{send_task: %{ref: ref}} = state) do
receive do
{^ref, {:ok, sent_message_count}} ->
{^ref, {:ok, sent_message_count, offset}} ->
if is_integer(offset), do: DDKafka.track_produce(state.topic, state.partition, offset)
{_sent_messages, remaining_messages} = :queue.split(sent_message_count, state.queue)
emit_queue_telemetry(state, :queue.len(remaining_messages))
terminate(reason, %{state | queue: remaining_messages, send_task: nil})
Expand All @@ -335,13 +339,15 @@ defmodule Kafee.Producer.AsyncWorker do
@spec terminate_send(t()) :: :ok
defp terminate_send(state) do
case send_messages(state) do
{:ok, 0} ->
{:ok, 0, offset} ->
Logger.info("Successfully sent all remaining messages to Kafka before termination")
if is_integer(offset), do: DDKafka.track_produce(state.topic, state.partition, offset)
emit_queue_telemetry(state, 0)
:ok

{:ok, sent_message_count} ->
{:ok, sent_message_count, offset} ->
Logger.debug("Successfully sent #{sent_message_count} messages to Kafka before termination")
if is_integer(offset), do: DDKafka.track_produce(state.topic, state.partition, offset)
{_sent_messages, remaining_messages} = :queue.split(sent_message_count, state.queue)
emit_queue_telemetry(state, :queue.len(remaining_messages))
terminate_send(%{state | queue: remaining_messages})
Expand Down Expand Up @@ -378,13 +384,13 @@ defmodule Kafee.Producer.AsyncWorker do
})
end

@spec send_messages(t()) :: {:ok, sent_count :: pos_integer()} | term()
@spec send_messages(t()) :: {:ok, sent_count :: pos_integer(), offset :: integer() | nil} | term()
defp send_messages(state) do
messages = build_message_batch(state.queue, state.max_request_size)
messages_length = length(messages)

if messages_length == 0 do
{:ok, 0}
{:ok, 0, nil}
else
Logger.debug("Sending #{messages_length} messages to Kafka")

Expand All @@ -398,8 +404,8 @@ defmodule Kafee.Producer.AsyncWorker do
fn ->
with {:ok, call_ref} <-
:brod.produce(state.brod_client_id, state.topic, state.partition, :undefined, messages),
:ok <- :brod.sync_produce_request(call_ref, state.send_timeout) do
{{:ok, messages_length}, %{}}
{:ok, offset} <- :brod.sync_produce_request_offset(call_ref, state.send_timeout) do
{{:ok, messages_length, offset}, %{}}
else
res -> {res, %{}}
end
Expand Down
8 changes: 7 additions & 1 deletion lib/kafee/producer/sync_backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Kafee.Producer.SyncBackend do

@behaviour Kafee.Producer.Backend

alias Datadog.DataStreams.Integrations.Kafka, as: DDKafka
alias Kafee.Producer.Config

@doc """
Expand Down Expand Up @@ -50,11 +51,16 @@ defmodule Kafee.Producer.SyncBackend do
Calls the `:brod.produce_sync/5` function.
"""
@impl Kafee.Producer.Backend
@dialyzer {:no_match, produce: 2}
def produce(%Config{} = config, messages) do
for message <- messages do
:telemetry.span([:kafee, :produce], %{topic: message.topic, partition: message.partition}, fn ->
# We pattern match here because it will cause `:telemetry.span/3` to measure exceptions
:ok = :brod.produce_sync(config.brod_client_id, message.topic, message.partition, message.key, message)
{:ok, offset} =
:brod.produce_sync_offset(config.brod_client_id, message.topic, message.partition, message.key, message)

if is_integer(offset), do: DDKafka.track_produce(message.topic, message.partition, offset)

{:ok, %{}}
end)
end
Expand Down
2 changes: 2 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ defmodule Kafee.MixProject do
defp deps do
[
{:brod, "~> 3.17.0"},
{:data_streams, ">= 1.2.0"},
{:jason, ">= 1.0.0"},
{:opentelemetry_api, ">= 1.0.0"},
{:telemetry, ">= 1.0.0"},

# Dev & Test dependencies
Expand Down
12 changes: 12 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
"benchee": {:hex, :benchee, "1.1.0", "f3a43817209a92a1fade36ef36b86e1052627fd8934a8b937ac9ab3a76c43062", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}], "hexpm", "7da57d545003165a012b587077f6ba90b89210fd88074ce3c60ce239eb5e6d93"},
"brod": {:hex, :brod, "3.17.0", "437daa5204a2175a3f6d01ee31152ca881539ca90acdf123d69835577f6133b1", [:rebar3], [{:kafka_protocol, "4.1.3", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:snappyer, "1.2.9", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "1bf5eb9d1bad1140f97b9d0c5a819ceb30414231cb7f5ad5d5e18201cfaf09f4"},
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"castore": {:hex, :castore, "1.0.4", "ff4d0fb2e6411c0479b1d965a814ea6d00e51eb2f58697446e9c41a97d940b28", [:mix], [], "hexpm", "9418c1b8144e11656f0be99943db4caf04612e3eaecefb5dae9a2a87565584f8"},
"crc32cer": {:hex, :crc32cer, "0.1.8", "c6c2275c5fb60a95f4935d414f30b50ee9cfed494081c9b36ebb02edfc2f48db", [:rebar3], [], "hexpm", "251499085482920deb6c9b7aadabf9fb4c432f96add97ab42aee4501e5b6f591"},
"credo": {:hex, :credo, "1.7.1", "6e26bbcc9e22eefbff7e43188e69924e78818e2fe6282487d0703652bc20fd62", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e9871c6095a4c0381c89b6aa98bc6260a8ba6addccf7f6a53da8849c748a58a2"},
"data_streams": {:hex, :data_streams, "1.2.2", "19288b5e0b4f54cf522543ce1be92b663760ba0d321545aeb77c9f34217102e1", [:mix], [{:finch, ">= 0.1.0", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:msgpax, "~> 2.3.1", [hex: :msgpax, repo: "hexpm", optional: false]}, {:opentelemetry_api, ">= 1.0.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:protobuf, ">= 0.10.0", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "cd13a10a8ccd39d41321fb4467ba04e73eb96e883be52affff327421bd965983"},
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
"dialyxir": {:hex, :dialyxir, "1.4.1", "a22ed1e7bd3a3e3f197b68d806ef66acb61ee8f57b3ac85fc5d57354c5482a93", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "84b795d6d7796297cca5a3118444b80c7d94f7ce247d49886e7c291e1ae49801"},
Expand All @@ -14,13 +16,23 @@
"excoveralls": {:hex, :excoveralls, "0.17.1", "83fa7906ef23aa7fc8ad7ee469c357a63b1b3d55dd701ff5b9ce1f72442b2874", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "95bc6fda953e84c60f14da4a198880336205464e75383ec0f570180567985ae0"},
"faker": {:hex, :faker, "0.17.0", "671019d0652f63aefd8723b72167ecdb284baf7d47ad3a82a15e9b8a6df5d1fa", [:mix], [], "hexpm", "a7d4ad84a93fd25c5f5303510753789fc2433ff241bf3b4144d3f6f291658a6a"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"finch": {:hex, :finch, "0.16.0", "40733f02c89f94a112518071c0a91fe86069560f5dbdb39f9150042f44dcfb1a", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f660174c4d519e5fec629016054d60edd822cdfe2b7270836739ac2f97735ec5"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"kafka_protocol": {:hex, :kafka_protocol, "4.1.3", "362d85a898d4148a43dbabb10a30bb2d6ff32ba0097eb06981d11b34e2e0a9cd", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "28cf73001270d972524dd0fad4a59074f4441219f9cf237ad808a2ac1ec97487"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
"mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"},
"msgpax": {:hex, :msgpax, "2.3.1", "28e17c4abb4c57da742e75de62abd9d01c76f1da0b103334de3fb1199610b3d9", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "17c8bf2fc2327b74e4bc6633dd520ffa10ea07b0a2f8ab1932db99044e116df5"},
"nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"},
"nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"},
"opentelemetry_api": {:hex, :opentelemetry_api, "1.2.2", "693f47b0d8c76da2095fe858204cfd6350c27fe85d00e4b763deecc9588cf27a", [:mix, :rebar3], [{:opentelemetry_semantic_conventions, "~> 0.2", [hex: :opentelemetry_semantic_conventions, repo: "hexpm", optional: false]}], "hexpm", "dc77b9a00f137a858e60a852f14007bb66eda1ffbeb6c05d5fe6c9e678b05e9d"},
"opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "0.2.0", "b67fe459c2938fcab341cb0951c44860c62347c005ace1b50f8402576f241435", [:mix, :rebar3], [], "hexpm", "d61fa1f5639ee8668d74b527e6806e0503efc55a42db7b5f39939d84c07d6895"},
"patch": {:hex, :patch, "0.12.0", "2da8967d382bade20344a3e89d618bfba563b12d4ac93955468e830777f816b0", [:mix], [], "hexpm", "ffd0e9a7f2ad5054f37af84067ee88b1ad337308a1cb227e181e3967127b0235"},
"protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"},
"snappyer": {:hex, :snappyer, "1.2.9", "9cc58470798648ce34c662ca0aa6daae31367667714c9a543384430a3586e5d3", [:rebar3], [], "hexpm", "18d00ca218ae613416e6eecafe1078db86342a66f86277bd45c95f05bf1c8b29"},
"statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
Expand Down
Loading

0 comments on commit ffdd5da

Please sign in to comment.