Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
some more stuff compiles!
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Kallen authored and Nick Kallen committed Mar 30, 2010
1 parent 44bdfb1 commit fb6af56
Show file tree
Hide file tree
Showing 20 changed files with 236 additions and 64 deletions.
3 changes: 2 additions & 1 deletion TODO
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
shardRepository += ("com.twitter.rowz.SqlShard" -> new SqlShardFactory(queryEvaluatorFactory, config))
shardRepository += ("com.twitter.gizzard.ReadOnlyShard" -> new gizzard.ReadOnlyShardFactory)
shardRepository += ("com.twitter.gizzard.BlockedShard" -> new gizzard.BlockedShardFactory)
- Make ThrottledLogger, etc. part of Gizzard
- Make ThrottledLogger, etc. part of Gizzard
- fromthrift/tothrift
Binary file modified libs/gizzard-1.0.jar
Binary file not shown.
11 changes: 5 additions & 6 deletions src/main/scala/com/twitter/rowz/ForwardingManager.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.twitter.rowz

import com.twitter.gizzard.nameserver
import com.twitter.querulous.evaluator.QueryEvaluator
import com.twitter.gizzard.nameserver.{Forwarding, NameServer}
import com.twitter.gizzard.shards.ShardException


class ForwardingManager(mappingFunction: Long => Long, protected val queryEvaluator: QueryEvaluator)
extends nameserver.ForwardingManager[Shard] {

}
class ForwardingManager(nameServer: NameServer[Shard]) extends (Long => Shard) {
def apply(id: Long) = nameServer.findCurrentForwarding(0, id)
}
34 changes: 20 additions & 14 deletions src/main/scala/com/twitter/rowz/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package com.twitter.rowz
import net.lag.configgy.Configgy
import net.lag.logging.Logger
import com.twitter.gizzard.nameserver.NameServer
import com.twitter.gizzard.scheduler.JobScheduler
import com.twitter.gizzard.thrift.{TSelectorServer, JobManagerService}
import com.twitter.gizzard.scheduler.PrioritizingJobScheduler
import com.twitter.gizzard.thrift.{TSelectorServer, JobManager, JobManagerService, ShardManager, ShardManagerService}
import com.facebook.thrift.server.{TServer, TThreadPoolServer}
import com.facebook.thrift.transport.{TServerSocket, TTransportFactory}
import com.twitter.ostrich.W3CStats
import com.twitter.ostrich.{W3CStats, Stats}
import com.twitter.xrayspecs.TimeConversions._
import com.twitter.gizzard.proxy.LoggingProxy


