Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

使用示例 consumer 的代码得到乱码消息 #236

Open
wangxiqunlucky opened this issue Sep 27, 2018 · 4 comments
Open

使用示例 consumer 的代码得到乱码消息 #236

wangxiqunlucky opened this issue Sep 27, 2018 · 4 comments

Comments

@wangxiqunlucky
Copy link

wangxiqunlucky commented Sep 27, 2018

你好,我的架构是使用 filebeat 采集日志传输进 kafka 中, 使用了示范中的 consumer 代码取出后出现乱码,
但是使用 bin/kafka-console-producer 来生成消息就没问题.
使用示例中 consumer 取出的乱码数据片段 :
YK╦÷µÊÐ┤ë} ¢w▀{Ñk│ÁtZ\t)#\x10ëÂ\x13«ÒñªÄƒ▒ØÁ]Uë\n
ìn\f\t·e\x1F&┴\x00MúÊé║ò¡@ÚÇ~(Ù$@b\x08P\x11\x12Ü║¯\v¹Rÿ\x00mBÒ^█Î)q\x12┐{│7ƒù$ë"█±ƒ°×▀9þ×\x7FÎÎ]º¥:▓ø¤«Ïy¢©t²Þ¬\x7FuØzK|ë[\x07Å$ÂVrÒ┘r%=^L°\té░█ç╝>┬ç\x10¸)¸\tOz.²bó7▒u<[IÅñ+Úä\x7F$▒/ø«ê╗ŵ‗┘Û┼ÌDÕp1+n\x19\t2Ô╩²┘R9\x17\x14─u'╔ÆH■9(µ2Ôz!{░v±ho²IÄ$÷\x07ÕJ!=.\x1F×\x0F2Ú╝╝×¼^\x1A\tãË╣┬îO8þ\x03─ô╦█õôÀ║_9ÿ(eõ=R\x07âÊüT>\x18+º\n
c╣┬íTb<[¿$ËÖLÂ\x16\x0F\e\x13»\x1AîÄû│Ôy1▒¢ÌäXÀrzL>\x18{$ë\x1D7Ú$1ßVƒ°ÌMX¬┐XJë\x15Õ>B>v|ýY}ê#┤Î┌ôÏ▒}╚Jݤµ¾üu¸ðð«\x14NÔ=\tï!f1nÝFIäÞ^K▄»O▄║'1\x10<É╦þË);뼧├╣┬Hp░l}f╚\x12/À╔\x1A\x1E\x1CvXÅı_,µ│├┘};sòöMyÆ:Í·Øw\x0F\r|║Î╩þ\x0Ed¡\x1D┘╠üáÃ║k\x7F)\x18¤ªl×DIÔ╣R╝ÍþÊúÚR«■¿Û\vèù\x15´6W(NT\x17▒.\╣\x06b╔èÑá\╠f*Ai·▀ÄÍi\x13£uîûo§_l\x02nW;ü3ö5s¿Â¼\x19ØIÍ®┘äÝ─PÏS\n
▄By\r¶;┤\x02+Ô:$q'^¾\x7F\vï8\x1Dj(‗\tÐóå─ÜÜ8Ü}Ãq\x15q7HÔ║w\x7F´╗å─▒hêïº\x1A\e\v─ÕÜÂYjÖ×mãÝÍ2E▄2I\x1C║|▀\x15X─-­ıƒ[wfQÀ\x18½\x14A╝aÒÌ%ë¾╬^9\n
ï©ã┐J▄໲ïâ?E▄ìÆ©o²Ó4âE\x1Cê§î\x16t\x00±ªí┴b«;ÅX§¦29▓ò¢·5├õ╚B&╬L \f\x13ªOÙóIÄtJÓ.]}e\x02\x16pæ╩\vÇ\x17]\ü\x11úñ▒ï{Å\x04¯O╗
°2,Ó"^\x00øÆêíÄw·%\x02¿\x15p╦%p\x7F\x1DX÷<,Ó@¿sâON\x1DM>ëOÝV|\x12ä,J%ƒÏÒÝþSOwì]Û{%p┐╗‗╩Ñ\x06pOC\x00NG║L³\x0E']ºj}©█Ú┬▓>Q{\x14ø┌H\x01¸>\t▄‗×=eX└\x19¥1█Í▄»\vX1ÊÇı]éuק¡¸KÓN\x16\x1F╔\x108;\x1AÓÇ»×®F8£╗┌\x1AAÿåF8\x1EPìP└u╔D▄å'?§½&Ó┬%Ô"\x02.╝L░Å\x1CƒÔ02┴╠¡\n

使用 gzdecode 尝试后, 得到 仍然带有部分乱码的片段 :
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02bIè;ƒ\x01\x00\x00\x00\x01f\x19ö\x0E®ÿÿÿÿ\x00\x00\x02L{"@timestamp":"2018-09-27T07:37:27.982Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.4.0","topic":"newtopic"},"input":{"type":"log"},"prospector":{"type":"log"},"beat":{"name":"localhost.localdomain","hostname":"localhost.localdomain","version":"6.4.0"},"host":{"name":"localhost.localdomain"}
虽然借鉴使用了其他 issues 提到的 gzdecode, 但是还是会发生乱码情况.

使用的各部分版本号 :
filebeat v6.4.0
kafka 2.0.0
php v7.0.11
kafka-php v0.2.0.8

@php-cpm
Copy link

php-cpm commented Oct 17, 2018

kafka 1.1.1
filebeat 6.4.0
php 7.1.11
kafka-php v0.2.0.8

我定位问题出现在
filebeat传输给 kafka 的时候默认配置compression: gzip 所以数据是压缩的,但是 php 的 gzdecode 还原出来的数据仍有乱码,如果 filebeat 收集发给 kafka 时设置 compression: none 就没有问题。

kafka-php 中 nmred/kafka-php/src/Kafka/Protocol/Protocol.php 方法 decodeString($data, $bytes, $compression = self::COMPRESSION_NONE)

第三个参数可以根据压缩类型处理字符串,但是现在项目中都没有用到。

陷入困境。

@php-cpm
Copy link

php-cpm commented Oct 17, 2018

nmred/kafka-php 更新到 dev-master 似乎解决了,继续研究中。

@wangxiqunlucky
Copy link
Author

kafka 1.1.1
filebeat 6.4.0
php 7.1.11
kafka-php v0.2.0.8

我定位问题出现在
filebeat传输给 kafka 的时候默认配置compression: gzip 所以数据是压缩的,但是 php 的 gzdecode 还原出来的数据仍有乱码,如果 filebeat 收集发给 kafka 时设置 compression: none 就没有问题。

kafka-php 中 nmred/kafka-php/src/Kafka/Protocol/Protocol.php 方法 decodeString($data, $bytes, $compression = self::COMPRESSION_NONE)

第三个参数可以根据压缩类型处理字符串,但是现在项目中都没有用到。

陷入困境。

你好, 是的,我也发现该问题,如果使用 compression: none 就没有问题, 至于你说的 ecodeString($data, $bytes, $compression = self::COMPRESSION_NONE) 这个是可以手动修改指明压缩方式从而解决该问题吗?

@php-cpm
Copy link

php-cpm commented Oct 25, 2018

我临时的解决方案是 filebeat 既然是 json
我就手动 gzdecode 之后用 { 截取后面正常的数据然后做处理
我后来改用 dev-master 版本似乎可以识别是否压缩并返回正常数据,但引入了新问题,取不到消费的偏移量,每次消费完以后又重复消费了

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants