Skip to content
Permalink
Browse files

Creation of Processor to help better thread-managed processing of work

  • Loading branch information...
darkfrog26 committed Mar 11, 2019
1 parent 586586a commit 1ff7b19568a61baf712a3700c490b7a7ee1a0c45
@@ -3,7 +3,7 @@ import sbtcrossproject.CrossPlugin.autoImport.crossProject

name := "youi"
organization in ThisBuild := "io.youi"
version in ThisBuild := "0.10.6"
version in ThisBuild := "0.10.7-SNAPSHOT"
scalaVersion in ThisBuild := "2.12.8"
crossScalaVersions in ThisBuild := List("2.12.8", "2.11.12")
resolvers in ThisBuild += Resolver.sonatypeRepo("releases")
@@ -0,0 +1,75 @@
package io.youi.processor

import java.util.concurrent.{ConcurrentLinkedQueue, Executors}
import java.util.concurrent.atomic.AtomicLong

import scala.concurrent.{ExecutionContext, Future}

object Processor {
lazy val DefaultThreads: Int = Runtime.getRuntime.availableProcessors() * 2

private lazy val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())

def apply[Input, Output](name: String,
handler: Input => Future[Output],
executionContext: ExecutionContext = executionContext): Processor[Input, Output] = {
new Processor[Input, Output](name, handler, executionContext)
}
}

class Processor[Input, Output](name: String,
handler: Input => Future[Output],
executionContext: ExecutionContext) {
private implicit val implicitExecutionContext: ExecutionContext = executionContext

def process(input: List[Input],
threads: Int = Processor.DefaultThreads,
monitor: ProcessorMonitor = ProcessorMonitor.Default): Future[List[Output]] = {
val started = System.currentTimeMillis()
val processed = new AtomicLong(0L)
val total = input.length

val queue = new ConcurrentLinkedQueue[WorkUnit]
val work = input.map { i =>
val wu = WorkUnit(i)
queue.add(wu)
wu
}

def processWork(): Future[Unit] = Option(queue.poll()) match {
case None => Future.successful(())
case Some(wu) => handler(wu.input).flatMap { out =>
processed.incrementAndGet()
wu.output = Some(out)
processWork()
}
}

val futures = (0 until threads).map(_ => processWork()).toList
val future = Future.sequence(futures).map(_ => work.flatMap(_.output))

// Monitor
Future {
val logEvery = monitor.resolution.toMillis
var lastRun: Long = System.currentTimeMillis()
while (!future.isCompleted) {
if (System.currentTimeMillis() > lastRun + logEvery) {
val now = System.currentTimeMillis()
val elapsed = (now - started) / 1000.0
val perSecond = processed.get() / elapsed
monitor.monitor(name, processed.get(), total, elapsed, perSecond, threads, finished = false)
lastRun = now
}
Thread.sleep(10L)
}
val elapsed = (System.currentTimeMillis() - started) / 1000.0
val perSecond = processed.get() / elapsed
monitor.monitor(name, processed.get(), total, elapsed, perSecond, threads, finished = true)
}

future
}

case class WorkUnit(input: Input, var output: Option[Output] = None)
}

@@ -0,0 +1,44 @@
package io.youi.processor

import scala.concurrent.duration._

trait ProcessorMonitor {
def monitor(name: String,
processed: Long,
total: Long,
elapsed: Double,
perSecond: Double,
threads: Int,
finished: Boolean): Unit

def resolution: FiniteDuration
}

object ProcessorMonitor {
lazy val Default: ProcessorMonitor = Logging()

case object Void extends ProcessorMonitor {
override def monitor(name: String,
processed: Long,
total: Long,
elapsed: Double,
perSecond: Double,
threads: Int,
finished: Boolean): Unit = {}

override def resolution: FiniteDuration = 1.hour
}

case class Logging(resolution: FiniteDuration = 5.seconds) extends ProcessorMonitor {
override def monitor(name: String,
processed: Long,
total: Long,
elapsed: Double,
perSecond: Double,
threads: Int, finished: Boolean): Unit = if (finished) {
scribe.info(s"$name: Processed $processed of $total in $elapsed ($perSecond per second in $threads threads)")
} else {
scribe.info(s"$name: Finished Processing $processed of $total in $elapsed ($perSecond per second in $threads threads)")
}
}
}

0 comments on commit 1ff7b19

Please sign in to comment.
You can’t perform that action at this time.