diff --git a/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala b/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala index 6092eaa7..d8787731 100644 --- a/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala +++ b/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala @@ -70,33 +70,49 @@ abstract case class CopyJob[S <: Shard](sourceId: ShardId, def apply() { try { - log.info("Copying shard block (type %s) from %s to %s: state=%s", - getClass.getName.split("\\.").last, sourceId, destinationId, toMap) - val sourceShard = nameServer.findShardById(sourceId) - val destinationShard = nameServer.findShardById(destinationId) - // do this on each iteration, so it happens in the queue and can be retried if the db is busy: - nameServer.markShardBusy(destinationId, shards.Busy.Busy) - - val nextJob = copyPage(sourceShard, destinationShard, count) - nextJob match { - case Some(job) => - incrGauge - scheduler.put(job) - case None => - finish() + if (nameServer.getShard(destinationId).busy == shards.Busy.Cancelled) { + log.info("Copying cancelled for (type %s) from %s to %s", + getClass.getName.split("\\.").last, sourceId, destinationId) + Stats.clearGauge(gaugeName) + + } else { + + val sourceShard = nameServer.findShardById(sourceId) + val destinationShard = nameServer.findShardById(destinationId) + + log.info("Copying shard block (type %s) from %s to %s: state=%s", + getClass.getName.split("\\.").last, sourceId, destinationId, toMap) + // do this on each iteration, so it happens in the queue and can be retried if the db is busy: + nameServer.markShardBusy(destinationId, shards.Busy.Busy) + + val nextJob = copyPage(sourceShard, destinationShard, count) + nextJob match { + case Some(job) => + incrGauge + scheduler.put(job) + case None => + finish() + } } } catch { case e: NonExistentShard => log.error("Shard block copy failed because one of the shards doesn't exist. Terminating the copy.") - case e: ShardTimeoutException if (count > CopyJob.MIN_COPY) => - log.warning("Shard block copy timed out; trying a smaller block size.") - count = (count * 0.9).toInt - scheduler.put(this) + case e: ShardTimeoutException => + if (count > CopyJob.MIN_COPY) { + log.warning("Shard block copy timed out; trying a smaller block size.") + count = (count * 0.9).toInt + scheduler.put(this) + } else { + nameServer.markShardBusy(destinationId, shards.Busy.Error) + log.error("Shard block copy timed out on minimum block size.") + throw e + } case e: ShardDatabaseTimeoutException => log.warning("Shard block copy failed to get a database connection; retrying.") scheduler.put(this) case e: Throwable => - log.warning("Shard block copy stopped due to exception: %s", e) + nameServer.markShardBusy(destinationId, shards.Busy.Error) + log.error("Shard block copy stopped due to exception: %s", e) throw e } } diff --git a/src/main/scala/com/twitter/gizzard/shards/Busy.scala b/src/main/scala/com/twitter/gizzard/shards/Busy.scala index 5f569d65..e04ca248 100644 --- a/src/main/scala/com/twitter/gizzard/shards/Busy.scala +++ b/src/main/scala/com/twitter/gizzard/shards/Busy.scala @@ -2,6 +2,8 @@ package com.twitter.gizzard.shards object Busy extends Enumeration { - val Normal = Value("Normal") - val Busy = Value("Busy") + val Normal = Value("Normal") + val Busy = Value("Busy") + val Error = Value("Error") + val Cancelled = Value("Cancelled") } diff --git a/src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala b/src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala index 86c56e96..d350b5cb 100644 --- a/src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala +++ b/src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala @@ -25,6 +25,7 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker "CopyJob" should { val sourceShardId = shards.ShardId("testhost", "1") val destinationShardId = shards.ShardId("testhost", "2") + val destinationShardInfo = shards.ShardInfo(destinationShardId, "FakeShard", "", "", shards.Busy.Normal) val count = CopyJob.MIN_COPY + 1 val nextCopy = mock[FakeCopy] val nameServer = mock[nameserver.NameServer[shards.Shard]] @@ -59,6 +60,7 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker "normally" in { val copy = makeCopy(Some(nextCopy)) expect { + one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo one(nameServer).findShardById(sourceShardId) willReturn shard1 one(nameServer).findShardById(destinationShardId) willReturn shard2 one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) @@ -68,9 +70,10 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker copy.apply() } - "no shard" in { + "no source shard" in { val copy = makeCopy(Some(nextCopy)) expect { + one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo one(nameServer).findShardById(sourceShardId) willThrow new nameserver.NonExistentShard("foo") never(jobScheduler).put(nextCopy) } @@ -78,9 +81,20 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker copy.apply() } + "no destination shard" in { + val copy = makeCopy(Some(nextCopy)) + expect { + one(nameServer).getShard(destinationShardId) willThrow new nameserver.NonExistentShard("foo") + never(jobScheduler).put(nextCopy) + } + + copy.apply() + } + "with a database connection timeout" in { val copy = makeCopy(throw new shards.ShardDatabaseTimeoutException(100.milliseconds, sourceShardId)) expect { + one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo one(nameServer).findShardById(sourceShardId) willReturn shard1 one(nameServer).findShardById(destinationShardId) willReturn shard2 one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) @@ -94,9 +108,11 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker "with a random exception" in { val copy = makeCopy(throw new Exception("boo")) expect { + one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo one(nameServer).findShardById(sourceShardId) willReturn shard1 one(nameServer).findShardById(destinationShardId) willReturn shard2 one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) + one(nameServer).markShardBusy(destinationShardId, shards.Busy.Error) never(jobScheduler).put(nextCopy) } @@ -107,6 +123,7 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker "early on" in { val copy = makeCopy(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId)) expect { + one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo one(nameServer).findShardById(sourceShardId) willReturn shard1 one(nameServer).findShardById(destinationShardId) willReturn shard2 one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) @@ -121,9 +138,11 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker val copy = new FakeCopy(sourceShardId, destinationShardId, count, nameServer, jobScheduler)(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId)) expect { + one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo one(nameServer).findShardById(sourceShardId) willReturn shard1 one(nameServer).findShardById(destinationShardId) willReturn shard2 one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) + one(nameServer).markShardBusy(destinationShardId, shards.Busy.Error) never(jobScheduler).put(nextCopy) } @@ -131,10 +150,26 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker } } + "when cancelled" in { + val copy = makeCopy(Some(nextCopy)) + val cancelledInfo = shards.ShardInfo(destinationShardId, "FakeShard", "", "", shards.Busy.Cancelled) + + expect { + one(nameServer).getShard(destinationShardId) willReturn cancelledInfo + never(nameServer).findShardById(sourceShardId) + never(nameServer).findShardById(destinationShardId) + never(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) + never(jobScheduler).put(nextCopy) + } + + copy.apply() + } + "when finished" in { val copy = makeCopy(None) expect { + one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo one(nameServer).findShardById(sourceShardId) willReturn shard1 one(nameServer).findShardById(destinationShardId) willReturn shard2 one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)