Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

compiling, replication spec not passing

  • Loading branch information...
commit 4fe750c95076a4c026d80711240d1ea6e361da75 1 parent 787e9bc
@freels freels authored
Showing with 505 additions and 1,222 deletions.
  1. +20 −9 src/main/scala/com/twitter/gizzard/GizzardServer.scala
  2. +0 −17 src/main/scala/com/twitter/gizzard/config/Future.scala
  3. +6 −6 src/main/scala/com/twitter/gizzard/config/GizzardServer.scala
  4. +0 −66 src/main/scala/com/twitter/gizzard/config/TServer.scala
  5. +1 −1  src/main/scala/com/twitter/gizzard/logging/ExceptionJsonFormatter.scala
  6. +3 −9 src/main/scala/com/twitter/gizzard/nameserver/JobRelay.scala
  7. +2 −17 src/main/scala/com/twitter/gizzard/nameserver/NameServerState.scala
  8. +21 −0 src/main/scala/com/twitter/gizzard/scheduler/JsonJob.scala
  9. +0 −4 src/main/scala/com/twitter/gizzard/scheduler/ReplicatingJob.scala
  10. +1 −0  src/main/scala/com/twitter/gizzard/shards/NodeSet.scala
  11. +1 −2  src/main/scala/com/twitter/gizzard/test/Reset.scala
  12. +13 −8 src/main/scala/com/twitter/gizzard/thrift/JobInjectorService.scala
  13. +107 −90 src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala
  14. +0 −255 src/main/scala/com/twitter/gizzard/thrift/TSelectorServer.scala
  15. +0 −139 src/main/scala/com/twitter/gizzard/thrift/TThreadServer.scala
  16. +118 −0 src/main/scala/com/twitter/gizzard/thrift/conversions.scala
  17. +0 −16 src/main/scala/com/twitter/gizzard/thrift/conversions/Busy.scala
  18. +0 −19 src/main/scala/com/twitter/gizzard/thrift/conversions/Forwarding.scala
  19. +0 −30 src/main/scala/com/twitter/gizzard/thrift/conversions/Host.scala
  20. +0 −18 src/main/scala/com/twitter/gizzard/thrift/conversions/LinkInfo.scala
  21. +0 −96 src/main/scala/com/twitter/gizzard/thrift/conversions/Sequences.scala
  22. +0 −17 src/main/scala/com/twitter/gizzard/thrift/conversions/ShardId.scala
  23. +0 −24 src/main/scala/com/twitter/gizzard/thrift/conversions/ShardInfo.scala
  24. +0 −85 src/main/scala/com/twitter/gizzard/util/Future.scala
  25. +5 −3 src/test/scala/com/twitter/gizzard/ConfiguredSpecification.scala
  26. +0 −89 src/test/scala/com/twitter/gizzard/FutureSpec.scala
  27. +28 −30 src/test/scala/com/twitter/gizzard/integration/GizzardIntegrationSpec.scala
  28. +91 −74 src/test/scala/com/twitter/gizzard/integration/TestServer.scala
  29. +0 −1  src/test/scala/com/twitter/gizzard/nameserver/JobRelaySpec.scala
  30. +0 −1  src/test/scala/com/twitter/gizzard/nameserver/SqlShardSpec.scala
  31. +13 −15 src/test/scala/com/twitter/gizzard/proxy/LoggingProxySpec.scala
  32. +11 −10 src/test/scala/com/twitter/gizzard/scheduler_new/ReplicatingJobIntegrationSpec.scala
  33. +64 −71 src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala
