Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Kafka support #25

Closed
wants to merge 3 commits into from
Closed

Initial Kafka support #25

wants to merge 3 commits into from

Conversation

hartungstenio
Copy link
Collaborator

@hartungstenio hartungstenio commented Aug 3, 2021

How to use:

class ProtobufSchemaMessageTranslator(KafkaMessageTranslator):
    def __init__(self, message_type):
        self.deserializer = ProtobufDeserializer(message_type)

    def translate(self, message):
        translated = super().translate(message)

        ctx = SerializationContext(translated["metadata"]["topic"], "value")
        translated["content"] = self.deserializer(translated["content"], ctx)

        return translated

routes = (
    KafkaRoute(
        "my_topic",
        provider_options,
        handler=Handler(),
        message_translator=ProtobufSchemaMessageTranslator(my_schema),
    ),
)

manager = LoaferManager(routes)
manager.run()

Comment on lines +10 to +18
producer_arguments = inspect.getfullargspec(AIOKafkaProducer.__init__)
self._producer_options = {
name: value for name, value in client_options.items() if name in producer_arguments.kwonlyargs
}

consumer_arguments = inspect.getfullargspec(AIOKafkaConsumer.__init__)
self._consumer_options = {
name: value for name, value in client_options.items() if name in consumer_arguments.kwonlyargs
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O Provider só aceita uma configuração, e uso ela para produtores e consumidores.
A maioria dos parâmetros é compartilhada entre eles. Essa parada do inspect é para eu conseguir separar as poucas diferenças

self._consumer_options = {
name: value for name, value in client_options.items() if name in consumer_arguments.kwonlyargs
}
self._consumer_options.update({"enable_auto_commit": False})
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Estou fazendo o commit manualmente quando consigo puxar mensagens, então não quero o auto commit.


logger.debug(f"publishing message to {topic_name}. receive_count={receive_count}")

async with self.get_producer() as producer:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O Kafka (e coisas parecidas) não possuem o conceito de confirmar uma mensagem. Eu consigo confirmar até onde eu já li.

Então a ideia aqui é que sempre que o handler ou o error_handler retornarem False, eu republico a mensagem novamente no Kafka.

Comment on lines +44 to +46
receive_count = header_map.setdefault("ApproximateReceiveCount", b"1").decode()
receive_count = int(receive_count) + 1
header_map["ApproximateReceiveCount"] = str(receive_count).encode()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metadado de quantas tentativas já foram.

def __init__(
self,
topic_name: str,
retry_topic: Union[None, str, Callable[[str, int], str]] = None,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Não gostei muito dessa lógica, mas ainda não pensei em algo melhor.

A ideia aqui é: Como sempre republico a mensagem, quero aqui falar em qual tópico vou republicar. Então isso é uma função que recebe o tópico original, e a quantidade de tentativas.

Assim consigo criar uma "DLQ" no Kafka. Algo como:

lambda topic, tries: topic if tries < 5 else f"dead__{topic}"

@hartungstenio hartungstenio marked this pull request as ready for review August 5, 2021 18:11
@hartungstenio hartungstenio marked this pull request as draft October 12, 2021 22:53
@hartungstenio hartungstenio deleted the kafka branch July 14, 2023 21:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant