Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
allow copy cancellation and add extra flags to shard busy status
Browse files Browse the repository at this point in the history
  • Loading branch information
freels committed Dec 30, 2010
1 parent 89c8da7 commit 010ba8e
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 22 deletions.
54 changes: 35 additions & 19 deletions src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala
Expand Up @@ -70,33 +70,49 @@ abstract case class CopyJob[S <: Shard](sourceId: ShardId,

def apply() {
try {
log.info("Copying shard block (type %s) from %s to %s: state=%s",
getClass.getName.split("\\.").last, sourceId, destinationId, toMap)
val sourceShard = nameServer.findShardById(sourceId)
val destinationShard = nameServer.findShardById(destinationId)
// do this on each iteration, so it happens in the queue and can be retried if the db is busy:
nameServer.markShardBusy(destinationId, shards.Busy.Busy)

val nextJob = copyPage(sourceShard, destinationShard, count)
nextJob match {
case Some(job) =>
incrGauge
scheduler.put(job)
case None =>
finish()
if (nameServer.getShard(destinationId).busy == shards.Busy.Cancelled) {

This comment has been minimized.

Copy link
@freels

freels Dec 30, 2010

Author Contributor

This could also be made to check randomly some percentage of the time.

log.info("Copying cancelled for (type %s) from %s to %s",
getClass.getName.split("\\.").last, sourceId, destinationId)
Stats.clearGauge(gaugeName)

} else {

val sourceShard = nameServer.findShardById(sourceId)
val destinationShard = nameServer.findShardById(destinationId)

log.info("Copying shard block (type %s) from %s to %s: state=%s",
getClass.getName.split("\\.").last, sourceId, destinationId, toMap)
// do this on each iteration, so it happens in the queue and can be retried if the db is busy:
nameServer.markShardBusy(destinationId, shards.Busy.Busy)

val nextJob = copyPage(sourceShard, destinationShard, count)
nextJob match {
case Some(job) =>
incrGauge
scheduler.put(job)
case None =>
finish()
}
}
} catch {
case e: NonExistentShard =>
log.error("Shard block copy failed because one of the shards doesn't exist. Terminating the copy.")
case e: ShardTimeoutException if (count > CopyJob.MIN_COPY) =>
log.warning("Shard block copy timed out; trying a smaller block size.")
count = (count * 0.9).toInt
scheduler.put(this)
case e: ShardTimeoutException =>
if (count > CopyJob.MIN_COPY) {
log.warning("Shard block copy timed out; trying a smaller block size.")
count = (count * 0.9).toInt
scheduler.put(this)
} else {
nameServer.markShardBusy(destinationId, shards.Busy.Error)
log.error("Shard block copy timed out on minimum block size.")
throw e
}
case e: ShardDatabaseTimeoutException =>
log.warning("Shard block copy failed to get a database connection; retrying.")
scheduler.put(this)
case e: Throwable =>
log.warning("Shard block copy stopped due to exception: %s", e)
nameServer.markShardBusy(destinationId, shards.Busy.Error)
log.error("Shard block copy stopped due to exception: %s", e)
throw e
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/com/twitter/gizzard/shards/Busy.scala
Expand Up @@ -2,6 +2,8 @@ package com.twitter.gizzard.shards


object Busy extends Enumeration {
val Normal = Value("Normal")
val Busy = Value("Busy")
val Normal = Value("Normal")
val Busy = Value("Busy")
val Error = Value("Error")
val Cancelled = Value("Cancelled")
}
Expand Up @@ -25,6 +25,7 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
"CopyJob" should {
val sourceShardId = shards.ShardId("testhost", "1")
val destinationShardId = shards.ShardId("testhost", "2")
val destinationShardInfo = shards.ShardInfo(destinationShardId, "FakeShard", "", "", shards.Busy.Normal)
val count = CopyJob.MIN_COPY + 1
val nextCopy = mock[FakeCopy]
val nameServer = mock[nameserver.NameServer[shards.Shard]]
Expand Down Expand Up @@ -59,6 +60,7 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
"normally" in {
val copy = makeCopy(Some(nextCopy))
expect {
one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo
one(nameServer).findShardById(sourceShardId) willReturn shard1
one(nameServer).findShardById(destinationShardId) willReturn shard2
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
Expand All @@ -68,19 +70,31 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
copy.apply()
}

"no shard" in {
"no source shard" in {
val copy = makeCopy(Some(nextCopy))
expect {
one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo
one(nameServer).findShardById(sourceShardId) willThrow new nameserver.NonExistentShard("foo")
never(jobScheduler).put(nextCopy)
}

copy.apply()
}

"no destination shard" in {
val copy = makeCopy(Some(nextCopy))
expect {
one(nameServer).getShard(destinationShardId) willThrow new nameserver.NonExistentShard("foo")
never(jobScheduler).put(nextCopy)
}

copy.apply()
}

"with a database connection timeout" in {
val copy = makeCopy(throw new shards.ShardDatabaseTimeoutException(100.milliseconds, sourceShardId))
expect {
one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo
one(nameServer).findShardById(sourceShardId) willReturn shard1
one(nameServer).findShardById(destinationShardId) willReturn shard2
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
Expand All @@ -94,9 +108,11 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
"with a random exception" in {
val copy = makeCopy(throw new Exception("boo"))
expect {
one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo
one(nameServer).findShardById(sourceShardId) willReturn shard1
one(nameServer).findShardById(destinationShardId) willReturn shard2
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Error)
never(jobScheduler).put(nextCopy)
}

Expand All @@ -107,6 +123,7 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
"early on" in {
val copy = makeCopy(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId))
expect {
one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo
one(nameServer).findShardById(sourceShardId) willReturn shard1
one(nameServer).findShardById(destinationShardId) willReturn shard2
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
Expand All @@ -121,20 +138,38 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
val copy = new FakeCopy(sourceShardId, destinationShardId, count, nameServer, jobScheduler)(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId))

expect {
one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo
one(nameServer).findShardById(sourceShardId) willReturn shard1
one(nameServer).findShardById(destinationShardId) willReturn shard2
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Error)
never(jobScheduler).put(nextCopy)
}

copy.apply() must throwA[Exception]
}
}

"when cancelled" in {
val copy = makeCopy(Some(nextCopy))
val cancelledInfo = shards.ShardInfo(destinationShardId, "FakeShard", "", "", shards.Busy.Cancelled)

expect {
one(nameServer).getShard(destinationShardId) willReturn cancelledInfo
never(nameServer).findShardById(sourceShardId)
never(nameServer).findShardById(destinationShardId)
never(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
never(jobScheduler).put(nextCopy)
}

copy.apply()
}

"when finished" in {
val copy = makeCopy(None)

expect {
one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo
one(nameServer).findShardById(sourceShardId) willReturn shard1
one(nameServer).findShardById(destinationShardId) willReturn shard2
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
Expand Down

2 comments on commit 010ba8e

@fizx
Copy link

@fizx fizx commented on 010ba8e Dec 30, 2010

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like that the only level of granularity that we have on the busyness flag is at a shard level. I think we should cancel jobs, not cancel shards. Also, I'd like to see the error message logged in the nameserver for easy retrieval.

This code looks good though, and is a step in the right direction, so I still say ship it.

@freels
Copy link
Contributor Author

@freels freels commented on 010ba8e Dec 31, 2010

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, the busy flag as it is is extremely limited. I was going for incremental change with this patch. The real solution should also:

  • Track individual jobs.
  • Allow space for jobs to record arbitrary status.
  • Allow operator to tune job in flight (say page size, perhaps.)
  • Prevent multiple copies to the same shard at once.
  • Prevent multiple pages of the same copy job from re-spawning due to races in error condition handling. (Same mechanism as above, different application)
  • Completely handle its own error retrying: The error retrying here is in addition to that of the containing queue. The logic here and that above can have bad interactions.

Please sign in to comment.