View
29 src/main/scala/com/twitter/gizzard/GizzardServer.scala
@@ -7,6 +7,7 @@ import com.twitter.gizzard.nameserver.{NameServer, RemoteClusterManager}
import com.twitter.gizzard.scheduler._
import com.twitter.gizzard.config.{GizzardServer => ServerConfig}
import com.twitter.gizzard.proxy.LoggingProxy
+import com.twitter.gizzard.thrift.{ManagerService, JobInjectorService}
abstract class GizzardServer(config: ServerConfig) {
@@ -47,25 +48,35 @@ abstract class GizzardServer(config: ServerConfig) {
// service wiring
- lazy val managerServer = new thrift.ManagerService(nameServer, shardManager, adminJobManager, remoteClusterManager, jobScheduler)
- lazy val managerThriftServer = config.manager(new thrift.Manager.Processor(managerServer))
+ lazy val managerService = new ManagerService(
+ config.manager.name,
+ config.manager.port,
+ nameServer,
+ shardManager,
+ adminJobManager,
+ remoteClusterManager,
+ jobScheduler
+ )
- lazy val jobInjectorServer = new thrift.JobInjectorService(jobCodec, jobScheduler)
- lazy val jobInjectorThriftServer = config.jobInjector(new thrift.JobInjector.Processor(jobInjectorServer))
+ lazy val jobInjectorService = new JobInjectorService(
+ config.jobInjector.name,
+ config.jobInjector.port,
+ jobCodec,
+ jobScheduler
+ )
def startGizzard() {
nameServer.reload()
remoteClusterManager.reload()
jobScheduler.start()
-
- new Thread(new Runnable { def run() { managerThriftServer.serve() } }, "GizzardManagerThread").start()
- new Thread(new Runnable { def run() { jobInjectorThriftServer.serve() } }, "JobInjectorThread").start()
+ managerService.start()
+ jobInjectorService.start()
}
def shutdownGizzard(quiesce: Boolean) {
remoteClusterManager.closeRelay()
- managerThriftServer.stop()
- jobInjectorThriftServer.stop()
+ managerService.shutdown()
+ jobInjectorService.shutdown()
while (quiesce && jobScheduler.size > 0) Thread.sleep(100)
jobScheduler.shutdown()
View
17 src/main/scala/com/twitter/gizzard/config/Future.scala
@@ -1,17 +0,0 @@
-package com.twitter.gizzard.config
-
-import com.twitter.conversions.time._
-import com.twitter.gizzard
-
-
-class Future {
- var poolSize = 1
- var maxPoolSize = 1
- var keepAlive = 5.seconds
- var timeout = 1.second
-
- def apply(name: String) = {
- if (maxPoolSize < poolSize) maxPoolSize = poolSize
- new gizzard.util.Future(name, poolSize, maxPoolSize, keepAlive, timeout)
- }
-}
View
12 src/main/scala/com/twitter/gizzard/config/GizzardServer.scala
@@ -17,8 +17,8 @@ trait GizzardServer {
var mappingFunction: MappingFunction = Hash
var nameServerReplicas: Seq[Replica] = Seq(Memory)
var jobRelay: JobRelay = new JobRelay
- var manager: Manager = new Manager with TThreadServer
- var jobInjector: JobInjector = new JobInjector with THsHaServer
+ var manager: Manager = new Manager
+ var jobInjector: JobInjector = new JobInjector
var queryStats: StatsCollection = new StatsCollection { }
var jobStats: StatsCollection = new StatsCollection {
@@ -55,13 +55,13 @@ trait GizzardServer {
}
}
-trait Manager extends TServer {
- def name = "GizzardManager"
+class Manager {
+ var name = "GizzardManager"
var port = 7920
}
-trait JobInjector extends TServer {
- def name = "JobInjector"
+class JobInjector {
+ var name = "JobInjector"
var port = 7921
}
View
66 src/main/scala/com/twitter/gizzard/config/TServer.scala
@@ -1,66 +0,0 @@
-package com.twitter.gizzard.config
-
-import java.util.concurrent.ThreadPoolExecutor
-import com.twitter.util.Duration
-import com.twitter.conversions.time._
-import org.apache.thrift
-import com.twitter.gizzard
-
-class ThreadPool extends (String => ThreadPoolExecutor) {
- var stopTimeout = 60
- var minThreads = 1
- var maxThreads = 1
- var maxWaiters: Option[Int] = None
-
- def maxWaiters_=(n: Int) { maxWaiters = Some(n) }
-
- def apply(name: String): ThreadPoolExecutor = {
- if (maxThreads < minThreads) maxThreads = minThreads
-
- gizzard.thrift.TSelectorServer.makeThreadPoolExecutor(
- name,
- stopTimeout,
- minThreads,
- maxThreads,
- maxWaiters)
- }
-}
-
-trait TServer extends (thrift.TProcessor => thrift.server.TServer) {
- def name: String
- def port: Int
- var timeout = 100.milliseconds
- var idleTimeout = 60.seconds
- var threadPool = new ThreadPool
-
- def getPool = threadPool(name + "_thread_pool")
-
- def apply(processor: thrift.TProcessor): thrift.server.TServer
-}
-
-trait TSelectorServer extends TServer {
- def apply(processor: thrift.TProcessor) = {
- gizzard.thrift.TSelectorServer(name, port, processor, getPool, timeout, idleTimeout)
- }
-}
-
-trait TThreadServer extends TServer {
- def apply(processor: thrift.TProcessor) = {
- gizzard.thrift.TThreadServer(name, port, idleTimeout.inMillis.toInt, getPool, processor)
- }
-}
-
-trait THsHaServer extends TServer {
- def apply(processor: thrift.TProcessor) = {
- val transport = new thrift.transport.TNonblockingServerSocket(port, timeout.inMillis.toInt)
- val options = new thrift.server.TNonblockingServer.Options
- new thrift.server.THsHaServer(
- new thrift.TProcessorFactory(processor),
- transport,
- new thrift.transport.TFramedTransport.Factory(),
- new thrift.protocol.TBinaryProtocol.Factory(),
- new thrift.protocol.TBinaryProtocol.Factory(),
- getPool,
- options)
- }
-}
View
2  src/main/scala/com/twitter/gizzard/logging/ExceptionJsonFormatter.scala
@@ -12,7 +12,7 @@ import com.twitter.gizzard.util.Json
*/
class ExceptionJsonFormatter extends Formatter {
private def throwableToMap(wrapped: Throwable): Map[String, Any] = {
- var pairs = List(
+ var pairs: List[(String, Any)] = List(
("class" -> wrapped.getClass().getName()),
("trace" -> wrapped.getStackTrace().map(_.toString()))
)
View
12 src/main/scala/com/twitter/gizzard/nameserver/JobRelay.scala
@@ -84,18 +84,12 @@ extends (Iterable[Array[Byte]] => Unit) {
.name("JobManagerClient")
.reportTo(new OstrichStatsReceiver)
.build()
- val client = new JobInjector.ServiceToClient(service, new TBinaryProtocol.Factory())
+ val client = new JobInjector.FinagledClient(service)
def apply(jobs: Iterable[Array[Byte]]) {
- val jobList = new JLinkedList[ThriftJob]()
+ val thriftJobs = jobs map { j => new ThriftJob(priority, ByteBuffer.wrap(j), Some(true)) } toSeq
- jobs.foreach { j =>
- val tj = new ThriftJob(priority, ByteBuffer.wrap(j))
- tj.setIs_replicated(true)
- jobList.add(tj)
- }
-
- client.inject_jobs(jobList)()
+ client.injectJobs(thriftJobs)()
}
def close() {
View
19 src/main/scala/com/twitter/gizzard/nameserver/NameServerState.scala
@@ -1,24 +1,9 @@
package com.twitter.gizzard.nameserver
-import scala.collection.mutable.ListBuffer
-import scala.collection.JavaConversions._
import com.twitter.gizzard.shards.{ShardId, ShardInfo, LinkInfo}
-import com.twitter.gizzard.thrift.{NameServerState => ThriftNameServerState}
-import com.twitter.gizzard.thrift.conversions.ShardInfo._
-import com.twitter.gizzard.thrift.conversions.LinkInfo._
-import com.twitter.gizzard.thrift.conversions.Forwarding._
-import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.gizzard.util.TreeUtils
-
-case class NameServerState(shards: List[ShardInfo], links: List[LinkInfo], forwardings: List[Forwarding], tableId: Int) {
- def toThrift = {
- val thriftForwardings = forwardings.map(_.toThrift)
- val thriftLinks = links.map(_.toThrift)
- val thriftShards = shards.map(_.toThrift)
- new ThriftNameServerState(thriftShards, thriftLinks, thriftForwardings, tableId)
- }
-}
+case class NameServerState(shards: Seq[ShardInfo], links: Seq[LinkInfo], forwardings: Seq[Forwarding], tableId: Int)
object NameServerState {
import TreeUtils._
@@ -32,6 +17,6 @@ object NameServerState {
val links = descendantLinks(forwardings.map(_.shardId))(linksByUpId)
val shards = (forwardings.map(_.shardId) ++ links.map(_.downId)).map(shardsById)
- NameServerState(shards.toList, links.toList, forwardings.toList, tableId)
+ NameServerState(shards.toSeq, links.toSeq, forwardings.toSeq, tableId)
}
}
View
21 src/main/scala/com/twitter/gizzard/scheduler/JsonJob.scala
@@ -1,5 +1,6 @@
package com.twitter.gizzard.scheduler
+import com.twitter.util.Future
import com.twitter.ostrich.stats.{StatsProvider, W3CStats}
import com.twitter.logging.Logger
import com.twitter.gizzard.proxy.LoggingProxy
@@ -44,6 +45,26 @@ trait JsonJob {
}
/**
+ * A JsonJob designed to work with futures. Implementations should override applyFuture and return
+ * Future signalling job completion instead of blocking.
+ */
+trait AsyncJsonJob extends JsonJob {
+ def applyFuture(): Future[Unit]
+
+ def apply() {
+ // todo. this should more intelligently deal with stuff...
+ val f = try {
+ applyFuture()
+ } catch {
+ case e => Future.exception(e)
+ }
+
+ f.apply()
+ }
+}
+
+
+/**
* A NestedJob that can be encoded in json.
*/
class JsonNestedJob(jobs: Iterable[JsonJob]) extends NestedJob(jobs) with JsonJob {
View
4 src/main/scala/com/twitter/gizzard/scheduler/ReplicatingJob.scala
@@ -1,13 +1,10 @@
package com.twitter.gizzard.scheduler
-import java.util.{LinkedList => JLinkedList}
-import java.nio.ByteBuffer
import scala.collection.mutable.Queue
import scala.util.matching.Regex
import com.twitter.ostrich.stats.StatsProvider
import com.twitter.logging.Logger
import com.twitter.util.Duration
-import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.gizzard.nameserver.JobRelay
import com.twitter.gizzard.proxy.LoggingProxy
@@ -116,4 +113,3 @@ extends JsonJobParser {
new ReplicatingJob(relay, tasks, clusters, serialized)
}
}
-
View
1  src/main/scala/com/twitter/gizzard/shards/NodeSet.scala
@@ -121,6 +121,7 @@ trait NodeIterable[+T] {
case Throw(e) => _futureAny(iter, promise, f)
}
} else {
+ // XXX: this is incorrect. If e exists at any time, ShardOfflineException masks logical errors.
promise.setException(new ShardOfflineException(rootInfo.id))
}
}
View
3  src/main/scala/com/twitter/gizzard/test/Reset.scala
@@ -1,6 +1,5 @@
package com.twitter.gizzard.test
-import org.specs.Specification
import com.twitter.querulous.query.SqlQueryFactory
import com.twitter.querulous.evaluator.{StandardQueryEvaluatorFactory, QueryEvaluator}
import com.twitter.util.Time
@@ -38,7 +37,7 @@ import com.twitter.gizzard.config
// }
// }
-trait NameServerDatabase extends Specification {
+trait NameServerDatabase {
def materialize(cfg: config.GizzardServer) {
try {
cfg.nameServerReplicas.map {
View
21 src/main/scala/com/twitter/gizzard/thrift/JobInjectorService.scala
@@ -1,16 +1,16 @@
package com.twitter.gizzard.thrift
-import scala.collection.JavaConversions._
-import java.util.{List => JList}
-import com.twitter.gizzard.thrift.conversions.Sequences._
+import java.nio.ByteBuffer
+import com.twitter.util.Future
import com.twitter.gizzard.thrift.{Job => ThriftJob}
import com.twitter.gizzard.scheduler._
-
class JobInjectorService(
+ val serverName: String,
+ val thriftPort: Int,
codecParam: JsonCodec,
scheduler: PrioritizingJobScheduler)
-extends JobInjector.Iface {
+extends JobInjector.ThriftServer {
private val codec = codecParam.innerCodec
@@ -36,12 +36,17 @@ extends JobInjector.Iface {
}
}
- def inject_jobs(jobs: JList[ThriftJob]) {
+ def injectJobs(jobs: Seq[ThriftJob]) = {
jobs foreach { j =>
- var job: JsonJob = new InjectedJsonJob(j.getContents())
- if (j.is_replicated) job = new ReplicatedJob(List(job))
+ val bytes = new Array[Byte](j.contents.remaining)
+ j.contents.get(bytes)
+
+ var job: JsonJob = new InjectedJsonJob(bytes)
+ if (j.isReplicated getOrElse false) job = new ReplicatedJob(List(job))
scheduler.put(j.priority, job)
}
+
+ Future.Done
}
}
View
197 src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala
@@ -1,172 +1,189 @@
package com.twitter.gizzard.thrift
-import java.util.{List => JList}
-import scala.reflect.Manifest
-import scala.collection.JavaConversions._
-import com.twitter.gizzard.thrift.conversions.Sequences._
-import com.twitter.gizzard.thrift.conversions.Busy._
-import com.twitter.gizzard.thrift.conversions.LinkInfo._
-import com.twitter.gizzard.thrift.conversions.ShardId._
-import com.twitter.gizzard.thrift.conversions.ShardInfo._
-import com.twitter.gizzard.thrift.conversions.Forwarding._
-import com.twitter.gizzard.thrift.conversions.Host._
+import com.twitter.logging.Logger
+import com.twitter.util.Future
import com.twitter.gizzard.shards._
import com.twitter.gizzard.scheduler._
import com.twitter.gizzard.nameserver._
-import com.twitter.logging.Logger
-
+import com.twitter.gizzard.thrift.conversions._
class ManagerService(
+ val serverName: String,
+ val thriftPort: Int,
nameServer: NameServer,
shardManager: ShardManager,
adminJobManager: AdminJobManager,
remoteClusterManager: RemoteClusterManager,
scheduler: PrioritizingJobScheduler)
-extends Manager.Iface {
+extends Manager.ThriftServer {
- val log = Logger.get(getClass.getName)
+ def wrapEx[A](f: => Future[A]) = {
+ val rv = try { f } catch { case e => Future.exception(e) }
+
+ rv rescue {
+ case e => {
+ log.error(e, "Exception in Gizzard ManagerService: %s", e)
+ Future.exception(new GizzardException(e.getMessage))
+ }
+ }
+ }
- def wrapEx[A](f: => A): A = try { f } catch {
- case ex: Throwable =>
- log.error(ex, "Exception in Gizzard ManagerService: %s", ex)
- throw new GizzardException(ex.getMessage)
+ def reloadUpdatedForwardings() = wrapEx {
+ nameServer.reloadUpdatedForwardings()
+ Future.Done
}
- def reload_updated_forwardings() = wrapEx {
- nameServer.reloadUpdatedForwardings() }
- def reload_config() = wrapEx {
+ def reloadConfig() = wrapEx {
nameServer.reload()
remoteClusterManager.reload()
+ Future.Done
}
- def find_current_forwarding(tableId: Int, id: Long) = {
- wrapEx(nameServer.findCurrentForwarding(tableId, id).shardInfo.toThrift)
+ def findCurrentForwarding(tableId: Int, id: Long) = wrapEx {
+ Future(nameServer.findCurrentForwarding(tableId, id).shardInfo)
}
// Shard Tree Management
// XXX: must be nameserver, in order to materialize. odd exception
- def create_shard(shard: ShardInfo) = wrapEx(shardManager.createAndMaterializeShard(shard.fromThrift))
+ def createShard(shard: ShardInfo) = wrapEx {
+ Future(shardManager.createAndMaterializeShard(shard))
+ }
- def delete_shard(id: ShardId) = wrapEx(shardManager.deleteShard(id.fromThrift))
+ def deleteShard(id: ShardId) = wrapEx {
+ Future(shardManager.deleteShard(id))
+ }
- def add_link(upId: ShardId, downId: ShardId, weight: Int) = {
- wrapEx(shardManager.addLink(upId.fromThrift, downId.fromThrift, weight))
+ def addLink(upId: ShardId, downId: ShardId, weight: Int) = wrapEx {
+ Future(shardManager.addLink(upId, downId, weight))
}
- def remove_link(upId: ShardId, downId: ShardId) = {
- wrapEx(shardManager.removeLink(upId.fromThrift, downId.fromThrift))
+
+ def removeLink(upId: ShardId, downId: ShardId) = wrapEx {
+ Future(shardManager.removeLink(upId, downId))
}
- def set_forwarding(forwarding: Forwarding) = {
- wrapEx(shardManager.setForwarding(forwarding.fromThrift))
+ def setForwarding(forwarding: Forwarding) = wrapEx {
+ Future(shardManager.setForwarding(forwarding))
}
- def replace_forwarding(oldId: ShardId, newId: ShardId) = {
- wrapEx(shardManager.replaceForwarding(oldId.fromThrift, newId.fromThrift))
+
+ def replaceForwarding(oldId: ShardId, newId: ShardId) = wrapEx {
+ Future(shardManager.replaceForwarding(oldId, newId))
}
- def remove_forwarding(forwarding: Forwarding) = {
- wrapEx(shardManager.removeForwarding(forwarding.fromThrift))
+
+ def removeForwarding(forwarding: Forwarding) = wrapEx {
+ Future(shardManager.removeForwarding(forwarding))
}
- def get_shard(id: ShardId): ShardInfo = {
- wrapEx(shardManager.getShard(id.fromThrift).toThrift)
+ def getShard(id: ShardId) = wrapEx {
+ Future(shardManager.getShard(id))
}
- def shards_for_hostname(hostname: String): JList[ShardInfo] = {
- wrapEx(shardManager.shardsForHostname(hostname).map(_.toThrift))
+
+ def shardsForHostname(hostname: String) = wrapEx {
+ Future(shardManager.shardsForHostname(hostname))
}
- def get_busy_shards(): JList[ShardInfo] = {
- wrapEx(shardManager.getBusyShards().map(_.toThrift))
+
+ def getBusyShards() = wrapEx {
+ Future(shardManager.getBusyShards())
}
- def list_upward_links(id: ShardId): JList[LinkInfo] = {
- wrapEx(shardManager.listUpwardLinks(id.fromThrift).map(_.toThrift))
+ def listUpwardLinks(id: ShardId) = wrapEx {
+ Future(shardManager.listUpwardLinks(id))
}
- def list_downward_links(id: ShardId): JList[LinkInfo] = {
- wrapEx(shardManager.listDownwardLinks(id.fromThrift).map(_.toThrift))
+
+ def listDownwardLinks(id: ShardId) = wrapEx {
+ Future(shardManager.listDownwardLinks(id))
}
- def get_forwarding(tableId: Int, baseId: Long) = {
- wrapEx(shardManager.getForwarding(tableId, baseId).toThrift)
+ def getForwarding(tableId: Int, baseId: Long) = wrapEx {
+ Future(shardManager.getForwarding(tableId, baseId))
}
- def get_forwarding_for_shard(id: ShardId) = {
- wrapEx(shardManager.getForwardingForShard(id.fromThrift).toThrift)
+ def getForwardingForShard(id: ShardId) = wrapEx {
+ Future(shardManager.getForwardingForShard(id))
}
- def get_forwardings(): JList[Forwarding] = {
- wrapEx(shardManager.getForwardings().map(_.toThrift))
+
+ def getForwardings() = wrapEx {
+ Future(shardManager.getForwardings())
}
- def list_hostnames() = wrapEx(shardManager.listHostnames)
+ def listHostnames() = wrapEx {
+ Future(shardManager.listHostnames)
+ }
- def mark_shard_busy(id: ShardId, busy: Int) = {
- wrapEx(shardManager.markShardBusy(id.fromThrift, busy.fromThrift))
+ def markShardBusy(id: ShardId, busy: Int) = wrapEx {
+ Future(shardManager.markShardBusy(id, busy))
}
- def list_tables(): JList[java.lang.Integer] = wrapEx(shardManager.listTables)
+ def listTables() = wrapEx {
+ Future(shardManager.listTables)
+ }
- def dump_nameserver(tableIds: JList[java.lang.Integer]) = wrapEx(shardManager.dumpStructure(tableIds.toList).map(_.toThrift))
+ def dumpNameserver(tableIds: Seq[Int]) = wrapEx {
+ Future(shardManager.dumpStructure(tableIds))
+ }
- def copy_shard(shardIds: JList[ShardId]) = {
- wrapEx(adminJobManager.scheduleCopyJob(shardIds.toList.map(_.asInstanceOf[ShardId].fromThrift)))
+ def copyShard(shardIds: Seq[ShardId]) = wrapEx {
+ Future(adminJobManager.scheduleCopyJob(shardIds))
}
- def repair_shard(shardIds: JList[ShardId]) = {
- wrapEx(adminJobManager.scheduleRepairJob(shardIds.toList.map(_.asInstanceOf[ShardId].fromThrift)))
+ def repairShard(shardIds: Seq[ShardId]) = wrapEx {
+ Future(adminJobManager.scheduleRepairJob(shardIds))
}
- def diff_shards(shardIds: JList[ShardId]) = {
- wrapEx(adminJobManager.scheduleDiffJob(shardIds.toList.map(_.asInstanceOf[ShardId].fromThrift)))
+ def diffShards(shardIds: Seq[ShardId]) = wrapEx {
+ Future(adminJobManager.scheduleDiffJob(shardIds))
}
// Job Scheduler Management
- def retry_errors() = wrapEx(scheduler.retryErrors())
- def stop_writes() = wrapEx(scheduler.pause())
- def resume_writes() = wrapEx(scheduler.resume())
+ def retryErrors() = wrapEx(Future(scheduler.retryErrors()))
+ def stopWrites() = wrapEx(Future(scheduler.pause()))
+ def resumeWrites() = wrapEx(Future(scheduler.resume()))
- def retry_errors_for(priority: Int) = wrapEx(scheduler(priority).retryErrors())
- def stop_writes_for(priority: Int) = wrapEx(scheduler(priority).pause())
- def resume_writes_for(priority: Int) = wrapEx(scheduler(priority).resume())
- def is_writing(priority: Int) = wrapEx(!scheduler(priority).isShutdown)
- def queue_size(priority: Int) = wrapEx(scheduler(priority).size)
- def error_queue_size(priority: Int) = wrapEx(scheduler(priority).errorSize)
+ def retryErrorsFor(priority: Int) = wrapEx(Future(scheduler(priority).retryErrors()))
+ def stopWritesFor(priority: Int) = wrapEx(Future(scheduler(priority).pause()))
+ def resumeWritesFor(priority: Int) = wrapEx(Future(scheduler(priority).resume()))
+ def isWriting(priority: Int) = wrapEx(Future(!scheduler(priority).isShutdown))
+ def queueSize(priority: Int) = wrapEx(Future(scheduler(priority).size))
+ def errorQueueSize(priority: Int) = wrapEx(Future(scheduler(priority).errorSize))
- def add_fanout(suffix: String) = wrapEx(scheduler.addFanout(suffix))
- def remove_fanout(suffix: String) = wrapEx(scheduler.removeFanout(suffix))
- def list_fanout() = wrapEx(scheduler.listFanout().toList)
+ def addFanout(suffix: String) = wrapEx(Future(scheduler.addFanout(suffix)))
+ def removeFanout(suffix: String) = wrapEx(Future(scheduler.removeFanout(suffix)))
+ def listFanout() = wrapEx(Future(scheduler.listFanout().toSeq))
- def add_fanout_for(priority: Int, suffix: String) = wrapEx(scheduler(priority).addFanout(suffix))
- def remove_fanout_for(priority: Int, suffix: String) = wrapEx(scheduler(priority).removeFanout(suffix))
+ def addFanoutFor(priority: Int, suffix: String) = wrapEx(Future(scheduler(priority).addFanout(suffix)))
+ def removeFanoutFor(priority: Int, suffix: String) = wrapEx(Future(scheduler(priority).removeFanout(suffix)))
// Remote Host Cluster Management
- def add_remote_host(host: Host) = {
- wrapEx(remoteClusterManager.addRemoteHost(host.fromThrift))
+ def addRemoteHost(host: Host) = wrapEx {
+ Future(remoteClusterManager.addRemoteHost(host))
}
- def remove_remote_host(hostname: String, port: Int) = {
- wrapEx(remoteClusterManager.removeRemoteHost(hostname, port))
+ def removeRemoteHost(hostname: String, port: Int) = wrapEx {
+ Future(remoteClusterManager.removeRemoteHost(hostname, port))
}
- def set_remote_host_status(hostname: String, port: Int, status: HostStatus) = {
- wrapEx(remoteClusterManager.setRemoteHostStatus(hostname, port, status.fromThrift))
+ def setRemoteHostStatus(hostname: String, port: Int, status: HostStatus) = wrapEx {
+ Future(remoteClusterManager.setRemoteHostStatus(hostname, port, status))
}
- def set_remote_cluster_status(cluster: String, status: HostStatus) = {
- wrapEx(remoteClusterManager.setRemoteClusterStatus(cluster, status.fromThrift))
+ def setRemoteClusterStatus(cluster: String, status: HostStatus) = wrapEx {
+ Future(remoteClusterManager.setRemoteClusterStatus(cluster, status))
}
- def get_remote_host(hostname: String, port: Int) = {
- wrapEx(remoteClusterManager.getRemoteHost(hostname, port).toThrift)
+ def getRemoteHost(hostname: String, port: Int) = wrapEx {
+ Future(remoteClusterManager.getRemoteHost(hostname, port))
}
- def list_remote_clusters(): JList[String] = wrapEx(remoteClusterManager.listRemoteClusters)
- def list_remote_hosts(): JList[Host] = wrapEx(remoteClusterManager.listRemoteHosts.map(_.toThrift))
+ def listRemoteClusters() = wrapEx(Future(remoteClusterManager.listRemoteClusters))
+ def listRemoteHosts() = wrapEx(Future(remoteClusterManager.listRemoteHosts))
- def list_remote_hosts_in_cluster(cluster: String): JList[Host] = {
- wrapEx(remoteClusterManager.listRemoteHosts.map(_.toThrift))
+ def listRemoteHostsInCluster(cluster: String) = wrapEx {
+ Future(remoteClusterManager.listRemoteHosts)
}
}
View
255 src/main/scala/com/twitter/gizzard/thrift/TSelectorServer.scala
@@ -1,255 +0,0 @@
-package com.twitter.gizzard.thrift
-
-import java.io.IOException
-import java.net.InetSocketAddress
-import java.nio.channels._
-import java.util.concurrent._
-import scala.collection.mutable
-import scala.collection.JavaConversions._
-import org.apache.thrift._
-import org.apache.thrift.protocol._
-import org.apache.thrift.transport._
-import org.apache.thrift.server._
-import com.twitter.util.{Duration, Time}
-import com.twitter.conversions.time._
-import com.twitter.logging.Logger
-import com.twitter.gizzard.Stats
-import com.twitter.gizzard.util.NamedPoolThreadFactory
-
-
-object TSelectorServer {
- val log = Logger.get(getClass.getName)
-
- val cache = new mutable.HashMap[String, ThreadPoolExecutor]()
-
- def makeThreadPoolExecutor(name: String, stopTimeout: Int, minThreads: Int, maxThreads: Int, maxWaiters: Option[Int]): ThreadPoolExecutor = {
- cache.get(name) foreach { executor =>
- if (!executor.isShutdown()) {
- return executor
- }
- cache.remove(name)
- }
-
- val queue = maxWaiters.map { new LinkedBlockingQueue[Runnable](_) } getOrElse { new LinkedBlockingQueue[Runnable] }
- val executor = new ThreadPoolExecutor(minThreads, maxThreads, stopTimeout, TimeUnit.SECONDS,
- queue, new NamedPoolThreadFactory(name))
- Stats.addGauge("thrift-" + name + "-worker-threads") { executor.getPoolSize().toDouble }
- Stats.addGauge("thrift-" + name + "-queue-size") { executor.getQueue().size() }
- cache(name) = executor
- executor
- }
-
- def apply(name: String, port: Int, processor: TProcessor, executor: ThreadPoolExecutor,
- timeout: Duration, idleTimeout: Duration): TSelectorServer = {
- val socket = ServerSocketChannel.open()
- socket.socket().setReuseAddress(true)
- socket.socket().bind(new InetSocketAddress(port), 8192)
- log.info("Starting %s (%s) on port %d", name, processor.getClass.getName, port)
- new TSelectorServer(name, processor, socket, executor, timeout, idleTimeout)
- }
-}
-
-class TSelectorServer(name: String, processor: TProcessor, serverSocket: ServerSocketChannel,
- executor: ThreadPoolExecutor, timeout: Duration, idleTimeout: Duration)
- extends TServer(null, null) {
- val log = Logger.get(getClass.getName)
-
- val processorFactory = new TProcessorFactory(processor)
- val inputTransportFactory = new TTransportFactory()
- val outputTransportFactory = new TTransportFactory()
- val inputProtocolFactory = new TBinaryProtocol.Factory(true, true)
- val outputProtocolFactory = new TBinaryProtocol.Factory(true, true)
-
- val clientTimeout = 0
-
- @volatile private var running = false
- var selectorThread: Thread = null
-
- case class Client(socketChannel: SocketChannel, processor: TProcessor, inputProtocol: TProtocol,
- outputProtocol: TProtocol, var activity: Time)
- val clientMap = new mutable.HashMap[SelectableChannel, Client]
- val registerQueue = new ConcurrentLinkedQueue[SocketChannel]
-
- Stats.addGauge("thrift-" + name + "-connections") { clientMap.synchronized { clientMap.size } }
-
- def isRunning = running
-
- def execute(f: => Unit)(onTimeout: => Unit) {
- executor.execute(new Runnable() {
- val startTime = Time.now
-
- def run() {
- if (Time.now - startTime > timeout) {
- Stats.incr("thrift-" + name + "-timeout")
- onTimeout
- } else {
- f
- }
- }
- })
- }
-
- def serve() {
- try {
- serverSocket.socket().setSoTimeout(0)
- } catch {
- case e: IOException => log.warning(e, "Could not set socket timeout.")
- }
-
- selectorThread = new SelectorThread()
- selectorThread.start()
- }
-
- def shutdown() {
- if ((selectorThread ne null) && selectorThread.isAlive()) {
- running = false
- selectorThread.join()
- try {
- serverSocket.close()
- } catch {
- case _ =>
- }
- }
- executor.shutdown()
- while (!executor.isTerminated()) {
- log.info("Waiting for thread-pool executor...")
- try {
- executor.awaitTermination(1, TimeUnit.SECONDS)
- } catch {
- case e: InterruptedException =>
- }
- }
- }
-
-
- class SelectorThread extends Thread("SelectorThread") {
- val selector = Selector.open()
- serverSocket.configureBlocking(false)
- serverSocket.register(selector, SelectionKey.OP_ACCEPT)
-
- var lastScan = Time.now
-
- override def run() {
- running = true
- var errorCount = 0
- while (running) {
- try {
- select()
- errorCount = 0
- } catch {
- case e: IOException =>
- log.error(e, "I/O exception in select: %s", e)
- errorCount += 1
- if (errorCount > 10) {
- log.error(e, "Too many select errors. Dying...")
- // a server with an open thrift-server socket but no thread to handle connections is useless.
- System.exit(1)
- }
- case e: Exception =>
- log.error(e, "Unexpected exception! Dying...")
- System.exit(1)
- }
- }
- }
-
- def select() {
- var channel = registerQueue.poll()
- while (channel ne null) {
- channel.configureBlocking(false)
- channel.register(selector, SelectionKey.OP_READ)
- channel = registerQueue.poll()
- }
-
- // kill off any idle sockets
- if (Time.now - lastScan >= 1.second) {
- lastScan = Time.now
- val toRemove = new mutable.ListBuffer[SelectableChannel]
- clientMap.synchronized {
- for ((socket, client) <- clientMap) {
- if (lastScan - client.activity > idleTimeout) {
- toRemove += socket
- }
- }
- toRemove.foreach { socket =>
- val key = socket.keyFor(selector)
- if (key ne null) {
- key.cancel()
- closeSocket(socket)
- }
- }
- }
- }
-
- selector.select(100)
-
- for (key <- selector.selectedKeys) {
- if (key.isAcceptable()) {
- // there's only one listen socket for now.
- val clientSocket = serverSocket.accept()
-// clientSocket.socket().setTcpNoDelay(true)
- clientSocket.configureBlocking(false)
- clientSocket.register(selector, SelectionKey.OP_READ)
- addSession(clientSocket)
- } else {
- key.cancel()
- execute {
- val client = clientMap.synchronized { clientMap(key.channel) }
- client.activity = Time.now
- try {
- client.socketChannel.configureBlocking(true)
- client.processor.process(client.inputProtocol, client.outputProtocol)
- Stats.incr("thrift-" + name + "-calls")
- registerQueue.add(client.socketChannel)
- selector.wakeup()
- } catch {
- case e: TTransportException =>
- // session ends
- closeSocket(client.socketChannel)
- case e: Throwable =>
- log.error(e, "Exception in client processor")
- closeSocket(client.socketChannel)
- }
- } {
- // if the job spent too long waiting for a thread:
- val client = clientMap.synchronized { clientMap(key.channel) }
- log.debug("Killing session (enqueued too long): %s", client.socketChannel)
- try {
- client.socketChannel.configureBlocking(true)
- new TApplicationException("server is too busy").write(client.outputProtocol)
- } finally {
- closeSocket(client.socketChannel)
- }
- }
- }
- }
- selector.selectedKeys.clear()
- selector.selectNow()
- }
-
- def addSession(clientSocket: SocketChannel) {
- val transport = new TSocket(clientSocket.socket())
- transport.setTimeout(clientTimeout)
- log.debug("Start of session: %s", clientSocket)
-
- // thrift gibberish.
- val processor = processorFactory.getProcessor(transport)
- val inputProtocol = inputProtocolFactory.getProtocol(inputTransportFactory.getTransport(transport))
- val outputProtocol = outputProtocolFactory.getProtocol(inputTransportFactory.getTransport(transport))
-
- clientMap.synchronized {
- clientMap(clientSocket) = Client(clientSocket, processor, inputProtocol, outputProtocol,
- Time.now)
- }
- }
-
- def closeSocket(socket: SelectableChannel) {
- log.debug("End of session: %s", socket)
- try {
- socket.close()
- } catch {
- case _ =>
- }
- clientMap.synchronized { clientMap -= socket }
- }
- }
-}
View
139 src/main/scala/com/twitter/gizzard/thrift/TThreadServer.scala
@@ -1,139 +0,0 @@
-package com.twitter.gizzard
-package thrift
-
-import java.net.{ServerSocket, Socket, SocketTimeoutException}
-import java.util.concurrent.{CountDownLatch, ExecutorService, SynchronousQueue, ThreadPoolExecutor, TimeUnit}
-import org.apache.thrift.{TProcessor, TProcessorFactory}
-import org.apache.thrift.protocol.{TBinaryProtocol, TProtocol, TProtocolFactory}
-import org.apache.thrift.server.TServer
-import org.apache.thrift.transport._
-import com.twitter.ostrich.stats.Stats
-import com.twitter.logging.Logger
-import com.twitter.gizzard.util.NamedPoolThreadFactory
-
-
-object TThreadServer {
- private val MIN_THREADS = 5
-
- def apply(name: String, port: Int, idleTimeout: Int, executor: ExecutorService,
- processor: TProcessor, framed: Boolean): TThreadServer = {
- new TThreadServer(name, port, idleTimeout, executor, new TProcessorFactory(processor),
- if (framed) new TFramedTransport.Factory else new TTransportFactory(),
- new TBinaryProtocol.Factory())
- }
-
- def apply(name: String, port: Int, idleTimeout: Int, executor: ExecutorService,
- processor: TProcessor): TThreadServer = {
- TThreadServer(name, port, idleTimeout, executor, processor, false)
- }
-
- def apply(name: String, port: Int, idleTimeout: Int, processor: TProcessor): TThreadServer = {
- TThreadServer(name, port, idleTimeout, makeThreadPool(name, MIN_THREADS), processor, false)
- }
-
- def makeThreadPool(name: String, minThreads: Int): ExecutorService = {
- val queue = new SynchronousQueue[Runnable]
- val executor = new ThreadPoolExecutor(minThreads, Int.MaxValue, 60, TimeUnit.SECONDS, queue,
- new NamedPoolThreadFactory(name))
-
- Stats.addGauge("thrift-" + name + "-worker-threads") { executor.getPoolSize().toDouble }
-
- executor
- }
-}
-
-/*
- * Various improvements to the libthrift TThreadPoolServer:
- *
- * Each connection gets its own thread from a thread pool, with an idle timeout. You can pass your
- * own ExecutorService. On shutdown, it'll wait up to 5 seconds for all the worker threads to
- * finish up and realize they're dead. (Unlike the libthrift version, which can't be reliably
- * shut down.)
- */
-class TThreadServer(name: String, port: Int, idleTimeout: Int,
- val executor: ExecutorService,
- processorFactory: TProcessorFactory,
- transportFactory: TTransportFactory,
- protocolFactory: TProtocolFactory)
- extends TServer(processorFactory, null, transportFactory, transportFactory,
- protocolFactory, protocolFactory) {
- private val log = Logger(getClass.getName)
-
- private val ACCEPT_TIMEOUT = 1000
- private val SHUTDOWN_TIMEOUT = 5000
-
- @volatile var running = false
- private val deathSwitch = new CountDownLatch(1)
-
- def start() {
- new Thread(name) {
- override def run() {
- serve()
- }
- }.start()
- }
-
- def serve() {
- log.info("Starting thrift service %s on port %d.", name, port)
-
- val serverSocket = new ServerSocket(port)
- serverSocket.setReuseAddress(true)
- serverSocket.setSoTimeout(ACCEPT_TIMEOUT)
-
- running = true
-
- while (running) {
- try {
- val client = serverSocket.accept()
- client.setSoTimeout(idleTimeout)
- executor.execute(new Runnable() {
- def run() {
- try {
- process(client)
- } catch {
- case x: Exception =>
- log.debug(x, "Client died prematurely: %s", x)
- }
- }
- })
- } catch {
- case x: SocketTimeoutException =>
- // ignore
- case x: Exception =>
- log.error(x, "Error occurred during accept: %s", x)
- running = false
- }
- }
-
- log.info("Shutting down thrift service %s on port %d.", name, port)
-
- serverSocket.close()
- executor.shutdown()
- executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)
- executor.shutdownNow()
-
- log.info("Finished shutting down service %s.", name)
- deathSwitch.countDown()
- }
-
- override def stop() {
- if (running) {
- running = false
- deathSwitch.await()
- }
- }
-
- private def process(client: Socket) {
- val transport = new TSocket(client)
- val processor = processorFactory.getProcessor(transport)
- val protocol = protocolFactory.getProtocol(transportFactory.getTransport(transport))
- while (running && processor.process(protocol, protocol)) {
- // ...
- }
- try {
- client.close()
- } catch {
- case _ =>
- }
- }
-}
View
118 src/main/scala/com/twitter/gizzard/thrift/conversions.scala
@@ -0,0 +1,118 @@
+package com.twitter.gizzard.thrift
+
+import com.twitter.gizzard.thrift
+import com.twitter.gizzard.shards
+import com.twitter.gizzard.nameserver
+
+package object conversions {
+ implicit def shardIdToThrift(id: shards.ShardId): thrift.ShardId = {
+ thrift.ShardId(id.hostname, id.tablePrefix)
+ }
+
+ implicit def shardIdFromThrift(id: thrift.ShardId): shards.ShardId = {
+ shards.ShardId(id.hostname, id.tablePrefix)
+ }
+
+ implicit def shardIdSeqToThrift(ids: Seq[shards.ShardId]): Seq[thrift.ShardId] = {
+ ids map shardIdToThrift
+ }
+
+ implicit def shardIdSeqFromThrift(ids: Seq[thrift.ShardId]): Seq[shards.ShardId] = {
+ ids map shardIdFromThrift
+ }
+
+
+ implicit def shardInfoToThrift(info: shards.ShardInfo): thrift.ShardInfo = {
+ thrift.ShardInfo(info.id, info.className, info.sourceType, info.destinationType, info.busy)
+ }
+
+ implicit def shardInfoFromThrift(info: thrift.ShardInfo): shards.ShardInfo = {
+ shards.ShardInfo(info.id, info.className, info.sourceType, info.destinationType, info.busy)
+ }
+
+ implicit def shardInfoSeqToThrift(infos: Seq[shards.ShardInfo]): Seq[thrift.ShardInfo] = {
+ infos map shardInfoToThrift
+ }
+
+ implicit def shardInfoSeqFromThrift(infos: Seq[thrift.ShardInfo]): Seq[shards.ShardInfo] = {
+ infos map shardInfoFromThrift
+ }
+
+
+ implicit def intToBusy(b: Int): shards.Busy.Value = shards.Busy(b)
+
+ implicit def busyToInt(b: shards.Busy.Value): Int = b.id
+
+
+ implicit def forwardingToThrift(forwarding: nameserver.Forwarding): thrift.Forwarding = {
+ thrift.Forwarding(forwarding.tableId, forwarding.baseId, forwarding.shardId)
+ }
+
+ implicit def forwardingFromThrift(forwarding: thrift.Forwarding): nameserver.Forwarding = {
+ nameserver.Forwarding(forwarding.tableId, forwarding.baseId, forwarding.shardId)
+ }
+
+ implicit def forwardingSeqToThrift(forwardings: Seq[nameserver.Forwarding]): Seq[thrift.Forwarding] = {
+ forwardings map forwardingToThrift
+ }
+
+ implicit def forwardingSeqFromThrift(forwardings: Seq[thrift.Forwarding]): Seq[nameserver.Forwarding] = {
+ forwardings map forwardingFromThrift
+ }
+
+
+ implicit def linkInfoToThrift(link: shards.LinkInfo): thrift.LinkInfo = {
+ thrift.LinkInfo(link.upId, link.downId, link.weight)
+ }
+
+ implicit def linkInfoFromThrift(link: thrift.LinkInfo): shards.LinkInfo = {
+ shards.LinkInfo(link.upId, link.downId, link.weight)
+ }
+
+ implicit def linkInfoSeqToThrift(links: Seq[shards.LinkInfo]): Seq[thrift.LinkInfo] = {
+ links map linkInfoToThrift
+ }
+
+ implicit def linkInfoSeqFromThrift(links: Seq[thrift.LinkInfo]): Seq[shards.LinkInfo] = {
+ links map linkInfoFromThrift
+ }
+
+
+ implicit def nameServerStateToThrift(state: nameserver.NameServerState): thrift.NameServerState = {
+ thrift.NameServerState(state.shards, state.links, state.forwardings, state.tableId)
+ }
+
+ implicit def nameServerStateFromThrift(state: thrift.NameServerState): nameserver.NameServerState = {
+ nameserver.NameServerState(state.shards, state.links, state.forwardings, state.tableId)
+ }
+
+ implicit def nameServerStateSeqToThrift(states: Seq[nameserver.NameServerState]): Seq[thrift.NameServerState] = {
+ states map nameServerStateToThrift
+ }
+
+ implicit def nameServerStateSeqFromThrift(states: Seq[thrift.NameServerState]): Seq[nameserver.NameServerState] = {
+ states map nameServerStateFromThrift
+ }
+
+
+ implicit def hostToThrift(host: nameserver.Host): thrift.Host = {
+ thrift.Host(host.hostname, host.port, host.cluster, host.status)
+ }
+
+ implicit def hostFromThrift(host: thrift.Host): nameserver.Host = {
+ nameserver.Host(host.hostname, host.port, host.cluster, host.status)
+ }
+
+ implicit def hostSeqToThrift(hosts: Seq[nameserver.Host]): Seq[thrift.Host] = {
+ hosts map hostToThrift
+ }
+
+ implicit def hostSeqFromThrift(hosts: Seq[thrift.Host]): Seq[nameserver.Host] = {
+ hosts map hostFromThrift
+ }
+
+
+ implicit def hostStatusToThrift(s: nameserver.HostStatus.Value): thrift.HostStatus = thrift.HostStatus(s.id)
+
+ implicit def hostStatusFromThrift(s: thrift.HostStatus): nameserver.HostStatus.Value = nameserver.HostStatus(s.value)
+}
View
16 src/main/scala/com/twitter/gizzard/thrift/conversions/Busy.scala
@@ -1,16 +0,0 @@
-package com.twitter.gizzard.thrift.conversions
-
-import com.twitter.gizzard.shards
-
-
-object Busy {
- class RichBusy(busy: shards.Busy.Value) {
- def toThrift = busy.id
- }
- implicit def busyToRichBusy(busy: shards.Busy.Value) = new RichBusy(busy)
-
- class RichInt(busy: Int) {
- def fromThrift = shards.Busy(busy)
- }
- implicit def intToRichInt(busy: Int) = new RichInt(busy)
-}
View
19 src/main/scala/com/twitter/gizzard/thrift/conversions/Forwarding.scala
@@ -1,19 +0,0 @@
-package com.twitter.gizzard.thrift.conversions
-
-import com.twitter.gizzard.nameserver
-import com.twitter.gizzard.thrift
-import com.twitter.gizzard.thrift.conversions.Sequences._
-import com.twitter.gizzard.thrift.conversions.ShardId._
-
-
-object Forwarding {
- class RichShardingForwarding(forwarding: nameserver.Forwarding) {
- def toThrift = new thrift.Forwarding(forwarding.tableId, forwarding.baseId, forwarding.shardId.toThrift)
- }
- implicit def shardingForwardingToRichShardingForwarding(forwarding: nameserver.Forwarding) = new RichShardingForwarding(forwarding)
-
- class RichThriftForwarding(forwarding: thrift.Forwarding) {
- def fromThrift = new nameserver.Forwarding(forwarding.table_id, forwarding.base_id, forwarding.shard_id.fromThrift)
- }
- implicit def thriftForwardingToRichThriftForwarding(forwarding: thrift.Forwarding) = new RichThriftForwarding(forwarding)
-}
View
30 src/main/scala/com/twitter/gizzard/thrift/conversions/Host.scala
@@ -1,30 +0,0 @@
-package com.twitter.gizzard.thrift.conversions
-
-import com.twitter.gizzard.thrift
-import com.twitter.gizzard.nameserver
-
-
-object Host {
- class RichNameServerHost(h: nameserver.Host) {
- def toThrift = new thrift.Host(h.hostname, h.port, h.cluster, h.status.toThrift)
- }
- implicit def nameServerHostToRichHost(h: nameserver.Host) = new RichNameServerHost(h)
-
- class RichThriftHost(h: thrift.Host) {
- def fromThrift = new nameserver.Host(h.hostname, h.port, h.cluster, h.status.fromThrift)
- }
- implicit def thriftHostToRichHost(h: thrift.Host) = new RichThriftHost(h)
-
-
- class RichNameServerHostStatus(s: nameserver.HostStatus.Value) {
- def toThrift = thrift.HostStatus.findByValue(s.id)
- }
- implicit def nameServerHostStatusToRichHostStatus(s: nameserver.HostStatus.Value) =
- new RichNameServerHostStatus(s)
-
- class RichThriftHostStatus(s: thrift.HostStatus) {
- def fromThrift = nameserver.HostStatus(s.getValue)
- }
- implicit def thriftHostStatusToRichHostStatus(s: thrift.HostStatus) =
- new RichThriftHostStatus(s)
-}
View
18 src/main/scala/com/twitter/gizzard/thrift/conversions/LinkInfo.scala
@@ -1,18 +0,0 @@
-package com.twitter.gizzard.thrift.conversions
-
-import com.twitter.gizzard.shards
-import com.twitter.gizzard.thrift
-import com.twitter.gizzard.thrift.conversions.ShardId._
-
-
-object LinkInfo {
- class RichShardingLinkInfo(linkInfo: shards.LinkInfo) {
- def toThrift = new thrift.LinkInfo(linkInfo.upId.toThrift, linkInfo.downId.toThrift, linkInfo.weight)
- }
- implicit def shardingLinkInfoToRichShardingLinkInfo(linkInfo: shards.LinkInfo) = new RichShardingLinkInfo(linkInfo)
-
- class RichThriftLinkInfo(linkInfo: thrift.LinkInfo) {
- def fromThrift = new shards.LinkInfo(linkInfo.up_id.fromThrift, linkInfo.down_id.fromThrift, linkInfo.weight)
- }
- implicit def thriftLinkInfoToRichThriftLinkInfo(linkInfo: thrift.LinkInfo) = new RichThriftLinkInfo(linkInfo)
-}
View
96 src/main/scala/com/twitter/gizzard/thrift/conversions/Sequences.scala
@@ -1,96 +0,0 @@
-package com.twitter.gizzard.thrift.conversions
-
-import scala.collection.{JavaConversions => JC}
-import java.nio.{BufferUnderflowException, ByteBuffer, ByteOrder}
-import java.util.{AbstractList => JAbstractList, List => JList}
-import com.twitter.gizzard.util.{Future, ParallelSeq}
-
-
-object Sequences {
- class RichSeq[A <: AnyRef](seq: Seq[A]) {
- def parallel(future: Future) = new ParallelSeq(seq, future)
-
- @deprecated("rely on implicit conversion from scala.collection.JavaConversions._")
- def toJavaList: JList[A] = JC.asJavaList(seq)
- def double = for (i <- seq) yield (i, i)
- }
-
- implicit def seqToRichSeq[A <: AnyRef](seq: Seq[A]) = new RichSeq(seq)
-
-
- class RichIntSeq(seq: Seq[Int]) {
- def parallel(future: Future) = new ParallelSeq(seq, future)
-
- @deprecated("there is implicit conversion from Seq[Int] to java.util.List[java.lang.Integer]")
- def toJavaList: JList[java.lang.Integer] = intSeqToBoxedJavaList(seq)
- def double = for (i <- seq) yield (i, i)
-
- def pack: ByteBuffer = {
- val buffer = new Array[Byte](seq.size * 4)
- val byteBuffer = ByteBuffer.wrap(buffer)
- byteBuffer.order(ByteOrder.LITTLE_ENDIAN)
- seq.foreach { item => byteBuffer.putInt(item) }
- byteBuffer.rewind
- byteBuffer
- }
- }
-
- implicit def seqToRichIntSeq(seq: Seq[Int]) = new RichIntSeq(seq)
-
- implicit def intSeqToBoxedJavaList(seq: Seq[Int]) = {
- JC.asJavaList(seq.map(_.asInstanceOf[java.lang.Integer]))
- }
-
- implicit def boxedJavaListToIntSeq(list: JList[java.lang.Integer]) = {
- JC.asScalaIterable(list).toSeq.map(_.asInstanceOf[Int])
- }
-
-
- class RichLongSeq(seq: Seq[Long]) {
- def parallel(future: Future) = new ParallelSeq(seq, future)
-
- @deprecated("there is implicit conversion from Seq[Long] to java.util.List[java.lang.Long]")
- def toJavaList: JList[java.lang.Long] = longSeqToBoxedJavaList(seq)
- def double = for (i <- seq) yield (i, i)
-
- def pack: ByteBuffer = {
- val buffer = new Array[Byte](seq.size * 8)
- val byteBuffer = ByteBuffer.wrap(buffer)
- byteBuffer.order(ByteOrder.LITTLE_ENDIAN)
- seq.foreach { item => byteBuffer.putLong(item) }
- byteBuffer.rewind
- byteBuffer
- }
- }
-
- implicit def seqToRichLongSeq(seq: Seq[Long]) = new RichLongSeq(seq)
-
- implicit def longSeqToBoxedJavaList(seq: Seq[Long]) = {
- JC.asJavaList(seq.map(_.asInstanceOf[java.lang.Long]))
- }
-
- implicit def boxedJavaListToLongSeq(list: JList[java.lang.Long]) = {
- JC.asScalaIterable(list).toSeq.map(_.asInstanceOf[Long])
- }
-
-
- class RichByteBuffer(buffer: ByteBuffer) {
- def toIntArray = {
- buffer.order(ByteOrder.LITTLE_ENDIAN)
- val ints = buffer.asIntBuffer
- val results = new Array[Int](ints.limit)
- ints.get(results)
- results
- }
-
- def toLongArray = {
- buffer.order(ByteOrder.LITTLE_ENDIAN)
- val longs = buffer.asLongBuffer
- val results = new Array[Long](longs.limit)
- longs.get(results)
- results
- }
- }
-
- implicit def bufferToRichByteBuffer(buffer: ByteBuffer) = new RichByteBuffer(buffer)
-}
View
17 src/main/scala/com/twitter/gizzard/thrift/conversions/ShardId.scala
@@ -1,17 +0,0 @@
-package com.twitter.gizzard.thrift.conversions
-
-import com.twitter.gizzard.shards
-import com.twitter.gizzard.thrift
-
-
-object ShardId {
- class RichShardId(shardId: shards.ShardId) {
- def toThrift = new thrift.ShardId(shardId.hostname, shardId.tablePrefix)
- }
- implicit def shardIdToRichShardId(shardId: shards.ShardId) = new RichShardId(shardId)
-
- class RichThriftShardId(shardId: thrift.ShardId) {
- def fromThrift = new shards.ShardId(shardId.hostname, shardId.table_prefix)
- }
- implicit def thriftShardIdToRichShardId(shardId: thrift.ShardId) = new RichThriftShardId(shardId)
-}
View
24 src/main/scala/com/twitter/gizzard/thrift/conversions/ShardInfo.scala
@@ -1,24 +0,0 @@
-package com.twitter.gizzard.thrift.conversions
-
-import com.twitter.gizzard.shards
-import com.twitter.gizzard.thrift
-import com.twitter.gizzard.thrift.conversions.Busy._
-import com.twitter.gizzard.thrift.conversions.ShardId._
-
-
-object ShardInfo {
- class RichShardingShardInfo(shardInfo: shards.ShardInfo) {
- def toThrift = new thrift.ShardInfo(shardInfo.id.toThrift, shardInfo.className,
- shardInfo.sourceType, shardInfo.destinationType,
- shardInfo.busy.toThrift)
-
- }
- implicit def shardingShardInfoToRichShardingShardInfo(shardInfo: shards.ShardInfo) = new RichShardingShardInfo(shardInfo)
-
- class RichThriftShardInfo(shardInfo: thrift.ShardInfo) {
- def fromThrift = new shards.ShardInfo(shardInfo.id.fromThrift, shardInfo.class_name, shardInfo.source_type,
- shardInfo.destination_type, shardInfo.busy.fromThrift)
-
- }
- implicit def thriftShardInfoToRichThriftShardInfo(shardInfo: thrift.ShardInfo) = new RichThriftShardInfo(shardInfo)
-}
View
85 src/main/scala/com/twitter/gizzard/util/Future.scala
@@ -1,85 +0,0 @@
-package com.twitter.gizzard.util
-
-import scala.collection.SeqProxy
-import scala.collection.generic.CanBuildFrom
-import java.util.concurrent._
-import com.twitter.util.{Duration, Time}
-import com.twitter.conversions.time._
-import com.twitter.gizzard.Stats
-
-
-class Future(name: String, poolSize: Int, maxPoolSize: Int, keepAlive: Duration,
- val timeout: Duration) {
-
- var executor = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAlive.inSeconds,
- TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable], new NamedPoolThreadFactory(name))
-
- Stats.addGauge("future-" + name + "-queue-size") { executor.getQueue().size() }
-
- def apply[A](a: => A) = {
- val trans = Stats.transactionOpt.map { _.createChild }
-
- val future = new FutureTask(new Callable[A] {
- val startTime = Time.now
- def call = {
- trans.foreach { t => Stats.setTransaction(t) }
- val timeInQueue = Time.now - startTime
- Stats.transaction.record("Time spent in future queue: "+timeInQueue.inMillis)
- if (timeInQueue > timeout) {
- Stats.incr("future-" + name + "-timeout")
- throw new TimeoutException("future spent too long in queue")
- }
-
- val threadExecTime = Time.now
- try {
- a
- } catch {
- case e: Exception =>
- Stats.transaction.record("Caught exception: "+e)
- throw e
- } finally {
- val duration = Time.now - threadExecTime
- Stats.transaction.record("Total duration: "+duration.inMillis)
- trans.foreach { t => Stats.endTransaction() }
- }
- }
- })
- executor.execute(future)
- future
- }
-
- def shutdown() {
- executor.shutdown()
- executor.awaitTermination(60, TimeUnit.SECONDS)
- }
-}
-
-class ParallelSeq[A](seq: Seq[A], future: Future) extends SeqProxy[A] {
- def self = seq
-
- override def map[B, That](f: A => B)(implicit bf: CanBuildFrom[Seq[A], B, That]): That = {
- val coll: Seq[B] = if (seq.size <= 1) {
- seq.map(f)
- } else {
- seq.map { a => future(f(a)) }.map { _.get(future.timeout.inMillis, TimeUnit.MILLISECONDS) }
- }
-
- val b = bf(repr)
- for (x <- coll) b += x
- b.sizeHint(coll)
- b.result
- }
-
- override def flatMap[B, That](f: A => Traversable[B])(implicit bf: CanBuildFrom[Seq[A], B, That]): That = {
- val coll: Seq[B] = if (seq.size <= 1) {
- seq.flatMap(f)
- } else {
- seq.map { a => future(f(a)) }.flatMap { _.get(future.timeout.inMillis, TimeUnit.MILLISECONDS) }
- }
-
- val b = bf(repr)
- for (x <- coll) b += x
- b.sizeHint(coll)
- b.result
- }
-}
View
8 src/test/scala/com/twitter/gizzard/ConfiguredSpecification.scala
@@ -83,12 +83,14 @@ trait IntegrationSpecification extends Specification {
def testServerClient(s: WithFacts) = {
val i = s.enum
val port = 8000 + (i - 1) * 3
- val client = new testserver.thrift.TestServer.ServiceToClient(ClientBuilder()
+ val client = new testserver.thrift.TestServer.FinagledClient(
+ ClientBuilder()
+ .name("TestServerClient")
.codec(ThriftClientFramedCodec())
.hosts(new InetSocketAddress("localhost", port))
.hostConnectionLimit(1)
- .build(),
- new TBinaryProtocol.Factory())
+ .build()
+ )
new testserver.thrift.TestServer.Iface {
def put(key: Int, value: String) { client.put(key, value)() }
View
89 src/test/scala/com/twitter/gizzard/FutureSpec.scala
@@ -1,89 +0,0 @@
-package com.twitter.gizzard
-
-import java.util.concurrent._
-import scala.collection.mutable
-import com.twitter.util.Time
-import com.twitter.conversions.time._
-import org.specs.Specification
-import org.specs.mock.{ClassMocker, JMocker}
-import com.twitter.gizzard.util.Future
-import com.twitter.gizzard.thrift.conversions.Sequences._
-
-
-object FutureSpec extends ConfiguredSpecification with JMocker with ClassMocker {
-
- "Future" should {
- var future: Future = null
-
- doBefore {
- future = new Future("test", 1, 1, 1.hour, 50.milliseconds)
- }
-
- doAfter {
- future.shutdown()
- }
-
- "execute in the future" in {
- future { 3 * 4 }.get mustEqual 12
- }
-
- "propagate exceptions that happen in computation" in {
- future { error("whoops"); 1 }.get must throwA[Exception]
- }
-
- "timeout appropriately" in {
- future { Thread.sleep(2000) }.get(10, TimeUnit.MILLISECONDS) must throwA[TimeoutException]
- }
-
- "timeout a stuffed-up queue" in {
- Time.withCurrentTimeFrozen { time =>
- future.executor.shutdown()
- future.executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
- def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
- // do nothing.
- }
- })
- future.executor.awaitTermination(1, TimeUnit.MINUTES)
- val f = future { 3 * 4 }
- time.advance(23.seconds)
- f.run()
- f.get(1, TimeUnit.MILLISECONDS) must throwA[Exception]
- }
- }
-
- "run sequences in parallel" in {
- val runs = future.executor.getCompletedTaskCount
- List(1, 2, 3).parallel(future).map { _ * 2 } mustEqual List(2, 4, 6)
- future.executor.getCompletedTaskCount mustEqual runs + 3
- }
-
- "run one-element sequences in series" in {
- val runs = future.executor.getCompletedTaskCount
- List(4).parallel(future).map { _ * 2 } mustEqual List(8)
- future.executor.getCompletedTaskCount mustEqual runs
- }
- }
-
- "ParallelSeq" should {
- var future: Future = null
-
- doBefore { future = new Future("test", 1, 1, 1.hour, 50.milliseconds) }
- doAfter { future.shutdown() }
-
- "map" in {
- Seq(1,2,3).parallel(future).map(_ + 1) must haveTheSameElementsAs(Seq(2,3,4))
- }
-
- "flatMap" in {
- Seq(1,2,3).parallel(future).flatMap(_ to 4) must haveTheSameElementsAs(Seq(1,2,3,4,2,3,4,3,4))
- }
-
- "propagate exceptions" in {
- Seq(1,2,3).parallel(future).map(i => if (i == 2) error("whoops") else i) must throwA[Exception]
- }
-
- "must timeout" in {
- Seq(1,2,3).parallel(future).map { i => Thread.sleep(2000); i } must throwA[TimeoutException]
- }
- }
-}
View
58 src/test/scala/com/twitter/gizzard/integration/GizzardIntegrationSpec.scala
@@ -1,21 +1,19 @@
package com.twitter.gizzard.integration
-import scala.collection.JavaConversions._
import com.twitter.gizzard.nameserver.{Host, HostStatus}
-import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.gizzard.testserver.thrift.TestResult
import com.twitter.gizzard.{IntegrationSpecification, ConfiguredSpecification}
class ReplicationSpec extends IntegrationSpecification with ConfiguredSpecification {
"Replication" should {
- val servers = List(1, 2, 3).map(testServer)
+ val servers = Seq(1, 2, 3).map(testServer)
val clients = servers.map(testServerClient)
val server1 :: server2 :: server3 :: _ = servers
val client1 :: client2 :: client3 :: _ = clients
- val hostFor1 :: hostFor2 :: hostFor3 :: _ = List(server1, server2, server3).map { s =>
+ val hostFor1 :: hostFor2 :: hostFor3 :: _ = Seq(server1, server2, server3).map { s =>
Host("localhost", s.injectorPort, "c" + s.enum, HostStatus.Normal)
}
@@ -23,9 +21,9 @@ class ReplicationSpec extends IntegrationSpecification with ConfiguredSpecificat
resetTestServerDBs(servers: _*)
setupServers(servers: _*)
- List(server1, server2).foreach(_.remoteClusterManager.addRemoteHost(hostFor3))
- List(server1, server3).foreach(_.remoteClusterManager.addRemoteHost(hostFor2))
- List(server2, server3).foreach(_.remoteClusterManager.addRemoteHost(hostFor1))
+ Seq(server1, server2).foreach(_.remoteClusterManager.addRemoteHost(hostFor3))
+ Seq(server1, server3).foreach(_.remoteClusterManager.addRemoteHost(hostFor2))
+ Seq(server2, server3).foreach(_.remoteClusterManager.addRemoteHost(hostFor1))
servers.foreach(_.remoteClusterManager.reload())
}
@@ -37,41 +35,41 @@ class ReplicationSpec extends IntegrationSpecification with ConfiguredSpecificat
client1.put(1, "foo")
- client1.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
- client2.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
- client3.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
+ client1.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
+ client2.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
+ client3.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
client2.put(2, "bar")
- client1.get(2).toList must eventually(be_==(List(new TestResult(2, "bar", 1))))
- client2.get(2).toList must eventually(be_==(List(new TestResult(2, "bar", 1))))
- client3.get(2).toList must eventually(be_==(List(new TestResult(2, "bar", 1))))
+ client1.get(2).toSeq must eventually(be_==(Seq(TestResult(2, "bar", 1))))
+ client2.get(2).toSeq must eventually(be_==(Seq(TestResult(2, "bar", 1))))
+ client3.get(2).toSeq must eventually(be_==(Seq(TestResult(2, "bar", 1))))
client3.put(3, "baz")
- client1.get(3).toList must eventually(be_==(List(new TestResult(3, "baz", 1))))
- client2.get(3).toList must eventually(be_==(List(new TestResult(3, "baz", 1))))
- client3.get(3).toList must eventually(be_==(List(new TestResult(3, "baz", 1))))
+ client1.get(3).toSeq must eventually(be_==(Seq(TestResult(3, "baz", 1))))
+ client2.get(3).toSeq must eventually(be_==(Seq(TestResult(3, "baz", 1))))
+ client3.get(3).toSeq must eventually(be_==(Seq(TestResult(3, "baz", 1))))
}
"retry replication errors" in {
startServers(server1)
client1.put(1, "foo")
- client1.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
+ client1.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
startServers(server2)
server1.jobScheduler.retryErrors()
- client2.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
- client1.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
+ client2.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
+ client1.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
startServers(server3)
server1.jobScheduler.retryErrors()
- client3.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
- client2.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
- client1.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
+ client3.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
+ client2.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
+ client1.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
}
"retry unblocked clusters" in {
@@ -81,16 +79,16 @@ class ReplicationSpec extends IntegrationSpecification with ConfiguredSpecificat
server1.remoteClusterManager.reload()
client1.put(1, "foo")
- client1.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
- client3.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
+ client1.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
+ client3.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
- client2.get(1).toList mustEqual List[TestResult]()
+ client2.get(1).toSeq mustEqual Seq[TestResult]()
server1.remoteClusterManager.setRemoteClusterStatus("c2", HostStatus.Normal)
server1.remoteClusterManager.reload()
server1.jobScheduler.retryErrors()
- client2.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
+ client2.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
}
"drop blackholed clusters" in {
@@ -100,10 +98,10 @@ class ReplicationSpec extends IntegrationSpecification with ConfiguredSpecificat
server1.remoteClusterManager.reload()
client1.put(1, "foo")
- client1.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
- client3.get(1).toList must eventually(be_==(List(new TestResult(1, "foo", 1))))
+ client1.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
+ client3.get(1).toSeq must eventually(be_==(Seq(TestResult(1, "foo", 1))))
- client2.get(1).toList mustEqual List[TestResult]()
+ client2.get(1).toSeq mustEqual Seq[TestResult]()
server1.remoteClusterManager.setRemoteClusterStatus("c2", HostStatus.Normal)
server1.remoteClusterManager.reload()
@@ -111,7 +109,7 @@ class ReplicationSpec extends IntegrationSpecification with ConfiguredSpecificat
Thread.sleep(200)
- client2.get(1).toList mustEqual List[TestResult]()
+ client2.get(1).toSeq mustEqual Seq[TestResult]()
}
}
}
View
165 src/test/scala/com/twitter/gizzard/integration/TestServer.scala
@@ -1,8 +1,9 @@
package com.twitter.gizzard.testserver
import java.sql.{ResultSet, SQLException}
+import com.twitter.util.Future
import com.twitter.querulous
-import com.twitter.querulous.evaluator.{QueryEvaluatorFactory, QueryEvaluator}
+import com.twitter.querulous.async.{AsyncQueryEvaluatorFactory, AsyncQueryEvaluator}
import com.twitter.querulous.config.Connection
import com.twitter.querulous.query.SqlQueryTimeoutException
@@ -27,19 +28,34 @@ package object config {
val hostnames = Seq("localhost")
}
- object TestQueryEvaluator extends querulous.config.QueryEvaluator {
- database.pool = new ApachePoolingDatabase {
- sizeMin = 3
- sizeMax = 3
+ object TestQueryEvaluator extends querulous.config.AsyncQueryEvaluator {
+ query.debug = { s => com.twitter.logging.Logger.get("query").debug(s) }
+
+ //singletonFactory = true
+
+ database.serviceName = "TestGizzardService"
+ database.pool = new ThrottledPoolingDatabase {
+ openTimeout = 10.seconds
+ size = 20
}
}
- trait TestTHsHaServer extends THsHaServer {
- threadPool.minThreads = 10
+ object NSQueryEvaluator extends querulous.config.QueryEvaluator {
+ query.debug = { s => com.twitter.logging.Logger.get("query").debug(s) }
+
+ //singletonFactory = true
+
+ database.serviceName = "TestGizzardService"
+ database.pool = new ThrottledPoolingDatabase {
+ openTimeout = 10.seconds
+ size = 2
+ }
}
trait TestServer extends gizzard.config.GizzardServer {
- def server: TServer
+ var name = "TestGizzardService"
+ var port = 3000
+
def databaseConnection: Connection
val queryEvaluator = TestQueryEvaluator
@@ -79,7 +95,7 @@ package object config {
private def testNameServerReplicas(name: String) = {
Seq(new Mysql {
- queryEvaluator = TestQueryEvaluator
+ queryEvaluator = NSQueryEvaluator
val connection = new TestDBConnection {
val database = "gizzard_test_" + name + "_ns"
}
@@ -92,12 +108,12 @@ package object config {
val queueBase = "gizzard_test_" + name
new TestServer {
+ port = sPort
mappingFunction = Identity
nameServerReplicas = testNameServerReplicas(name)
jobInjector.port = iPort
manager.port = mPort
- val server = new TestTHsHaServer { val name = "TestGizzardService"; val port = sPort }
val databaseConnection = new TestDBConnection { val database = "gizzard_test_" + name }
val jobQueues = Map(
Priority.High.id -> new TestJobScheduler { val name = queueBase+"_high" },
@@ -123,30 +139,30 @@ class TestServer(conf: config.TestServer) extends GizzardServer(conf) {
nameServer.configureForwarder[TestShard](
_.tableId(0)
+ //.copyFactory(new TestCopyFactory(nameServer, jobScheduler(Priority.Low.id)))
.shardFactory(new TestShardFactory(conf.queryEvaluator(), conf.databaseConnection))
- .copyFactory(new TestCopyFactory(nameServer, jobScheduler(Priority.Low.id)))
)
jobCodec += ("Put".r -> new PutParser(nameServer.forwarder[TestShard]))
- jobCodec += ("Copy".r -> new TestCopyParser(nameServer, jobScheduler(Priority.Low.id)))
+ //jobCodec += ("Copy".r -> new TestCopyParser(nameServer, jobScheduler(Priority.Low.id)))
// service listener
- val testService = new TestServerIFace(nameServer.forwarder[TestShard], jobScheduler)
-
- lazy val testThriftServer = {
- val processor = new thrift.TestServer.Processor(testService)
- conf.server(processor)
- }
+ lazy val testService = new TestService(
+ conf.name,
+ conf.port,
+ nameServer.forwarder[TestShard],
+ jobScheduler
+ )
def start() {
startGizzard()
- new Thread(new Runnable { def run() { testThriftServer.serve() } }, "TestServerThread").start()
+ testService.start()
}
def shutdown(quiesce: Boolean) {
- testThriftServer.stop()
+ testService.shutdown()
shutdownGizzard(quiesce)
}
}
@@ -154,24 +170,24 @@ class TestServer(conf: config.TestServer) extends GizzardServer(conf) {
// Service Interface
-class TestServerIFace(forwarding: Long => RoutingNode[TestShard], scheduler: PrioritizingJobScheduler)
-extends thrift.TestServer.Iface {
- import scala.collection.JavaConversions._
- import com.twitter.gizzard.thrift.conversions.Sequences._
+class TestService(
+ val serverName: String,
+ val thriftPort: Int,
+ forwarding: Long => RoutingNode[TestShard],
+ scheduler: PrioritizingJobScheduler)
+extends thrift.TestServer.ThriftServer {
- def put(key: Int, value: String) {
- scheduler.put(Priority.High.id, new PutJob(key, value, forwarding))
+ def put(key: Int, value: String) = {
+ Future(scheduler.put(Priority.High.id, new PutJob(key, value, forwarding)))
}
- def get(key: Int) = forwarding(key).read.any(_.get(key)).toList.map(asTestResult)
-
- private def asTestResult(t: (Int, String, Int)) = new thrift.TestResult(t._1, t._2, t._3)
+ def get(key: Int) = forwarding(key).read futureAny { _.get(key) }
}
// Shard Definitions
-class TestShardFactory(qeFactory: QueryEvaluatorFactory, conn: Connection) extends ShardFactory[TestShard] {
+class TestShardFactory(qeFactory: AsyncQueryEvaluatorFactory, conn: Connection) extends ShardFactory[TestShard] {
def newEvaluator(host: String) = qeFactory(conn.withHost(host))
def instantiate(info: ShardInfo, weight: Int) = new TestShard(newEvaluator(info.hostname), info, false)
@@ -188,8 +204,8 @@ class TestShardFactory(qeFactory: QueryEvaluatorFactory, conn: Connection) exten
) engine=innodb default charset=utf8"""
try {
val e = qeFactory(conn.withHost(info.hostname).withoutDatabase)
- e.execute("create database if not exists " + conn.database)
- e.execute(ddl.format(conn.database + "." + info.tablePrefix))
+ e.execute("create database if not exists " + conn.database)()
+ e.execute(ddl.format(conn.database + "." + info.tablePrefix))()
} catch {
case e: SQLException => throw new ShardException(e.toString)
case e: SqlQueryTimeoutException => throw new ShardTimeoutException(e.timeout, info.id)
@@ -198,7 +214,8 @@ class TestShardFactory(qeFactory: QueryEvaluatorFactory, conn: Connection) exten
}
// should enforce read/write perms at the db access level
-class TestShard(evaluator: QueryEvaluator, val shardInfo: ShardInfo, readOnly: Boolean) {
+class TestShard(evaluator: AsyncQueryEvaluator, val shardInfo: ShardInfo, readOnly: Boolean) {
+ import thrift.TestResult
private val table = shardInfo.tablePrefix
@@ -207,19 +224,19 @@ class TestShard(evaluator: QueryEvaluator, val shardInfo: ShardInfo, readOnly: B
private val getSql = "select * from " + table + " where id = ?"
private val getAllSql = "select * from " + table + " where id > ? limit ?"
- private def asResult(r: ResultSet) = (r.getInt("id"), r.getString("value"), r.getInt("count"))
+ private def asResult(r: ResultSet) = TestResult(r.getInt("id"), r.getString("value"), r.getInt("count"))
- def put(key: Int, value: String) {
+ def put(key: Int, value: String) = {
if (readOnly) error("shard is read only!")
evaluator.execute(putSql, key, value)
}
- def putAll(kvs: Seq[(Int, String)]) {
+ def putAll(kvs: Seq[(Int, String)]) = {
if (readOnly) error("shard is read only!")
evaluator.executeBatch(putSql) { b => for ((k,v) <- kvs) b(k,v) }
}
- def get(key: Int) = evaluator.selectOne(getSql, key)(asResult)
+ def get(key: Int) = evaluator.selectOne(getSql, key)(asResult) map { _.toSeq }
def getAll(key: Int, count: Int) = evaluator.select(getAllSql, key, count)(asResult)
}
@@ -232,43 +249,43 @@ class PutParser(forwarding: Long => RoutingNode[TestShard]) extends JsonJobParse
}
}
-class PutJob(key: Int, value: String, forwarding: Long => RoutingNode[TestShard]) extends JsonJob {
+class PutJob(key: Int, value: String, forwarding: Long => RoutingNode[TestShard]) extends AsyncJsonJob {
def toMap = Map("key" -> key, "value" -> value)
- def apply() { forwarding(key).write.foreach(_.put(key, value)) }
+ def applyFuture() = Future.join(forwarding(key).write fmap { _.put(key, value) })
}
-class TestCopyFactory(ns: NameServer, s: JobScheduler)
-extends CopyJobFactory[TestShard] {
- def apply(shardIds: Seq[ShardId]) = new TestCopy(shardIds, 0, 500, ns, s)
-}
-
-class TestCopyParser(ns: NameServer, s: JobScheduler)
-extends CopyJobParser[TestShard] {
- def deserialize(m: Map[String, Any], shardIds: Seq[ShardId], count: Int) = {
- val cursor = m("cursor").asInstanceOf[Int]
- val count = m("count").asInstanceOf[Int]
- new TestCopy(shardIds, cursor, count, ns, s)
- }
-}
-
-class TestCopy(
- shardIds: Seq[ShardId],
- cursor: Int,
- count: Int,
- ns: NameServer,
- s: JobScheduler)
-extends CopyJob[TestShard](shardIds, count, ns, s) {
-
- def copyPage(nodes: Seq[RoutingNode[TestShard]], count: Int) = {
- val rows = nodes.map { _.read.any(_.getAll(cursor, count)) map { case (k,v,c) => (k,v) }}.flatten
-
- if (rows.isEmpty) {
- None
- } else {
- nodes.map { _.write.foreach(_.putAll(rows)) }
- Some(new TestCopy(shardIds, rows.last._1, count, ns, s))
- }
- }
-
- def serialize = Map("cursor" -> cursor)
-}
+// class TestCopyFactory(ns: NameServer, s: JobScheduler)
+// extends CopyJobFactory[TestShard] {
+// def apply(shardIds: Seq[ShardId]) = new TestCopy(shardIds, 0, 500, ns, s)
+// }
+
+// class TestCopyParser(ns: NameServer, s: JobScheduler)
+// extends CopyJobParser[TestShard] {
+// def deserialize(m: Map[String, Any], shardIds: Seq[ShardId], count: Int) = {
+// val cursor = m("cursor").asInstanceOf[Int]
+// val count = m("count").asInstanceOf[Int]
+// new TestCopy(shardIds, cursor, count, ns, s)
+// }
+// }
+
+// class TestCopy(
+// shardIds: Seq[ShardId],
+// cursor: Int,
+// count: Int,
+// ns: NameServer,
+// s: JobScheduler)
+// extends CopyJob[TestShard](shardIds, count, ns, s) {
+
+// def copyPage(nodes: Seq[RoutingNode[TestShard]], count: Int) = {
+// val rows = nodes.map { _.read.any(_.getAll(cursor, count)) map { case (k,v,c) => (k,v) }}.flatten
+
+// if (rows.isEmpty) {
+// None
+// } else {
+// nodes.map { _.write.foreach(_.putAll(rows)) }
+// Some(new TestCopy(shardIds, rows.last._1, count, ns, s))
+// }
+// }
+
+// def serialize = Map("cursor" -> cursor)
+// }
View
1  src/test/scala/com/twitter/gizzard/nameserver/JobRelaySpec.scala
@@ -2,7 +2,6 @@ package com.twitter.gizzard.nameserver
import org.specs.mock.{ClassMocker, JMocker}
import com.twitter.conversions.time._
-import com.twitter.gizzard.thrift.{JobInjectorService, TThreadServer, JobInjector}
import com.twitter.gizzard.ConfiguredSpecification
View
1  src/test/scala/com/twitter/gizzard/nameserver/SqlShardSpec.scala
@@ -3,7 +3,6 @@ package com.twitter.gizzard.nameserver
import com.twitter.util.Duration
import com.twitter.conversions.time._
import com.twitter.gizzard.shards._
-import com.twitter.gizzard.thrift.conversions.ShardInfo._
import com.twitter.gizzard.test.NameServerDatabase
import com.twitter.gizzard.ConfiguredSpecification
import org.specs.Specification
View
28 src/test/scala/com/twitter/gizzard/proxy/LoggingProxySpec.scala
@@ -1,10 +1,10 @@
package com.twitter.gizzard.proxy
+import com.twitter.util.FuturePool
import com.twitter.logging.Logger
import com.twitter.util.TimeConversions._
import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}
-import com.twitter.gizzard.util.Future
import com.twitter.gizzard.{Stats, TransactionalStatsProvider, TransactionalStatsConsumer, SampledTransactionalStatsConsumer}
import com.twitter.gizzard.ConfiguredSpecification
@@ -91,8 +91,6 @@ object LoggingProxySpec extends ConfiguredSpecification with JMocker with ClassM
} */
"New School Logging Proxy" should {
- val future = new Future("test", 1, 1, 1.second, 1.second)
-
val bob = new Named {
def name = {
Stats.transaction.record("ack")
@@ -101,11 +99,11 @@ object LoggingProxySpec extends ConfiguredSpecification with JMocker with ClassM
def nameParts = throw new Exception("yarrg!")
def namePartsSeq = {
Stats.transaction.record("before thread")
- val f = future {
+ val f = FuturePool.defaultPool {
Stats.transaction.record("in thread")
Seq("bob", "marley")
}
- f.get()
+ f()
}
}
@@ -137,16 +135,16 @@ object LoggingProxySpec extends ConfiguredSpecification with JMocker with ClassM
messages(1) must startWith("Total duration:")
}
- "log a trace across threads" in {
- bobProxy.namePartsSeq
- val messages = sampledStats.stats.toSeq.map { _.message }
- messages(0) mustEqual "before thread"
- messages(1) must startWith("Total duration:")
- val children = sampledStats.stats.children.map { _.toSeq.map { _.message } }
- children(0)(0) must startWith("Time spent in future queue")
- children(0)(1) mustEqual "in thread"
- children(0)(2) must startWith("Total duration:")
- }
+ // "log a trace across threads" in {
+ // bobProxy.namePartsSeq
+ // val messages = sampledStats.stats.toSeq.map { _.message }
+ // messages(0) mustEqual "before thread"
+ // messages(1) must startWith("Total duration:")
+ // val children = sampledStats.stats.children.map { _.toSeq.map { _.message } }
+ // children(0)(0) must startWith("Time spent in future queue")
+ // children(0)(1) mustEqual "in thread"
+ // children(0)(2) must startWith("Total duration:")
+ // }
"log exceptions" in {
bobProxy.nameParts must throwA[Exception]
View
21 src/test/scala/com/twitter/gizzard/scheduler_new/ReplicatingJobIntegrationSpec.scala
@@ -6,8 +6,7 @@ import org.specs.mock.{ClassMocker, JMocker}
import com.twitter.conversions.time._
import com.twitter.gizzard
-import com.twitter.gizzard.thrift.{JobInjectorService, TThreadServer, JobInjector}
-import com.twitter.gizzard.config.{THsHaServer => THsHaServerConfig}
+import com.twitter.gizzard.thrift.{JobInjectorService, JobInjector}
import com.twitter.gizzard.nameserver.{Host, HostStatus, JobRelay}
import com.twitter.gizzard.ConfiguredSpecification
@@ -20,7 +19,7 @@ object ReplicatingJobIntegrationSpec extends ConfiguredSpecification with JMocke
println(new String(badJob, "UTF-8"))
})
- var jobsApplied = new AtomicInteger
+ val jobsApplied = new AtomicInteger
val testJobParser = new JsonJobParser {
def apply(json: Map[String, Any]) = new JsonJob {
@@ -47,22 +46,22 @@ object ReplicatingJobIntegrationSpec extends ConfiguredSpecification with JMocke
val queue = scheduler(1).queue.asInstanceOf[KestrelJobQueue].queue
- val service = new JobInjectorService(codec, scheduler)
- val processor = new JobInjector.Processor(service)
- val server = (new THsHaServerConfig { val name = "injector"; val port = 12313 }).apply(processor)
+ val service = new JobInjectorService("injector", 12313, codec, scheduler)
doBefore {
- (new Thread { override def run() { server.serve() } }).start()
+ service.start()
scheduler.start()
queue.flush()
}
doAfter {
- server.stop()
+ service.shutdown()
scheduler.shutdown()
}
"replicate and replay jobs" in {
+ jobsApplied.set(0)
+
val testJob = testJobParser(Map("dummy" -> 1, "job" -> true, "blah" -> "blop"))
scheduler.put(1, testJob)
@@ -70,9 +69,12 @@ object ReplicatingJobIntegrationSpec extends ConfiguredSpecification with JMocke
}
"replicate and replay nested jobs" in {
+ jobsApplied.set(0)
+
val jobs = Seq(
Map("asdf" -> 1, "bleep" -> "bloop"),
- Map("this" -> 4, "is" -> true, "a test job" -> "teh"))
+ Map("this" -> 4, "is" -> true, "a test job" -> "teh")
+ )
val nestedJsonJob = new JsonNestedJob(jobs.map(testJobParser(_)))
scheduler.put(1, nestedJsonJob)
@@ -80,4 +82,3 @@ object ReplicatingJobIntegrationSpec extends ConfiguredSpecification with JMocke
}
}
}
-
View
135 src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala
@@ -1,16 +1,11 @@
package com.twitter.gizzard
package thrift
-import scala.collection.JavaConversions._
+import com.twitter.util.Future
import org.specs.mock.{ClassMocker, JMocker}
import org.specs.Specification
-import com.twitter.gizzard.thrift.conversions.Sequences._
-import com.twitter.gizzard.thrift.conversions.ShardId._
-import com.twitter.gizzard.thrift.conversions.ShardInfo._
-import shards.{Busy, RoutingNode}
-import scheduler.{CopyJob, CopyJobFactory, JobScheduler, PrioritizingJobScheduler, JsonJob, AdminJobManager}
-
-
+import com.twitter.gizzard.shards.{Busy, RoutingNode}
+import com.twitter.gizzard.scheduler._
object ManagerServiceSpec extends ConfiguredSpecification with JMocker with ClassMocker {
val nameServer = mock[nameserver.NameServer]
@@ -20,7 +15,7 @@ object ManagerServiceSpec extends ConfiguredSpecification with JMocker with Clas