Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache Kafka Support #2361

Closed
mkaufmaner opened this issue Jun 6, 2019 · 23 comments
Closed

Apache Kafka Support #2361

mkaufmaner opened this issue Jun 6, 2019 · 23 comments

Comments

@mkaufmaner
Copy link
Contributor

mkaufmaner commented Jun 6, 2019

Feature Request

Is your feature request related to a problem? Please describe.

Kafka provides a very rich feature set in regards to horizontally scaling event sourced frameworks. More specifically, the concept of multiple consumers running in parallel while maintaining event order.

Describe the solution you'd like

Utilizing the existing @nestjs/microservices package and abstraction layer the Kafka microservice will provide support for producing and consuming Kafka messages. A Kafka client will act as a producer and a Kafka server will act as a consumer.

Teachability, Documentation, Adoption, Migration Strategy

With consideration into performance and maintainability, I recommend the node-rdkafka (https://www.npmjs.com/package/node-rdkafka) package. CPU is reported to be 5-10% less than the kafka-node (https://www.tivix.com/blog/nodejs-kafka-producers) package.

What is the motivation / use case for changing the behavior?

Simply to support Apache Kafka :D

@patricknazar
Copy link

patricknazar commented Jun 19, 2019

I am also interested in this. Another githubber and I want to create a package/packages for this purpose. I agree with using node-rdkafka. I feel there is a use with the @nestjs/cqrs package too. Perhaps a way to integrate kafka-streams into this as well.

@iangregsondev
Copy link

We currently use kafka in another project (not nest but pure nodejs) - and I thought I would mention some issues we get. node-rdkafka wraps the c++ driver, I had a lot of issues. Too many to list, but 1 that i seemed to remember was when i did a process.exit(1) because I had a failure - nodejs failed to exit.. The rdkafka was stopping it from exiting.

Eventually, I moved over to https://github.com/SOHU-Co/kafka-node

An integration with nestjs but be great - but I would ask to support the above library or at the very least support a kind of "provider" so that we can choose the library of choice.

@mkaufmaner
Copy link
Contributor Author

Actually, I dislike how nest would have to implement kafka-node and node-rdkafka.

What about https://kafka.js.org/ ? The project is a little young but gaining popularity. More importantly, the codebase is easier to implement.

@mkaufmaner
Copy link
Contributor Author

mkaufmaner commented Jun 24, 2019

@patricknazar @appsolutegeek @kamilmysliwiec

I am leaning towards only having a single consumer group for the microservices server and a single producer for the microservices client. Thoughts?

@patricknazar
Copy link

The thing that interests me the most is the ability to make the ecosystem pluggable, where different parts of the system respond to events. For multiple different services to respond to the same event, I understand we need multiple consumer groups, one per service.

To be honest, request-response pattern could be built on kafka, but the broker doesn't contribute much, other than a "central" point to talk to. There is no need to broadcast or store the messages. But if you were to, you would use a single consumer group. A single producer would make sense.

So, imho there is a requirement for configurable consumer group with event-based, and common consumer-group for request-response. As long as different services don't try to respond to requests differently, things won't get weird, as the requests will get round-robined (or partitioned if using key) within that consumer group.

I keep getting tagged as patrickhousley (I assume you meant me :P).

@massimodeluisa
Copy link

The thing that interests me the most is the ability to make the ecosystem pluggable, where different parts of the system respond to events. For multiple different services to respond to the same event, I understand we need multiple consumer groups, one per service.

To be honest, request-response pattern could be built on kafka, but the broker doesn't contribute much, other than a "central" point to talk to. There is no need to broadcast or store the messages. But if you were to, you would use a single consumer group. A single producer would make sense.

So, imho there is a requirement for configurable consumer group with event-based, and common consumer-group for request-response. As long as different services don't try to respond to requests differently, things won't get weird, as the requests will get round-robined (or partitioned if using key) within that consumer group.

I keep getting tagged as patrickhousley (I assume you meant me :P).

Check this out: https://martinfowler.com/bliki/CQRS.html
(and of course, this: https://github.com/nestjs/cqrs ...and this: https://github.com/kamilmysliwiec/nest-cqrs-example)

IMO this project (NestJS) could be the best platform to implement a CQRS system once will support Apache Kafka!

@patricknazar
Copy link

@massimodeluisa Yes, my intention is to use it for CQRS. As I mentioned earlier, perhaps we need to integrate with the nest cqrs package too. If we can use @EventPattern from microservices package to respond to kafka messages, then perhaps the cqrs module isn't needed. It has Sagas, though, which is starting to make me think of stream processing, which is why I mentioned kafka-streams.

@mkaufmaner
Copy link
Contributor Author

I keep getting tagged as patrickhousley (I assume you meant me :P).

@patricknazar It is the autofill feature after the @ that keeps messing up 😒

Hrm, I can certainly understand the correlation between using kafka-streams and CRQS, however kafka-streams utilizes most (https://github.com/cujojs/most) and node-rdkafka (https://github.com/Blizzard/node-rdkafka) which makes the implementation rather difficult.

I am not sure how we would use the @EventPattern to respond to Kafka messages since we will most likely have to end up implementing custom decorators describing the topic and options.

@patricknazar
Copy link

Hrm, I can certainly understand the correlation between using kafka-streams and CRQS, however kafka-streams utilizes most (https://github.com/cujojs/most) and node-rdkafka (https://github.com/Blizzard/node-rdkafka) which makes the implementation rather difficult.

Agreed. Merging/joining/etc streams would be pretty awesome though.

I am not sure how we would use the @EventPattern to respond to Kafka messages since we will most likely have to end up implementing custom decorators describing the topic and options.

Can we add an optional options argument to the @EventPattern decorator? Those transports that need it can use it. I imagine typing might be a bit tricky though. Not sure if that would make sense or not.

@mkaufmaner
Copy link
Contributor Author

The more I think about how to implement Kafka into the nest microservices package the more opinionated it becomes because of the microservices abstraction layer.

First, Kafka employs a dumb broker and smart consumers communication style. This means nest should be able to instantiate multiple consumers. If nest is going to support multiple consumers, nest should also support multiple producers. Subsequently, the instantiated consumers should be accessible by methods consuming messages.

Routing of consumed messages to class methods will use method decorators. Messages can be routed by their topic, header, or value with routes being prioritized in that order.

@KafkaTopic(consumer: KafkaConsumer, topic: string | RegExp) Routes all messages of a topic to the method.
@KafkaHeader(consumer: KafkaConsumer, key: string, value?: string | RegExp) Routes all messages with matching headers.
@KafkaValue(consumer: KafkaConsumer, value: string | RegExp) Routes all messages with matching values. This would possibly require something like Avro (https://avro.apache.org/docs/current/spec.html#preamble)

The payload passed to the message handlers will be either type of EachMessagePayload (https://github.com/tulios/kafkajs/blob/master/types/index.d.ts#L580) or EachBatchPayload (https://github.com/tulios/kafkajs/blob/master/types/index.d.ts#L586) which is determined by the handlerMethod.

@Controller()
class KafkaController {
    @Client(kafkaClientOptions)
    private readonly client: ClientKafka;

    @KafkaConsumer(consumerOptions, subscriptions, handlerMethod, runOptions)
    private consumer: KafkaConsumer;

    private producer: KafkaProducer;

    onModuleInit() {
        this.producer = this.client.createProducer(producerOptions);
    }

    @KafkaTopic(this.consumer, 'test.1')
    public topicHandlerString(payload: any){
        // do something
    }

    @KafkaTopic(this.consumer, /test.*/i)
    public topicHandlerRegexp(payload: any){
        // do something
    }

    @KafkaHeader(this.consumer, 'key')
    public headerHandlerWithoutValue(payload: any){
        // do something
    }

    @KafkaHeader(this.consumer, 'key', 'value')
    public headerHandlerWithValue(payload: any){
        // do something
    }

    @KafkaHeader(this.consumer, 'key', /value.*/i)
    public headerHandlerWithValueRegex(payload: any){
        // do something
    }

    @KafkaValue(this.consumer, 'eventName')
    public valueHandler(payload: any){
        // do something
    }

    @KafkaValue(this.consumer, /eventPrefix.*/i)
    public valueHandlerWithRegExp(payload: any){
        // do something
    }
}

Thoughts?

@patricknazar
Copy link

I was thinking something along these lines, except I would now take a KafkaModule.forRoot(...) approach to init the client and consumers. But yes, I was starting to think that this might be better left independent of the microservices package.

I had started writing a module for this with a @KafkaProducer(producerConfig) property decorator and had plans for a similar @KafkaConsumer(consumerConfig) one. At this stage I hadn't started with any method decorators, as I still wasn't sure about the right way. I like the general idea of what you have here. Combining our thoughts:

Initializing the module. Takes the KafkaConfig and a list of user defined @KafkaConsumer decorated classes

@Module({
    imports: [
        KafkaModule.forRoot({
            kafkaConfig: {
                brokers: ['localhost:9092']
                // ...
            },
            consumers: [MyConsumer],
        })
    ],
})
export class AppModule {}

Consuming messages.

@KafkaConsumer(consumerOptions, subscriptions, handlerMethod, runOptions)
export class MyConsumer {

    @KafkaTopic('test.1')
    public topicHandlerString(payload: any) {}

    @KafkaTopic(/test.*/i)
    public topicHandlerRegexp(payload: any) {}

    @KafkaHeader('key')
    public headerHandlerWithoutValue(payload: any) {}

    @KafkaHeader('key')
    public headerHandlerWithoutValue(payload: any) {}

    @KafkaHeader('key', 'value')
    public headerHandlerWithValue(payload: any) {}

    @KafkaHeader('key', /value.*/i)
    public headerHandlerWithValueRegex(payload: any) {}

    @KafkaValue('eventName')
    public valueHandler(payload: any) {}

    @KafkaValue(/eventPrefix.*/i)
    public valueHandlerWithRegExp(payload: any) {}

}

Producing messages

@Injectable()
export class MyService {
    @KafkaProducer(producerConfig)
    private producer: Producer;

    async produceMessage(value: string) {
        console.log(`Sending ${value} to kafka...`);
        await this.producer.send({
            topic: 'new-topic',
            messages: [{ key: 'message', value }]
        });
    }

    async transactionMessage() {
        const transaction = await this.producer.transaction();

        try {
            await transaction.send({ topic: 'test-topic', messages: [
                {key: 'one', value: 'one'},
                {key: 'two', value: 'two'},
            ] });

            await transaction.commit();
        } catch (e) {
            await transaction.abort();
        }
    }
}

I was trying to clean up the repo and get it ready for public eyes - the nuances and pains of a monorepo are surprisingly annoying. I'll put that on hold until we get this right or something.

@hellraisercenobit
Copy link

same here :)
the only way to have an effective consumer with groupstream without nest was to use kafka-node.
it is possible to implement consumer with CustomTransportStrategy interface?

@patricknazar i'm really interested on your sample

@mkaufmaner
Copy link
Contributor Author

@patricknazar Haven't worked on this much but after trying to shoehorn everything into the microservices abstraction I believe a standalone module maybe the best choice. Would really like to get a peak at your code.

@patricknazar
Copy link

@hellraisercenobit @mkaufmaner
I have to admit I haven't worked in this much either - we have decided to use kafka when we have expanded more. Running a cluster is out of reach for us at the moment, sadly. If I get some time this weekend I'll see if I can get my repo resurrected and at least shaped into the general idea above.

@zonorion
Copy link

Kafka support already? I want to use T______T

@hellraisercenobit
Copy link

@patricknazar For my curiosity, how do you bootstraping the app without http adapter ?

@patricknazar
Copy link

@hellraisercenobit

@patricknazar For my curiosity, how do you bootstraping the app without http adapter ?

https://docs.nestjs.com/application-context

:)

@mkaufmaner
Copy link
Contributor Author

In consideration os using AVRO, @kamilmysliwiec seems to be working on serializers. See #2653 and nestjs/docs.nestjs.com#406

Subsequently, I will wait for that to be complete before implementing AVRO. In the meantime I think simple JSON serialization will suffice.

@iangregsondev
Copy link

Wow ! This looks really nice, I actually just moved over to kafkajs in my own implementation inside of nest. I would love to use this library. It looks really promising. @mkaufmaner looking forward to avro - actually the schema registry would be great also as we currently do this via the schema registry.

I currently use this library from npm, it works - not sure about others. When I checked a while ago it was the only thing i could find..

https://www.npmjs.com/package/avro-schema-registry

@kamilmysliwiec
Copy link
Member

Added in 6.7.0 :) We're working on the docs now

@aardee
Copy link

aardee commented Dec 13, 2019

Why do we need Avro Schema Registry, if we are sending JSON messages? We may even get away without having to worry about the Schema Registry altogether.

@katsanva
Copy link

@aardee it is because Avro is pretty common for encoding the Kafka messages. And it actually looks like a nice feature to select which encoding mechanism is used.

@lock
Copy link

lock bot commented Apr 15, 2020

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators Apr 15, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

9 participants