Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[Schema Registry] Force authorization when kafkaEnableMultiTenantMeta…
Browse files Browse the repository at this point in the history
…data is false

### Motivation

Currently when the schema registry and authorization are both enabled,
the authorization is only performed when
`kafkaEnableMultiTenantMetadata` is false. It's not reasonable because
the role of the token must have the write permission to the schema
registry topic.

### Modifications

- In `performAuthorizationValidation`, do not chec
  `kafkaEnableMultiTenantMetadata`.
- Add the `testSchemaWrongAuth` to verify that a wrong role cannot be
  used to create an Avro produce.
- Separate the default namespace and the default schema namespace in
  `KafkaAuthorizationTestBase` so that the permission requirements
  will be clear.
  • Loading branch information
BewareMyPower committed Jul 14, 2023
1 parent 1dda879 commit fcde386
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
Expand Down Expand Up @@ -135,16 +136,17 @@ public String authenticate(FullHttpRequest request) throws SchemaStorageExceptio

private void performAuthorizationValidation(String username, String role, String tenant)
throws SchemaStorageException {
if (kafkaConfig.isAuthorizationEnabled() && kafkaConfig.isKafkaEnableMultiTenantMetadata()) {
if (kafkaConfig.isAuthorizationEnabled()) {
KafkaPrincipal kafkaPrincipal =
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, role, username, null, null);
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, role, username, null,
new AuthenticationDataSource() {});
String topicName = MetadataUtils.constructSchemaRegistryTopicName(tenant, kafkaConfig);
try {
Boolean tenantExists =
authorizer.canAccessTenantAsync(kafkaPrincipal, Resource.of(ResourceType.TENANT, tenant))
.get();
if (tenantExists == null || !tenantExists) {
log.debug("SchemaRegistry username {} role {} tenant {} does not exist",
log.debug("SchemaRegistry username {} role {} tenant {} does not exist {}",
username, role, tenant, topicName);
throw new SchemaStorageException("Role " + role + " cannot access topic " + topicName + " "
+ "tenant " + tenant + " does not exist (wrong username?)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public abstract class KafkaAuthorizationTestBase extends KopProtocolHandlerTestB

protected static final String TENANT = "KafkaAuthorizationTest";
protected static final String NAMESPACE = "ns1";
private static final String SCHEMA_NAMESPACE = "ns2";
private static final String SHORT_TOPIC = "topic1";
private static final String TOPIC = "persistent://" + TENANT + "/" + NAMESPACE + "/" + SHORT_TOPIC;
private static final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
Expand Down Expand Up @@ -120,7 +121,7 @@ protected void setup() throws Exception {
conf.setKafkaMetadataNamespace("__kafka");
conf.setKafkaTenant(TENANT);
conf.setKafkaNamespace(NAMESPACE);
conf.setKopSchemaRegistryNamespace(NAMESPACE);
conf.setKopSchemaRegistryNamespace(SCHEMA_NAMESPACE);

conf.setClusterName(super.configClusterName);
conf.setAuthorizationEnabled(true);
Expand Down Expand Up @@ -712,18 +713,16 @@ public static Object[][] tokenPrefix() {
// this test creates the schema registry topic, and this may interfere with other tests
@Test(timeOut = 30000, priority = 1000, dataProvider = "tokenPrefix")
public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Exception {

if (conf.isKafkaEnableMultiTenantMetadata()) {
// ensure that the KOP metadata namespace exists and that the user can write to it
// because we require "produce" permissions on the Schema Registry Topic
// while working in Multi Tenant mode
if (!admin.namespaces().getNamespaces(TENANT).contains(TENANT + "/" + conf.getKafkaMetadataNamespace())) {
admin.namespaces().createNamespace(TENANT + "/" + conf.getKafkaMetadataNamespace());
}
admin.namespaces()
.grantPermissionOnNamespace(TENANT + "/" + conf.getKafkaMetadataNamespace(), SIMPLE_USER,
Sets.newHashSet(AuthAction.produce, AuthAction.consume));
admin.namespaces().grantPermissionOnNamespace(conf.getKafkaTenant() + "/" + conf.getKafkaNamespace(),
SIMPLE_USER, Sets.newHashSet(AuthAction.produce, AuthAction.consume));
final String tenant = (conf.isKafkaEnableMultiTenantMetadata() ? TENANT : conf.getKafkaMetadataTenant());
final var namespaces = admin.namespaces().getNamespaces(tenant);
final String schemaNamespace = tenant + "/" + conf.getKopSchemaRegistryNamespace();
if (!namespaces.contains(schemaNamespace)) {
admin.namespaces().createNamespace(schemaNamespace);
}
admin.namespaces().grantPermissionOnNamespace(schemaNamespace, SIMPLE_USER,
Sets.newHashSet(AuthAction.produce));

String topic = "SchemaRegistryTest-testAvroProduceAndConsumeWithAuth" + withTokenPrefix;
IndexedRecord avroRecord = createAvroRecord();
Expand Down Expand Up @@ -759,7 +758,7 @@ public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Ex

@Test(timeOut = 30000)
public void testSchemaNoAuth() {
final KafkaProducer<Integer, Object> producer = createAvroProducer(false, false);
final KafkaProducer<Integer, Object> producer = createAvroProducer(false, null);
try {
producer.send(new ProducerRecord<>("test-avro-wrong-auth", createAvroRecord())).get();
fail();
Expand All @@ -772,6 +771,22 @@ public void testSchemaNoAuth() {
producer.close();
}

@Test(timeOut = 30000)
public void testSchemaWrongAuth() {
final var wrongToken = AuthTokenUtils.createToken(secretKey, "wrong-user", Optional.empty());
final KafkaProducer<Integer, Object> producer = createAvroProducer(false, wrongToken);
try {
producer.send(new ProducerRecord<>("test-avro-wrong-auth", createAvroRecord())).get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof RestClientException);
var restException = (RestClientException) e.getCause();
assertEquals(restException.getErrorCode(), HttpResponseStatus.FORBIDDEN.code());
assertTrue(restException.getMessage().contains("cannot access topic"));
}
producer.close();
}

private IndexedRecord createAvroRecord() {
String userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", "
+ "\"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}";
Expand All @@ -783,10 +798,10 @@ private IndexedRecord createAvroRecord() {
}

private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefix) {
return createAvroProducer(withTokenPrefix, true);
return createAvroProducer(withTokenPrefix, userToken);
}

private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefix, boolean withSchemaToken) {
private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefix, String schemaToken) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getClientPort());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
Expand All @@ -803,10 +818,10 @@ private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefi
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");

if (withSchemaToken) {
if (schemaToken != null) {
props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG,
username + ":" + (withTokenPrefix ? password : userToken));
username + ":" + (withTokenPrefix ? password : schemaToken));
}

return new KafkaProducer<>(props);
Expand Down

0 comments on commit fcde386

Please sign in to comment.