Skip to content

Commit

Permalink
GEOMESA-3349 Kafka - ensure topic compaction on catalog topic at startup
Browse files Browse the repository at this point in the history
Co-authored-by: Kyle Stamper <kstamper@ccri.com>
Co-authored-by: Brady Pittman <brady.pittman@ga-ccri.com>
Co-authored-by: Michael Ronquest <ronquest@ccri.com>
  • Loading branch information
4 people committed Apr 5, 2024
1 parent f53028a commit 1a606e6
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

package org.locationtech.geomesa.kafka.data

import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InterruptException, WakeupException}
import org.locationtech.geomesa.index.metadata.{KeyValueStoreMetadata, MetadataSerializer}
import org.locationtech.geomesa.kafka.data.KafkaDataStore.KafkaDataStoreConfig
Expand Down Expand Up @@ -38,20 +39,44 @@ import scala.util.control.NonFatal
class KafkaMetadata[T](val config: KafkaDataStoreConfig, val serializer: MetadataSerializer[T])
extends KeyValueStoreMetadata[T] {

import KafkaMetadata.{CompactCleanupPolicy, CleanupPolicyConfig}
import org.apache.kafka.clients.consumer.ConsumerConfig.{AUTO_OFFSET_RESET_CONFIG, GROUP_ID_CONFIG}

import scala.collection.JavaConverters._

private val producer = new LazyProducer(KafkaDataStore.producer(config.brokers, config.producers.properties))
private val consumer = new LazyCloseable(new TopicMap())

override protected def checkIfTableExists: Boolean =
adminClientOp(_.listTopics().names().get.contains(config.catalog))
override protected def checkIfTableExists: Boolean = {
adminClientOp { adminClient=>
val exists = adminClient.listTopics().names().get.contains(config.catalog)
// ensure that the topic has compaction enabled, in case it was created externally
if (exists && !checkCompactionPolicy(adminClient)){
setCompactionPolicy(adminClient)
}
exists
}
}

private def checkCompactionPolicy(kClient: AdminClient): Boolean = {
val cr = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, config.catalog))
val catalogConfigs = kClient.describeConfigs(cr).all().get().values()
catalogConfigs.asScala.exists { config =>
config.get(CleanupPolicyConfig) != null && config.get(CleanupPolicyConfig).value() == CompactCleanupPolicy
}
}

private def setCompactionPolicy(kClient: AdminClient): Unit ={
val catalogResource = new ConfigResource(ConfigResource.Type.TOPIC, config.catalog)
val catalogConfigEntry = new ConfigEntry(CleanupPolicyConfig, CompactCleanupPolicy)
val alterOps = Collections.singleton(new AlterConfigOp(catalogConfigEntry, AlterConfigOp.OpType.SET))
kClient.incrementalAlterConfigs(Collections.singletonMap(catalogResource, alterOps), new AlterConfigsOptions()).all().get()
}

override protected def createTable(): Unit = {
val newTopic =
new NewTopic(config.catalog, 1, config.topics.replication.toShort)
.configs(Collections.singletonMap("cleanup.policy", "compact"))
.configs(Collections.singletonMap(CleanupPolicyConfig, CompactCleanupPolicy))
adminClientOp(_.createTopics(Collections.singletonList(newTopic)).all().get)
}

Expand Down Expand Up @@ -234,3 +259,8 @@ class KafkaMetadata[T](val config: KafkaDataStoreConfig, val serializer: Metadat
}
}
}

object KafkaMetadata {
private val CleanupPolicyConfig = "cleanup.policy"
private val CompactCleanupPolicy = "compact"
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
package org.locationtech.geomesa.kafka.data

import com.codahale.metrics.{MetricRegistry, ScheduledReporter}
import org.apache.commons.lang3.StringUtils
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import org.apache.kafka.common.config.ConfigResource
import org.geotools.api.data._
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
Expand Down Expand Up @@ -1032,6 +1033,34 @@ class KafkaDataStoreTest extends KafkaContainerTest with Mockito {
ds.dispose()
}
}

"update compaction policy for catalog topics if not set" in {
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
val path = getUniquePath
val topic = StringUtils.strip(path, " /").replace("/", "-")
//Create the topic
WithClose(AdminClient.create(props)) { admin =>
val newTopic = new NewTopic(topic, 1, 1.toShort)
admin.createTopics(Collections.singletonList(newTopic)).all().get
}
val ds = getStore(path, 0)
try {
ds.getTypeNames()
//Verify the compaction policy
WithClose(AdminClient.create(props)) { admin =>
val configs =
admin.describeConfigs(Collections.singletonList(new ConfigResource(ConfigResource.Type.TOPIC, topic)))
val config = configs.values().get(new ConfigResource(ConfigResource.Type.TOPIC, topic)).get()
config must not(beNull)
config.entries().asScala.map(e => e.name() -> e.value()).toMap must
containAllOf(Seq("cleanup.policy" -> "compact"))
}
} finally {
ds.dispose()
}
}

}

"KafkaDataStoreFactory" should {
Expand Down

0 comments on commit 1a606e6

Please sign in to comment.