Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error: truncated buffer/TypeError: this.buf.utf8Slice is not a function #59

Closed
fnozarian opened this issue Jul 23, 2016 · 6 comments
Closed

Comments

@fnozarian
Copy link

fnozarian commented Jul 23, 2016

When I want to read an event received from Kafka with the following schema, at first I get the Error: truncated buffer exception and when I customize the JSON schema I get TypeError: this.buf.utf8Slice is not a function exception! I think something is wrong with JSON schema. I've provided both Avro and JSON schema for better debugging.

My Avro schema when producing event to Kafka:

{
    "name": "ComposedEvent",
    "type": "record",
    "fields": [

        { "name": "SearchResult",
            "type": [
                "null",
                {
                    "name":"SearchResultRec", "type": "record",
                    "fields":[
                        { "name": "query", "type": "string"},
                        { "name": "took", "type": "int" },
                        { "name": "timed_out", "type": "boolean" },
                        { "name": "hits", "type": "int"}
                    ]
                }
            ], "default" : null
        },

        { "name": "detectedDuplicate",       "type": "boolean" },
        { "name": "detectedCorruption",      "type": "boolean" },
        { "name": "firstInSession",          "type": "boolean" },
        { "name": "timestamp",               "type": "long" },
        { "name": "clientTimestamp",         "type": "long" },
        { "name": "remoteHost",              "type": "string" },
        { "name": "referer",                 "type": ["null", "string"], "default": null },
        { "name": "location",                "type": ["null", "string"], "default": null },
        { "name": "viewportPixelWidth",      "type": ["null", "int"],    "default": null },
        { "name": "viewportPixelHeight",     "type": ["null", "int"],    "default": null },
        { "name": "screenPixelWidth",        "type": ["null", "int"],    "default": null },
        { "name": "screenPixelHeight",       "type": ["null", "int"],    "default": null },
        { "name": "partyId",                 "type": ["null", "string"], "default": null },
        { "name": "sessionId",               "type": ["null", "string"], "default": null },
        { "name": "pageViewId",              "type": ["null", "string"], "default": null },
        { "name": "eventType",               "type": "string",           "default": "unknown" },
        { "name": "userAgentString",         "type": ["null", "string"], "default": null },
        { "name": "userAgentName",           "type": ["null", "string"], "default": null },
        { "name": "userAgentFamily",         "type": ["null", "string"], "default": null },
        { "name": "userAgentVendor",         "type": ["null", "string"], "default": null },
        { "name": "userAgentType",           "type": ["null", "string"], "default": null },
        { "name": "userAgentVersion",        "type": ["null", "string"], "default": null },
        { "name": "userAgentDeviceCategory", "type": ["null", "string"], "default": null },
        { "name": "userAgentOsFamily",       "type": ["null", "string"], "default": null },
        { "name": "userAgentOsVersion",      "type": ["null", "string"], "default": null },
        { "name": "userAgentOsVendor",       "type": ["null", "string"], "default": null }
    ]
}

My JSON schema when trying to read produced events:

{
    name: 'ComposedEvent',
    type: 'record',
    fields: [

        { name: 'SearchResult',
            type: [
                'null',
                {
                    name:'SearchResultRec', type: 'record',
                    fields:[
                        { name: 'query', type: 'string'},
                        { name: 'took', type: 'int' },
                        { name: 'timed_out', type: 'boolean' },
                        { name: 'hits', type: 'int'}
                    ]
                }
            ], 'default': null
        },

        { name: 'detectedDuplicate',       type: 'boolean' },
        { name: 'detectedCorruption',      type: 'boolean' },
        { name: 'firstInSession',          type: 'boolean' },
        { name: 'timestamp',               type: 'int' },
        { name: 'clientTimestamp',         type: 'int' },
        { name: 'remoteHost',              type: 'string' },
        { name: 'referer',                 type: ['null', 'string'], 'default': null },
        { name: 'location',                type: ['null', 'string'], 'default': null },
        { name: 'viewportPixelWidth',      type: ['null', 'int'],    'default': null },
        { name: 'viewportPixelHeight',     type: ['null', 'int'],    'default': null },
        { name: 'screenPixelWidth',        type: ['null', 'int'],    'default': null },
        { name: 'screenPixelHeight',       type: ['null', 'int'],    'default': null },
        { name: 'partyId',                 type: ['null', 'string'], 'default': null },
        { name: 'sessionId',               type: ['null', 'string'], 'default': null },
        { name: 'pageViewId',              type: ['null', 'string'], 'default': null },
        { name: 'eventType',               type: 'string',           'default': 'unknown' },
        { name: 'userAgentString',         type: ['null', 'string'], 'default': null },
        { name: 'userAgentName',           type: ['null', 'string'], 'default': null },
        { name: 'userAgentFamily',         type: ['null', 'string'], 'default': null },
        { name: 'userAgentVendor',         type: ['null', 'string'], 'default': null },
        { name: 'userAgentType',           type: ['null', 'string'], 'default': null },
        { name: 'userAgentVersion',        type: ['null', 'string'], 'default': null },
        { name: 'userAgentDeviceCategory', type: ['null', 'string'], 'default': null },
        { name: 'userAgentOsFamily',       type: ['null', 'string'], 'default': null },
        { name: 'userAgentOsVersion',      type: ['null', 'string'], 'default': null },
        { name: 'userAgentOsVendor',       type: ['null', 'string'], 'default': null }
    ]
}
@fnozarian fnozarian changed the title TypeError: this.buf.utf8Slice is not a function Error: truncated buffer/TypeError: this.buf.utf8Slice is not a function Jul 24, 2016
@mtth
Copy link
Owner