object Main {
var rowzService: RowzService = null
var nameServer: NameServer[Shard] = null
var scheduler: JobScheduler = null
var scheduler: PrioritizingJobScheduler = null
var copyManager: CopyManager = null

var rowzServer: TSelectorServer = null
var jobServer: TSelectorServer = null
var shardServer: TSelectorServer = null
Expand All @@ -33,25 +37,27 @@ object Main {

def main(args: Array[String]) {
val state = Rowz(config)
rowzService = state._0
nameServer = state._1
scheduler = state._2
rowzService = state._1
nameServer = state._2
scheduler = state._3
copyManager = state._4

startThrift()
}

def startThrift() {
val timeout = config("timeout").toInt.milliseconds
val executor = TSelectorServer.makeThreadPoolExecutor(config)
val processor = new rowz.thrift.Rowz.Processor(ExceptionWrappingProxy[rowz.thrift.Rowz.Iface](LoggingProxy[rowz.thrift.Rowz.Iface](Stats, w3c, "Rowz", rowzService)))
rowzServer = TSelectorServer("rowz", config("port").toInt, processor, executor, config("timeout").toInt.milliseconds)
val processor = new rowz.thrift.Rowz.Processor(LoggingProxy[rowz.thrift.Rowz.Iface](Stats, w3c, "Rowz", rowzService))
rowzServer = TSelectorServer("rowz", config("port").toInt, processor, executor, timeout)

val jobService = new JobManagerService(scheduler)
val jobProcessor = new JobManager.Processor(ExceptionWrappingProxy[JobManager.Iface](LoggingProxy[JobManager.Iface](Stats, Main.w3c, "RowzJobs", jobService)))
jobServer = TSelectorServer("rowz-jobs", config("edges.job_server_port").toInt, jobProcessor, executor, config("timeout").toInt.milliseconds)
val jobProcessor = new JobManager.Processor(LoggingProxy[JobManager.Iface](Stats, Main.w3c, "RowzJobs", jobService))
jobServer = TSelectorServer("rowz-jobs", config("rowz.job_server_port").toInt, jobProcessor, executor, timeout)

val shardService = new ShardManagerService(nameServer)
val shardProcessor = new ShardManager.Processor(ExceptionWrappingProxy[ShardManager.Iface](LoggingProxy[ShardManager.Iface](Stats, Main.w3c, "RowzShards", shardService)))
shardServer = TSelectorServer("edges-shards", config("edges.shard_server_port").toInt, edgesShardProcessor, edgesExecutor, edgesTimeout)
val shardService = new ShardManagerService(nameServer, copyManager)
val shardProcessor = new ShardManager.Processor(LoggingProxy[ShardManager.Iface](Stats, Main.w3c, "RowzShards", shardService)) // XXX exception wrapping proxy
shardServer = TSelectorServer("rowz-shards", config("rowz.shard_server_port").toInt, shardProcessor, executor, timeout)

rowzServer.serve()
jobServer.serve()
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/com/twitter/rowz/Priority.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.twitter.rowz


object Priority extends Enumeration {
val High, Low = Value
}
14 changes: 14 additions & 0 deletions src/main/scala/com/twitter/rowz/ReadWriteShardAdapter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.twitter.rowz

import com.twitter.gizzard.shards
import com.twitter.gizzard.shards.ReadWriteShard
import com.twitter.xrayspecs.Time


class ReadWriteShardAdapter(shard: ReadWriteShard[Shard])
extends shards.ReadWriteShardAdapter(shard) with Shard {

def create(info: RowInfo, at: Time) = shard.writeOperation(_.create(info, at))
def destroy(id: Long, at: Time) = shard.writeOperation(_.destroy(id, at))
def read(id: Long) = shard.readOperation(_.read(id))
}
4 changes: 4 additions & 0 deletions src/main/scala/com/twitter/rowz/Row.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.twitter.rowz


case class Row(id: Long, info: RowInfo)
4 changes: 4 additions & 0 deletions src/main/scala/com/twitter/rowz/RowInfo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.twitter.rowz


case class RowInfo(name: String, state: State.Value)
66 changes: 42 additions & 24 deletions src/main/scala/com/twitter/rowz/Rowz.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import com.twitter.querulous.evaluator.StandardQueryEvaluatorFactory
import com.twitter.xrayspecs.TimeConversions._
import net.lag.logging.{Logger, ThrottledLogger}
import com.twitter.gizzard.Future
import com.twitter.gizzard.scheduler.JobScheduler
import com.twitter.gizzard.nameserver.{NameServer, ShardRepository}
import com.twitter.gizzard.jobs.PolymorphicJobParser
import com.twitter.gizzard.scheduler.{JobScheduler, PrioritizingJobScheduler}
import com.twitter.gizzard.shards._
import com.twitter.gizzard.nameserver.{NameServer, ShardRepository, SqlNameServerStore}
import com.twitter.gizzard.jobs.{PolymorphicJobParser, BoundJobParser}
import scala.collection.mutable


object Rowz {
def apply(config: Config) = {
val log = Logger.get
val databaseFactory = new MemoizingDatabaseFactory(new ApachePoolingDatabaseFactory(
config("rowz.db.connection_pool.size_min").toInt,
config("rowz.db.connection_pool.size_max").toInt,
Expand All @@ -22,36 +25,51 @@ object Rowz {
config("rowz.db.connection_pool.test_on_borrow").toBoolean,
config("rowz.db.connection_pool.min_evictable_idle_msec").toLong.millis))

val queryEvaluatorFactory = new StandardQueryEvaluatorFactory(databaseFactory, new SqlQueryFactory)
val nameServerQueryEvaluator = queryEvaluatorFactory(
config("nameserver.hostname"),
config("nameserver.database"),
config("nameserver.username"),
config("nameserver.password"))
val queryEvaluatorFactory = new StandardQueryEvaluatorFactory(databaseFactory, new SqlQueryFactory)

val throttledLogger = new ThrottledLogger[String](Logger(), config("throttled_log.period_msec").toInt, config("throttled_log.rate").toInt)
val future = new Future("ReplicatingFuture", config.configMap("replication.future"))

val shardRepository = new ShardRepository[Shard]
shardRepository += ("com.twitter.rowz.SqlShard" -> new SqlShardFactory(queryEvaluatorFactory, config))
shardRepository += ("com.twitter.gizzard.ReadOnlyShard" -> new gizzard.ReadOnlyShardFactory)
shardRepository += ("com.twitter.gizzard.BlockedShard" -> new gizzard.BlockedShardFactory)
shardRepository += ("com.twitter.gizzard.WriteOnlyShard" -> new gizzard.WriteOnlyShardFactory)
shardRepository += ("com.twitter.gizzard.ReplicatingShard" -> new gizzard.ReplicatingShardFactory(throttledLogger, replicatingFuture))
val shardRepository = new ShardRepository[Shard]
shardRepository += ("com.twitter.rowz.SqlShard" -> new SqlShardFactory(queryEvaluatorFactory, config))
shardRepository += ("com.twitter.gizzard.shards.ReadOnlyShard" -> new ReadOnlyShardFactory(new ReadWriteShardAdapter(_)))
/* shardRepository += ("com.twitter.gizzard.shards.BlockedShard" -> new BlockedShardFactory)
shardRepository += ("com.twitter.gizzard.shards.WriteOnlyShard" -> new WriteOnlyShardFactory)
shardRepository += ("com.twitter.gizzard.shards.ReplicatingShard" -> new ReplicatingShardFactory(throttledLogger, future))
*/

val polymorphicJobParser = new PolymorphicJobParser
val scheduler = JobScheduler("jobs", queueConfig, jobParser, Main.w3c)
val nameServerStores = config.getList("rowz.nameserver.databases").map { hostname =>
new SqlNameServerStore(
queryEvaluatorFactory(
hostname,
config("rowz.nameserver.name"),
config("rowz.nameserver.username"),
config("rowz.nameserver.password")))
}
/* val replicatingNameServerStore = new ReplicatingNameServerStore(nameServerStores, log, future)*/
val nameServer = new NameServer(nameServerStores.first, shardRepository, Hash)
val forwardingManager = new ForwardingManager(nameServer)

/* val copyJobParser = new BoundJobParser((nameServer, scheduler))*/
val rowzJobParser = new BoundJobParser(forwardingManager)

val queryExecutorFuture = new Future("QueryExecutorFuture", config.configMap("groups.query_executor_future"))
val polymorphicJobParser = new PolymorphicJobParser
/* polymorphicJobParser += ("rowz\\.jobs\\.(Copy|Migrate)".r, copyJobParser)*/
polymorphicJobParser += ("rowz\\.jobs\\.(Create|Destroy)".r, rowzJobParser)
val schedulerMap = new mutable.HashMap[Int, JobScheduler]
List((2, "high"), (1, "low")).foreach { case (priority, configName) =>
val queueConfig = config.configMap("edges.queue")
val scheduler = JobScheduler(configName, queueConfig, polymorphicJobParser, Main.w3c) // XXX
schedulerMap(priority) = scheduler
}
val prioritizingScheduler = new PrioritizingJobScheduler(schedulerMap)
val copyManager = new CopyManager(prioritizingScheduler(1)) // XXX

val forwardingManager = new ForwardingManager(Hash, nameServerQueryEvaluator)
val copyManager = new CopyManager(scheduler)
val nameServer = new NameServer[Shard](nameServerQueryEvaluator, shardRepository, forwardingManager, copyManager)
val rowzService = new RowzService(nameServer, forwardingManager, scheduler)
val rowzService = new RowzService(forwardingManager, prioritizingScheduler)

nameServer.reload()
scheduler.start()
prioritizingScheduler.start()

(rowzService, nameServer, scheduler)
(rowzService, nameServer, prioritizingScheduler, copyManager)
}
}
9 changes: 7 additions & 2 deletions src/main/scala/com/twitter/rowz/Shard.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package com.twitter.rowz

import com.twitter.gizzard.shards
import com.twitter.xrayspecs.Time

trait Shard {


trait Shard extends shards.Shard {
def create(info: RowInfo, at: Time)
def destroy(id: Long, at: Time)
def read(id: Long): RowInfo
}
52 changes: 52 additions & 0 deletions src/main/scala/com/twitter/rowz/SqlShard.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.twitter.rowz

import com.twitter.querulous.evaluator.{QueryEvaluatorFactory, QueryEvaluator}
import net.lag.configgy.ConfigMap
import com.twitter.gizzard.shards
import com.twitter.querulous.query.SqlQueryTimeoutException
import java.sql.SQLException
import com.twitter.gizzard.proxy.SqlExceptionWrappingProxy


class SqlShardFactory(queryEvaluatorFactory: QueryEvaluatorFactory, config: ConfigMap)
extends shards.ShardFactory[Shard] {

val TABLE_DDL = """
CREATE TABLE IF NOT EXISTS %s (
source_id %s NOT NULL,
position BIGINT NOT NULL,
updated_at INT UNSIGNED NOT NULL,
destination_id %s NOT NULL,
count TINYINT UNSIGNED NOT NULL,
state TINYINT NOT NULL,
PRIMARY KEY (source_id, state, position),
UNIQUE unique_source_id_destination_id (source_id, destination_id)
) TYPE=INNODB"""

def instantiate(shardInfo: shards.ShardInfo, weight: Int, children: Seq[Shard]) = {
val queryEvaluator = queryEvaluatorFactory(List(shardInfo.hostname), config("rowz.db.name"), config("rowz.db.username"), config("rowz.db.password"))
SqlExceptionWrappingProxy[Shard](new SqlShard(queryEvaluator, shardInfo, weight, children, config))
}

def materialize(shardInfo: shards.ShardInfo) = {
try {
val queryEvaluator = queryEvaluatorFactory(
List(shardInfo.hostname),
config("rowz.db.name"),
config("rowz.db.username"),
config("rowz.db.password"))
queryEvaluatorFactory(shardInfo.hostname, null, config("rowz.db.username"), config("rowz.db.password")).execute("CREATE DATABASE IF NOT EXISTS " + config("rowz.db.name"))
queryEvaluator.execute(TABLE_DDL.format(shardInfo.tablePrefix + "_rowz", shardInfo.sourceType, shardInfo.destinationType))
} catch {
case e: SQLException => throw new shards.ShardException(e.toString)
case e: SqlQueryTimeoutException => throw new shards.ShardTimeoutException
}
}
}


class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardInfo,
val weight: Int, val children: Seq[Shard], config: ConfigMap) extends Shard {
}
6 changes: 6 additions & 0 deletions src/main/scala/com/twitter/rowz/State.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.twitter.rowz


object State extends Enumeration {
val Normal, Removed = Value
}
5 changes: 4 additions & 1 deletion src/main/scala/com/twitter/rowz/jobs/Create.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.twitter.rowz.jobs

import com.twitter.gizzard.jobs.UnboundJob
import com.twitter.xrayspecs.Time

class Create {

class Create(id: Long, info: RowInfo, at: Time) extends UnboundJob[ForwardingManager] {

}
6 changes: 0 additions & 6 deletions src/main/scala/com/twitter/rowz/jobs/Delete.scala

This file was deleted.

9 changes: 9 additions & 0 deletions src/main/scala/com/twitter/rowz/jobs/Destroy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.twitter.rowz.jobs

import com.twitter.xrayspecs.Time
import com.twitter.gizzard.jobs.UnboundJob


class Destroy(id: Long, at: Time) extends UnboundJob[ForwardingManager] {

}
23 changes: 14 additions & 9 deletions src/main/scala/com/twitter/rowz/thrift/RowzService.scala
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
package com.twitter.rowz

import net.lag.configgy.Config
import com.twitter.gizzard.nameserver.NameServer
import com.twitter.gizzard.scheduler.JobScheduler
import com.twitter.gizzard.scheduler.PrioritizingJobScheduler
import jobs.{Create, Destroy}
import com.twitter.xrayspecs.Time
import com.twitter.xrayspecs.TimeConversions._
import thrift.conversions.Row._
import thrift.conversions.RowInfo._


class RowzService(nameServer: NameServer[Shard], forwardingManager: ForwardingManager, scheduler: JobScheduler) {
class RowzService(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler) extends thrift.Rowz.Iface {
def create(rowInfo: RowInfo, at: Int) = {
val id = makeId()
scheduler(new Create(id, rowInfo.fromThrift, at))
/* val id = makeId()*/
val id = 1
scheduler(0)(new Create(id, rowInfo, Time(at.seconds))) // XXX priority const
id
}

def delete(rowInfo: RowInfo, at: Int) {
scheduler(new Delete(id, rowInfo.fromThrift, at))
def destroy(id: Long, at: Int) {
scheduler(1)(new Destroy(id, Time(at.seconds))) // XXX
}

def get(id: Long) = {
nameServer.find(id).get(id)
def read(id: Long) = {
forwardingManager(id).read(id).toThrift
}
}
16 changes: 16 additions & 0 deletions src/main/scala/com/twitter/rowz/thrift/conversions/Row.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.twitter.rowz.thrift.conversions

import RowInfo._


object Row {
class RichShardingRow(row: rowz.Row) {
def toThrift = new thrift.Row(row.id, row.info.toThrift)
}
implicit def shardingRowToRichShardingRow(row: rowz.Row) = new RichShardingRow(row)

class RichThriftRow(row: thrift.Row) {
def fromThrift = new rowz.Row(row.id, row.info.fromThrift)
}
implicit def thriftRowToRichThriftRow(row: thrift.Row) = new RichThriftRow(row)
}
16 changes: 16 additions & 0 deletions src/main/scala/com/twitter/rowz/thrift/conversions/RowInfo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.twitter.rowz.thrift.conversions

import State._


object RowInfo {
class RichShardingRowInfo(rowInfo: rowz.RowInfo) {
def toThrift = new thrift.RowInfo(rowInfo.name, rowInfo.state.toThrift)
}
implicit def shardingRowInfoToRichShardingRowInfo(rowInfo: rowz.RowInfo) = new RichShardingRowInfo(rowInfo)

class RichThriftRowInfo(rowInfo: thrift.RowInfo) {
def fromThrift = new rowz.RowInfo(rowInfo.name, rowInfo.state_id.fromThrift)
}
implicit def thriftRowInfoToRichThriftRowInfo(rowInfo: thrift.RowInfo) = new RichThriftRowInfo(rowInfo)
}
Loading

0 comments on commit fb6af56

Please sign in to comment.