Permalink
Browse files

compiling, replication spec not passing

  • Loading branch information...
1 parent 787e9bc commit 4fe750c95076a4c026d80711240d1ea6e361da75 @freels freels committed Feb 21, 2012
Showing with 505 additions and 1,222 deletions.
  1. +20 −9 src/main/scala/com/twitter/gizzard/GizzardServer.scala
  2. +0 −17 src/main/scala/com/twitter/gizzard/config/Future.scala
  3. +6 −6 src/main/scala/com/twitter/gizzard/config/GizzardServer.scala
  4. +0 −66 src/main/scala/com/twitter/gizzard/config/TServer.scala
  5. +1 −1 src/main/scala/com/twitter/gizzard/logging/ExceptionJsonFormatter.scala
  6. +3 −9 src/main/scala/com/twitter/gizzard/nameserver/JobRelay.scala
  7. +2 −17 src/main/scala/com/twitter/gizzard/nameserver/NameServerState.scala
  8. +21 −0 src/main/scala/com/twitter/gizzard/scheduler/JsonJob.scala
  9. +0 −4 src/main/scala/com/twitter/gizzard/scheduler/ReplicatingJob.scala
  10. +1 −0 src/main/scala/com/twitter/gizzard/shards/NodeSet.scala
  11. +1 −2 src/main/scala/com/twitter/gizzard/test/Reset.scala
  12. +13 −8 src/main/scala/com/twitter/gizzard/thrift/JobInjectorService.scala
  13. +107 −90 src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala
  14. +0 −255 src/main/scala/com/twitter/gizzard/thrift/TSelectorServer.scala
  15. +0 −139 src/main/scala/com/twitter/gizzard/thrift/TThreadServer.scala
  16. +118 −0 src/main/scala/com/twitter/gizzard/thrift/conversions.scala
  17. +0 −16 src/main/scala/com/twitter/gizzard/thrift/conversions/Busy.scala
  18. +0 −19 src/main/scala/com/twitter/gizzard/thrift/conversions/Forwarding.scala
  19. +0 −30 src/main/scala/com/twitter/gizzard/thrift/conversions/Host.scala
  20. +0 −18 src/main/scala/com/twitter/gizzard/thrift/conversions/LinkInfo.scala
  21. +0 −96 src/main/scala/com/twitter/gizzard/thrift/conversions/Sequences.scala
  22. +0 −17 src/main/scala/com/twitter/gizzard/thrift/conversions/ShardId.scala
  23. +0 −24 src/main/scala/com/twitter/gizzard/thrift/conversions/ShardInfo.scala
  24. +0 −85 src/main/scala/com/twitter/gizzard/util/Future.scala
  25. +5 −3 src/test/scala/com/twitter/gizzard/ConfiguredSpecification.scala
  26. +0 −89 src/test/scala/com/twitter/gizzard/FutureSpec.scala
  27. +28 −30 src/test/scala/com/twitter/gizzard/integration/GizzardIntegrationSpec.scala
  28. +91 −74 src/test/scala/com/twitter/gizzard/integration/TestServer.scala
  29. +0 −1 src/test/scala/com/twitter/gizzard/nameserver/JobRelaySpec.scala
  30. +0 −1 src/test/scala/com/twitter/gizzard/nameserver/SqlShardSpec.scala
  31. +13 −15 src/test/scala/com/twitter/gizzard/proxy/LoggingProxySpec.scala
  32. +11 −10 src/test/scala/com/twitter/gizzard/scheduler_new/ReplicatingJobIntegrationSpec.scala
  33. +64 −71 src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala
@@ -7,6 +7,7 @@ import com.twitter.gizzard.nameserver.{NameServer, RemoteClusterManager}
import com.twitter.gizzard.scheduler._
import com.twitter.gizzard.config.{GizzardServer => ServerConfig}
import com.twitter.gizzard.proxy.LoggingProxy
+import com.twitter.gizzard.thrift.{ManagerService, JobInjectorService}
abstract class GizzardServer(config: ServerConfig) {
@@ -47,25 +48,35 @@ abstract class GizzardServer(config: ServerConfig) {
// service wiring
- lazy val managerServer = new thrift.ManagerService(nameServer, shardManager, adminJobManager, remoteClusterManager, jobScheduler)
- lazy val managerThriftServer = config.manager(new thrift.Manager.Processor(managerServer))
+ lazy val managerService = new ManagerService(
+ config.manager.name,
+ config.manager.port,
+ nameServer,
+ shardManager,
+ adminJobManager,
+ remoteClusterManager,
+ jobScheduler
+ )
- lazy val jobInjectorServer = new thrift.JobInjectorService(jobCodec, jobScheduler)
- lazy val jobInjectorThriftServer = config.jobInjector(new thrift.JobInjector.Processor(jobInjectorServer))
+ lazy val jobInjectorService = new JobInjectorService(
+ config.jobInjector.name,
+ config.jobInjector.port,
+ jobCodec,
+ jobScheduler
+ )
def startGizzard() {
nameServer.reload()
remoteClusterManager.reload()
jobScheduler.start()
-
- new Thread(new Runnable { def run() { managerThriftServer.serve() } }, "GizzardManagerThread").start()
- new Thread(new Runnable { def run() { jobInjectorThriftServer.serve() } }, "JobInjectorThread").start()
+ managerService.start()
+ jobInjectorService.start()
}
def shutdownGizzard(quiesce: Boolean) {
remoteClusterManager.closeRelay()
- managerThriftServer.stop()
- jobInjectorThriftServer.stop()
+ managerService.shutdown()
+ jobInjectorService.shutdown()
while (quiesce && jobScheduler.size > 0) Thread.sleep(100)
jobScheduler.shutdown()
@@ -1,17 +0,0 @@
-package com.twitter.gizzard.config
-
-import com.twitter.conversions.time._
-import com.twitter.gizzard
-
-
-class Future {
- var poolSize = 1
- var maxPoolSize = 1
- var keepAlive = 5.seconds
- var timeout = 1.second
-
- def apply(name: String) = {
- if (maxPoolSize < poolSize) maxPoolSize = poolSize
- new gizzard.util.Future(name, poolSize, maxPoolSize, keepAlive, timeout)
- }
-}
@@ -17,8 +17,8 @@ trait GizzardServer {
var mappingFunction: MappingFunction = Hash
var nameServerReplicas: Seq[Replica] = Seq(Memory)
var jobRelay: JobRelay = new JobRelay
- var manager: Manager = new Manager with TThreadServer
- var jobInjector: JobInjector = new JobInjector with THsHaServer
+ var manager: Manager = new Manager
+ var jobInjector: JobInjector = new JobInjector
var queryStats: StatsCollection = new StatsCollection { }
var jobStats: StatsCollection = new StatsCollection {
@@ -55,13 +55,13 @@ trait GizzardServer {
}
}
-trait Manager extends TServer {
- def name = "GizzardManager"
+class Manager {
+ var name = "GizzardManager"
var port = 7920
}
-trait JobInjector extends TServer {
- def name = "JobInjector"
+class JobInjector {
+ var name = "JobInjector"
var port = 7921
}
@@ -1,66 +0,0 @@
-package com.twitter.gizzard.config
-
-import java.util.concurrent.ThreadPoolExecutor
-import com.twitter.util.Duration
-import com.twitter.conversions.time._
-import org.apache.thrift
-import com.twitter.gizzard
-
-class ThreadPool extends (String => ThreadPoolExecutor) {
- var stopTimeout = 60
- var minThreads = 1
- var maxThreads = 1
- var maxWaiters: Option[Int] = None
-
- def maxWaiters_=(n: Int) { maxWaiters = Some(n) }
-
- def apply(name: String): ThreadPoolExecutor = {
- if (maxThreads < minThreads) maxThreads = minThreads
-
- gizzard.thrift.TSelectorServer.makeThreadPoolExecutor(
- name,
- stopTimeout,
- minThreads,
- maxThreads,
- maxWaiters)
- }
-}
-
-trait TServer extends (thrift.TProcessor => thrift.server.TServer) {
- def name: String
- def port: Int
- var timeout = 100.milliseconds
- var idleTimeout = 60.seconds
- var threadPool = new ThreadPool
-
- def getPool = threadPool(name + "_thread_pool")
-
- def apply(processor: thrift.TProcessor): thrift.server.TServer
-}
-
-trait TSelectorServer extends TServer {
- def apply(processor: thrift.TProcessor) = {
- gizzard.thrift.TSelectorServer(name, port, processor, getPool, timeout, idleTimeout)
- }
-}
-
-trait TThreadServer extends TServer {
- def apply(processor: thrift.TProcessor) = {
- gizzard.thrift.TThreadServer(name, port, idleTimeout.inMillis.toInt, getPool, processor)
- }
-}
-
-trait THsHaServer extends TServer {
- def apply(processor: thrift.TProcessor) = {
- val transport = new thrift.transport.TNonblockingServerSocket(port, timeout.inMillis.toInt)
- val options = new thrift.server.TNonblockingServer.Options
- new thrift.server.THsHaServer(
- new thrift.TProcessorFactory(processor),
- transport,
- new thrift.transport.TFramedTransport.Factory(),
- new thrift.protocol.TBinaryProtocol.Factory(),
- new thrift.protocol.TBinaryProtocol.Factory(),
- getPool,
- options)
- }
-}
@@ -12,7 +12,7 @@ import com.twitter.gizzard.util.Json
*/
class ExceptionJsonFormatter extends Formatter {
private def throwableToMap(wrapped: Throwable): Map[String, Any] = {
- var pairs = List(
+ var pairs: List[(String, Any)] = List(
("class" -> wrapped.getClass().getName()),
("trace" -> wrapped.getStackTrace().map(_.toString()))
)
@@ -84,18 +84,12 @@ extends (Iterable[Array[Byte]] => Unit) {
.name("JobManagerClient")
.reportTo(new OstrichStatsReceiver)
.build()
- val client = new JobInjector.ServiceToClient(service, new TBinaryProtocol.Factory())
+ val client = new JobInjector.FinagledClient(service)
def apply(jobs: Iterable[Array[Byte]]) {
- val jobList = new JLinkedList[ThriftJob]()
+ val thriftJobs = jobs map { j => new ThriftJob(priority, ByteBuffer.wrap(j), Some(true)) } toSeq
- jobs.foreach { j =>
- val tj = new ThriftJob(priority, ByteBuffer.wrap(j))
- tj.setIs_replicated(true)
- jobList.add(tj)
- }
-
- client.inject_jobs(jobList)()
+ client.injectJobs(thriftJobs)()
}
def close() {
@@ -1,24 +1,9 @@
package com.twitter.gizzard.nameserver
-import scala.collection.mutable.ListBuffer
-import scala.collection.JavaConversions._
import com.twitter.gizzard.shards.{ShardId, ShardInfo, LinkInfo}
-import com.twitter.gizzard.thrift.{NameServerState => ThriftNameServerState}
-import com.twitter.gizzard.thrift.conversions.ShardInfo._
-import com.twitter.gizzard.thrift.conversions.LinkInfo._
-import com.twitter.gizzard.thrift.conversions.Forwarding._
-import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.gizzard.util.TreeUtils
-
-case class NameServerState(shards: List[ShardInfo], links: List[LinkInfo], forwardings: List[Forwarding], tableId: Int) {
- def toThrift = {
- val thriftForwardings = forwardings.map(_.toThrift)
- val thriftLinks = links.map(_.toThrift)
- val thriftShards = shards.map(_.toThrift)
- new ThriftNameServerState(thriftShards, thriftLinks, thriftForwardings, tableId)
- }
-}
+case class NameServerState(shards: Seq[ShardInfo], links: Seq[LinkInfo], forwardings: Seq[Forwarding], tableId: Int)
object NameServerState {
import TreeUtils._
@@ -32,6 +17,6 @@ object NameServerState {
val links = descendantLinks(forwardings.map(_.shardId))(linksByUpId)
val shards = (forwardings.map(_.shardId) ++ links.map(_.downId)).map(shardsById)
- NameServerState(shards.toList, links.toList, forwardings.toList, tableId)
+ NameServerState(shards.toSeq, links.toSeq, forwardings.toSeq, tableId)
}
}
@@ -1,5 +1,6 @@
package com.twitter.gizzard.scheduler
+import com.twitter.util.Future
import com.twitter.ostrich.stats.{StatsProvider, W3CStats}
import com.twitter.logging.Logger
import com.twitter.gizzard.proxy.LoggingProxy
@@ -44,6 +45,26 @@ trait JsonJob {
}
/**
+ * A JsonJob designed to work with futures. Implementations should override applyFuture and return
+ * Future signalling job completion instead of blocking.
+ */
+trait AsyncJsonJob extends JsonJob {
+ def applyFuture(): Future[Unit]
+
+ def apply() {
+ // todo. this should more intelligently deal with stuff...
+ val f = try {
+ applyFuture()
+ } catch {
+ case e => Future.exception(e)
+ }
+
+ f.apply()
+ }
+}
+
+
+/**
* A NestedJob that can be encoded in json.
*/
class JsonNestedJob(jobs: Iterable[JsonJob]) extends NestedJob(jobs) with JsonJob {
@@ -1,13 +1,10 @@
package com.twitter.gizzard.scheduler
-import java.util.{LinkedList => JLinkedList}
-import java.nio.ByteBuffer
import scala.collection.mutable.Queue
import scala.util.matching.Regex
import com.twitter.ostrich.stats.StatsProvider
import com.twitter.logging.Logger
import com.twitter.util.Duration
-import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.gizzard.nameserver.JobRelay
import com.twitter.gizzard.proxy.LoggingProxy
@@ -116,4 +113,3 @@ extends JsonJobParser {
new ReplicatingJob(relay, tasks, clusters, serialized)
}
}
-
@@ -121,6 +121,7 @@ trait NodeIterable[+T] {
case Throw(e) => _futureAny(iter, promise, f)
}
} else {
+ // XXX: this is incorrect. If e exists at any time, ShardOfflineException masks logical errors.
promise.setException(new ShardOfflineException(rootInfo.id))
}
}
@@ -1,6 +1,5 @@
package com.twitter.gizzard.test
-import org.specs.Specification
import com.twitter.querulous.query.SqlQueryFactory
import com.twitter.querulous.evaluator.{StandardQueryEvaluatorFactory, QueryEvaluator}
import com.twitter.util.Time
@@ -38,7 +37,7 @@ import com.twitter.gizzard.config
// }
// }
-trait NameServerDatabase extends Specification {
+trait NameServerDatabase {
def materialize(cfg: config.GizzardServer) {
try {
cfg.nameServerReplicas.map {
@@ -1,16 +1,16 @@
package com.twitter.gizzard.thrift
-import scala.collection.JavaConversions._
-import java.util.{List => JList}
-import com.twitter.gizzard.thrift.conversions.Sequences._
+import java.nio.ByteBuffer
+import com.twitter.util.Future
import com.twitter.gizzard.thrift.{Job => ThriftJob}
import com.twitter.gizzard.scheduler._
-
class JobInjectorService(
+ val serverName: String,
+ val thriftPort: Int,
codecParam: JsonCodec,
scheduler: PrioritizingJobScheduler)
-extends JobInjector.Iface {
+extends JobInjector.ThriftServer {
private val codec = codecParam.innerCodec
@@ -36,12 +36,17 @@ extends JobInjector.Iface {
}
}
- def inject_jobs(jobs: JList[ThriftJob]) {
+ def injectJobs(jobs: Seq[ThriftJob]) = {
jobs foreach { j =>
- var job: JsonJob = new InjectedJsonJob(j.getContents())
- if (j.is_replicated) job = new ReplicatedJob(List(job))
+ val bytes = new Array[Byte](j.contents.remaining)
+ j.contents.get(bytes)
+
+ var job: JsonJob = new InjectedJsonJob(bytes)
+ if (j.isReplicated getOrElse false) job = new ReplicatedJob(List(job))
scheduler.put(j.priority, job)
}
+
+ Future.Done
}
}
Oops, something went wrong.

0 comments on commit 4fe750c

Please sign in to comment.