Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka shim #11

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open

Kafka shim #11

wants to merge 17 commits into from

Conversation

alecraso
Copy link

@alecraso alecraso commented Aug 30, 2018

There's a semi-simple way to override the _recorder_function for all of BlueOx. This PR aims to provide an easily configurable way to access this override to publish events to Pycernan instead of Zmq.

Todo:

  • Add changes and bump version
  • Update contrib to use pycernan first, falling back to zmq

be silently dropped.

Currently we support logging through the network (and the configured host and port) to a blueoxd instances, or
to the specified recorder function
"""
if recorder:
if int(OVERRIDE_KAFKA_RECORDER) == 1:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not gonna lie, not a fan of magical overrides via env vars. Why not just have it come in as a recorder?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fair. I was hoping to avoid a bunch of edits throughout postal changing how it's called, but it would be simple enough to call

import blueox
from blueox.recorders import kafka

blueox.configure(recorder=kafka.send)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh huh, wait, if thats the case no go back to your original way lol

commit 6d6f53a
Author: Brandon Bickford <brandon@postmates.com>
Date:   Thu Aug 30 09:58:59 2018 -0700

    Bump version

commit b45384a
Author: Brandon Bickford <brandon@postmates.com>
Date:   Wed Aug 29 16:20:59 2018 -0700

    Ignore unknown types
if _kafka_hosts and threadLocal.kp is not None:
try:
log.debug("Sending msg")
threadLocal.kp.send('events', context_data)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually going to get really slow and backup blueox and cause an outage.
You need a non blocking way to send these. Also I do worry a little bit about the threadlocal approach, because Python and threading never play well together.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a number of kwargs that can configure this better to prevent things from backing up:

  • acks (0, 1, 'all') –
    The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:

    0: Producer will not wait for any acknowledgment from the server.

    The message will immediately be added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1.

    1: Wait for leader to write the record to its local log only.

    Broker will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

    all: Wait for the full set of in-sync replicas to write the record.

    This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.

    If unset, defaults to acks=1.

  • retries (int) – Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max_in_flight_requests_per_connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Default: 0.

  • max_block_ms (int) – Number of milliseconds to block during send() and partitions_for(). These methods can be blocked either because the buffer is full or metadata unavailable. Blocking in the user-supplied serializers or partitioner will not be counted against this timeout. Default: 60000.

  • retry_backoff_ms (int) – Milliseconds to backoff when retrying on errors. Default: 100.
    request_timeout_ms (int) – Client request timeout in milliseconds. Default: 30000.

Copy link

@pulltab pulltab Aug 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My $.02, let's not tightly couple postal-main to Kafka. kafka-python uses a thread under the hood to send events and things can go unexpected in the presence of multiprocessing and monkey patching (which we may do some day, who knows).

Proposal:

  1. We configure a separate source / sink in cernan / cernan-events for blueox events:

    • Kafka bootstrapping is done async.
    • We can tune the blueox sources/sinks to discard (or not) in the presence of Kafka slowness as we see fit.
    • Financial events are already slated to use cernan, so we are properly entrenched already.
  2. Using pycernan forces us to use avro for blueox like we are going to do with everything else.

  3. RUST!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me. Takes the burden of async off of blueox/postal, and I was considering the avro route anyway.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: cernan, I was under the impression that we're migrating to fluentbit for log transport?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fluentbit is just replacing cernan when it comes to consuming on disk logs, no?

Now we get into a meaty discussion around whether or not blueox emits logs or events.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants