Skip to content

Commit

Permalink
support getting topic defaults from AdminClient
Browse files Browse the repository at this point in the history
  • Loading branch information
norwood committed Dec 4, 2017
1 parent 9204197 commit cafc9d0
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 16 deletions.
Expand Up @@ -42,7 +42,8 @@ public enum Type {
*/
public ConfigResource(Type type, String name) {
Objects.requireNonNull(type, "type should not be null");
Objects.requireNonNull(name, "name should not be null");
if (type != Type.TOPIC)
Objects.requireNonNull(name, "name should not be null");
this.type = type;
this.name = name;
}
Expand All @@ -69,15 +70,13 @@ public boolean equals(Object o) {
return false;

ConfigResource that = (ConfigResource) o;

return type == that.type && name.equals(that.name);
return type == that.type &&
Objects.equals(name, that.name);
}

@Override
public int hashCode() {
int result = type.hashCode();
result = 31 * result + name.hashCode();

This comment has been minimized.

Copy link
@freyayunfu

freyayunfu Jan 17, 2019

Why it was like that?

This comment has been minimized.

Copy link
@norwood

norwood Jan 17, 2019

Author Owner

this is the old way of making hashcode

return result;
return Objects.hash(type, name);
}

@Override
Expand Down
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;

import static org.apache.kafka.common.protocol.types.Type.INT8;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
import static org.apache.kafka.common.protocol.types.Type.STRING;

public class DescribeConfigsRequest extends AbstractRequest {
Expand All @@ -42,7 +43,7 @@ public class DescribeConfigsRequest extends AbstractRequest {

private static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new Schema(
new Field(RESOURCE_TYPE_KEY_NAME, INT8),
new Field(RESOURCE_NAME_KEY_NAME, STRING),
new Field(RESOURCE_NAME_KEY_NAME, NULLABLE_STRING),
new Field(CONFIG_NAMES_KEY_NAME, ArrayOf.nullable(STRING)));

private static final Schema DESCRIBE_CONFIGS_REQUEST_V0 = new Schema(
Expand Down
Expand Up @@ -58,7 +58,7 @@ public class DescribeConfigsResponse extends AbstractResponse {
ERROR_CODE,
ERROR_MESSAGE,
new Field(RESOURCE_TYPE_KEY_NAME, INT8),
new Field(RESOURCE_NAME_KEY_NAME, STRING),
new Field(RESOURCE_NAME_KEY_NAME, NULLABLE_STRING),
new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(new Schema(
new Field(CONFIG_NAME_KEY_NAME, STRING),
new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.kafka.common.requests;

import java.util.Objects;

public final class Resource {
private final ResourceType type;
private final String name;
Expand All @@ -42,15 +44,13 @@ public boolean equals(Object o) {
return false;

Resource resource = (Resource) o;

return type == resource.type && name.equals(resource.name);
return type == resource.type &&
Objects.equals(name, resource.name);
}

@Override
public int hashCode() {
int result = type.hashCode();
result = 31 * result + name.hashCode();
return result;
return Objects.hash(type, name);
}

@Override
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/kafka/server/AdminManager.scala
Expand Up @@ -306,9 +306,13 @@ class AdminManager(val config: KafkaConfig,

case ResourceType.TOPIC =>
val topic = resource.name
Topic.validate(topic)
// Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
val topicProps =
if (topic == null) new Properties
else {
Topic.validate(topic)
// Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
}
val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
createResponseConfig(logConfig, isReadOnly = false, name => !topicProps.containsKey(name))

Expand Down

0 comments on commit cafc9d0

Please sign in to comment.