Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
initial gizzard conversions
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Hull committed Feb 25, 2011
1 parent 19309cd commit 1f3b23d
Show file tree
Hide file tree
Showing 23 changed files with 54 additions and 45 deletions.
2 changes: 1 addition & 1 deletion project/build/FlockDBProject.scala
Expand Up @@ -8,7 +8,7 @@ with SubversionPublisher with DefaultRepos {
override def filterScalaJars = false
val scalaTools = "org.scala-lang" % "scala-compiler" % "2.8.1"

val gizzard = "com.twitter" % "gizzard" % "2.0.1"
val gizzard = "com.twitter" % "gizzard" % "2.1.2-SNAPSHOT"
val asm = "asm" % "asm" % "1.5.3" % "test"
val cglib = "cglib" % "cglib" % "2.1_3" % "test"
val hamcrest = "org.hamcrest" % "hamcrest-all" % "1.1" % "test"
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/flockdb/Edge.scala
Expand Up @@ -32,7 +32,7 @@ case class Edge(sourceId: Long, destinationId: Long, position: Long, updatedAtSe

val updatedAt = Time.fromSeconds(updatedAtSeconds)

def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler[JsonJob], priority: Int) = {
def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, priority: Int) = {
scheduler.put(priority, toJob(tableId, forwardingManager))
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/flockdb/EdgesService.scala
Expand Up @@ -31,7 +31,7 @@ import net.lag.logging.Logger
class EdgesService(val nameServer: NameServer[shards.Shard],
var forwardingManager: ForwardingManager,
val copyFactory: CopyJobFactory[shards.Shard],
val schedule: PrioritizingJobScheduler[JsonJob],
val schedule: PrioritizingJobScheduler,
future: Future,
intersectionQueryConfig: config.IntersectionQuery,
aggregateJobsPageSize: Int) {
Expand Down
7 changes: 4 additions & 3 deletions src/main/scala/com/twitter/flockdb/FlockDB.scala
Expand Up @@ -19,6 +19,7 @@ package com.twitter.flockdb
import java.lang.{Long => JLong, String}
import java.util.{ArrayList => JArrayList, List => JList}
import scala.collection.mutable
import scala.collection.JavaConversions._
import com.twitter.gizzard.{Future, GizzardServer}
import com.twitter.gizzard.scheduler._
import com.twitter.gizzard.nameserver
Expand Down Expand Up @@ -48,7 +49,7 @@ import Direction._
import thrift.FlockException
import config.{FlockDB => FlockDBConfig}

class FlockDB(config: FlockDBConfig, w3c: W3CStats) extends GizzardServer[shards.Shard, JsonJob](config) {
class FlockDB(config: FlockDBConfig, w3c: W3CStats) extends GizzardServer[shards.Shard](config) {
object FlockExceptionWrappingProxyFactory extends ExceptionHandlingProxyFactory[thrift.FlockDB.Iface]({ (flock, e) =>
e match {
case _: thrift.FlockException =>
Expand Down Expand Up @@ -129,7 +130,7 @@ class FlockDB(config: FlockDBConfig, w3c: W3CStats) extends GizzardServer[shards
}
}

class FlockDBThriftAdapter(val edges: EdgesService, val scheduler: PrioritizingJobScheduler[JsonJob]) extends thrift.FlockDB.Iface {
class FlockDBThriftAdapter(val edges: EdgesService, val scheduler: PrioritizingJobScheduler) extends thrift.FlockDB.Iface {
def contains(source_id: Long, graph_id: Int, destination_id: Long) = {
edges.contains(source_id, graph_id, destination_id)
}
Expand All @@ -152,7 +153,7 @@ class FlockDBThriftAdapter(val edges: EdgesService, val scheduler: PrioritizingJ
}

def select2(queries: JList[thrift.SelectQuery]): JList[thrift.Results] = {
edges.select(queries.toSeq.map { _.fromThrift }).map { _.toThrift }.toJavaList
edges.select(queries.toSeq.map { _.fromThrift }).map { _.toThrift }
}

def select_edges(queries: JList[thrift.EdgeQuery]) = {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/flockdb/JobSchedulable.scala
Expand Up @@ -3,5 +3,5 @@ package com.twitter.flockdb
import com.twitter.gizzard.scheduler._

trait JobSchedulable {
def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler[JsonJob], priority: Int)
def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, priority: Int)
}
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/flockdb/Metadata.scala
Expand Up @@ -47,7 +47,7 @@ case class Metadata(sourceId: Long, state: State, count: Int, updatedAtSeconds:

def max(other: Metadata) = if (this > other) this else other

def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler[JsonJob], priority: Int) = {
def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, priority: Int) = {
val job = state match {
case State.Normal => Unarchive
case State.Removed => RemoveAll
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.twitter.flockdb
package conversions

import scala.collection.JavaConversions._
import com.twitter.gizzard.thrift.conversions.Sequences._
import conversions.ExecuteOperation._
import conversions.Priority._
Expand Down
@@ -1,3 +1,4 @@

/*
* Copyright 2010 Twitter, Inc.
*
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.twitter.flockdb
package conversions

import scala.collection.JavaConversions._
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.flockdb
import Page._
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/com/twitter/flockdb/jobs/Copy.scala
Expand Up @@ -34,14 +34,14 @@ object Copy {
val COUNT = 10000
}

class CopyFactory(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
class CopyFactory(nameServer: NameServer[Shard], scheduler: JobScheduler)
extends CopyJobFactory[Shard] {
def apply(sourceShardId: ShardId, destinationShardId: ShardId) =
new MetadataCopy(sourceShardId, destinationShardId, MetadataCopy.START, Copy.COUNT,
nameServer, scheduler)
}

class CopyParser(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
class CopyParser(nameServer: NameServer[Shard], scheduler: JobScheduler)
extends CopyJobParser[Shard] {
def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinationId: ShardId, count: Int) = {
val cursor = (Cursor(attributes("cursor1").asInstanceOf[AnyVal].toLong),
Expand All @@ -51,7 +51,7 @@ class CopyParser(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob]
}

class Copy(sourceShardId: ShardId, destinationShardId: ShardId, cursor: Copy.CopyCursor,
count: Int, nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
count: Int, nameServer: NameServer[Shard], scheduler: JobScheduler)
extends CopyJob[Shard](sourceShardId, destinationShardId, count, nameServer, scheduler) {
def copyPage(sourceShard: Shard, destinationShard: Shard, count: Int) = {
val (items, newCursor) = sourceShard.selectAll(cursor, count)
Expand All @@ -73,7 +73,7 @@ object MetadataCopy {
val END = Cursor.End
}

class MetadataCopyParser(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
class MetadataCopyParser(nameServer: NameServer[Shard], scheduler: JobScheduler)
extends CopyJobParser[Shard] {
def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinationId: ShardId, count: Int) = {
val cursor = Cursor(attributes("cursor").asInstanceOf[AnyVal].toLong)
Expand All @@ -82,7 +82,7 @@ class MetadataCopyParser(nameServer: NameServer[Shard], scheduler: JobScheduler[
}

class MetadataCopy(sourceShardId: ShardId, destinationShardId: ShardId, cursor: MetadataCopy.CopyCursor,
count: Int, nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
count: Int, nameServer: NameServer[Shard], scheduler: JobScheduler)
extends CopyJob[Shard](sourceShardId, destinationShardId, count, nameServer, scheduler) {
def copyPage(sourceShard: Shard, destinationShard: Shard, count: Int) = {
val (items, newCursor) = sourceShard.selectAllMetadata(cursor, count)
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/com/twitter/flockdb/jobs/Diff.scala
Expand Up @@ -29,14 +29,14 @@ import com.twitter.gizzard.shards.{ShardDatabaseTimeoutException, ShardTimeoutEx
import collection.mutable.ListBuffer
import shards.{Shard}

class DiffFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
class DiffFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
extends RepairJobFactory[Shard] {
override def apply(shardIds: Seq[ShardId]) = {
new MetadataDiff(shardIds, MetadataRepair.START, MetadataRepair.COUNT, nameServer, scheduler)
}
}

class DiffParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
class DiffParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
extends RepairParser(nameServer, scheduler) {
override def deserialize(attributes: Map[String, Any], shardIds: Seq[ShardId], count: Int) = {
val cursor = (Cursor(attributes("cursor1").asInstanceOf[AnyVal].toLong),
Expand All @@ -46,7 +46,7 @@ class DiffParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobSchedu
}

class Diff(shardIds: Seq[ShardId], cursor: Repair.RepairCursor, count: Int,
nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
extends Repair(shardIds, cursor, count, nameServer, scheduler) {

private val log = Logger.get(getClass.getName)
Expand All @@ -69,7 +69,7 @@ class Diff(shardIds: Seq[ShardId], cursor: Repair.RepairCursor, count: Int,
}
}

class MetadataDiffParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
class MetadataDiffParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
extends MetadataRepairParser(nameServer, scheduler) {
override def deserialize(attributes: Map[String, Any], shardIds: Seq[ShardId], count: Int) = {
val cursor = Cursor(attributes("cursor").asInstanceOf[AnyVal].toLong)
Expand All @@ -78,7 +78,7 @@ class MetadataDiffParser(nameServer: NameServer[Shard], scheduler: PrioritizingJ
}

class MetadataDiff(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor, count: Int,
nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
extends MetadataRepair(shardIds, cursor, count, nameServer, scheduler) {

private val log = Logger.get(getClass.getName)
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/com/twitter/flockdb/jobs/Repair.scala
Expand Up @@ -37,14 +37,14 @@ object Repair {
val PRIORITY = Priority.Medium.id
}

class RepairFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
class RepairFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
extends RepairJobFactory[Shard] {
def apply(shardIds: Seq[ShardId]) = {
new MetadataRepair(shardIds, MetadataRepair.START, MetadataRepair.COUNT, nameServer, scheduler)
}
}

class RepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
class RepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
extends RepairJobParser[Shard] {
def deserialize(attributes: Map[String, Any], shardIds: Seq[ShardId], count: Int) = {
val cursor = (Cursor(attributes("cursor1").asInstanceOf[AnyVal].toLong),
Expand All @@ -54,7 +54,7 @@ class RepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobSche
}

class Repair(shardIds: Seq[ShardId], cursor: Repair.RepairCursor, count: Int,
nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
extends MultiShardRepair[Shard, Edge, Repair.RepairCursor](shardIds, cursor, count, nameServer, scheduler, Repair.PRIORITY) {

private val log = Logger.get(getClass.getName)
Expand Down Expand Up @@ -127,7 +127,7 @@ object MetadataRepair {
val PRIORITY = Priority.Medium.id
}

class MetadataRepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
class MetadataRepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
extends RepairJobParser[Shard] {
def deserialize(attributes: Map[String, Any], shardIds: Seq[ShardId], count: Int) = {
val cursor = Cursor(attributes("cursor").asInstanceOf[AnyVal].toLong)
Expand All @@ -136,7 +136,7 @@ class MetadataRepairParser(nameServer: NameServer[Shard], scheduler: Prioritizin
}

class MetadataRepair(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor, count: Int,
nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
extends MultiShardRepair[Shard, Metadata, MetadataRepair.RepairCursor](shardIds, cursor, count, nameServer, scheduler, Repair.PRIORITY) {

private val log = Logger.get(getClass.getName)
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/com/twitter/flockdb/jobs/UnsafeCopy.scala
Expand Up @@ -34,14 +34,14 @@ object UnsafeCopy {
val COUNT = 10000
}

class UnsafeCopyFactory(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
class UnsafeCopyFactory(nameServer: NameServer[Shard], scheduler: JobScheduler)
extends CopyJobFactory[Shard] {
def apply(sourceShardId: ShardId, destinationShardId: ShardId) =
new MetadataUnsafeCopy(sourceShardId, destinationShardId, MetadataUnsafeCopy.START,
UnsafeCopy.COUNT, nameServer, scheduler)
}

class UnsafeCopyParser(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
class UnsafeCopyParser(nameServer: NameServer[Shard], scheduler: JobScheduler)
extends CopyJobParser[Shard] {
def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinationId: ShardId, count: Int) = {
val cursor = (Cursor(attributes("cursor1").asInstanceOf[AnyVal].toInt),
Expand All @@ -51,7 +51,7 @@ class UnsafeCopyParser(nameServer: NameServer[Shard], scheduler: JobScheduler[Js
}

class UnsafeCopy(sourceShardId: ShardId, destinationShardId: ShardId, cursor: UnsafeCopy.CopyCursor,
count: Int, nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
count: Int, nameServer: NameServer[Shard], scheduler: JobScheduler)
extends CopyJob[Shard](sourceShardId, destinationShardId, count, nameServer, scheduler) {
def copyPage(sourceShard: Shard, destinationShard: Shard, count: Int) = {
val (items, newCursor) = sourceShard.selectAll(cursor, count)
Expand All @@ -73,7 +73,7 @@ object MetadataUnsafeCopy {
val END = Cursor.End
}

class MetadataUnsafeCopyParser(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
class MetadataUnsafeCopyParser(nameServer: NameServer[Shard], scheduler: JobScheduler)
extends CopyJobParser[Shard] {
def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinationId: ShardId, count: Int) = {
val cursor = Cursor(attributes("cursor").asInstanceOf[AnyVal].toInt)
Expand All @@ -83,7 +83,7 @@ class MetadataUnsafeCopyParser(nameServer: NameServer[Shard], scheduler: JobSche

class MetadataUnsafeCopy(sourceShardId: ShardId, destinationShardId: ShardId,
cursor: MetadataUnsafeCopy.CopyCursor, count: Int,
nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
nameServer: NameServer[Shard], scheduler: JobScheduler)
extends CopyJob[Shard](sourceShardId, destinationShardId, count, nameServer, scheduler) {
def copyPage(sourceShard: Shard, destinationShard: Shard, count: Int) = {
val (items, newCursor) = sourceShard.selectAllMetadata(cursor, count)
Expand Down

0 comments on commit 1f3b23d

Please sign in to comment.