Skip to content

Commit

Permalink
Add describeLogDirs method to AdminClient (#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
trobert committed Oct 23, 2021
1 parent 9f33eb3 commit d9aed5f
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
34 changes: 34 additions & 0 deletions src/main/scala/zio/kafka/admin/AdminClient.scala
Expand Up @@ -14,17 +14,20 @@ import org.apache.kafka.clients.admin.{
ListConsumerGroupOffsetsOptions => JListConsumerGroupOffsetsOptions,
ListConsumerGroupsOptions => JListConsumerGroupsOptions,
ListOffsetsOptions => JListOffsetsOptions,
LogDirDescription => JLogDirDescription,
MemberDescription => JMemberDescription,
NewPartitions => JNewPartitions,
NewTopic => JNewTopic,
OffsetSpec => JOffsetSpec,
ReplicaInfo => JReplicaInfo,
TopicDescription => JTopicDescription,
TopicListing => JTopicListing,
_
}
import org.apache.kafka.clients.admin.ListOffsetsResult.{ ListOffsetsResultInfo => JListOffsetsResultInfo }
import org.apache.kafka.clients.consumer.{ OffsetAndMetadata => JOffsetAndMetadata }
import org.apache.kafka.common.config.{ ConfigResource => JConfigResource }
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.{
ConsumerGroupState => JConsumerGroupState,
IsolationLevel => JIsolationLevel,
Expand Down Expand Up @@ -165,6 +168,13 @@ trait AdminClient {
* Remove the specified members from a consumer group.
*/
def removeMembersFromConsumerGroup(groupId: String, membersToRemove: Set[String]): Task[Unit]

/**
* Describe the log directories of the specified brokers
*/
def describeLogDirs(
brokersId: Iterable[Int]
): ZIO[Any, Throwable, Map[Int, Map[String, LogDirDescription]]]
}

object AdminClient {
Expand Down Expand Up @@ -436,6 +446,17 @@ object AdminClient {
)
).unit
}

override def describeLogDirs(
brokersId: Iterable[Int]
): ZIO[Any, Throwable, Map[Int, Map[String, LogDirDescription]]] =
fromKafkaFuture(
blocking.effectBlocking(
adminClient.describeLogDirs(brokersId.map(Int.box).asJavaCollection).allDescriptions()
)
).map(
_.asScala.toMap.bimap(_.intValue, _.asScala.toMap.bimap(identity, LogDirDescription(_)))
)
}

val live: ZLayer[Has[Blocking.Service] with Has[AdminClientSettings], Throwable, Has[AdminClient]] =
Expand Down Expand Up @@ -788,6 +809,19 @@ object AdminClient {
KafkaConfig(jConfig.entries().asScala.map(e => e.name() -> e).toMap)
}

case class LogDirDescription(error: ApiException, replicaInfos: Map[TopicPartition, ReplicaInfo])

object LogDirDescription {
def apply(ld: JLogDirDescription): LogDirDescription =
LogDirDescription(ld.error(), ld.replicaInfos().asScala.toMap.bimap(TopicPartition(_), ReplicaInfo(_)))
}

case class ReplicaInfo(size: Long, offsetLag: Long, isFuture: Boolean)

object ReplicaInfo {
def apply(ri: JReplicaInfo): ReplicaInfo = ReplicaInfo(ri.size(), ri.offsetLag(), ri.isFuture)
}

def make(settings: AdminClientSettings): ZManaged[Has[Blocking.Service], Throwable, AdminClient] =
ZManaged.service[Blocking.Service].flatMap { blocking =>
ZManaged.make(
Expand Down
15 changes: 15 additions & 0 deletions src/test/scala/zio/kafka/AdminSpec.scala
Expand Up @@ -296,6 +296,21 @@ object AdminSpec extends DefaultRunnableSpec {
description <- getStableConsumerGroupDescription(groupId)
} yield assert(description.groupId)(equalTo(groupId)) && assert(description.members.length)(equalTo(1))
}
},
testM("describe log dirs") {
KafkaTestUtils.withAdmin { implicit admin =>
for {
topicName <- randomTopic
_ <- admin.createTopic(AdminClient.NewTopic(topicName, numPartitions = 1, replicationFactor = 1))
node <- admin.describeClusterNodes().head.orElseFail(new NoSuchElementException())
logDirs <- admin.describeLogDirs(List(node.id))
} yield assert(logDirs)(
hasKey(
node.id,
hasValues(exists(hasField("replicaInfos", _.replicaInfos, hasKey(TopicPartition(topicName, 0)))))
)
)
}
}
).provideSomeLayerShared[TestEnvironment](Kafka.embedded.mapError(TestFailure.fail) ++ Clock.live) @@ sequential

Expand Down

0 comments on commit d9aed5f

Please sign in to comment.