-
Notifications
You must be signed in to change notification settings - Fork 54
Description
Describe the bug:
Hi, I think that I found a little bug in mop authorization module.
Pulsar version: 3.0.5
MoP version: 3.0.5.8
Configuration: in mop, when I set the authorization switch on ( mqttAuthorizationEnabled=true ), sending topic more than three levels, like persistent://public/default/a/b, persistent://public/default/a/b/c, always receives unauthorized error.To reproduce this issue, do not use superuser token.
messagingProtocols=mqtt
protocolHandlerDirectory=/home/oneby/software/pulsar/apache-pulsar-3.0.5/protocols
mqttListeners=mqtt://127.0.0.1:1883
advertisedAddress=127.0.0.1
mqttAuthenticationEnabled=true
mqttAuthenticationMethods=token
mqttAuthorizationEnabled=trueSolution:
I think the problem is in MQTTBrokerProtocolMethodProcessor, and I tried to fix it, it seems worked.
I paste the code that I fixed below, I think it is a wrong way to directly call TopicName.get(msg.variableHeader().topicName()) etc. The mqtt topic name is seperated by /, we need to transfer mqtt topic name to pulsar topic name, thus url encode is needed.
- processPublish method:
final String pulsarTopicName = PulsarTopicUtils.getEncodedPulsarTopicName(msg.variableHeader().topicName(),
configuration.getDefaultTenant(), configuration.getDefaultNamespace(),
TopicDomain.getEnum(configuration.getDefaultTopicDomain()));
result = this.authorizationService.canProduceAsync(TopicName.get(pulsarTopicName),
connection.getUserRole(), connection.getAuthData())
.thenCompose(authorized -> authorized ? doPublish(adapter) : doUnauthorized(adapter));- processSubscribe method:
Precondition, persistent://public/default/%2F%23 (mqtt wildcard pattern is persistent://public/default/#) persistent://public/default/a%2F%23 (mqtt wildcard pattern is persistent://public/default/a/#) etc. These topics can not exist, or it will cause authorization leak problems.
I use this trick to do namespace level authorization when using mqtt wildcard, but tenant admins still receives an unauthorized error(there's no code to deal with tenant admins). In my scenario, mqtt topic names are generated by my application, authorization leak problems will not occur. Hopes the codes can be further optimized by refactoring the authorization module.
for (MqttTopicSubscription topic: msg.payload().topicSubscriptions()) {
final String pulsarTopicName = PulsarTopicUtils.getEncodedPulsarTopicName(topic.topicName(),
configuration.getDefaultTenant(), configuration.getDefaultNamespace(),
TopicDomain.getEnum(configuration.getDefaultTopicDomain()));
authorizationFutures.add(this.authorizationService.canConsumeAsync(TopicName.get(pulsarTopicName),
userRole, connection.getAuthData(), userRole).thenAccept((authorized) -> {
if (!authorized) {
authorizedFlag.set(false);
log.warn("[Subscribe] no authorization to sub topic={}, userRole={}, CId= {}",
topic.topicName(), userRole, clientId);
}
}));
}