Skip to content

Commit

Permalink
feat(microservices): add consumer getter to kafka context
Browse files Browse the repository at this point in the history
Provide access to native consumer of kafkajs using @ctx() decorator.
  • Loading branch information
davidschuette committed Mar 7, 2022
1 parent c229a75 commit b91803c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 21 deletions.
8 changes: 4 additions & 4 deletions packages/microservices/ctx-host/kafka.context.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { KafkaMessage } from '../external/kafka.interface';
import { Consumer, KafkaMessage } from '../external/kafka.interface';
import { BaseRpcContext } from './base-rpc.context';

type KafkaContextArgs = [
message: KafkaMessage,
partition: number,
topic: string,
commitOffset: () => Promise<void>,
consumer: Consumer,
];

export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
Expand Down Expand Up @@ -37,7 +37,7 @@ export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
/**
* Commit the offset of this message.
*/
commitOffset() {
return this.args[3]();
getConsumer() {
return this.args[3];
}
}
9 changes: 1 addition & 8 deletions packages/microservices/server/server-kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
rawMessage,
payload.partition,
payload.topic,
() =>
this.consumer.commitOffsets([
{
offset: (parseInt(payload.message.offset, 10) + 1).toString(),
partition: payload.partition,
topic: payload.topic,
},
]),
this.consumer,
]);
// if the correlation id or reply topic is not set
// then this is an event (events could still have correlation id)
Expand Down
15 changes: 6 additions & 9 deletions packages/microservices/test/ctx-host/kafka.context.spec.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { KafkaContext } from '../../ctx-host';
import { KafkaMessage } from '../../external/kafka.interface';
import { Consumer, KafkaMessage } from '../../external/kafka.interface';

describe('KafkaContext', () => {
const testFunc = sinon.spy();
const args = ['test', { test: true }, undefined, testFunc];
const args = ['test', { test: true }, undefined, { test: 'consumer' }];
let context: KafkaContext;

beforeEach(() => {
context = new KafkaContext(
args as [KafkaMessage, number, string, () => Promise<void>],
args as [KafkaMessage, number, string, Consumer],
);
});
describe('getTopic', () => {
Expand All @@ -28,10 +26,9 @@ describe('KafkaContext', () => {
expect(context.getMessage()).to.be.eql(args[0]);
});
});
describe('commitOffset', () => {
it('should be called once', () => {
context.commitOffset();
expect(testFunc.called).to.be.true;
describe('getConsumer', () => {
it('should return consumer instance', () => {
expect(context.getConsumer()).to.deep.eq({ test: 'consumer' });
});
});
});

0 comments on commit b91803c

Please sign in to comment.