Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: ZooKeeper digest authentication support #3847

Merged
merged 4 commits into from May 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/docs/command-line-flags.md
Expand Up @@ -110,7 +110,10 @@ The core functionality flags can be also set by environment variable `MARATHON_O
Additional callback URLs may also be set dynamically via the REST API.
* `--zk` (Optional. Default: `zk://localhost:2181/marathon`): ZooKeeper URL for storing state.
Format: `zk://host1:port1,host2:port2,.../path`
* `--zk_max_versions` (Optional. Default: 25): Limit the number of versions stored for one entity.
- <span class="label label-default">v1.1.2</span> Format: `zk://user@pass:host1:port1,user@pass:host2:port2,.../path`.
When authentication is enabled the default ACL will be changed and all subsequent reads must be done using the same auth.
* `--zk_max_versions` (Optional. Default: None): Limit the number of versions
stored for one entity.
* `--zk_timeout` (Optional. Default: 10000 (10 seconds)): Timeout for ZooKeeper
in milliseconds.
* <span class="label label-default">v0.9.0</span> `--zk_session_timeout` (Optional. Default: 1.800.000 (30 minutes)): Timeout for ZooKeeper
Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/logback.xml
Expand Up @@ -11,6 +11,10 @@
<appender-ref ref="stdout" />
<queueSize>1024</queueSize>
</appender>

<!-- Don't log ZooKeeper AuthInfo from com.twitter.zk.NativeConnector -->
<logger name="native-zk-connector" level="WARN" />

<root level="INFO">
<appender-ref ref="async"/>
</root>
Expand Down
14 changes: 10 additions & 4 deletions src/main/scala/mesosphere/marathon/MarathonModule.scala
Expand Up @@ -12,7 +12,7 @@ import com.codahale.metrics.Gauge
import com.google.inject._
import com.google.inject.name.Names
import com.twitter.util.JavaTimer
import com.twitter.zk.{ NativeConnector, ZkClient }
import com.twitter.zk.{ AuthInfo, NativeConnector, ZkClient }
import mesosphere.chaos.http.HttpConf
import mesosphere.marathon.Protos.MarathonTask
import mesosphere.marathon.core.election.ElectionService
Expand All @@ -34,7 +34,6 @@ import mesosphere.util.state.zk.{ CompressionConf, ZKStore }
import mesosphere.util.state.{ FrameworkId, FrameworkIdUtil, PersistentStore, _ }
import mesosphere.util.{ CapConcurrentExecutions, CapConcurrentExecutionsMetrics }
import org.apache.mesos.state.ZooKeeperState
import org.apache.zookeeper.ZooDefs.Ids
import org.slf4j.LoggerFactory

import scala.collection.immutable.Seq
Expand Down Expand Up @@ -135,9 +134,16 @@ class MarathonModule(conf: MarathonConf, http: HttpConf)
def directZK(): PersistentStore = {
import com.twitter.util.TimeConversions._
val sessionTimeout = conf.zooKeeperSessionTimeout().millis
val connector = NativeConnector(conf.zkHosts, None, sessionTimeout, new JavaTimer(isDaemon = true))

val authInfo = (conf.zkUsername, conf.zkPassword) match {
case (Some(user), Some(pass)) => Some(AuthInfo.digest(user, pass))
case _ => None
}

val connector = NativeConnector(conf.zkHosts, None, sessionTimeout, new JavaTimer(isDaemon = true), authInfo)

val client = ZkClient(connector)
.withAcl(Ids.OPEN_ACL_UNSAFE.asScala)
.withAcl(conf.zkDefaultCreationACL.asScala)
.withRetries(3)
val compressionConf = CompressionConf(conf.zooKeeperCompressionEnabled(), conf.zooKeeperCompressionThreshold())
new ZKStore(client, client(conf.zooKeeperStatePath), compressionConf)
Expand Down
19 changes: 15 additions & 4 deletions src/main/scala/mesosphere/marathon/ZookeeperConf.scala
Expand Up @@ -2,6 +2,7 @@ package mesosphere.marathon

import java.net.InetSocketAddress

import org.apache.zookeeper.ZooDefs
import org.rogach.scallop.ScallopConf

import scala.concurrent.duration._
Expand All @@ -10,10 +11,12 @@ trait ZookeeperConf extends ScallopConf {

//scalastyle:off magic.number

private val userAndPass = """[^/@]+"""
private val user = """[^/:]+"""
private val pass = """[^@]+"""
private val hostAndPort = """[A-z0-9-.]+(?::\d+)?"""
private val zkNode = """[^/]+"""
private val zkURLPattern = s"""^zk://(?:$userAndPass@)?($hostAndPort(?:,$hostAndPort)*)(/$zkNode(?:/$zkNode)*)$$""".r
private val zkURLPattern =
s"""^zk://(?:($user):($pass)@)?($hostAndPort(?:,$hostAndPort)*)(/$zkNode(?:/$zkNode)*)$$""".r

lazy val zooKeeperTimeout = opt[Long]("zk_timeout",
descr = "The timeout for ZooKeeper in milliseconds.",
Expand Down Expand Up @@ -71,8 +74,16 @@ trait ZookeeperConf extends ScallopConf {

def zkURL: String = zooKeeperUrl.get.get

lazy val zkHosts = zkURL match { case zkURLPattern(server, _) => server }
lazy val zkPath = zkURL match { case zkURLPattern(_, path) => path }
lazy val zkHosts = zkURL match { case zkURLPattern(_, _, server, _) => server }
lazy val zkPath = zkURL match { case zkURLPattern(_, _, _, path) => path }
lazy val zkUsername = zkURL match { case zkURLPattern(u, _, _, _) => Option(u) }
lazy val zkPassword = zkURL match { case zkURLPattern(_, p, _, _) => Option(p) }

lazy val zkDefaultCreationACL = (zkUsername, zkPassword) match {
case (Some(_), Some(_)) => ZooDefs.Ids.CREATOR_ALL_ACL
case _ => ZooDefs.Ids.OPEN_ACL_UNSAFE
}

lazy val zkTimeoutDuration = Duration(zooKeeperTimeout(), MILLISECONDS)
lazy val zkSessionTimeoutDuration = Duration(zooKeeperSessionTimeout(), MILLISECONDS)
}
@@ -1,21 +1,26 @@
package mesosphere.marathon.core.election.impl

import java.util

import akka.actor.ActorSystem
import akka.event.EventStream
import com.codahale.metrics.MetricRegistry
import mesosphere.chaos.http.HttpConf
import mesosphere.marathon.MarathonConf
import mesosphere.marathon.core.base.ShutdownHooks
import mesosphere.marathon.metrics.Metrics
import org.apache.curator.framework.api.ACLProvider
import org.apache.curator.{ RetrySleeper, RetryPolicy }
import org.apache.curator.framework.{ CuratorFramework, CuratorFrameworkFactory }
import org.apache.curator.framework.{ CuratorFramework, CuratorFrameworkFactory, AuthInfo }
import org.apache.curator.framework.recipes.leader.{ LeaderLatch, LeaderLatchListener }
import org.apache.zookeeper.data.ACL
import org.apache.zookeeper.{ ZooDefs, KeeperException, CreateMode }
import org.slf4j.LoggerFactory

import scala.concurrent.{ Await, Future }
import scala.util.control.NonFatal
import scala.concurrent.duration._
import scala.collection.JavaConversions._

class CuratorElectionService(
config: MarathonConf,
Expand Down Expand Up @@ -94,9 +99,19 @@ class CuratorElectionService(

private def provideCuratorClient(): CuratorFramework = {
log.info(s"Will do leader election through ${config.zkHosts}")
val client = CuratorFrameworkFactory.builder().

// let the world read the leadership information as some setups depend on that to find Marathon
val acl = new util.ArrayList[ACL]()
acl.addAll(config.zkDefaultCreationACL)
acl.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)

val builder = CuratorFrameworkFactory.builder().
connectString(config.zkHosts).
sessionTimeoutMs(config.zooKeeperSessionTimeout().toInt).
aclProvider(new ACLProvider {
override def getDefaultAcl: util.List[ACL] = acl
override def getAclForPath(path: String): util.List[ACL] = acl
}).
retryPolicy(new RetryPolicy {
override def allowRetry(retryCount: Int, elapsedTimeMs: Long, sleeper: RetrySleeper): Boolean = {
log.error("ZooKeeper access failed")
Expand All @@ -121,16 +136,24 @@ class CuratorElectionService(

false
}
}).
build()
})

// optionally authenticate
val client = (config.zkUsername, config.zkPassword) match {
case (Some(user), Some(pass)) =>
builder.authorization(List(
new AuthInfo("digest", (user + ":" + pass).getBytes("UTF-8"))
)).build()
case _ =>
builder.build()
}

client.start()
client.getZookeeperClient.blockUntilConnectedOrTimedOut()
client
}

private object twitterCommonsTombstone {
lazy val acl = ZooDefs.Ids.OPEN_ACL_UNSAFE

def memberPath(member: String): String = {
config.zooKeeperLeaderPath.stripSuffix("/") + "/" + member
}
Expand All @@ -154,7 +177,6 @@ class CuratorElectionService(
client.create().
creatingParentsIfNeeded().
withMode(CreateMode.EPHEMERAL_SEQUENTIAL).
withACL(acl).
forPath(memberPath("member_-1"), hostPort.getBytes("UTF-8"))
fallbackCreated = true
}
Expand All @@ -163,7 +185,6 @@ class CuratorElectionService(
client.create().
creatingParentsIfNeeded().
withMode(CreateMode.EPHEMERAL).
withACL(acl).
forPath(path, hostPort.getBytes("UTF-8"))
}
catch {
Expand Down
@@ -1,6 +1,7 @@
package mesosphere.marathon.core.election.impl

import java.net.InetSocketAddress
import java.util

import akka.actor.ActorSystem
import akka.event.EventStream
Expand All @@ -16,6 +17,7 @@ import mesosphere.marathon.core.base.ShutdownHooks
import mesosphere.marathon.core.election.ElectionService
import mesosphere.marathon.metrics.Metrics
import org.apache.zookeeper._
import org.apache.zookeeper.data.ACL
import org.slf4j.LoggerFactory

import scala.concurrent.{ Await, Future }
Expand Down Expand Up @@ -76,8 +78,14 @@ class TwitterCommonsElectionService(

private def provideCandidate(zk: ZooKeeperClient): Candidate = {
log.info("Registering in ZooKeeper with hostPort:" + hostPort)

// let the world read the leadership information as some setups depend on that to find Marathon
lazy val acl = new util.ArrayList[ACL]()
acl.addAll(config.zkDefaultCreationACL)
acl.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)

new mesosphere.marathon.core.election.impl.CandidateImpl(
new Group(zk, ZooDefs.Ids.OPEN_ACL_UNSAFE, config.zooKeeperLeaderPath),
new Group(zk, acl, config.zooKeeperLeaderPath),
new Supplier[Array[Byte]] {
def get(): Array[Byte] = {
hostPort.getBytes("UTF-8")
Expand All @@ -92,10 +100,14 @@ class TwitterCommonsElectionService(
"ZooKeeper timeout too large!"
)

val client = new ZooKeeperLeaderElectionClient(
Amount.of(config.zooKeeperSessionTimeout().toInt, Time.MILLISECONDS),
config.zooKeeperHostAddresses.asJavaCollection
)
val sessionTimeout = Amount.of(config.zooKeeperSessionTimeout().toInt, Time.MILLISECONDS)
val zooKeeperServers = config.zooKeeperHostAddresses.asJavaCollection
val client = (config.zkUsername, config.zkPassword) match {
case (Some(user), Some(pass)) =>
new ZooKeeperClient(sessionTimeout, ZooKeeperClient.digestCredentials(user, pass), zooKeeperServers)
case _ =>
new ZooKeeperClient(sessionTimeout, zooKeeperServers)
}

// Marathon can't do anything useful without a ZK connection
// so we wait to proceed until one is available
Expand Down
9 changes: 9 additions & 0 deletions src/test/scala/mesosphere/marathon/ZookeeperConfTest.scala
@@ -1,5 +1,6 @@
package mesosphere.marathon

import org.apache.zookeeper.ZooDefs
import org.rogach.scallop.ScallopConf
import scala.util.Try

Expand All @@ -11,6 +12,9 @@ class ZookeeperConfTest extends MarathonSpec {
assert(opts.zkURL == url)
assert(opts.zkHosts == "host1:123,host2,host3:312")
assert(opts.zkPath == "/path")
assert(opts.zkUsername.isEmpty)
assert(opts.zkPassword.isEmpty)
assert(opts.zkDefaultCreationACL == ZooDefs.Ids.OPEN_ACL_UNSAFE)
}

test("urlParameterWithAuthGetParsed") {
Expand All @@ -19,11 +23,16 @@ class ZookeeperConfTest extends MarathonSpec {
assert(opts.zkURL == url)
assert(opts.zkHosts == "host1:123,host2,host3:312")
assert(opts.zkPath == "/path")
assert(opts.zkUsername == Some("user1"))
assert(opts.zkPassword == Some("pass1"))
assert(opts.zkDefaultCreationACL == ZooDefs.Ids.CREATOR_ALL_ACL)
}

test("wrongURLIsNotParsed") {
assert(Try(conf("--zk", "zk://host1:foo/path")).isFailure, "No port number")
assert(Try(conf("--zk", "zk://host1")).isFailure, "No path")
assert(Try(conf("--zk", "zk://user@host1:2181/path")).isFailure, "No password")
assert(Try(conf("--zk", "zk://:pass@host1:2181/path")).isFailure, "No username")
}

def conf(args: String*) = {
Expand Down
75 changes: 75 additions & 0 deletions src/test/scala/mesosphere/marathon/integration/ZooKeeperTest.scala
@@ -0,0 +1,75 @@
package mesosphere.marathon.integration

import java.util

import mesosphere.marathon.integration.facades.MarathonFacade
import mesosphere.marathon.integration.setup._
import org.apache.zookeeper.ZooDefs.Perms
import org.apache.zookeeper.data.{ Id, ACL }
import org.apache.zookeeper.{ ZooDefs, WatchedEvent, Watcher, ZooKeeper }
import org.scalatest.{ ConfigMap, GivenWhenThen, Matchers }

import scala.concurrent.duration._

class ZooKeeperTest extends IntegrationFunSuite with SingleMarathonIntegrationTest with GivenWhenThen with Matchers {

test("/marathon has OPEN_ACL_UNSAFE acls") {
Given("a leader has been elected")
WaitTestSupport.waitUntil("a leader has been elected", 30.seconds) { marathon.leader().code == 200 }

val watcher = new Watcher { override def process(event: WatchedEvent): Unit = println(event) }
val zooKeeper = new ZooKeeper(config.zkHostAndPort, 30 * 1000, watcher)

Then("the /leader node exists")
val stat = zooKeeper.exists(config.zkPath + "/leader", false)
Option(stat) should not be empty

And("it has the default OPEN_ACL_UNSAFE permissions")
val acls = zooKeeper.getACL(config.zkPath + "/leader", stat)
val expectedAcl = new util.ArrayList[ACL]
expectedAcl.addAll(ZooDefs.Ids.OPEN_ACL_UNSAFE)
expectedAcl.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)
acls.toArray.toSet should equal(expectedAcl.toArray.toSet)
}
}

class AuthorizedZooKeeperTest extends IntegrationFunSuite
with SingleMarathonIntegrationTest with GivenWhenThen with Matchers {

lazy val credentials = "user:secret"
lazy val digest = org.apache.zookeeper.server.auth.DigestAuthenticationProvider.generateDigest(credentials)

override protected def beforeAll(configMap: ConfigMap): Unit = {
super.beforeAll(configMap + ("zkCredentials" -> credentials))
}

test("/marathon has OPEN_ACL_UNSAFE acls") {
Given("a leader has been elected")
WaitTestSupport.waitUntil("a leader has been elected", 30.seconds) { marathon.leader().code == 200 }

val watcher = new Watcher { override def process(event: WatchedEvent): Unit = println(event) }
val zooKeeper = new ZooKeeper(config.zkHostAndPort, 30 * 1000, watcher)
zooKeeper.addAuthInfo("digest", digest.getBytes("UTF-8"))

Then("the /leader node exists")
var stat = zooKeeper.exists(config.zkPath + "/leader", false)
Option(stat) should not be empty

And(s"the /leader node has $credentials:rcdwa + world:r")
var acls = zooKeeper.getACL(config.zkPath + "/leader", stat)
var expectedAcl = new util.ArrayList[ACL]
expectedAcl.add(new ACL(Perms.ALL, new Id("digest", digest)))
expectedAcl.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)
acls.toArray.toSet should equal(expectedAcl.toArray.toSet)

Then("the /state node exists")
stat = zooKeeper.exists(config.zkPath + "/state", false)
Option(stat) should not be empty

And(s"the /state node has $credentials:rcdwa")
acls = zooKeeper.getACL(config.zkPath + "/state", stat)
expectedAcl = new util.ArrayList[ACL]
expectedAcl.add(new ACL(Perms.ALL, new Id("digest", digest)))
acls.toArray.toSet should equal(expectedAcl.toArray.toSet)
}
}