Permalink
Browse files

more wip

  • Loading branch information...
freels committed Apr 11, 2011
1 parent c1efcb7 commit 1544917ad882facb7a32254106ca127a0bf60d9d
@@ -1,72 +1,49 @@
package com.twitter.rowz
-import com.twitter.gizzard.jobs.CopyFactory
-import com.twitter.gizzard.nameserver.NameServer
-import com.twitter.gizzard.proxy.LoggingProxy
-import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, Priority}
-import com.twitter.gizzard.thrift.{TSelectorServer, JobManager, JobManagerService, ShardManager,
- ShardManagerService}
-import com.twitter.ostrich.{W3CStats, Stats}
-import com.twitter.xrayspecs.TimeConversions._
-import net.lag.configgy.{Configgy, RuntimeEnvironment, ConfigMap}
-import net.lag.logging.Logger
-import org.apache.thrift.server.{TServer, TThreadPoolServer}
-import org.apache.thrift.transport.{TServerSocket, TTransportFactory}
+import com.twitter.ostrich.{Service, ServiceTracker, RuntimeEnvironment}
+import com.twitter.util.Eval
+import net.lag.configgy.{Config => CConfig}
+import java.io.File
+import com.twitter.rowz.config.{Rowz => RowzConfig}
-object Main {
- var state: Rowz.State = null
- var rowzServer: TSelectorServer = null
- var jobServer: TSelectorServer = null
- var shardServer: TSelectorServer = null
- var config: ConfigMap = null
- val runtime = new RuntimeEnvironment(getClass)
+object Main extends Service {
+ var service: Rowz = _
+ var config: RowzConfig = _
def main(args: Array[String]) {
- try {
- runtime.load(args)
- config = Configgy.config
- val w3c = new W3CStats(Logger.get("w3c"), config.getList("rowz.w3c").toArray)
- state = Rowz(config, w3c)
- state.start()
- startThrift(w3c)
- println("Running.")
- } catch {
- case _ =>
- println("Exception in initialization!")
- shutdown()
- }
- }
+ config = Eval[RowzConfig](args.map(new File(_)): _*)
+ service = new Rowz(config)
- def startThrift(w3c: W3CStats) {
- val timeout = config("rowz.timeout_msec").toInt.milliseconds
- val idleTimeout = config("rowz.idle_timeout_sec").toInt.seconds
- val executor = TSelectorServer.makeThreadPoolExecutor(config.configMap("rowz"))
- val processor = new rowz.thrift.Rowz.Processor(LoggingProxy[rowz.thrift.Rowz.Iface](Stats, w3c, "Rowz", state.rowzService))
- rowzServer = TSelectorServer("rowz", config("rowz.server_port").toInt, processor, executor, timeout, idleTimeout)
+ start()
+ println("Running.")
+ }
- val jobManagerService = new JobManagerService(state.prioritizingScheduler)
- val jobProcessor = new JobManager.Processor(LoggingProxy[JobManager.Iface](Stats, w3c, "RowzJobs", jobManagerService))
- jobServer = TSelectorServer("rowz-jobs", config("rowz.job_server_port").toInt, jobProcessor, executor, timeout, idleTimeout)
+ def start() {
+ val adminConfig = new CConfig
+ adminConfig.setInt("admin_http_port", config.admin.httpPort)
+ adminConfig.setInt("admin_text_port", config.admin.textPort)
- val shardManagerService = new ShardManagerService(state.nameServer, state.copyFactory, state.prioritizingScheduler(Priority.Medium.id))
- val shardProcessor = new ShardManager.Processor(ExceptionWrappingProxy(LoggingProxy[ShardManager.Iface](Stats, w3c, "RowzShards", shardManagerService)))
- shardServer = TSelectorServer("rowz-shards", config("rowz.shard_server_port").toInt, shardProcessor, executor, timeout, idleTimeout)
+ ServiceTracker.register(this)
+ ServiceTracker.startAdmin(adminConfig, new RuntimeEnvironment(this.getClass))
- rowzServer.serve()
- jobServer.serve()
- shardServer.serve()
+ service.start()
}
def shutdown() {
- try {
- rowzServer.stop()
- jobServer.stop()
- state.shutdown()
- } finally {
- println("Exiting!")
- System.exit(0)
- }
+ println("Exiting.")
+
+ if (service ne null) service.shutdown()
+ service = null
+ ServiceTracker.stopAdmin()
+ }
+
+ def quiesce() {
+ println("Exiting.")
+
+ if (service ne null) service.shutdown(true)
+ service = null
+ ServiceTracker.stopAdmin()
}
}
@@ -1,15 +1,8 @@
package com.twitter.rowz
-import java.sql.{ResultSet, SQLException}
-import com.twitter.querulous
-import com.twitter.querulous.evaluator.{QueryEvaluatorFactory, QueryEvaluator}
-import com.twitter.querulous.config.Connection
-import com.twitter.querulous.query.SqlQueryTimeoutException
-
-import com.twitter.gizzard
-import nameserver.NameServer
-import shards.{ShardId, ShardInfo, ShardException, ShardTimeoutException}
-import scheduler.{JobScheduler, JsonJob, CopyJob, CopyJobParser, CopyJobFactory, JsonJobParser, PrioritizingJobScheduler}
+import com.twitter.gizzard.scheduler._
+import com.twitter.gizzard.nameserver
+import com.twitter.gizzard.GizzardServer
object Priority extends Enumeration {
val High, Medium, Low = Value
@@ -27,7 +20,7 @@ class Rowz(config: com.twitter.rowz.config.Rowz) extends GizzardServer[RowzShard
shardRepo += ("RowzShard" -> new RowzShardFactory(config.queryEvaluator(), config.databaseConnection))
- jobCodec += ("Create".r -> new CreateJobParser())
+ jobCodec += ("Ceate".r -> new CreateJobParser())
jobCodec += ("Destroy".r -> new DestroyJobParser())
@@ -41,12 +34,11 @@ class Rowz(config: com.twitter.rowz.config.Rowz) extends GizzardServer[RowzShard
// set up the service listener
-
val rowzService = new RowzService(findForwarding, jobScheduler, idGenerator)
lazy val rowzThriftServer = {
val processor = new thrift.TestServer.Processor(rowzService)
- config.thriftServer(processor)
+ config.server(processor)
}
def start() {
@@ -1,45 +1,37 @@
package com.twitter.rowz
+import java.sql.SQLException
import java.sql.{SQLIntegrityConstraintViolationException, ResultSet}
import com.twitter.querulous.evaluator.{QueryEvaluatorFactory, QueryEvaluator}
-import net.lag.configgy.ConfigMap
-import com.twitter.gizzard.shards
import com.twitter.querulous.query.SqlQueryTimeoutException
-import java.sql.SQLException
+import com.twitter.gizzard.shards.{ShardException, ShardInfo}
import com.twitter.gizzard.proxy.SqlExceptionWrappingProxy
-import com.twitter.xrayspecs.Time
-import com.twitter.xrayspecs.TimeConversions._
-import Shard.Cursor
-class SqlShardFactory(queryEvaluatorFactory: QueryEvaluatorFactory, config: ConfigMap)
- extends shards.ShardFactory[Shard] {
+class SqlShardFactory(qeFactory: QueryEvaluatorFactory, conn: Connection)
+extends shards.ShardFactory[Shard] {
val TABLE_DDL = """
CREATE TABLE IF NOT EXISTS %s (
- id BIGINT NOT NULL,
+ id BIGINT UNSIGNED NOT NULL,
name VARCHAR(255) NOT NULL,
- created_at INT UNSIGNED NOT NULL,
- updated_at INT UNSIGNED NOT NULL,
+ created_at BIGINT UNSIGNED NOT NULL,
+ updated_at BIGINT UNSIGNED NOT NULL,
state TINYINT NOT NULL,
PRIMARY KEY (id)
) TYPE=INNODB"""
- def instantiate(shardInfo: shards.ShardInfo, weight: Int, children: Seq[Shard]) = {
- val queryEvaluator = queryEvaluatorFactory(List(shardInfo.hostname), config("rowz.db.name"), config("rowz.db.username"), config("rowz.db.password"))
- SqlExceptionWrappingProxy[Shard](new SqlShard(queryEvaluator, shardInfo, weight, children))
+ def instantiate(shardInfo: ShardInfo, weight: Int, children: Seq[RowzShard]) = {
+ val queryEvaluator = qeFactory(conn.withHost(shardInfo.hostname))
+ new SqlShard(queryEvaluator, shardInfo, weight, children)
}
def materialize(shardInfo: shards.ShardInfo) = {
try {
- val queryEvaluator = queryEvaluatorFactory(
- List(shardInfo.hostname),
- config("rowz.db.name"),
- config("rowz.db.username"),
- config("rowz.db.password"))
- queryEvaluatorFactory(shardInfo.hostname, null, config("rowz.db.username"), config("rowz.db.password")).execute("CREATE DATABASE IF NOT EXISTS " + config("rowz.db.name"))
- queryEvaluator.execute(TABLE_DDL.format(shardInfo.tablePrefix + "_rowz"))
+ val evaluator = qeFactory(connection.withHost(shardInfo.hostname).withoutDatabase)
+ evaluator.execute("CREATE DATABASE IF NOT EXISTS " + conn.database)
+ evaluator.execute(ddl.format(conn.database +"."+ info.tablePrefix))
} catch {
case e: SQLException => throw new shards.ShardException(e.toString)
case e: SqlQueryTimeoutException => throw new shards.ShardTimeoutException
@@ -48,10 +40,13 @@ CREATE TABLE IF NOT EXISTS %s (
}
-class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardInfo,
- val weight: Int, val children: Seq[Shard]) extends Shard {
+class SqlShard(
+ queryEvaluator: QueryEvaluator,
+ val shardInfo: shards.ShardInfo,
+ val weight: Int,
+ val children: Seq[RowzShard]) extends RowzShard {
- private val table = shardInfo.tablePrefix + "_rowz"
+ private val table = shardInfo.tablePrefix
def create(id: Long, name: String, at: Time) = write(new Row(id, name, at, at, State.Normal))
def destroy(row: Row, at: Time) = write(new Row(row.id, row.name, row.createdAt, at, State.Destroyed))
@@ -91,4 +86,4 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
private def makeRow(row: ResultSet) = {
new Row(row.getLong("id"), row.getString("name"), Time(row.getLong("created_at").seconds), Time(row.getLong("updated_at").seconds), State(row.getInt("state")))
}
-}
+}
@@ -0,0 +1,27 @@
+package com.twitter.rowz.config
+
+import com.twitter.gizzard
+import com.twitter.gizzard.config._
+import com.twitter.querulous.config.{Connection, QueryEvaluator}
+import com.twitter.util.Duration
+import com.twitter.conversions.time._
+
+
+class AdminService {
+ var httpPort = 9990
+ var textPort = 9991
+}
+
+trait RowzThriftServer extends TServer {
+ var name = "rowz"
+ var port = 7919
+}
+
+trait Rowz {
+ def server: RowzThriftServer
+
+ def databaseConnection: Connection
+ def rowzQueryEvaluator: QueryEvaluator
+
+ def admin: AdminService
+}
@@ -15,7 +15,7 @@ class CreateParser(findForwarding: Long => RowzShard) extends JsonJobParser {
}
}
-class CreateJob(id: Long, name: String, at: Time, findForwarding) extends JsonJob {
+class CreateJob(id: Long, name: String, at: Time, findForwarding: Long => RowzShard) extends JsonJob {
def toMap = {
Map("id" -> id, "name" -> name, "at" -> at.inMilliseconds)
}

0 comments on commit 1544917

Please sign in to comment.