mtth commented Jul 25, 2016

Your schema looks fine. What environment are you running (i.e. node/browser version)? Could you attach the code you are using to read the message?

@fnozarian
Copy link
Author

The environment is node and I was trying to read messages using the following code:

var avro = require('avsc');
var kafka = require('kafka-node');
var type = avro.parse({
    name: 'ComposedEvent',
    type: 'record',
    fields: [
        { name: 'SearchResult',
            type: [
                'null',
                {
                    name:'SearchResultRec', type: 'record',
                    fields:[
                        { name: 'query', type: 'string'},
                        { name: 'took', type: 'int' },
                        { name: 'timed_out', type: 'boolean' },
                        { name: 'hits', type: 'int'}
                    ]
                }
            ], 'default': null
        },

        { name: 'detectedDuplicate',       type: 'boolean' },
        { name: 'detectedCorruption',      type: 'boolean' },
        { name: 'firstInSession',          type: 'boolean' },
        { name: 'timestamp',               type: 'int' },
        { name: 'clientTimestamp',         type: 'int' },
        { name: 'remoteHost',              type: 'string' },
        { name: 'referer',                 type: ['null', 'string'], 'default': null },
        { name: 'location',                type: ['null', 'string'], 'default': null },
        { name: 'viewportPixelWidth',      type: ['null', 'int'],    'default': null },
        { name: 'viewportPixelHeight',     type: ['null', 'int'],    'default': null },
        { name: 'screenPixelWidth',        type: ['null', 'int'],    'default': null },
        { name: 'screenPixelHeight',       type: ['null', 'int'],    'default': null },
        { name: 'partyId',                 type: ['null', 'string'], 'default': null },
        { name: 'sessionId',               type: ['null', 'string'], 'default': null },
        { name: 'pageViewId',              type: ['null', 'string'], 'default': null },
        { name: 'eventType',               type: 'string',           'default': 'unknown' },
        { name: 'userAgentString',         type: ['null', 'string'], 'default': null },
        { name: 'userAgentName',           type: ['null', 'string'], 'default': null },
        { name: 'userAgentFamily',         type: ['null', 'string'], 'default': null },
        { name: 'userAgentVendor',         type: ['null', 'string'], 'default': null },
        { name: 'userAgentType',           type: ['null', 'string'], 'default': null },
        { name: 'userAgentVersion',        type: ['null', 'string'], 'default': null },
        { name: 'userAgentDeviceCategory', type: ['null', 'string'], 'default': null },
        { name: 'userAgentOsFamily',       type: ['null', 'string'], 'default': null },
        { name: 'userAgentOsVersion',      type: ['null', 'string'], 'default': null },
        { name: 'userAgentOsVendor',       type: ['null', 'string'], 'default': null }
    ]
});
var client = kafka.Client("hadoop2:2181","kafka-node-client");

var consumer = new kafka.Consumer(client, [{topic:'divolte',partition:2}],{autoCommit:false});
consumer.on('message', function(message){
var decodedMessage = type.fromBuffer(message.value);
console.log(decodedMessage);
//console.log(message.value);
});

@mtth
Copy link
Owner

mtth commented Jul 25, 2016

Thanks, I see. The problem is that kafka-node's message.value isn't the right argument for type.fromBuffer. For example it expects a Buffer containing exactly a single encoded value but AFAIK message.value is a string prefixed by a header (see #13).

The best way to handle this would probably be to implement some kind of schema resolution logic (retrieving the type from the header) but in your case I believe you should be able to get a quick solution working as follows:

// ...
consumer.on('message', function (message) {
  var buf = new Buffer(message.value, 'binary'); // Read string into a buffer.
  var decodedMessage = type.fromBuffer(buf.slice(5)); // Skip prefix.
  console.log(decodedMessage);
});

You might also want to take a look at #22 for more information on Kafka messages.

@mtth mtth closed this as completed Jul 25, 2016
@fnozarian
Copy link
Author

Oh! Thanks.
Sorry, it was completely my fault that passed a wrong argument to type.fromBuffer. Thanks again for your help :)

@BryceCicada
Copy link

I also had this problem with kafka-node. I solved it by setting the encoding on the consumer to 'buffer'. For example:

let consumer = new kafka.Consumer(client, [], {encoding:'buffer'});

See https://www.npmjs.com/package/kafka-node#consumer

@joscha
Copy link
Contributor

joscha commented Sep 13, 2024

Just for future reference: I ran into this error when I didn't use a checksum whilst encoding/decoding with a custom compression codec. Once I added the checksum, this exact error disappeared.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants