Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

DNS-backed cluster #207

Closed
wants to merge 5 commits into from

8 participants

@agleyzer

Per discussion on the finaglers mailing list: https://groups.google.com/d/msg/finaglers/HqfNWJF3qZk/mWD-By0MtOgJ

DnsCluster continuously resolves a host name into a set of SocketAddress instances, loosely based on ZK cluster implementation .

LingeringCluster is a generic Cluster filter that propagates Rem events after a predefined delay, this helps dealing with firewall that does its own DNS resolution.

@mosesn
Collaborator

I like the idea here, but Clusters are deprecated. It doesn't make sense to add a new one right now. Can you get these to work with the Name api?

@mariusae
Collaborator

It should be pretty simple to adopt to the new APIs.

@mariusae
Collaborator

(This is really cool btw.)

@agleyzer

I've looked at ZkResolver to see how to implement a similar DnsResolver, but it uses a Group internally and there's a comment about taking that out as well (I understand Groups are deprecated too). I wonder if my best strategy is to wait until the dust settles and then take another look? Or am I barking up the wrong tree?

@evnm

The deprecation storm has settled and the Name/Var[Addr] API is the outcome. I think it's safe to assume that it's stable enough to build upon. ZkResolver is still written in terms of Group because no one's had time to migrate StabilizingGroup to the Var[Addr] world.

A DnsResolver would provide a function def bind(arg: String): Var[Addr]. See twitter-server's FlagResolver class for a simple example.

@agleyzer

Thanks for your response, but I am still a bit confused where my DNS stuff would fit in this brave new world... On one hand, it could be used as a poor man's ZK, with multiple machines resolving the same hostname, so that's why I looked at the ZkResolver, but then we can also have a Name implementation, something like this: https://gist.github.com/agleyzer/7363353. What would you guys prefer?

@mariusae
Collaborator

I think we just want this to be part of the inet! resolver.

@mosesn
Collaborator

@agleyzer Not sure if you're still interested in this problem, but I just stumbled on this ticket again. I think @mariusaeriksen is right that this shouldn't be special behavior, this should just be the way that the InetResolver works all the time. Do you still have the bandwidth to make this happen?

@jdanbrown

@mosesn We keep getting bit by inet! resolved names failing to track DNS names that change over time (e.g. AWS ELBs), so I'd be interested in working this into the inet! resolver if no one is tackling it yet.

What's the best approach?

  1. Stick with Name.Bound(Var[Addr]) (Resolver.eval + InetResolver.bind) and periodically push DNS changes to the Var. This would require periodically re-resolving the hostname on a Timer, which seems like it might be overly complicated.

  2. Add some kind of "bind-on-demand" state in Addr so that DNS resolution happens on each request. Is this even possible?—looking at DefaultClient.newStack0 my guess is that it needs the push behavior in (1)...

@roanta roanta closed this
@roanta roanta reopened this
@agleyzer

@mosesn sorry I just noticed your question. I might have some bandwidth next week.

@jdanbrown I ended up writing a subclass for Name with periodic changes: https://gist.github.com/agleyzer/7363353, it's been in use for a while, no complains so far.

@mosesn
Collaborator

@agleyzer that solution looks great. Would you mind if @jdanbrown refashioned it to be the default for "inet!"?

@mariusae
Collaborator

+1, this looks great.

We should consider doing a few things:

  • limit the total amount of permissible concurrency;
  • have resolution timeouts (will thread interruption work here?).

In the long term, we might consider implementing our own DNS resolver (I think that there was a Netty summer-of-code project for this -- @trustin?) so that we can respect the upstream TTLs, etc. But, I know that anything involving "your own DNS resolver" is tantamount to famous last words.

@agleyzer

@mosesn @jdanbrown you are welcome to use that code, I am glad to be of any help

@mosesn
Collaborator

Awesome. @jdanbrown let me know if you need any help.

@trustin

Yeah, Netty has an asynchronous DNS resolution facility which came from last year's GSoC. It has not been merged yet because it needs some clean-up. Will let you know once it's ready to go.

@mosesn
Collaborator

@agleyzer @jdanbrown looks like this fell through the cracks, do either of you have the bandwidth to take this on?

@jixu

@mosesn , we are using the DNS Resolver written by @agleyzer in our product, and it works well. Can I know what is the progress to support automatically DNS resolving in finagle? I can help on issue if @agleyzer doesn't have time.

@mosesn
Collaborator

Sure, if you want to take it on, we'd be glad to take your PR. None of us at twitter has the bandwidth to do it, so it will have to be community driven.

@agleyzer
@jdanbrown

@mosesn @agleyzer @jixu Yeah, sorry, I dropped the ball on this. I still intend to do it eventually, but @jixu feel free to jump on it sooner if you want. Either way, I'll follow up here before I spend much time on it to make sure I'm not racing with someone else.

@jixu

@mosesn I have created a pull request for this issue:
#282
Please help to review when you have time.

@mosesn
Collaborator

#282 implemented this, so I'm going to close this PR. Thanks to @jixu and @agleyzer for this super cool feature!

@mosesn mosesn closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Oct 18, 2013
  1. @agleyzer
  2. @agleyzer

    switched to using SpoolSource

    agleyzer authored
  3. @agleyzer
Commits on Dec 13, 2013
  1. @agleyzer
Commits on Dec 21, 2013
  1. @agleyzer
This page is out of date. Refresh to see the latest.
View
101 finagle-core/src/main/scala/com/twitter/finagle/builder/DnsCluster.scala
@@ -0,0 +1,101 @@
+package com.twitter.finagle.builder
+
+import com.twitter.concurrent.{Spool, SpoolSource}
+import com.twitter.conversions.time._
+import com.twitter.finagle.util.DefaultTimer
+import com.twitter.logging.Logger
+import com.twitter.util.{Duration, Future, FutureCancelledException, FuturePool, Timer}
+import java.net.{InetAddress, InetSocketAddress, SocketAddress, UnknownHostException}
+import java.security.Security
+import scala.collection._
+import scala.collection.JavaConverters._
+
+/**
+ * A dynamic cluster that keeps all DNS-resolved addresses for a host.
+ *
+ * Usese timer to keep itself up to date with DNS changes. Can be
+ * helpful to access resources with frequently-changing addresses
+ * (Akamai/Amazon S3/Google) from behind a firewall which does a
+ * similar DNS lookup.
+ */
+protected[builder] trait DnsCluster extends Cluster[SocketAddress] {
+ import Cluster._
+
+ def ttl: Duration
+ def timer: Timer
+ def resolveHost: Future[Set[SocketAddress]]
+
+ private[this] val log = Logger(this.getClass)
+ private[this] var underlyingSet = Set.empty[SocketAddress]
+ private[this] var changes = new SpoolSource[Cluster.Change[SocketAddress]]
+
+ private[this] def updateAddresses(newSet: Set[SocketAddress]): Unit = synchronized {
+ if (newSet != underlyingSet) {
+ val added = newSet &~ underlyingSet
+ val removed = underlyingSet &~ newSet
+
+ added foreach { address =>
+ changes.offer(Add(address))
+ }
+
+ removed foreach { address =>
+ changes.offer(Rem(address))
+ }
+
+ underlyingSet = newSet
+ }
+ }
+
+ protected[builder] def loop(): Unit = {
+ resolveHost handle { case ex =>
+ log.error(ex, "failed to resolve host")
+ Set.empty[SocketAddress]
+
+ } onSuccess { newSet =>
+ updateAddresses(newSet)
+ timer.doLater(ttl) { loop() }
+ }
+ }
+
+ def snap: (Seq[SocketAddress], Future[Spool[Change[SocketAddress]]]) =
+ synchronized {
+ (underlyingSet.toSeq, changes())
+ }
+}
+
+
+protected[builder] class BlockingDnsCluster(host: String, port: Int,
+ val ttl: Duration, val timer: Timer = DefaultTimer.twitter) extends DnsCluster {
+
+ private def blockingDnsCall: Set[SocketAddress] = {
+ InetAddress.getAllByName(host).map { address =>
+ new InetSocketAddress(address, port): SocketAddress
+ }.toSet
+ }
+
+ def resolveHost: Future[Set[SocketAddress]] =
+ FuturePool.unboundedPool(blockingDnsCall)
+
+ loop()
+}
+
+
+object DnsCluster {
+
+ def apply(host: String, port: Int, ttl: Duration): DnsCluster =
+ new BlockingDnsCluster(host, port, ttl)
+
+ def apply(host: String, port: Int): DnsCluster = {
+ // TTL using Java standard secutity property
+ val ttl = {
+ val minTtl = 5.seconds
+ val defaultTtl = 10.seconds
+ val maxTtl = 1.hour
+
+ val property = Option(Security.getProperty("networkaddress.cache.ttl"))
+ property map (_.toInt.seconds max minTtl min maxTtl) getOrElse defaultTtl
+ }
+
+ apply(host, port, ttl)
+ }
+}
View
47 finagle-core/src/main/scala/com/twitter/finagle/builder/LingeringCluster.scala
@@ -0,0 +1,47 @@
+package com.twitter.finagle.builder
+
+import com.google.common.collect.ConcurrentHashMultiset
+import com.twitter.concurrent.{Spool, SpoolSource}
+import com.twitter.conversions.time._
+import com.twitter.finagle.util.DefaultTimer
+import com.twitter.util.{Duration, Future, Timer}
+import scala.collection.JavaConverters._
+
+/**
+ * A Cluster implementation that allows its element to linger.
+ *
+ * A potential use case is when underlying cluster is DNS-backed,
+ * continuously refreshing addresses for a host with a low TTL
+ * (e.g. Akamai) and we're behind a firewall that updates from DNS as
+ * well. To maximise our chances of getting through, we want old
+ * addresses to stick around for a while, along with the new ones.
+ */
+class LingeringCluster[T](underlying: Cluster[T], delay: Duration,
+ timer: Timer = DefaultTimer.twitter)
+ extends Cluster[T] {
+
+ import Cluster._
+
+ def snap: (Seq[T], Future[Spool[Change[T]]]) = {
+ val outgoing = new SpoolSource[Change[T]]
+
+ val (init, changes) = underlying.snap
+
+ // the same element can be added and removed repeatedly
+ val multiset = ConcurrentHashMultiset.create(init.asJava)
+
+ for (spool <- changes; change <- spool) change match {
+ case Add(node) =>
+ if (multiset.add(node, 1) == 0)
+ outgoing.offer(change)
+
+ case Rem(node) =>
+ timer.doLater(delay) {
+ if (multiset.remove(node, 1) == 1)
+ outgoing.offer(change)
+ }
+ }
+
+ (init, outgoing())
+ }
+}
View
168 finagle-core/src/test/scala/com/twitter/finagle/builder/DnsClusterSpec.scala
@@ -0,0 +1,168 @@
+package com.twitter.finagle.builder
+
+import com.twitter.finagle.integration.StringCodec
+import com.twitter.conversions.time._
+import com.twitter.finagle.{ ClientCodecConfig, Codec, CodecFactory, Service }
+import com.twitter.util._
+import java.net.{ InetAddress, InetSocketAddress, SocketAddress, UnknownHostException }
+import org.jboss.netty.channel._
+import org.jboss.netty.handler.codec.frame.{ DelimiterBasedFrameDecoder, Delimiters }
+import org.jboss.netty.handler.codec.string.{ StringDecoder, StringEncoder }
+import org.jboss.netty.util.CharsetUtil
+import org.scalatest.FunSpec
+import org.scalatest.matchers.MustMatchers
+
+object DnsClusterSpec {
+ def makeServer(name: String, f: String => String) = {
+ val sillyService = new Service[String, String] {
+ def apply(request: String) = Future(f(request))
+ }
+
+ ServerBuilder()
+ .codec(StringCodec)
+ .bindTo(new InetSocketAddress(0)) // ephemeral port
+ .name(name)
+ .build(sillyService)
+ }
+
+ def makeClient(cluster: DnsCluster) =
+ ClientBuilder()
+ .cluster(cluster)
+ .codec(StringCodec)
+ .hostConnectionLimit(10)
+ .build()
+
+}
+
+class DnsClusterSpec extends FunSpec with MustMatchers {
+ import DnsClusterSpec._
+
+ describe("DnsCluster") {
+ it("should be able to block till server set is ready") {
+ Time.withCurrentTimeFrozen { tc =>
+ val server = makeServer("reverse", s => s.reverse)
+
+ val cluster = new DnsCluster {
+ override val ttl = 10.seconds
+ override val timer = new MockTimer()
+ override val resolveHost = Promise[Set[SocketAddress]]()
+ loop()
+ }
+
+ val client = makeClient(cluster)
+
+ val response = client("hello\n")
+
+ cluster.ready.isDefined must be(false)
+
+ cluster.resolveHost.setValue((Set(server.localAddress)))
+
+ tc.advance(10.seconds)
+ cluster.timer.tick()
+
+ cluster.ready.isDefined must be(true)
+
+ Await.result(response, 5.seconds) must be("olleh")
+ }
+ }
+
+ it("should be able to recognize a DNS change") {
+ Time.withCurrentTimeFrozen { tc =>
+ val server1 = makeServer("server1", _ + " server1")
+ val server2 = makeServer("server2", _ + " server2")
+
+ val cluster = new DnsCluster {
+ override val ttl = 10.seconds
+ override val timer = new MockTimer()
+
+ @volatile
+ var currentServer: Server = server1
+
+ override def resolveHost = Future.value(Set(currentServer.localAddress))
+
+ loop()
+ }
+
+ val client = makeClient(cluster)
+
+ cluster.currentServer = server1
+
+ cluster.timer.tick()
+ Await.result(client("hello\n"), 1.second) must be("hello server1")
+
+ cluster.currentServer = server2
+ tc.advance(10.seconds)
+ cluster.timer.tick()
+
+ Await.result(client("hello\n"), 1.second) must be("hello server2")
+ }
+ }
+
+ it("should handle multiple random DNS chages") {
+ Time.withCurrentTimeFrozen { tc =>
+ val addresses = for (n <- 1 to 10) yield {
+ var bytes = Array[Byte](10, 0, 0, n.toByte)
+ new InetSocketAddress(InetAddress.getByAddress(bytes), 80)
+ }
+
+ val cluster = new DnsCluster {
+ override val ttl = 10.seconds
+ override val timer = new MockTimer()
+
+ @volatile
+ var current: Set[SocketAddress] = Set.empty
+
+ override def resolveHost = Future.value(current)
+
+ loop()
+ }
+
+ val rnd = new scala.util.Random
+ val (seq, changes) = cluster.snap
+ var current = seq.toSet
+ changes foreach { spool =>
+ spool foreach {
+ case Cluster.Add(elem) => current += elem
+ case Cluster.Rem(elem) => current -= elem
+ }
+ }
+
+ for (i <- 0 to 100) {
+ cluster.current = Set(rnd.shuffle(addresses).take(2): _*)
+
+ tc.advance(10.seconds)
+ cluster.timer.tick()
+
+ current must equal(cluster.current)
+
+ current = cluster.current
+ }
+ }
+ }
+
+ it("should handle UnknownHostException") {
+ Time.withCurrentTimeFrozen { tc =>
+ val timer = new MockTimer()
+
+ val cluster = new DnsCluster {
+ override val ttl = 10.seconds
+ override val timer = new MockTimer()
+
+ @volatile
+ var current: Set[SocketAddress] = _
+ override def resolveHost = Future.exception(new UnknownHostException)
+
+ loop()
+ }
+
+ cluster.ready.isDefined must be(false)
+
+ tc.advance(20.seconds)
+
+ timer.tick()
+
+ cluster.ready.isDefined must be(false)
+ }
+ }
+ }
+}
View
109 finagle-core/src/test/scala/com/twitter/finagle/builder/LingeringClusterSpec.scala
@@ -0,0 +1,109 @@
+package com.twitter.finagle.builder
+
+import com.twitter.util.MockTimer
+import com.twitter.util.Time
+import org.specs.SpecificationWithJUnit
+import com.twitter.conversions.time._
+import scala.collection._
+import scala.util.Random
+
+class LingeringClusterSpec extends SpecificationWithJUnit {
+ "LingeringCluster" should {
+ val N = 10
+ val inCluster = new ClusterInt()
+ val timer = new MockTimer()
+ val outCluster = new LingeringCluster(inCluster, 10.seconds, timer)
+
+ "delay Rem events" in Time.withCurrentTimeFrozen { tc =>
+ 0 until N foreach { inCluster.add(_) }
+ val (seq, changes) = outCluster.snap
+
+ var set = seq.toSet
+ changes foreach { spool =>
+ spool foreach {
+ case Cluster.Add(elem) => set += elem
+ case Cluster.Rem(elem) => set -= elem
+ }
+ }
+
+ set.size must be_==(N)
+ 0 until N foreach { inCluster.del(_) }
+ set.size must be_==(N)
+
+ tc.advance(10.seconds)
+ timer.tick()
+
+ set.size must be_==(0)
+ }
+
+ "report Add once if the same element is removed and then added again" in Time.withCurrentTimeFrozen { tc =>
+ val (seq, changes) = outCluster.snap
+ var adds, rems = 0
+ changes foreach { spool =>
+ spool foreach {
+ case Cluster.Add(elem) => adds += 1
+ case Cluster.Rem(elem) => rems -= 1
+ }
+ }
+
+ 0 until N foreach { inCluster.add(_) }
+ adds must be_==(N)
+ rems must be_==(0)
+
+ tc.advance(1.second); timer.tick()
+
+ 0 until N foreach { inCluster.del(_) }
+ adds must be_==(N)
+ rems must be_==(0)
+
+ tc.advance(1.second); timer.tick()
+
+ 0 until N foreach { inCluster.add(_) }
+ adds must be_==(N)
+ rems must be_==(0)
+ }
+
+ "handle multiple additions and deletions" in Time.withCurrentTimeFrozen { tc =>
+ // here we simulate a DNS service that on each iteration
+ // resolves 4 "addresses" out of 10
+
+ val rnd = new Random
+ def rndSet = Seq.fill(4) { rnd.nextInt(10) } toSet
+ val (seq, changes) = outCluster.snap
+
+ var set = seq.toSet
+ changes foreach { spool =>
+ spool foreach {
+ case Cluster.Add(elem) => set += elem
+ case Cluster.Rem(elem) => set -= elem
+ }
+ }
+
+ var prevSet = Set.empty[Int]
+
+ for (i <- 0 to 100) {
+ val newSet = rndSet
+ val added = newSet &~ prevSet
+ val removed = prevSet &~ newSet
+
+ added foreach { inCluster.add(_) }
+ removed foreach { inCluster.del(_) }
+
+ set must be_== (prevSet ++ newSet)
+
+ tc.advance(5.seconds)
+ timer.tick()
+
+ set must be_== (prevSet ++ newSet)
+
+ tc.advance(5.seconds)
+ timer.tick()
+
+ set must be_== (newSet)
+
+ prevSet = newSet
+ }
+ }
+
+ }
+}
Something went wrong with that request. Please try again.