Skip to content

Commit

Permalink
feat(kafka): update kafka framework & add test demo (#2236)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhaozheng committed Aug 16, 2022
1 parent 501772e commit 5eae117
Show file tree
Hide file tree
Showing 23 changed files with 514 additions and 99 deletions.
65 changes: 21 additions & 44 deletions packages/decorator/src/decorator/microservice/kafkaListener.ts
Original file line number Diff line number Diff line change
@@ -1,52 +1,29 @@
import { MS_CONSUMER_KEY, attachPropertyDataToClass } from '../..';

/**
* @deprecated Replaced by ConsumerSubscribeTopics
*/
export type ConsumerSubscribeTopic = {
fromBeginning?: boolean;
};
export type ConsumerSubscribeTopics = {
fromBeginning?: boolean;
};

export type ConsumerRunConfig = {
autoCommit?: boolean;
autoCommitInterval?: number | null;
autoCommitThreshold?: number | null;
eachBatchAutoResolve?: boolean;
partitionsConsumedConcurrently?: number;
};

export interface KafkaListenerOptions {
propertyKey?: string;
topic?: string;
// exchange?: string;
// /**
// * queue options
// */
// exclusive?: boolean;
// durable?: boolean;
// autoDelete?: boolean;
// messageTtl?: number;
// expires?: number;
// deadLetterExchange?: string;
// deadLetterRoutingKey?: string;
// maxLength?: number;
// maxPriority?: number;
// pattern?: string;
// /**
// * prefetch
// */
// prefetch?: number;
// /**
// * router
// */
// routingKey?: string;
// /**
// * exchange options
// */
// exchangeOptions?: {
// type?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match' | string;
// durable?: boolean;
// internal?: boolean;
// autoDelete?: boolean;
// alternateExchange?: string;
// arguments?: any;
// };
// /**
// * consumeOptions
// */
// consumeOptions?: {
// consumerTag?: string;
// noLocal?: boolean;
// noAck?: boolean;
// exclusive?: boolean;
// priority?: number;
// arguments?: any;
// };

subscription?: ConsumerSubscribeTopics | ConsumerSubscribeTopic;
runConfig?: ConsumerRunConfig;
}

export function KafkaListener(
Expand Down
2 changes: 1 addition & 1 deletion packages/kafka/jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ module.exports = {
testPathIgnorePatterns: ['<rootDir>/test/fixtures'],
coveragePathIgnorePatterns: ['<rootDir>/test/'],
setupFilesAfterEnv: ['./jest.setup.js'],
};
};
119 changes: 78 additions & 41 deletions packages/kafka/src/framework.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,53 +50,90 @@ export class MidwayKafkaFramework extends BaseFramework<any, any, any> {
});
for (const module of subscriberModules) {
const data = listPropertyDataFromClass(MS_CONSUMER_KEY, module);
for (const methodBindListeners of data) {
for (const listenerOptions of methodBindListeners) {
await this.app.connection.subscribe({
topic: listenerOptions.topic,
// fromBeginning: true,
});
await this.app.connection.run({
autoCommit: false,
const topics = [...new Set(data.map(e => e[0].topic))];
let tempTopics = [];
topics.forEach(e => {
tempTopics.push({ topic: e });
});
tempTopics = tempTopics.map(e => {
let subscription = {};
let runConfig = {};
data.forEach(e2 => {
if (e2[0].topic === e.topic) {
if (
typeof e2[0].subscription !== 'undefined' &&
Object.keys(e2[0].subscription).length > 0
) {
subscription = e2[0].subscription;
}
if (
typeof e2[0].runConfig !== 'undefined' &&
Object.keys(e2[0].runConfig).length > 0
) {
runConfig = e2[0].runConfig;
}
}
});
e.subscription = subscription;
e.runConfig = runConfig;
return e;
});
tempTopics.forEach(async e => {
await this.app.connection.subscribe(
Object.assign(
{
topics: topics,
},
e.subscription
)
);
await this.app.connection.run(
Object.assign(e.runConfig, {
eachMessage: async ({ topic, partition, message }) => {
const ctx = {
topic: topic,
partition: partition,
message: message,
commitOffsets: (offset: any) => {
return this.app.connection.commitOffsets([
{
let propertyKey;
for (const methodBindListeners of data) {
for (const listenerOptions of methodBindListeners) {
if (topic === listenerOptions.topic) {
propertyKey = listenerOptions.propertyKey;
const ctx = {
topic: topic,
partition: partition,
offset,
},
]);
},
} as IMidwayKafkaContext;
this.app.createAnonymousContext(ctx);
const ins = await ctx.requestContext.getAsync(module);
const fn = await this.applyMiddleware(async ctx => {
return await ins[listenerOptions.propertyKey].call(
ins,
message
);
});
try {
const result = await fn(ctx);
// 返回为undefined,下面永远不会执行
if (result) {
const res = await this.app.connection.commitOffsets(
message.offset
);
return res;
message: message,
commitOffsets: (offset: any) => {
return this.app.connection.commitOffsets([
{
topic: topic,
partition: partition,
offset,
},
]);
},
} as unknown as IMidwayKafkaContext;
this.app.createAnonymousContext(ctx);
const ins = await ctx.requestContext.getAsync(module);
const fn = await this.applyMiddleware(async ctx => {
return await ins[propertyKey].call(ins, message);
});
try {
const result = await fn(ctx);
// 返回为undefined,下面永远不会执行
if (result) {
const res = await this.app.connection.commitOffsets(
message.offset
);
return res;
}
} catch (error) {
// 记录偏移量提交的异常情况
this.logger.error(error);
}
}
}
} catch (error) {
this.logger.error(error);
}
},
});
}
}
})
);
});
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/kafka/src/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export class KafkaConsumerServer
implements IKafkaApplication
{
protected loggers: ILogger;
protected connection: Consumer = null;
public connection: Consumer = null;

constructor(options: any = {}) {
super();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"name": "ali-demo"
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Provide, Consumer, MSListenerType, Inject, App, KafkaListener } from '@midwayjs/decorator';
import { KafkaMessage } from 'kafkajs';
import { Context, Application } from '../../../../../src';

@Provide()
@Consumer(MSListenerType.KAFKA)
export class UserConsumer {

@App()
app: Application;

@Inject()
ctx: Context;

@Inject()
logger;

@KafkaListener('topic-test', {
subscription: {
fromBeginning: false,
},
runConfig: {
autoCommit: true,
}
})
async gotData(message: KafkaMessage) {
console.info('gotData info');
this.logger.info('test output =>', message.offset + ' ' + message.key + ' ' + message.value.toString('utf8'));
this.app.setAttr('total', this.app.getAttr<number>('total') + 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Configuration, App } from '@midwayjs/decorator';
import { ILifeCycle } from '@midwayjs/core';
import { IMidwayKafkaApplication } from '../../../../src';

@Configuration({
importConfigs: [
{
default: {
kafka: {
kafkaConfig: {
clientId: 'my-app',
brokers: [process.env.KAFKA_URL || 'localhost:9092'],
},
consumerConfig: {
groupId: 'groupId-test',
},
},
},
},
],
})
export class AutoConfiguration implements ILifeCycle {
@App()
app: IMidwayKafkaApplication;

async onReady() {
this.app.setAttr('total', 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Provide } from '@midwayjs/decorator';

@Provide()
export class UserService {
async hello(name) {
return {
name,
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"name": "ali-demo"
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { Provide, Consumer, MSListenerType, Inject, App, KafkaListener } from '@midwayjs/decorator';
import { KafkaMessage } from 'kafkajs';
import { Context, Application } from '../../../../../src';

@Provide()
@Consumer(MSListenerType.KAFKA)
export class UserConsumer {

@App()
app: Application;

@Inject()
ctx: Context;

@Inject()
logger;

@KafkaListener('topic-test0', {
subscription: {
fromBeginning: false,
},
runConfig: {
autoCommit: false,
}
})
async gotData(message: KafkaMessage) {
console.info('gotData info');
this.logger.info('test output =>', message.offset + ' ' + message.key + ' ' + message.value.toString('utf8'));
try {
// 抛出异常,当出现异常的时候,需要设置commitOffsets偏移到异常的位置,用于重新执行消费,所以这里应该出现的消费是2次,total为2
throw new Error("error");
} catch (error) {
this.ctx.commitOffsets(message.offset);
}
this.app.setAttr('total', this.app.getAttr<number>('total') + 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Configuration, App } from '@midwayjs/decorator';
import { ILifeCycle } from '@midwayjs/core';
import { IMidwayKafkaApplication } from '../../../../src';

@Configuration({
importConfigs: [
{
default: {
kafka: {
kafkaConfig: {
clientId: 'my-app',
brokers: [process.env.KAFKA_URL || 'localhost:9092'],
},
consumerConfig: {
groupId: 'groupId-test',
},
},
},
},
],
})
export class AutoConfiguration implements ILifeCycle {
@App()
app: IMidwayKafkaApplication;

async onReady() {
this.app.setAttr('total', 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Provide } from '@midwayjs/decorator';

@Provide()
export class UserService {
async hello(name) {
return {
name,
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"name": "ali-demo"
}

Loading

0 comments on commit 5eae117

Please sign in to comment.