Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Upgrade pulsar with LightProto support #337

Conversation

BewareMyPower
Copy link
Collaborator

@BewareMyPower BewareMyPower commented Jan 20, 2021

Fixes #336

Besides applying the LightProto generated new package name and some new APIs, this PR also fixes the tests error caused by apache/pulsar#9240, which introduces a new method PulsarService#createLocalMetadataStore.

And some tests are affected by the wrong parse of entry data after the pulsar update, this PR fixes the test failures.

@BewareMyPower BewareMyPower changed the title Upgrade pulsar with LightProto support [WIP] Upgrade pulsar with LightProto support Jan 20, 2021
@jiazhai jiazhai changed the title [WIP] Upgrade pulsar with LightProto support Upgrade pulsar with LightProto support Jan 20, 2021
@jiazhai
Copy link
Contributor

jiazhai commented Jan 20, 2021

remove wip to trigger build

@jiazhai jiazhai changed the title Upgrade pulsar with LightProto support [wip]Upgrade pulsar with LightProto support Jan 20, 2021
@BewareMyPower
Copy link
Collaborator Author

There's a bug that dispatcher doesn't skip the BrokerEntryMetadata. We need to wait until apache/pulsar#9255 being merged.

@BewareMyPower
Copy link
Collaborator Author

I've installed a snapshot pulsar in local maven repository with apache/pulsar#9255. There're still a lot of failed tests that need to resolve. The reason may be related to PulsarEntryFormatter and I'll try to look into the issue.

@BewareMyPower BewareMyPower changed the title [wip]Upgrade pulsar with LightProto support Upgrade pulsar with LightProto support Jan 21, 2021
@BewareMyPower
Copy link
Collaborator Author

Now all tests passed in my local environment with latest pulsar. This PR will be ready to review after bump pulsar to the daily build that includes apache/pulsar#9255.

Currently the pulsar version is 2.8.0-SNAPSHOT for fast fail to avoid running CI.

@BewareMyPower
Copy link
Collaborator Author

BewareMyPower commented Jan 26, 2021

There're two tests failed in CI, it looks like that there're still some deserialize issues with OffsetFinder:

EntryPublishTimeKafkaFormatTest.testPublishTime:113 expected [NONE] but found [UNKNOWN_SERVER_ERROR]

java.lang.IllegalArgumentException: Invalid unknonwn tag type: 7
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:425) ~[pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at org.apache.pulsar.client.impl.MessageImpl.deserialize(MessageImpl.java:209) ~[pulsar-client-original-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at io.streamnative.pulsar.handlers.kop.utils.OffsetFinder.lambda$findMessages$0(OffsetFinder.java:64) ~[pulsar-protocol-handler-kafka-2.8.0-SNAPSHOT.jar:?]
	at io.streamnative.pulsar.handlers.kop.utils.OpFindNewestEntry.readEntryComplete(OpFindNewestEntry.java:69) ~[pulsar-protocol-handler-kafka-2.8.0-SNAPSHOT.jar:?]
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$0(EntryCacheImpl.java:227) ~[managed-ledger-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) [?:1.8.0_282]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) [?:1.8.0_282]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) [?:1.8.0_282]
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [bookkeeper-common-4.12.1.jar:4.12.1]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.51.Final.jar:4.1.51.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
04:17:04.039 [bookkeeper-ml-workers-OrderedExecutor-0-0:io.streamnative.pulsar.handlers.kop.utils.OffsetFinder@92] INFO  io.streamnative.pulsar.handlers.kop.utils.OffsetFinder - [4:0][1611634620347] Found position {} closest to provided timestamp {}
04:17:04.040 [bookkeeper-ml-workers-OrderedExecutor-0-0:io.streamnative.pulsar.handlers.kop.KafkaRequestHandler$1@780] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - Find position for topic persistent://public/default/publishTime-partition-0 time 1611634620347. position: 4:0
04:17:04.040 [bookkeeper-ml-workers-OrderedExecutor-0-0:io.streamnative.pulsar.handlers.kop.KafkaRequestHandler$1@791] ERROR io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [PersistentTopic{topic=persistent://public/default/publishTime-partition-0}] Failed to get offset for position 4:0
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 7
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:425) ~[pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at org.apache.pulsar.client.impl.MessageImpl.deserialize(MessageImpl.java:209) ~[pulsar-client-original-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils$1.readEntryComplete(MessageIdUtils.java:64) ~[pulsar-protocol-handler-kafka-2.8.0-SNAPSHOT.jar:?]
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$0(EntryCacheImpl.java:227) ~[managed-ledger-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) [?:1.8.0_282]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) [?:1.8.0_282]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) [?:1.8.0_282]
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [bookkeeper-common-4.12.1.jar:4.12.1]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.51.Final.jar:4.1.51.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]

