Skip to content

Commit

Permalink
closing scanner in any case
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Wewerka committed Nov 23, 2018
1 parent e6a17fb commit adbe789
Showing 1 changed file with 40 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.blocking
import scala.util.{Failure, Success, Try}

class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace: String)
extends LEvents with Logging {
Expand Down Expand Up @@ -185,37 +186,50 @@ class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace
Future[Iterator[Event]] = {
Future {
blocking {
require(!((reversed == Some(true)) && (entityType.isEmpty || entityId.isEmpty)),
require(!(reversed.contains(true) && (entityType.isEmpty || entityId.isEmpty)),
"the parameter reversed can only be used with both entityType and entityId specified.")

val table = getTable(appId, channelId)

val scan = HBEventsUtil.createScan(
startTime = startTime,
untilTime = untilTime,
entityType = entityType,
entityId = entityId,
eventNames = eventNames,
targetEntityType = targetEntityType,
targetEntityId = targetEntityId,
reversed = reversed)
val scanner = table.getScanner(scan)
table.close()

val eventsIter = scanner.iterator()

// Get all events if None or Some(-1)
val results: Iterator[Result] = limit match {
case Some(-1) => eventsIter
case None => eventsIter
case Some(x) => eventsIter.take(x)
val tableTry = Try(getTable(appId, channelId))
val scannerTry = tableTry.flatMap {
table =>
Try {
val scan = HBEventsUtil.createScan(
startTime = startTime,
untilTime = untilTime,
entityType = entityType,
entityId = entityId,
eventNames = eventNames,
targetEntityType = targetEntityType,
targetEntityId = targetEntityId,
reversed = reversed)
table.getScanner(scan)
}
}

val eventsIt = results.map {
resultToEvent(_, appId)
try {
scannerTry match {
case Success(scanner) =>
val eventsIter = scanner.iterator()

// Get all events if None or Some(-1)
val results: Iterator[Result] = limit match {
case Some(-1) => eventsIter
case None => eventsIter
case Some(x) => eventsIter.take(x)
}

val eventsIt = results.map {
resultToEvent(_, appId)
}
eventsIt
case Failure(e) =>
logger.error("Cannot create HBase Scanner", e)
Iterator.empty
}
} finally {
Try(tableTry.map(_.close()))
Try(scannerTry.map(_.close()))
}

eventsIt
}
}(blockingThreadPoolExecutor)
}
Expand Down

0 comments on commit adbe789

Please sign in to comment.