Skip to content

Commit

Permalink
fix: e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
thynson committed Sep 22, 2021
1 parent 73fce84 commit 07ea6af
Show file tree
Hide file tree
Showing 21 changed files with 64 additions and 58 deletions.
File renamed without changes.
5 changes: 4 additions & 1 deletion packages/kafkajs-standalone/jest.config.cjs
@@ -1,2 +1,5 @@

module.exports = {...require('../../jest.base-config.cjs')};
module.exports = {
...require('../../jest.base-config.cjs'),
setupFiles: [__dirname + '/jest-setup.cjs'],
};
2 changes: 1 addition & 1 deletion packages/kafkajs-standalone/src/base-message-producer.ts
@@ -1,4 +1,4 @@
import {Message, RecordMetadata, Sender, TopicMessages} from 'kafkajs';
import type {Message, RecordMetadata, Sender, TopicMessages} from 'kafkajs';
import {KafkaSendOption, SimpleMessageProducer} from './types.js';

export abstract class BaseKafkaJsMessageProducer implements SimpleMessageProducer {
Expand Down
6 changes: 3 additions & 3 deletions packages/kafkajs-standalone/src/create-client.ts
@@ -1,15 +1,15 @@
import {Kafka} from 'kafkajs';
import kafkajs from 'kafkajs';
import {KafkaClientOption} from './types.js';
import {consoleLogger} from '@sensejs/utility';
import {createLogOption} from './logging.js';

export function createKafkaClient(clientOption: KafkaClientOption): Kafka {
export function createKafkaClient(clientOption: KafkaClientOption): kafkajs.Kafka {
const {
connectOption: {brokers, ...kafkaConfig},
logOption = {},
logger = consoleLogger,
} = clientOption;
return new Kafka({
return new kafkajs.Kafka({
brokers: typeof brokers === 'string' ? brokers.split(',') : brokers,
...createLogOption(logger, logOption),
...kafkaConfig,
Expand Down
20 changes: 10 additions & 10 deletions packages/kafkajs-standalone/src/logging.ts
@@ -1,4 +1,4 @@
import {LogEntry, logLevel} from 'kafkajs';
import kafkajs from 'kafkajs';
import {KafkaLogOption} from './types.js';
import {Logger} from '@sensejs/utility';

Expand All @@ -8,17 +8,17 @@ export interface KafkaLogAdapterOption {
level?: KafkaLogLevel;
}

function adaptLogLevel(level: logLevel) {
function adaptLogLevel(level: kafkajs.logLevel) {
switch (level) {
case logLevel.ERROR:
case kafkajs.logLevel.ERROR:
return 'error';
case logLevel.WARN:
case kafkajs.logLevel.WARN:
return 'warn';
case logLevel.INFO:
case kafkajs.logLevel.INFO:
return 'info';
case logLevel.DEBUG:
case kafkajs.logLevel.DEBUG:
return 'debug';
case logLevel.NOTHING:
case kafkajs.logLevel.NOTHING:
default:
return '';
}
Expand All @@ -27,10 +27,10 @@ function adaptLogLevel(level: logLevel) {
export function createLogOption(logger: Logger, option: KafkaLogAdapterOption = {}): Required<KafkaLogOption> {
const {level: desiredLevel = 'INFO'} = option;
return {
level: logLevel[desiredLevel],
level: kafkajs.logLevel[desiredLevel],
logCreator: () => {
return (logEntry: LogEntry) => {
if (logLevel[desiredLevel] >= logEntry.level) {
return (logEntry: kafkajs.LogEntry) => {
if (kafkajs.logLevel[desiredLevel] >= logEntry.level) {
const {timestamp, message, ...rest} = logEntry.log;
const level = adaptLogLevel(logEntry.level);
if (level) {
Expand Down
26 changes: 9 additions & 17 deletions packages/kafkajs-standalone/src/message-consumer.ts
@@ -1,18 +1,10 @@
import {
Consumer,
ConsumerConfig,
EachBatchPayload,
Kafka,
KafkaJSError,
KafkaMessage as KafkaJsMessage,
RetryOptions,
} from 'kafkajs';
import kafkajs from 'kafkajs';
import {WorkerController} from './worker-synchronizer.js';
import {createKafkaClient} from './create-client.js';
import {KafkaBatchConsumeMessageParam, KafkaClientOption, KafkaReceivedMessage} from './types.js';

export interface KafkaFetchOption extends Exclude<ConsumerConfig, 'retry'> {
retry?: RetryOptions;
export interface KafkaFetchOption extends Exclude<kafkajs.ConsumerConfig, 'retry'> {
retry?: kafkajs.RetryOptions;
}

export interface KafkaCommitOption {
Expand Down Expand Up @@ -56,8 +48,8 @@ export interface BatchSubscribeOption {
type ConsumeOption = SimpleConsumeOption | BatchConsumeOption;

export class MessageConsumer {
private client: Kafka;
private consumer: Consumer;
private client: kafkajs.Kafka;
private consumer: kafkajs.Consumer;
private consumeOptions: Map<string, ConsumeOption> = new Map();
private crashPromise?: Promise<void>;
private runPromise?: Promise<unknown>;
Expand Down Expand Up @@ -161,7 +153,7 @@ export class MessageConsumer {
}
}

private async eachBatch(payload: EachBatchPayload) {
private async eachBatch(payload: kafkajs.EachBatchPayload) {
const {topic, partition} = payload.batch;
const consumeOption = this.consumeOptions.get(topic);
if (!consumeOption) {
Expand Down Expand Up @@ -190,7 +182,7 @@ export class MessageConsumer {
};

await this.processBatch(
async (message: KafkaJsMessage) => {
async (message: kafkajs.KafkaMessage) => {
await consumer({topic, partition, ...message});
payload.resolveOffset(message.offset);
await payload.commitOffsetsIfNecessary();
Expand All @@ -203,11 +195,11 @@ export class MessageConsumer {
}

private async processBatch(
consumer: (message: KafkaJsMessage) => Promise<void>,
consumer: (message: kafkajs.KafkaMessage) => Promise<void>,
heartbeat: () => Promise<void>,
resolveOffset: (offset: string) => void,
commitOffset: (forced?: boolean) => Promise<void>,
messages: KafkaJsMessage[],
messages: kafkajs.KafkaMessage[],
) {
const synchronizer = this.workerController.createSynchronizer(false);
try {
Expand Down
20 changes: 10 additions & 10 deletions packages/kafkajs-standalone/src/pooled-provider.ts
@@ -1,5 +1,5 @@
import {MessageProducerOption, MessageProducerProvider} from './types.js';
import {Kafka, Producer} from 'kafkajs';
import kafka from 'kafkajs';
import {SimpleKafkaJsMessageProducer} from './simple-message-producer.js';
import {KafkaJsTransactionalMessageProducer} from './transactional-message-producer.js';
import {Pool, PoolConfiguration} from 'lightning-pool';
Expand All @@ -17,13 +17,13 @@ export interface PooledMessageProducerOption extends MessageProducerOption {
}

export class PooledKafkaJsProducerProvider extends MessageProducerProvider {
private pool?: Pool<Producer>;
private client: Kafka;
private pool?: Pool<kafka.Producer>;
private client: kafka.Kafka;
/**
* This map will store each pool by
* For Transactional producer, since the transaction id must be unique,
*/
private txPoolMap: Map<string, Pool<Producer>> = new Map();
private txPoolMap: Map<string, Pool<kafka.Producer>> = new Map();
private txPoolOption: PoolConfiguration;
private allResourcesDrained = Promise.resolve();

Expand All @@ -39,14 +39,14 @@ export class PooledKafkaJsProducerProvider extends MessageProducerProvider {
idleTimeoutMillis,
maxQueue: maxWaitingClients,
};
this.pool = new Pool<Producer>(
this.pool = new Pool<kafka.Producer>(
{
create: async (): Promise<Producer> => {
create: async (): Promise<kafka.Producer> => {
const producer = this.client.producer(this.option.producerOption);
await producer.connect();
return producer;
},
destroy: async (client: Producer): Promise<void> => {
destroy: async (client: kafka.Producer): Promise<void> => {
await client.disconnect();
},
reset: async () => {},
Expand Down Expand Up @@ -83,9 +83,9 @@ export class PooledKafkaJsProducerProvider extends MessageProducerProvider {
maxInFlightRequests: 1,
idempotent: true,
});
const pool = new Pool<Producer>(
const pool = new Pool<kafka.Producer>(
{
create: async (): Promise<Producer> => {
create: async (): Promise<kafka.Producer> => {
await producer.connect();
return producer;
},
Expand All @@ -111,7 +111,7 @@ export class PooledKafkaJsProducerProvider extends MessageProducerProvider {
return this.createTxPool(transactionalId);
}

private checkReleased(): Pool<Producer> {
private checkReleased(): Pool<kafka.Producer> {
if (!this.pool) {
throw new Error('Pool has been destroyed');
}
Expand Down
2 changes: 1 addition & 1 deletion packages/kafkajs-standalone/src/simple-message-producer.ts
@@ -1,4 +1,4 @@
import {Sender} from 'kafkajs';
import type {Sender} from 'kafkajs';
import {KafkaSendOption} from './types.js';
import {BaseKafkaJsMessageProducer} from './base-message-producer.js';

Expand Down
4 changes: 2 additions & 2 deletions packages/kafkajs-standalone/src/simple-provider.ts
@@ -1,12 +1,12 @@
import {SimpleKafkaJsMessageProducer} from './simple-message-producer.js';
import {Kafka} from 'kafkajs';
import kafkajs from 'kafkajs';
import {KafkaJsTransactionalMessageProducer} from './transactional-message-producer.js';
import {MessageProducerOption, MessageProducerProvider} from './types.js';
import {createKafkaClient} from './create-client.js';

export class SimpleKafkaJsProducerProvider extends MessageProducerProvider {
private allProducerClosed: Promise<any> = Promise.resolve();
private client: Kafka;
private client: kafkajs.Kafka;

constructor(private option: MessageProducerOption) {
super();
Expand Down
@@ -1,5 +1,5 @@
import {KafkaSendOption, TransactionalMessageProducer} from './types.js';
import {Offsets, Producer, Transaction} from 'kafkajs';
import type {Offsets, Producer, Transaction} from 'kafkajs';
import {BaseKafkaJsMessageProducer} from './base-message-producer.js';

export class KafkaJsTransactionalMessageProducer
Expand Down
2 changes: 1 addition & 1 deletion packages/kafkajs-standalone/src/types.ts
@@ -1,4 +1,4 @@
import * as k from 'kafkajs';
import k from 'kafkajs';
import {LogEntry, logLevel, Message, Offsets, RecordMetadata, TopicMessages} from 'kafkajs';
import {KafkaLogAdapterOption} from './logging.js';
import {Logger} from '@sensejs/utility';
Expand Down
1 change: 0 additions & 1 deletion packages/kafkajs-standalone/tests/message-consumer.spec.ts
Expand Up @@ -20,7 +20,6 @@ MockKafka.prototype.admin = jest.fn();
MockKafka.prototype.consumer = jest.fn();

jest.mock('kafkajs', () => {
console.log('returning mocked kafkajs');
return {
// eslint-disable-next-line @typescript-eslint/naming-convention
Kafka: MockKafka,
Expand Down
File renamed without changes.
5 changes: 4 additions & 1 deletion packages/kafkajs-zstd-support/jest.config.cjs
@@ -1,2 +1,5 @@

module.exports = {...require('../../jest.base-config.cjs')};
module.exports = {
...require('../../jest.base-config.cjs'),
setupFiles: [__dirname + '/jest-setup.cjs'],
};
3 changes: 2 additions & 1 deletion packages/kafkajs-zstd-support/package.json
Expand Up @@ -38,7 +38,8 @@
"zstd-napi": "^0.0.6"
},
"devDependencies": {
"kafkajs": "^1.15.0"
"kafkajs": "^1.15.0",
"config": "^3.3.6"
},
"peerDependencies": {
"kafkajs": "^1.15.0"
Expand Down
10 changes: 5 additions & 5 deletions packages/kafkajs-zstd-support/src/index.ts
@@ -1,9 +1,9 @@
import {CompressionTypes, CompressionCodecs} from 'kafkajs';
import {Compressor, Decompressor} from 'zstd-napi';
import kafkajs from 'kafkajs';
import zn from 'zstd-napi';

const zstdKafkaCodec = () => {
const compressor = new Compressor();
const decompressor = new Decompressor();
const compressor = new zn.Compressor();
const decompressor = new zn.Decompressor();
return {
async compress(encoder: any) {
return compressor.compress(encoder.buffer);
Expand All @@ -15,4 +15,4 @@ const zstdKafkaCodec = () => {
};
};

CompressionCodecs[CompressionTypes.ZSTD] = zstdKafkaCodec;
kafkajs.CompressionCodecs[kafkajs.CompressionTypes.ZSTD] = zstdKafkaCodec;
File renamed without changes.
5 changes: 4 additions & 1 deletion packages/kafkajs/jest.config.cjs
@@ -1,2 +1,5 @@

module.exports = {...require('../../jest.base-config.cjs')};
module.exports = {
...require('../../jest.base-config.cjs'),
setupFiles: [__dirname + '/jest-setup.cjs'],
};
3 changes: 2 additions & 1 deletion packages/kafkajs/package.json
Expand Up @@ -42,7 +42,8 @@
"devDependencies": {
"@sensejs/container": "workspace:*",
"@sensejs/core": "workspace:*",
"@sensejs/testing-utility": "workspace:*"
"@sensejs/testing-utility": "workspace:*",
"config": "^3.3.6"
},
"peerDependencies": {
"@sensejs/container": "workspace:^0.9.0-alpha.4",
Expand Down
2 changes: 1 addition & 1 deletion packages/kafkajs/tests/producer-provider.e2e-spec.ts
@@ -1,6 +1,6 @@
import {createModule, Inject, ModuleClass, ModuleRoot, uuidV1} from '@sensejs/core';
import config from 'config';
import {createPooledProducerModule, createSimpleProducerModule} from '../src/producer-provider-module';
import {createPooledProducerModule, createSimpleProducerModule} from '../src/producer-provider-module.js';
import {MessageProducerProvider} from '@sensejs/kafkajs-standalone';

test('createSimpleProducerModule', async () => {
Expand Down
4 changes: 4 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 07ea6af

Please sign in to comment.