Skip to content

kostyak127/nestjs-kafka

Repository files navigation

kostyak127-nestjs-kafka

Library provides great nestjs and kafkajs integration.

Functionality:

  • send messages
  • subscribe to topics (string and regexp) from your injectable classes
  • subscribe to one topic from many classes or functions
  • io-ts, joi, zod message validation
  • skip messages rules
  • getting message payload by property
  • getting message context in function arguments

Installation

npm

npm install kostyak127-nestjs-kafka

yarn

yarn add kostyak127-nestjs-kafka

Usage

Register module

import { KafkaModule } from 'kostyak127-nestjs-kafka';
import { KafkaModuleOption } from '@app/modules/kafka/interfaces';

// put your options here (it is kafkajs options)
let options: KafkaModuleOption;

@Module({
  imports: [KafkaModule.register(options)],
})
export class AppModule {}

Sending messages

import { Injectable } from '@nestjs/common';
import { KafkaClient } from 'kostyak127-nestjs-kafka';

@Injectable()
export class SendMessageService {
  public constructor(private readonly kafkaClient: KafkaClient) {}
  public async sendMessage(message: { foo: 'bar' }, topic: string) {
    await this.kafkaClient.send({
      messages: [{ key: 'myKey', value: JSON.stringify({ message }],
      topic: topic,
    });
  }
}

Subscripe to topic handler

@Injectable()
export class MessageHandlerService {
  @SubscribeToTopic('TOPIC')
  async onMessage() {
    console.log('message handled');
  }
  @SubscribeToTopic(/.*_customers/)
  async onMessage() {
    console.log('regexp message handled');
  }
}

Message pattern and payload

import { Injectable } from '@nestjs/common';
import { KafkaPayload, SubscribeToTopic } from 'kostyak127-nestjs-kafka';

// specify your path to message payload by messagePattern property
// @KafkaPayload returns payload from message
//
// message from kafka : 
//  {
//     "author": "Author",
//     "message": {
//     "object": { "foo": "bar" }
//  }
// }
// messagePattern:  'message.object'
// @KafkaPayload() messagePayload: { foo: 'bar' }
//
@Injectable()
export class MessageHandlerService {
  @SubscribeToTopic('TOPIC', { messagePattern: 'message.object' })
  async onMessage(@KafkaPayload() messagePayload: { foo: 'bar' }) {
    // logs { foo: 'bar' }
    console.log(messagePayload);
  }
}

Validation

import { Injectable } from '@nestjs/common';
import {
  IoTsValidator,
  JoiValidator,
  SubscribeToTopic,
  ZodValidator,
} from 'kostyak127-nestjs-kafka';

// use your favourite validation library. specify it with schema

@Injectable()
export class MessageHandlerService {
  // export const ZodUser = z.object({
  //    userId: z.number(),
  //    name: z.string(),
  //});
  @SubscribeToTopic('zod_validation', { messagePattern: 'message.object' })
  async validateZod(@ZodValidator(ZodUser) user: ZodTypeOf<typeof ZodUser>) {}


  // export const JoiUser = joi.object({
  //    userId: joi.number(),
  //    name: joi.string(),
  //});
  @SubscribeToTopic('joi_validation', { messagePattern: 'message.object' })
  async validateJoi(
    @JoiValidator(JoiUser) user: joi.extractType<typeof JoiUser>,
  ) {}


  // export const IoTsUser = t.type({
  //    userId: t.number,
  //    name: t.string,
  //});
  @SubscribeToTopic('io-ts_validation', { messagePattern: 'message.object' })
  async validateIoTs(
    @IoTsValidator(IoTsUser) user: IoTsTypeOf<typeof IoTsUser>,
  ) {}
}

Skip messages

import { Injectable } from '@nestjs/common';
import { SkipMessage, SubscribeToTopic } from 'kostyak127-nestjs-kafka';

// skip your messages by some value in message
// here is example how to skip message { author: 'THIS_AUTHOR', data: { foo: 'bar' } }

// you can access nested properties using '.' as wildcard
// use field: 'options.author' for message { message: { author: 'THIS_AUTHOR'} }
@Injectable()
export class MessageHandlerService {
  @SkipMessage([{field: 'author', rule: (author) => author === 'THIS_AUTHOR'}])
  @SubscribeToTopic('TOPIC')
  async onMessage() {
    console.log('messages from author would be skipped by this handler');
  }
}

Kafka context

import { Injectable } from '@nestjs/common';
import { KafkaPayload, SubscribeToTopic, KafkaContext } from 'kostyak127-nestjs-kafka';
import { KafkaMessageContext } from 'kostyak127-nestjs-kafka/src/kafka/kafka.types';

@Injectable()
export class MessageHandlerService {
  @SubscribeToTopic('TOPIC', { messagePattern: 'message.object' })
  async onMessage(@KafkaContext() context: KafkaMessageContext) {
    /// full info message info like topic, offset and other
    console.log(context)
  }
}

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published