Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

serverset integration

  • Loading branch information...
commit cfd05ba6f1463aa19fb0d17e8d7d73395e9414dd 1 parent 5de9091
Alan Liang authored
View
60 cassie-core/src/main/scala/com/twitter/cassie/Cluster.scala
@@ -6,6 +6,7 @@ import scala.collection.JavaConversions._
import com.twitter.cassie.connection.{CCluster, RetryPolicy, SocketAddressCluster}
import com.twitter.util.Duration
import com.twitter.conversions.time._
+import com.twitter.finagle.builder.{Cluster => FCluster}
import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
import com.twitter.finagle.tracing.{Tracer, NullTracer}
@@ -13,8 +14,11 @@ import com.twitter.finagle.tracing.{Tracer, NullTracer}
* A Cassandra cluster.
*
* @param seedHosts list of some hosts in the cluster
- * @param seedPort the port number for '''all''' hosts in the cluster */
-class Cluster(seedHosts: Set[String], seedPort: Int) {
+ * @param seedPort the port number for '''all''' hosts in the cluster
+ * @param mapHostsEvery Cassie will query the cassandra cluster every [[mapHostsEvery]] period
+ * to refresh its host list. */
+class Cluster(seedHosts: Set[String], seedPort:Int) {
+ private var mapHostsEvery: Duration = 10.minutes
/**
* @param seedHosts A comma separated list of seed hosts for a cluster. The rest of the
@@ -29,14 +33,27 @@ class Cluster(seedHosts: Set[String], seedPort: Int) {
/**
* Returns a [[com.twitter.cassie.KeyspaceBuilder]] instance.
* @param name the keyspace's name */
- def keyspace(name: String): KeyspaceBuilder = KeyspaceBuilder(seedHosts, seedPort, name)
+ def keyspace(name: String): KeyspaceBuilder = {
+ val seedAddresses = seedHosts.map{ host => new InetSocketAddress(host, seedPort) }.toSeq
+ val cluster = if (mapHostsEvery > 0)
+ // either map the cluster for this keyspace
+ new ClusterRemapper(name, seedHosts.head, mapHostsEvery)
+ else
+ // or connect directly to the hosts that were given as seeds
+ new SocketAddressCluster(seedAddresses)
+
+ KeyspaceBuilder(name, cluster)
+ }
+
+ def mapHostsEvery(period: Duration): Cluster = {
+ mapHostsEvery = period
+ this
+ }
}
case class KeyspaceBuilder(
- seedHosts: Set[String],
- seedPort: Int,
_name: String,
- _mapHostsEvery: Duration = 10.minutes,
+ _cluster: CCluster,
_retryAttempts: Int = 0,
_requestTimeoutInMS: Int = 10000,
_connectionTimeoutInMS: Int = 10000,
@@ -47,29 +64,25 @@ case class KeyspaceBuilder(
_tracer: Tracer = NullTracer,
_retryPolicy: RetryPolicy = RetryPolicy.Idempotent) {
+ _cluster.statsReceiver(_statsReceiver)
+
/**
* connect to the cluster with the specified parameters */
def connect(): Keyspace = {
- val seedAddresses = seedHosts.map{ host => new InetSocketAddress(host, seedPort) }.toSeq
- val hosts = if (_mapHostsEvery > 0)
- // either map the cluster for this keyspace
- new ClusterRemapper(_name, seedAddresses, _mapHostsEvery, statsReceiver = _statsReceiver)
- else
- // or connect directly to the hosts that were given as seeds
- new SocketAddressCluster(seedAddresses)
-
// TODO: move to builder pattern as well
- val ccp = new ClusterClientProvider(hosts, _name, _retryAttempts,
- _requestTimeoutInMS, _connectionTimeoutInMS, _minConnectionsPerHost,
- _maxConnectionsPerHost, _removeAfterIdleForMS, _statsReceiver, _tracer,
- _retryPolicy)
+ val ccp = new ClusterClientProvider(_cluster,
+ _name,
+ _retryAttempts,
+ _requestTimeoutInMS,
+ _connectionTimeoutInMS,
+ _minConnectionsPerHost,
+ _maxConnectionsPerHost,
+ _removeAfterIdleForMS,
+ _statsReceiver,
+ _tracer,
+ _retryPolicy)
new Keyspace(_name, ccp)
}
-
- /**
- * @param d Cassie will query the cassandra cluster every [[d]] period
- * to refresh its host list. */
- def mapHostsEvery(d: Duration): KeyspaceBuilder = copy(_mapHostsEvery = d)
def retryAttempts(r: Int): KeyspaceBuilder = copy(_retryAttempts = r)
def retryPolicy(r: RetryPolicy): KeyspaceBuilder = copy(_retryPolicy = r)
/**
@@ -93,4 +106,3 @@ case class KeyspaceBuilder(
*/
def tracer(t: Tracer) = copy(_tracer = t)
}
-
View
10 cassie-core/src/main/scala/com/twitter/cassie/ClusterRemapper.scala
@@ -27,15 +27,21 @@ import com.twitter.finagle.WriteException
* @param seeds seed node addresses
* @param port the Thrift port of client nodes
*/
-private class ClusterRemapper(keyspace: String, seeds: Seq[InetSocketAddress], remapPeriod: Duration, port: Int = 9160, statsReceiver: StatsReceiver = NullStatsReceiver) extends CCluster {
+private class ClusterRemapper(keyspace: String, seeds: Seq[InetSocketAddress], remapPeriod: Duration, port: Int = 9160) extends CCluster {
private val log = Logger.get
private[cassie] var timer = new Timer(new HashedWheelTimer())
+ private var statsReceiver = NullStatsReceiver
def close = timer.stop()
// For servers, not clients.
def join(address: SocketAddress) {}
+ def statsReceiver(statsReceiver: StatsReceiver): ClusterRemapper = {
+ this.statsReceiver = statsReceiver
+ this
+ }
+
// Called once to get a Seq-like of ServiceFactories.
def mkFactories[Req, Rep](mkBroker: (SocketAddress) => ServiceFactory[Req, Rep]) = {
new SeqProxy[ServiceFactory[Req, Rep]] {
@@ -95,6 +101,4 @@ private class ClusterRemapper(keyspace: String, seeds: Seq[InetSocketAddress], r
ccp.close()
}
}
-
-
}
View
4 cassie-core/src/test/java/com/twitter/cassie/jtests/ClusterTest.java
@@ -15,12 +15,12 @@
@Before
public void before() throws Exception {
- cluster = new Cluster("host1,host2");
+ cluster = new Cluster("host1,host2").mapHostsEvery(new Duration(0));
}
@Test
public void test() {
- Keyspace ks = cluster.keyspace("blah").mapHostsEvery(new Duration(0)).connect();
+ Keyspace ks = cluster.keyspace("blah").connect();
assertEquals(ks.name(), "blah");
}
}
View
4 cassie-core/src/test/scala/com/twitter/cassie/tests/ClusterTest.scala
@@ -8,10 +8,10 @@ import com.twitter.conversions.time._
class ClusterTest extends Spec with MustMatchers with MockitoSugar {
describe("a cluster") {
- val cluster = new Cluster("nonhost")
+ val cluster = new Cluster("nonhost").mapHostsEvery(0.minutes)
it("creates a keyspace with the given name and provider") {
- val ks = cluster.keyspace("ks").mapHostsEvery(0.minutes).connect()
+ val ks = cluster.mapHostsEvery(0.minutes).keyspace("ks").connect()
ks.name must equal("ks")
}
View
2  cassie-core/src/test/scala/com/twitter/cassie/util/FakeCassandraTest.scala
@@ -31,7 +31,7 @@ class FakeCassandraTest extends Spec with MustMatchers with BeforeAndAfterAll wi
}
def keyspace() = {
- val keyspace = client.keyspace("foo").mapHostsEvery(0.seconds).connect()
+ val keyspace = client.mapHostsEvery(0.seconds).keyspace("foo").connect()
connections.append(keyspace)
keyspace
}
View
11 cassie-hadoop/src/main/scala/com/twitter/cassie/hadoop/CassieReducer.scala
@@ -50,22 +50,27 @@ class CassieReducer extends Reducer[BytesWritable, ColumnWritable, BytesWritable
override def setup(context: ReducerContext) = {
def conf(key: String) = context.getConfiguration.get(key)
cluster = new Cluster(conf(HOSTS))
+ cluster = configureCluster(cluster)
if(conf(MIN_BACKOFF) != null ) minBackoff = Integer.valueOf(conf(MIN_BACKOFF)).intValue
if(conf(MAX_BACKOFF) != null ) maxBackoff = Integer.valueOf(conf(MAX_BACKOFF)).intValue
if(conf(IGNORE_FAILURES) != null ) ignoreFailures = conf(IGNORE_FAILURES) == "true"
if(conf(PAGE_SIZE) != null ) page = Integer.valueOf(conf(PAGE_SIZE)).intValue
- keyspace = configure(cluster.keyspace(conf(KEYSPACE))).connect()
+ keyspace = configureKeyspace(cluster.keyspace(conf(KEYSPACE))).connect()
columnFamily = keyspace.columnFamily[ByteBuffer, ByteBuffer, ByteBuffer](conf(COLUMN_FAMILY),
ByteArrayCodec, ByteArrayCodec, ByteArrayCodec)
batch = columnFamily.batch
}
- def configure(c: KeyspaceBuilder): KeyspaceBuilder = {
+ def configureKeyspace(c: KeyspaceBuilder): KeyspaceBuilder = {
c.retryAttempts(2)
}
+ def configureCluster(cluster: Cluster): Cluster = {
+ cluster
+ }
+
override def reduce(key: BytesWritable, values: java.lang.Iterable[ColumnWritable], context: ReducerContext) = {
for (value <- values) {
val bufKey = bufCopy(ByteBuffer.wrap(key.getBytes, 0, key.getLength))
@@ -114,4 +119,4 @@ class CassieReducer extends Reducer[BytesWritable, ColumnWritable, BytesWritable
}
override def cleanup(context: ReducerContext) = execute(context)
-}
+}
View
10 cassie-hadoop/src/test/scala/com/twitter/cassie/hadoop/CassieReducerTest.scala
@@ -52,8 +52,8 @@ object Fake {
}
class NonMappingCassieReducer extends CassieReducer {
- override def configure(ksb: KeyspaceBuilder) = {
- ksb.mapHostsEvery(0.seconds)
+ override def configureCluster(cluster: Cluster): Cluster = {
+ cluster.mapHostsEvery(0.seconds)
}
}
@@ -106,7 +106,7 @@ class CassieReducerTest extends Spec with MustMatchers{
ToolRunner.run(new Configuration(), new TestScript(), Array("hello", "world"))
implicit val keyCodec = Utf8Codec
val cluster = new Cluster("127.0.0.1")
- val ks = cluster.keyspace("ks").mapHostsEvery(0.seconds).connect()
+ val ks = cluster.mapHostsEvery(0.seconds).keyspace("ks").connect()
val cf = ks.columnFamily[String, String, String]("cf", Utf8Codec,Utf8Codec, Utf8Codec)
cf.getRow("0").get().get("default").value must equal("hello")
@@ -121,10 +121,10 @@ class CassieReducerTest extends Spec with MustMatchers{
ToolRunner.run(new Configuration(), new TestScript(), Array())
implicit val keyCodec = Utf8Codec
val cluster = new Cluster("127.0.0.1")
- val ks = cluster.keyspace("ks").mapHostsEvery(0.seconds).connect()
+ val ks = cluster.mapHostsEvery(0.seconds).keyspace("ks").connect()
val cf = ks.columnFamily[String, String, String]("cf", Utf8Codec,Utf8Codec, Utf8Codec)
fake.stop()
}
}
-}
+}
View
1  cassie-serversets/project/build.properties
@@ -0,0 +1 @@
+project.name=cassie-serversets
View
30 cassie-serversets/src/main/scala/com/twitter/cassie/serversets/ServerSetsCluster.scala
@@ -0,0 +1,30 @@
+package com.twitter.cassie
+
+import java.net.InetSocketAddress
+
+import scala.collection.JavaConversions
+
+import com.twitter.cassie.connection.CCluster
+import com.twitter.common.quantity.Amount
+import com.twitter.common.quantity.Time
+import com.twitter.common.zookeeper.ServerSet
+import com.twitter.common.zookeeper.ServerSetImpl
+import com.twitter.common.zookeeper.ZooKeeperClient
+import com.twitter.finagle.zookeeper.ZookeeperServerSetCluster
+
+class ZookeeperServerSetCCluster(serverSet: ServerSet) extends ZookeeperServerSetCluster(serverSet) with CCluster {
+ def close {}
+}
+
+class ServerSetsCluster(zkHost: String, zkPort: Int, path: String, timeoutMillis: Int) {
+ /**
+ * Returns a [[com.twitter.cassie.KeyspaceBuilder]] instance.
+ * @param name the keyspace's name */
+ def keyspace(name: String): KeyspaceBuilder = {
+ val zkAddress = new InetSocketAddress(zkHost, zkPort)
+ val zkClient = new ZooKeeperClient(Amount.of(timeoutMillis, Time.MILLISECONDS), JavaConversions.asJavaIterable(List(zkAddress)))
+ val serverSet = new ServerSetImpl(zkClient, "/twitter/services/silly")
+ val cluster = new ZookeeperServerSetCCluster(serverSet)
+ KeyspaceBuilder(name, cluster)
+ }
+}
View
22 project/build/Cassie.scala
@@ -6,6 +6,10 @@ class Cassie(info: sbt.ProjectInfo) extends StandardParentProject(info)
override def subversionRepository = Some("http://svn.local.twitter.com/maven/")
+ val slf4jVersion = "1.5.11"
+ val finagleVersion = "1.7.3"
+ val utilVersion = "1.10.2"
+
val coreProject = project(
"cassie-core", "cassie-core",
new CoreProject(_))
@@ -14,6 +18,10 @@ class Cassie(info: sbt.ProjectInfo) extends StandardParentProject(info)
"cassie-hadoop", "cassie-hadoop",
new HadoopProject(_), coreProject)
+ val serversetsProject = project(
+ "cassie-serversets", "cassie-serversets",
+ new ServerSetsProject(_), coreProject)
+
class CoreProject(info: ProjectInfo) extends StandardLibraryProject(info)
with SubversionPublisher with AdhocInlines with CompileThriftFinagle with PublishSite
with NoisyDependencies {
@@ -23,14 +31,11 @@ class Cassie(info: sbt.ProjectInfo) extends StandardParentProject(info)
// Some of the autogenerated java code cause javadoc errors.
override def docSources = sources(mainScalaSourcePath##)
- val slf4jVersion = "1.5.11"
val slf4jApi = "org.slf4j" % "slf4j-api" % slf4jVersion withSources() intransitive()
val slf4jBindings = "org.slf4j" % "slf4j-jdk14" % slf4jVersion withSources() intransitive()
val codecs = "commons-codec" % "commons-codec" % "1.4"
- val finagleVersion = "1.7.3"
- val utilVersion = "1.10.2"
val finagle = "com.twitter" % "finagle-core" % finagleVersion
val finagleThrift = "com.twitter" % "finagle-thrift" % finagleVersion
val utilCore = "com.twitter" % "util-core" % utilVersion
@@ -60,4 +65,15 @@ class Cassie(info: sbt.ProjectInfo) extends StandardParentProject(info)
// Some of the autogenerated java code cause javadoc errors.
override def docSources = sources(mainScalaSourcePath##)
}
+
+ class ServerSetsProject(info: ProjectInfo) extends StandardLibraryProject(info) with SubversionPublisher with AdhocInlines {
+ val finagleServerSets = "com.twitter" % "finagle-serversets" % finagleVersion
+ override def ivyXML =
+ <dependencies>
+ <exclude org="jline"/>
+ <exclude org="javax.jms"/>
+ <exclude org="com.sun.jdmk"/>
+ <exclude org="com.sun.jmx"/>
+ </dependencies>
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.