Skip to content
Permalink
Browse files

Connect to Zookeeper using EnsembleProvider instead of connection str…

…ing (#975)
  • Loading branch information...
nbroyles committed May 13, 2019
1 parent 6f9920e commit 28b36014ca1e003d49b881bce28960778bfba1f6
@@ -24,12 +24,12 @@ class ZkTestModule(
override fun configure() {
val keystorePath = this::class.java.getResource("/zookeeper/keystore.jks").path
val truststorePath = this::class.java.getResource("/zookeeper/truststore.jks").path

install(ZookeeperModule(ZookeeperConfig(
val config = ZookeeperConfig(
zk_connect = "127.0.0.1:$zkPortKey",
cert_store = CertStoreConfig(keystorePath, "changeit", SslLoader.FORMAT_JKS),
trust_store = TrustStoreConfig(truststorePath, "changeit", SslLoader.FORMAT_JKS)),
qualifier))
trust_store = TrustStoreConfig(truststorePath, "changeit", SslLoader.FORMAT_JKS))

install(ZookeeperModule(config, qualifier))

multibind<Service>().toInstance(StartZookeeperService(qualifier))
val curator = getProvider(keyOf<CuratorFramework>(qualifier))
@@ -0,0 +1,48 @@
package misk.clustering.zookeeper

import com.google.common.util.concurrent.Service
import com.google.inject.Key
import com.google.inject.Provides
import misk.clustering.lease.LeaseManager
import misk.concurrent.ExecutorServiceModule
import misk.inject.KAbstractModule
import misk.inject.asSingleton
import misk.inject.toKey
import misk.tasks.RepeatedTaskQueue
import misk.zookeeper.CuratorFrameworkModule
import java.time.Clock
import java.util.concurrent.ExecutorService
import javax.inject.Singleton


class ZkLeaseCommonModule(private val config: ZookeeperConfig) : KAbstractModule() {
override fun configure() {
install(ZkLeaseManagerModule())
install(ExecutorServiceModule.withFixedThreadPool(ForZkLease::class, "zk-lease-poller", 1))
install(CuratorFrameworkModule(config, ForZkLease::class))
}

companion object {
/** @property Key<*> the Key of the lease manager service */
val leaseManagerKey: Key<*> = Key.get(ZkLeaseManager::class.java)
}

@Provides @ForZkLease @Singleton
fun provideTaskQueue(
clock: Clock,
@ForZkLease executorService: ExecutorService
): RepeatedTaskQueue {
return RepeatedTaskQueue("zk-lease-poller", clock, executorService)
}
}

/**
* Common bindings between [ZkLeaseCommonModule] and [ZkLeaseTestModule].
*/
internal class ZkLeaseManagerModule : KAbstractModule() {
override fun configure() {
multibind<Service>().to<ZkLeaseManager>()
multibind<Service>().to(RepeatedTaskQueue::class.toKey(ForZkLease::class)).asSingleton()
bind<LeaseManager>().to<ZkLeaseManager>()
}
}
@@ -37,7 +37,7 @@ internal class ZkLeaseManager @Inject internal constructor(
@ForZkLease curator: CuratorFramework
) : AbstractExecutionThreadService(), LeaseManager, DependentService {
override val consumedKeys = setOf(keyOf<ZkService>(ForZkLease::class), keyOf<Cluster>())
override val producedKeys = setOf(ZkLeaseModule.leaseManagerKey)
override val producedKeys = setOf(ZkLeaseCommonModule.leaseManagerKey)

internal val leaseNamespace = "$SERVICES_NODE/${appName.asZkNamespace}/leases"
internal val client = lazy { curator.usingNamespace(leaseNamespace) }
@@ -1,50 +1,14 @@
package misk.clustering.zookeeper

import com.google.common.util.concurrent.Service
import com.google.inject.Key
import com.google.inject.Provides
import misk.clustering.lease.LeaseManager
import misk.concurrent.ExecutorServiceModule
import misk.inject.KAbstractModule
import misk.inject.asSingleton
import misk.inject.toKey
import misk.tasks.RepeatedTaskQueue
import misk.zookeeper.ZookeeperModule
import java.time.Clock
import java.util.concurrent.ExecutorService
import javax.inject.Singleton
import misk.zookeeper.FixedEnsembleProviderModule

/**
* Binds a [LeaseManager] that uses Zookeeper.
*/
class ZkLeaseModule(private val config: ZookeeperConfig) : KAbstractModule() {
override fun configure() {
install(ZkLeaseCommonModule())
install(ExecutorServiceModule.withFixedThreadPool(ForZkLease::class, "zk-lease-poller", 1))
install(ZookeeperModule(config, ForZkLease::class))
install(ZkLeaseCommonModule(config))
install(FixedEnsembleProviderModule(config, ForZkLease::class))
}

companion object {
/** @property Key<*> the Key of the lease manager service */
val leaseManagerKey: Key<*> = Key.get(ZkLeaseManager::class.java)
}

@Provides @ForZkLease @Singleton
fun provideTaskQueue(
clock: Clock,
@ForZkLease executorService: ExecutorService
): RepeatedTaskQueue {
return RepeatedTaskQueue("zk-lease-poller", clock, executorService)
}
}

/**
* Common bindings between [ZkLeaseModule] and [ZkLeaseTestModule].
*/
internal class ZkLeaseCommonModule : KAbstractModule() {
override fun configure() {
multibind<Service>().to<ZkLeaseManager>()
multibind<Service>().to(RepeatedTaskQueue::class.toKey(ForZkLease::class)).asSingleton()
bind<LeaseManager>().to<ZkLeaseManager>()
}
}
}
@@ -0,0 +1,45 @@
package misk.zookeeper

import com.google.common.util.concurrent.Service
import misk.clustering.zookeeper.ZookeeperConfig
import misk.inject.KAbstractModule
import misk.inject.asSingleton
import misk.inject.keyOf
import org.apache.curator.ensemble.EnsembleProvider
import org.apache.curator.framework.CuratorFramework
import javax.inject.Provider
import kotlin.reflect.KClass

/**
* Binds a [CuratorFramework] for an application to use. A [Service] is also installed to manage
* the lifecycle of the [CuratorFramework].
*
* If an application needs to connect to multiple Zookeepers, an optional qualifier can be passed
* resulting in an annotated [CuratorFramework] binding.
*
* ```
* @Qualifier
* annotation class MyZk
*
* install(CuratorFrameworkModule(config, MyZk::class)
*
* @Inject @MyZk CuratorFramework
* ```
*/
class CuratorFrameworkModule(
private val config: ZookeeperConfig,
private val qualifier: KClass<out Annotation>? = null
) : KAbstractModule() {
override fun configure() {
val ensembleProvider = getProvider(keyOf<EnsembleProvider>(qualifier))
bind(keyOf<CuratorFramework>(qualifier)).toProvider(
CuratorFrameworkProvider(config, ensembleProvider)).asSingleton()
val curator = getProvider(keyOf<CuratorFramework>(qualifier))
bind(keyOf<ZkService>(qualifier)).toProvider(object : Provider<ZkService> {
override fun get(): ZkService {
return ZkService(curator.get(), qualifier)
}
}).asSingleton()
multibind<Service>().to(keyOf<ZkService>(qualifier))
}
}
@@ -3,6 +3,7 @@ package misk.zookeeper
import com.google.common.util.concurrent.ThreadFactoryBuilder
import misk.clustering.zookeeper.ZookeeperConfig
import misk.clustering.zookeeper.asZkPath
import org.apache.curator.ensemble.EnsembleProvider
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.api.ACLProvider
@@ -28,7 +29,8 @@ const val DEFAULT_PERMS = ZooDefs.Perms.READ or
const val SHARED_DIR_PERMS = ZooDefs.Perms.READ or ZooDefs.Perms.WRITE or ZooDefs.Perms.CREATE

internal class CuratorFrameworkProvider @Inject internal constructor(
private val config: ZookeeperConfig
private val config: ZookeeperConfig,
private val ensembleProvider: Provider<EnsembleProvider>
) : Provider<CuratorFramework> {

override fun get(): CuratorFramework {
@@ -45,7 +47,6 @@ internal class CuratorFrameworkProvider @Inject internal constructor(
// Uses reasonable default values from http://curator.apache.org/getting-started.html
val retryPolicy = ExponentialBackoffRetry(1000, 3)
return CuratorFrameworkFactory.builder()
.connectString(config.zk_connect)
.retryPolicy(retryPolicy)
.sessionTimeoutMs(config.session_timeout_msecs)
.canBeReadOnly(false)
@@ -82,6 +83,7 @@ internal class CuratorFrameworkProvider @Inject internal constructor(
return defaultAcl
}
})
.ensembleProvider(ensembleProvider.get())
.build()
}
}
@@ -0,0 +1,22 @@
package misk.zookeeper

import misk.clustering.zookeeper.ZookeeperConfig
import misk.inject.KAbstractModule
import misk.inject.keyOf
import org.apache.curator.ensemble.EnsembleProvider
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider
import javax.inject.Provider
import kotlin.reflect.KClass

class FixedEnsembleProviderModule(
private val config: ZookeeperConfig,
private val qualifier: KClass<out Annotation>?
) : KAbstractModule() {
override fun configure() {
bind(keyOf<EnsembleProvider>(qualifier)).toProvider(object : Provider<EnsembleProvider> {
override fun get(): EnsembleProvider {
return FixedEnsembleProvider(config.zk_connect)
}
})
}
}
@@ -1,43 +1,15 @@
package misk.zookeeper

import com.google.common.util.concurrent.Service
import com.google.inject.Key
import misk.clustering.zookeeper.ZookeeperConfig
import misk.inject.KAbstractModule
import misk.inject.asSingleton
import misk.inject.keyOf
import org.apache.curator.framework.CuratorFramework
import javax.inject.Provider
import kotlin.reflect.KClass

/**
* Binds a [CuratorFramework] for an application to use. A [Service] is also installed to manage
* the lifecycle of the [CuratorFramework].
*
* If an application needs to connect to multiple Zookeepers, an optional qualifier can be passed
* resulting in an annotated [CuratorFramework] binding.
*
* ```
* @Qualifier
* annotation class MyZk
*
* install(ZookeeperModule(config, MyZk::class)
*
* @Inject @MyZk CuratorFramework
* ```
*/
class ZookeeperModule(
private val config: ZookeeperConfig,
private val qualifier: KClass<out Annotation>? = null
) : KAbstractModule() {
override fun configure() {
bind(keyOf<CuratorFramework>(qualifier)).toProvider(CuratorFrameworkProvider(config)).asSingleton()
val curator = getProvider(keyOf<CuratorFramework>(qualifier))
bind(keyOf<ZkService>(qualifier)).toProvider(object : Provider<ZkService> {
override fun get(): ZkService {
return ZkService(curator.get(), qualifier)
}
}).asSingleton()
multibind<Service>().to(keyOf<ZkService>(qualifier))
install(CuratorFrameworkModule(config, qualifier))
install(FixedEnsembleProviderModule(config, qualifier))
}
}
}
@@ -15,7 +15,7 @@ internal class ZkLeaseTestModule : KAbstractModule() {
bind<String>().annotatedWith<AppName>().toInstance("my-app")
install(FakeClusterModule())
install(ZkTestModule(ForZkLease::class))
install(ZkLeaseCommonModule())
install(ZkLeaseManagerModule())

}

0 comments on commit 28b3601

Please sign in to comment.
You can’t perform that action at this time.