Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'gizzard15'
Browse files Browse the repository at this point in the history
  • Loading branch information
Robey Pointer committed Nov 16, 2010
2 parents 34a02dc + 632f8b9 commit e69656d
Show file tree
Hide file tree
Showing 41 changed files with 460 additions and 338 deletions.
2 changes: 1 addition & 1 deletion project/build.properties
Expand Up @@ -3,6 +3,6 @@
project.organization=com.twitter
project.name=flockdb
sbt.version=0.7.4
project.version=1.4.6-SNAPSHOT
project.version=1.5.0-SNAPSHOT
build.scala.versions=2.7.7
project.initialize=false
3 changes: 1 addition & 2 deletions project/build/FlockDBProject.scala
Expand Up @@ -3,8 +3,7 @@ import Process._
import com.twitter.sbt._

class FlockDBProject(info: ProjectInfo) extends StandardProject(info) with SubversionPublisher {
val gizzard = "com.twitter" % "gizzard" % "1.4.12"
val results = "com.twitter" % "results" % "1.0"
val gizzard = "com.twitter" % "gizzard" % "1.5.4"

val asm = "asm" % "asm" % "1.5.3" % "test"
val cglib = "cglib" % "cglib" % "2.1_3" % "test"
Expand Down
30 changes: 30 additions & 0 deletions src/main/scala/com/twitter/flockdb/Cursor.scala
@@ -0,0 +1,30 @@
/*
* Copyright 2010 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.flockdb

object Cursor {
def cursorZip(seq: Seq[Long]) = for (i <- seq) yield (i, Cursor(i))

val End = new Cursor(0)
val Start = new Cursor(-1)
}

case class Cursor(position: Long) extends Ordered[Cursor] {
def compare(that: Cursor) = position.compare(that.position)
def reverse = new Cursor(-position)
def magnitude = new Cursor(Math.abs(position))
}
3 changes: 0 additions & 3 deletions src/main/scala/com/twitter/flockdb/EdgeQuery.scala
Expand Up @@ -16,7 +16,4 @@

package com.twitter.flockdb

import com.twitter.results.Page


case class EdgeQuery(term: QueryTerm, page: Page)
8 changes: 3 additions & 5 deletions src/main/scala/com/twitter/flockdb/EdgesService.scala
Expand Up @@ -17,12 +17,10 @@
package com.twitter.flockdb

import com.twitter.gizzard.Future
import com.twitter.gizzard.jobs.CopyFactory
import com.twitter.gizzard.nameserver.NameServer
import com.twitter.gizzard.scheduler.PrioritizingJobScheduler
import com.twitter.gizzard.scheduler.{CopyJobFactory, JsonJob, PrioritizingJobScheduler}
import com.twitter.gizzard.shards.{ShardBlackHoleException, ShardDatabaseTimeoutException, ShardTimeoutException}
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.results.{Cursor, ResultWindow}
import operations.{ExecuteOperations, SelectOperation}
import com.twitter.ostrich.Stats
import queries._
Expand All @@ -31,8 +29,8 @@ import net.lag.logging.Logger

class EdgesService(val nameServer: NameServer[shards.Shard],
val forwardingManager: ForwardingManager,
val copyFactory: CopyFactory[shards.Shard],
val schedule: PrioritizingJobScheduler,
val copyFactory: CopyJobFactory[shards.Shard],
val schedule: PrioritizingJobScheduler[JsonJob],
future: Future, replicationFuture: Future) {

private val log = Logger.get(getClass.getName)
Expand Down
81 changes: 53 additions & 28 deletions src/main/scala/com/twitter/flockdb/FlockDB.scala
Expand Up @@ -20,12 +20,10 @@ import java.lang.{Long => JLong, String}
import java.util.{ArrayList => JArrayList, List => JList}
import scala.collection.mutable
import com.twitter.gizzard.Future
import com.twitter.gizzard.jobs._
import com.twitter.gizzard.scheduler.{KestrelMessageQueue, JobScheduler, PrioritizingJobScheduler}
import com.twitter.gizzard.scheduler._
import com.twitter.gizzard.nameserver
import com.twitter.gizzard.shards.{ShardException, ShardInfo, ReplicatingShard}
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.results.{Cursor, ResultWindow}
import com.twitter.ostrich.{Stats, W3CStats}
import com.twitter.querulous.StatsCollector
import com.twitter.querulous.database.DatabaseFactory
Expand All @@ -51,6 +49,8 @@ import thrift.FlockException


object FlockDB {
private val log = Logger.get(getClass.getName)

def statsCollector(w3c: W3CStats) = {
new StatsCollector {
def incr(name: String, count: Int) = w3c.incr(name, count)
Expand All @@ -66,6 +66,18 @@ object FlockDB {
val dbQueryEvaluatorFactory = QueryEvaluatorFactory.fromConfig(config.configMap("db"), Some(stats))
val materializingQueryEvaluatorFactory = QueryEvaluatorFactory.fromConfig(config.configMap("materializing_db"), Some(stats))


val codec = new JsonCodec[JsonJob]({ unparsable: Array[Byte] =>
log.error("Unparsable job: %s", unparsable.map { n => "%02x".format(n.toInt & 0xff) }.mkString(", "))
})

val badJobQueue = new JsonJobLogger[JsonJob](Logger.get("bad_jobs"))
// :( val jobParser = new LoggingJobParser(Stats, w3c, new JobWithTasksParser(polymorphicJobParser))
val scheduler = PrioritizingJobScheduler(config.configMap("edges.queue"), codec,
Map(Priority.High.id -> "primary", Priority.Medium.id -> "copy", Priority.Low.id -> "slow"),
Some(badJobQueue))


val replicationFuture = new Future("ReplicationFuture", config.configMap("edges.replication.future"))
val shardRepository = new nameserver.BasicShardRepository[shards.Shard](
new shards.ReadWriteShardAdapter(_), Some(replicationFuture))
Expand All @@ -77,41 +89,54 @@ object FlockDB {
val nameServer = nameserver.NameServer(config.configMap("edges.nameservers"), Some(stats),
shardRepository, Some(replicationFuture))

val polymorphicJobParser = new PolymorphicJobParser
val jobParser = new LoggingJobParser(Stats, w3c, new JobWithTasksParser(polymorphicJobParser))
val scheduler = PrioritizingJobScheduler(config.configMap("edges.queue"), jobParser,
Map(Priority.High.id -> "primary", Priority.Medium.id -> "copy", Priority.Low.id -> "slow"))
// val polymorphicJobParser = new PolymorphicJobParser
// val jobParser = new LoggingJobParser(Stats, w3c, new JobWithTasksParser(polymorphicJobParser))
// val scheduler = PrioritizingJobScheduler(config.configMap("edges.queue"), jobParser,
// Map(Priority.High.id -> "primary", Priority.Medium.id -> "copy", Priority.Low.id -> "slow"))

val forwardingManager = new ForwardingManager(nameServer)
nameServer.reload()

val singleJobEnvironment = (forwardingManager, OrderedUuidGenerator)
List((jobs.single.AddParser, "single.Add".r), (jobs.single.RemoveParser, "single.Remove".r),
(jobs.single.ArchiveParser, "single.Archive".r), (jobs.single.NegateParser, "single.Negate".r)).foreach {
case (unboundJobParser, regex) =>
val boundJobParser = new BoundJobParser(unboundJobParser, singleJobEnvironment)
polymorphicJobParser += (regex, boundJobParser)
}

val multiJobEnvironment = (forwardingManager, scheduler)
List((jobs.multi.ArchiveParser, "multi.Archive".r), (jobs.multi.UnarchiveParser, "multi.Unarchive".r),
(jobs.multi.RemoveAllParser, "multi.RemoveAll".r), (jobs.multi.NegateParser, "multi.Negate".r)).foreach {
case (unboundJobParser, regex) =>
val boundJobParser = new BoundJobParser(unboundJobParser, multiJobEnvironment)
polymorphicJobParser += (regex, boundJobParser)
}

val copyJobParser = new BoundJobParser(jobs.CopyParser, (nameServer, scheduler(Priority.Medium.id)))
val metadataCopyJobParser = new BoundJobParser(jobs.MetadataCopyParser, (nameServer, scheduler(Priority.Medium.id)))
codec += ("single.Add".r, new jobs.single.AddParser(forwardingManager, OrderedUuidGenerator))
codec += ("single.Remove".r, new jobs.single.RemoveParser(forwardingManager, OrderedUuidGenerator))
codec += ("single.Archive".r, new jobs.single.ArchiveParser(forwardingManager, OrderedUuidGenerator))
codec += ("single.Negate".r, new jobs.single.NegateParser(forwardingManager, OrderedUuidGenerator))
codec += ("multi.Archive".r, new jobs.multi.ArchiveParser(forwardingManager, scheduler))
codec += ("multi.Unarchive".r, new jobs.multi.UnarchiveParser(forwardingManager, scheduler))
codec += ("multi.RemoveAll".r, new jobs.multi.RemoveAllParser(forwardingManager, scheduler))
codec += ("multi.Negate".r, new jobs.multi.NegateParser(forwardingManager, scheduler))

codec += ("(Copy|Migrate)".r, new jobs.CopyParser(nameServer, scheduler(Priority.Medium.id)))
codec += ("(MetadataCopy|MetadataMigrate)".r, new jobs.MetadataCopyParser(nameServer, scheduler(Priority.Medium.id)))

// val singleJobEnvironment = (forwardingManager, OrderedUuidGenerator)
// List((jobs.single.AddParser, "single.Add".r), (jobs.single.RemoveParser, "single.Remove".r),
// (jobs.single.ArchiveParser, "single.Archive".r), (jobs.single.NegateParser, "single.Negate".r)).foreach {
// case (unboundJobParser, regex) =>
// val boundJobParser = new BoundJobParser(unboundJobParser, singleJobEnvironment)
// polymorphicJobParser += (regex, boundJobParser)
// }

// val multiJobEnvironment = (forwardingManager, scheduler)
// List((jobs.multi.ArchiveParser, "multi.Archive".r), (jobs.multi.UnarchiveParser, "multi.Unarchive".r),
// (jobs.multi.RemoveAllParser, "multi.RemoveAll".r), (jobs.multi.NegateParser, "multi.Negate".r)).foreach {
// case (unboundJobParser, regex) =>
// val boundJobParser = new BoundJobParser(unboundJobParser, multiJobEnvironment)
// polymorphicJobParser += (regex, boundJobParser)
// }

// val copyJobParser = new BoundJobParser(jobs.CopyParser, (nameServer, scheduler(Priority.Medium.id)))
// val metadataCopyJobParser = new BoundJobParser(jobs.MetadataCopyParser, (nameServer, scheduler(Priority.Medium.id)))

val future = new Future("EdgesFuture", config.configMap("edges.future"))

polymorphicJobParser += ("(Copy|Migrate)".r, copyJobParser)
polymorphicJobParser += ("(MetadataCopy|MetadataMigrate)".r, metadataCopyJobParser)
// polymorphicJobParser += ("(Copy|Migrate)".r, copyJobParser)
// polymorphicJobParser += ("(MetadataCopy|MetadataMigrate)".r, metadataCopyJobParser)

scheduler.start()

new FlockDB(new EdgesService(nameServer, forwardingManager, jobs.CopyFactory, scheduler,
val copyFactory = new jobs.CopyFactory(nameServer, scheduler(Priority.Medium.id))
new FlockDB(new EdgesService(nameServer, forwardingManager, copyFactory, scheduler,
future, replicationFuture))
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/com/twitter/flockdb/Main.scala
Expand Up @@ -23,8 +23,8 @@ import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.server.{TServer, TThreadPoolServer}
import org.apache.thrift.transport.{TServerSocket, TTransportFactory}
import com.twitter.gizzard.proxy.{ExceptionHandlingProxy, LoggingProxy}
import com.twitter.gizzard.thrift.{GizzardServices, JobManager, JobManagerService, ShardManager,
ShardManagerService, TSelectorServer}
import com.twitter.gizzard.scheduler.JsonJob
import com.twitter.gizzard.thrift._
import com.twitter.ostrich.{JsonStatsLogger, Service, ServiceTracker, Stats, StatsMBean, W3CStats}
import com.twitter.xrayspecs.TimeConversions._
import net.lag.configgy.{Config, RuntimeEnvironment, ConfigMap, Configgy}
Expand All @@ -36,7 +36,7 @@ object Main extends Service {
val runtime = new RuntimeEnvironment(getClass)

var thriftServer: TSelectorServer = null
var gizzardServices: GizzardServices[shards.Shard] = null
var gizzardServices: GizzardServices[shards.Shard, JsonJob] = null
var flock: FlockDB = null

var config: ConfigMap = null
Expand Down Expand Up @@ -118,7 +118,7 @@ object Main extends Service {
flock.edges.nameServer,
flock.edges.copyFactory,
flock.edges.schedule,
Priority.Medium.id)
flock.edges.schedule(Priority.Medium.id))
gizzardServices.start()
thriftServer.serve()
} catch {
Expand Down
19 changes: 19 additions & 0 deletions src/main/scala/com/twitter/flockdb/Page.scala
@@ -0,0 +1,19 @@
/*
* Copyright 2010 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.flockdb

case class Page(count: Int, cursor: Cursor)
96 changes: 96 additions & 0 deletions src/main/scala/com/twitter/flockdb/ResultWindow.scala
@@ -0,0 +1,96 @@
/*
* Copyright 2010 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.flockdb

import scala.util.Sorting
import com.twitter.xrayspecs.Time
import com.twitter.xrayspecs.TimeConversions._


case class ResultWindowRow[T](id: T, cursor: Cursor) extends Ordered[ResultWindowRow[T]] {
def compare(that: ResultWindowRow[T]) = that.cursor.compare(cursor)
}

class ResultWindowRows[T](data: Seq[ResultWindowRow[T]]) extends Seq[ResultWindowRow[T]] {
def length = data.length
def apply(i: Int) = data(i)
def elements = data.elements
}

class ResultWindow[T](val data: ResultWindowRows[T], val inNextCursor: Cursor, val inPrevCursor: Cursor, val count: Int, val cursor: Cursor) extends Seq[T] {
def this(data: Seq[(T, Cursor)], inNextCursor: Cursor, inPrevCursor: Cursor, count: Int, cursor: Cursor) =
this(new ResultWindowRows(data.map { datum => ResultWindowRow(datum._1, datum._2) }), inNextCursor, inPrevCursor, count, cursor)
def this(data: Seq[(T, Cursor)], count: Int, cursor: Cursor) =
this(data, Cursor.End, Cursor.End, count, cursor)
def this() =
this(List[(T, Cursor)](), 0, Cursor.End)

var page: Seq[ResultWindowRow[T]] = data
var nextChanged, prevChanged = false
if (cursor < Cursor.Start) {
page = data.takeWhile(_.cursor > cursor.magnitude)
nextChanged = page.size < data.size
prevChanged = page.size > count
page = page.drop(page.size - count)
} else if (cursor == Cursor.Start) {
nextChanged = page.size > count
page = page.take(count)
} else {
page = data.dropWhile(_.cursor >= cursor)
nextChanged = page.size > count
prevChanged = page.size < data.size
page = page.take(count)
}
val nextCursor = if (nextChanged && !page.isEmpty) page(page.size - 1).cursor else inNextCursor
val prevCursor = if (prevChanged && !page.isEmpty) page(0).cursor.reverse else inPrevCursor

def ++(other: ResultWindow[T]) = {
if (cursor < Cursor.Start) {
new ResultWindow(new ResultWindowRows(other.page ++ page), nextCursor, other.prevCursor, count, cursor)
} else {
new ResultWindow(new ResultWindowRows(page ++ other.page), other.nextCursor, prevCursor, count, cursor)
}
}

def merge(other: ResultWindow[T]) = {
val newPage = Sorting.stableSort((Set((page ++ other.page): _*)).toSeq)
val newNextCursor = if (nextCursor == Cursor.End && other.nextCursor == Cursor.End) Cursor.End else newPage(newPage.size - 1).cursor
val newPrevCursor = if (prevCursor == Cursor.End && other.prevCursor == Cursor.End) Cursor.End else newPage(0).cursor.reverse
new ResultWindow(new ResultWindowRows(newPage), newNextCursor, newPrevCursor, count, cursor)
}

def --(values: Seq[T]) = {
val rejects = Set(values: _*)
val newPage = page.filter { row => !rejects.contains(row.id) }
val newNextCursor = if (nextCursor == Cursor.End || newPage.size == 0) Cursor.End else newPage(newPage.size - 1).cursor
val newPrevCursor = if (prevCursor == Cursor.End || newPage.size == 0) Cursor.End else newPage(0).cursor.reverse
new ResultWindow(new ResultWindowRows(newPage), newNextCursor, newPrevCursor, count, cursor)
}

def length = page.length
def apply(i: Int) = page(i).id
def elements = page.projection.map(_.id).elements
def continueCursor = if (cursor < Cursor.Start) prevCursor else nextCursor
override def firstOption = page.firstOption.map { _.id }

override def toString = (elements.toList, nextCursor, prevCursor, count, cursor).toString

override def equals(that: Any) = that match {
case that: ResultWindow[_] => elements.toList == that.elements.toList && nextCursor == that.nextCursor && prevCursor == that.prevCursor && cursor == that.cursor
case _ => false
}
}
2 changes: 0 additions & 2 deletions src/main/scala/com/twitter/flockdb/SelectQuery.scala
Expand Up @@ -16,8 +16,6 @@

package com.twitter.flockdb

import com.twitter.results.Page
import operations.SelectOperation


case class SelectQuery(operations: Seq[SelectOperation], page: Page)
Expand Up @@ -17,10 +17,8 @@
package com.twitter.flockdb.conversions

import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.results.ResultWindow
import conversions.Edge._


object EdgeResults {
class RichResultWindowOfEdges(resultWindow: ResultWindow[Edge]) {
def toEdgeResults = new thrift.EdgeResults(resultWindow.map { _.toThrift }.toJavaList,
Expand Down
9 changes: 3 additions & 6 deletions src/main/scala/com/twitter/flockdb/conversions/Page.scala
Expand Up @@ -16,17 +16,14 @@

package com.twitter.flockdb.conversions

import com.twitter.results


object Page {
class RichFlockPage(page: results.Page) {
class RichFlockPage(page: Page) {
def toThrift = new thrift.Page(page.count, page.cursor.position)
}
implicit def richFlockPage(page: results.Page) = new RichFlockPage(page)
implicit def richFlockPage(page: Page) = new RichFlockPage(page)

class RichThriftPage(page: thrift.Page) {
def fromThrift = new results.Page(page.count, results.Cursor(page.cursor))
def fromThrift = new Page(page.count, Cursor(page.cursor))
}
implicit def richThriftPage(page: thrift.Page) = new RichThriftPage(page)
}
2 changes: 0 additions & 2 deletions src/main/scala/com/twitter/flockdb/conversions/Results.scala
Expand Up @@ -17,8 +17,6 @@
package com.twitter.flockdb.conversions

import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.results.ResultWindow


object Results {
class RichResultWindowOfLongs(resultWindow: ResultWindow[Long]) {
Expand Down

0 comments on commit e69656d

Please sign in to comment.