KafkaApisTest.testConsumerListOffset:384 expected [NONE] but found [UNKNOWN_SERVER_ERROR]

offset for latest 10
04:19:03.148 [TestNG-method=testConsumerListOffset-1:io.streamnative.pulsar.handlers.kop.utils.OffsetFinder@58] DEBUG io.streamnative.pulsar.handlers.kop.utils.OffsetFinder - [0] Starting message position find at timestamp {}
04:19:03.156 [bookkeeper-ml-workers-OrderedExecutor-1-0:io.streamnative.pulsar.handlers.kop.utils.OffsetFinder@67] ERROR io.streamnative.pulsar.handlers.kop.utils.OffsetFinder - [{}][{}] Error deserializing message for message position find
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:425) ~[pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at org.apache.pulsar.client.impl.MessageImpl.deserialize(MessageImpl.java:209) ~[pulsar-client-original-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at io.streamnative.pulsar.handlers.kop.utils.OffsetFinder.lambda$findMessages$0(OffsetFinder.java:64) ~[pulsar-protocol-handler-kafka-2.8.0-SNAPSHOT.jar:?]
	at io.streamnative.pulsar.handlers.kop.utils.OpFindNewestEntry.readEntryComplete(OpFindNewestEntry.java:69) ~[pulsar-protocol-handler-kafka-2.8.0-SNAPSHOT.jar:?]
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$0(EntryCacheImpl.java:227) ~[managed-ledger-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) [?:1.8.0_282]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) [?:1.8.0_282]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) [?:1.8.0_282]
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [bookkeeper-common-4.12.1.jar:4.12.1]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.51.Final.jar:4.1.51.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
04:19:03.168 [bookkeeper-ml-workers-OrderedExecutor-1-0:io.streamnative.pulsar.handlers.kop.utils.OffsetFinder@92] INFO  io.streamnative.pulsar.handlers.kop.utils.OffsetFinder - [4:0][0] Found position {} closest to provided timestamp {}
04:19:03.169 [bookkeeper-ml-workers-OrderedExecutor-1-0:io.streamnative.pulsar.handlers.kop.KafkaRequestHandler$1@780] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - Find position for topic persistent://public/default/listOffset-partition-0 time 0. position: 4:0
04:19:03.170 [bookkeeper-ml-workers-OrderedExecutor-1-0:io.streamnative.pulsar.handlers.kop.KafkaRequestHandler$1@791] ERROR io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [PersistentTopic{topic=persistent://public/default/listOffset-partition-0}] Failed to get offset for position 4:0
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:425) ~[pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at org.apache.pulsar.client.impl.MessageImpl.deserialize(MessageImpl.java:209) ~[pulsar-client-original-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils$1.readEntryComplete(MessageIdUtils.java:64) ~[pulsar-protocol-handler-kafka-2.8.0-SNAPSHOT.jar:?]
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$0(EntryCacheImpl.java:227) ~[managed-ledger-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) [?:1.8.0_282]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) [?:1.8.0_282]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) [?:1.8.0_282]
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [bookkeeper-common-4.12.1.jar:4.12.1]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.51.Final.jar:4.1.51.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
offset for timestamp=0 -1

@BewareMyPower BewareMyPower force-pushed the bewaremypower/upgrade-pulsar-lightproto branch from 3d5f506 to 6ab7201 Compare January 26, 2021 07:56
@BewareMyPower
Copy link
Collaborator Author

BewareMyPower commented Jan 26, 2021

Since the upgrade introduced (or exposed) some bugs of existed code, I've just added new commits to fix it: 8912a98

PTAL again, @jiazhai

@BewareMyPower BewareMyPower force-pushed the bewaremypower/upgrade-pulsar-lightproto branch from 8912a98 to 3ef7fbc Compare January 26, 2021 10:40
@jiazhai jiazhai merged commit 597b069 into streamnative:master Jan 26, 2021
@BewareMyPower BewareMyPower deleted the bewaremypower/upgrade-pulsar-lightproto branch January 26, 2021 13:00
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] Upgrade pulsar with LightProto support
2 participants