Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix zlib polyfill -- was "Support for kafkajs package" #1811

Closed
renatocron opened this issue Jan 17, 2023 · 4 comments
Closed

Fix zlib polyfill -- was "Support for kafkajs package" #1811

renatocron opened this issue Jan 17, 2023 · 4 comments
Labels
bug Something isn't working ecosystem Something that relates to package or framework compatibility

Comments

@renatocron
Copy link

renatocron commented Jan 17, 2023

What version of Bun is running?

0.5.0

What platform is your computer?

Linux 5.4.0-135-generic x86_64 x86_64

What steps can reproduce the bug?

Init a bun package, add kafkajs (tested on 2.2.3)

Trying to init a consumer results in invalid input error after joining the group:

{"level":"ERROR","timestamp":"2023-01-17T10:22:40.120Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 11)","broker":"localhost:9093","clientId":"test.js","error":"invalid input","correlationId":0,"size":3187}

What is the expected behavior?

Read messages from the topic.

Topic was already created and poplated with data, compression is gzip.

I need to further test if producer/admin api is working with bun

What do you see instead?

No response

Additional information

Tested script:

index.ts

KAFKA_BROKERS probably need updates for your config

import { Kafka, Batch } from 'kafkajs';

class KafkaUtils {

    static newClient(logger: { info: any }, clientId: string) {
        if (!clientId)
            clientId = 'foobar';
        let brokers, envBrokers = process.env.KAFKA_BROKERS;

        if (!envBrokers) {
            logger.info('KAFKA_BROKERS not configured, Setting up KAFKA_BROKERS for development')
            brokers = ["172.17.0.1:9092", "172.17.0.1:9093", "172.17.0.1:9094"];
        } else {

            const parts = envBrokers.split(',');
            parts.forEach(function (v, i) {
                if (/^[a-z0-9-\.]+:\d+$/.test(v) === false) {
                    throw `Please check value for KAFKA_BROKERS "${envBrokers}" at index ${i} "${v}" looks invalid`;
                }
            })
            brokers = parts;
        }

        logger.info('kafka will connect to ' + JSON.stringify(brokers));
        let k = new Kafka({
            clientId: clientId,
            brokers: brokers
        });
        return k;
    }
}


const consumerTopic = process.env.CONSUMER_TOPIC || 'topic';
const consumerGroup = process.env.CONSUMER_GROUP || 'CONSUMER_GROUP' + Math.random();
const consumeFromStart = process.env.FROM_START ? true : false;

const logger = {
    info: function (e) {
        console.log(e)
    }
};
(async function () {
    const kafka = KafkaUtils.newClient(logger, 'test.js');

    logger.info('about to connect to kafka as consumer');
    const consumer = kafka.consumer({ groupId: String(consumerGroup) })

    consumer.on('consumer.crash', (event: any) => {
        const error = event?.payload?.error
        logger.info('consumer.crash: ' + error);
        logger.info('exiting in 15 seconds....');
        setTimeout(() => {
            logger.info('exiting...');
            process.exit(1)
        }, 15000)
    });

    await consumer.subscribe({ topic: consumerTopic, fromBeginning: consumeFromStart });
    await consumer.run({
        autoCommitThreshold: 100,
        autoCommitInterval: 5000,
        eachBatch: async ({
            batch,
            resolveOffset,
            heartbeat,
            commitOffsetsIfNecessary,
            uncommittedOffsets,
            isRunning,
            isStale,
        }) => {
            logger.info(`partition=${batch.partition} batch has ${batch.messages.length} messages batch.highWatermark=${batch.highWatermark}`);

            const beforeProc = Date.now();
            await handleNewMessages(batch);
            logger.info(`processed ${batch.messages.length} messages in ${Date.now() - beforeProc} ms`);
        },
    })


})();

async function handleNewMessages(batch: Batch) {


    const parsedMessages = [];
    for (const message of batch.messages) {
        if (message.value == null) continue;
        try {
            const msgs = JSON.parse(message.value.toString());

            if (Array.isArray(msgs)) {
                parsedMessages.push(...msgs);
            }
            else {
                logger.info(`Failed to parse ${message.value.toString()}: array expected - ignoring message at offset ${message.offset} on partition=${batch.partition}`);
            }

        } catch (error) {
            logger.info(`Failed to parse ${message.value.toString()}: ${error} - ignoring message at offset ${message.offset} on partition=${batch.partition}`);
        }
    }

    console.log(parsedMessages);


}

ENV KAFKA_VERSION 3.1.0
ENV SCALA_VERSION 2.13

ps: I send you a docker-compose for kafka but I think I should try a simple setup with redpanda first with nodejs and then edit this PR, cause I'm building from archive.apache.org directly, not confluent

@renatocron renatocron added the bug Something isn't working label Jan 17, 2023
@Jarred-Sumner
Copy link
Collaborator

Five possibilities come to mind:

  1. not yet implemented api (unlikely, it would complain about a missing function)
  2. a bug in bun's tcp client (possibly, just fixed a macOS specific one earlier tonight)
  3. a bug in bun's node:net implementation (less likely than 2)
  4. some CommonJS/ESM thing (probably not)
  5. A bug in Buffer in bun (less likely than 3)

@Electroid Electroid added the ecosystem Something that relates to package or framework compatibility label Jan 17, 2023
@renatocron
Copy link
Author

renatocron commented Jan 18, 2023

After testing with another topic without compression, I found the origin of the issue, it's not working when trying to decode the GZIP stream. This code bellow is able to reproduce the error message (similar to https://github.com/tulios/kafkajs/blob/master/src/protocol/message/compression/gzip.js), but without kafkajs

const { promisify } = require('util')
const zlib = require('zlib')

const gzip = promisify(zlib.gzip)
const unzip = promisify(zlib.unzip)
class Test {
    static async compress(encoder: any) {
        return await gzip(encoder)
    }

    static async decompress(buffer: any) {
        return await unzip(buffer)
    }
}

const buffer = await Test.compress('abc');
const decoded = await Test.decompress(buffer);

console.log({ r: buffer, A: decoded });

when running this, the message is a very long compressed code, followed by:

error: invalid input
      at /bun-vfs/node_modules/zlib/index.js:63:52128

short version

import zlib from 'zlib';

zlib.gzip('abc', (err, buffer) => {
    if (err) throw 'encoder error: ' + err.message;

    zlib.unzip(buffer, (err, decoded) => {
        if (err) throw 'decode error error: ' + err.message;
        console.log({ buffer: buffer, decoded: decoded.toString() });
    });
});

kafkajs is working fine without the compression

When trying to aff other compressions algorithms, such as zstd, it fails on

var compressor = require('bindings')('compressor.node');

because I think bun does not support this binding yet

@Jarred-Sumner
Copy link
Collaborator

oh yeah, our zlib polyfill is not great. it needs to be implemented natively using zlib but we haven't done that yet.

@renatocron renatocron changed the title Support for kafkajs package Fix zlib polyfill was "Support for kafkajs package" Jan 20, 2023
@renatocron renatocron changed the title Fix zlib polyfill was "Support for kafkajs package" Fix zlib polyfill -- was "Support for kafkajs package" Jan 20, 2023
@renatocron
Copy link
Author

refer to >> #1511

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working ecosystem Something that relates to package or framework compatibility
Projects
None yet
Development

No branches or pull requests

3 participants