Skip to content

Commit

Permalink
code revert (apache#391)
Browse files Browse the repository at this point in the history
  • Loading branch information
aiceflower committed Jan 2, 2024
1 parent 1acd382 commit 7964c2d
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ class LoopArrayQueue(var group: Group) extends ConsumeQueue with Logging {
}

override def get(index: Int): Option[SchedulerEvent] = {
getOrRemove(index, false)
}

private def getOrRemove(index: Int, removeFlag: Boolean = false): Option[SchedulerEvent] = {
var event: SchedulerEvent = null
eventQueue synchronized {
val _max = max
Expand All @@ -86,9 +82,6 @@ class LoopArrayQueue(var group: Group) extends ConsumeQueue with Logging {
}
val _index = (flag + (index - realSize)) % maxCapacity
event = eventQueue(_index).asInstanceOf[SchedulerEvent]
if (removeFlag) {
eventQueue(_index) = null
}
}
Option(event)
}
Expand Down Expand Up @@ -178,7 +171,7 @@ class LoopArrayQueue(var group: Group) extends ConsumeQueue with Logging {
readLock.wait(1000)
}
if (takeIndex < min) takeIndex = min
val t = getOrRemove(takeIndex, true)
val t = get(takeIndex)
takeIndex += 1
t
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,6 @@ class FIFOUserConsumer(
case _ =>
}
}
// clear cache
queue.clearAll()

this.runningJobs.foreach { job =>
if (job != null && !job.isCompleted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)

override def destroyConsumer(groupName: String): Unit =
consumerGroupMap.get(groupName).foreach { tmpConsumer =>
Utils.tryAndWarn(tmpConsumer.shutdown())
Utils.tryAndWarn(consumerGroupMap.remove(groupName))
tmpConsumer.shutdown()
consumerGroupMap.remove(groupName)
consumerListener.foreach(_.onConsumerDestroyed(tmpConsumer))
logger.warn(s"Consumer of group ($groupName) in $schedulerName is destroyed.")
}
Expand Down

0 comments on commit 7964c2d

Please sign in to comment.