Skip to content

Commit

Permalink
move more classes out of org.apache.pekko package (#5)
Browse files Browse the repository at this point in the history
* move more classes out of org.apache.pekko package

* move more code

* broken tests

* stray imports
  • Loading branch information
pjfanning committed Apr 24, 2024
1 parent b334535 commit 1a6edd2
Show file tree
Hide file tree
Showing 21 changed files with 153 additions and 132 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ pekko {
# main/global/default dispatcher
default-dispatcher {
type = "org.apache.pekko.sensors.dispatch.InstrumentedDispatcherConfigurator"
executor = "org.apache.pekko.sensors.dispatch.InstrumentedExecutor"
type = "nl.pragmasoft.pekko.sensors.dispatch.InstrumentedDispatcherConfigurator"
executor = "nl.pragmasoft.pekko.sensors.dispatch.InstrumentedExecutor"
instrumented-executor {
delegate = "fork-join-executor"
Expand All @@ -138,8 +138,8 @@ pekko {
# some other dispatcher used in your app
default-blocking-io-dispatcher {
type = "org.apache.pekko.sensors.dispatch.InstrumentedDispatcherConfigurator"
executor = "org.apache.pekko.sensors.dispatch.InstrumentedExecutor"
type = "nl.pragmasoft.pekko.sensors.dispatch.InstrumentedDispatcherConfigurator"
executor = "nl.pragmasoft.pekko.sensors.dispatch.InstrumentedExecutor"
instrumented-executor {
delegate = "thread-pool-executor"
Expand All @@ -160,8 +160,8 @@ pekko {

```
default-dispatcher {
type = "org.apache.pekko.sensors.dispatch.InstrumentedDispatcherConfigurator"
executor = "org.apache.pekko.sensors.dispatch.InstrumentedExecutor"
type = "nl.pragmasoft.pekko.sensors.dispatch.InstrumentedDispatcherConfigurator"
executor = "nl.pragmasoft.pekko.sensors.dispatch.InstrumentedExecutor"
instrumented-executor {
delegate = "fork-join-executor"
Expand Down
8 changes: 4 additions & 4 deletions examples/app/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ pekko {
provider = "org.apache.pekko.cluster.ClusterActorRefProvider"
allow-java-serialization = true // for in-memory unit tests only!
default-dispatcher {
type = "org.apache.pekko.sensors.dispatch.InstrumentedDispatcherConfigurator"
executor = "org.apache.pekko.sensors.dispatch.InstrumentedExecutor"
type = "nl.pragmasoft.pekko.sensors.dispatch.InstrumentedDispatcherConfigurator"
executor = "nl.pragmasoft.pekko.sensors.dispatch.InstrumentedExecutor"

instrumented-executor {
delegate = "cats.effect.unsafe.CatsEffectPool"
Expand All @@ -24,8 +24,8 @@ pekko {
}

default-blocking-io-dispatcher {
type = "org.apache.pekko.sensors.dispatch.InstrumentedDispatcherConfigurator"
executor = "org.apache.pekko.sensors.dispatch.InstrumentedExecutor"
type = "nl.pragmasoft.pekko.sensors.dispatch.InstrumentedDispatcherConfigurator"
executor = "nl.pragmasoft.pekko.sensors.dispatch.InstrumentedExecutor"

instrumented-executor {
delegate = "thread-pool-executor"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.apache.pekko.sensors
package nl.pragmasoft.pekko.sensors

import io.prometheus.client.{Collector, CollectorRegistry, Gauge, Histogram}
import nl.pragmasoft.pekko.sensors.BasicMetricBuilders

final case class DispatcherMetrics(
queueTime: Histogram,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package org.apache.pekko.sensors.dispatch
package nl.pragmasoft.pekko.sensors.dispatch

import io.prometheus.client.{Gauge, Histogram}
import nl.pragmasoft.pekko.sensors.MetricsBuilders
import org.apache.pekko.sensors.DispatcherMetrics
import nl.pragmasoft.pekko.sensors.{DispatcherMetrics, MetricsBuilders}

/** Creates and registers Dispatcher metrics in the global registry */
private[dispatch] object DispatcherMetricsRegistration extends MetricsBuilders {
object DispatcherMetricsRegistration extends MetricsBuilders {
def namespace: String = "pekko_sensors"
def subsystem: String = "dispatchers"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,16 @@
package org.apache.pekko.sensors.dispatch
package nl.pragmasoft.pekko.sensors.dispatch

import java.lang.management.{ManagementFactory, ThreadInfo, ThreadMXBean}
import java.util.concurrent._
import java.util.concurrent.atomic.LongAdder
import org.apache.pekko.dispatch._
import org.apache.pekko.event.Logging.{Error, Warning}
import org.apache.pekko.event.Logging.Warning
import org.apache.pekko.sensors.dispatch.{InstrumentedDispatcherBase, PekkoRunnableWrapper, ScalaRunnableWrapper}
import DispatcherInstrumentationWrapper.Run
import com.typesafe.config.Config
import nl.pragmasoft.pekko.sensors.PekkoSensors
import nl.pragmasoft.pekko.sensors.{MetricsBuilders, PekkoSensors, RunnableWatcher}
import nl.pragmasoft.pekko.sensors.RunnableWatcher

import scala.PartialFunction.condOpt
import scala.concurrent.duration.{Duration, FiniteDuration}

object PekkoRunnableWrapper {
def unapply(runnable: Runnable): Option[Run => Runnable] =
condOpt(runnable) {
case runnable: Batchable => new BatchableWrapper(runnable, _)
case runnable: Mailbox => new MailboxWrapper(runnable, _)
}

class BatchableWrapper(self: Batchable, r: Run) extends Batchable {
def run(): Unit = r(() => self.run())
def isBatchable: Boolean = self.isBatchable
}

class MailboxWrapper(self: Mailbox, r: Run) extends ForkJoinTask[Unit] with Runnable {
def getRawResult: Unit = self.getRawResult()
def setRawResult(v: Unit): Unit = self.setRawResult(v)
def exec(): Boolean = r(() => self.exec())
def run(): Unit = { exec(); () }
}
}
import scala.concurrent.duration.Duration

class DispatcherInstrumentationWrapper(config: Config) {
import DispatcherInstrumentationWrapper._
Expand Down Expand Up @@ -205,60 +184,7 @@ class InstrumentedExecutor(val config: Config, val prerequisites: DispatcherPrer

}

trait InstrumentedDispatcher extends Dispatcher {

def actorSystemName: String
private lazy val wrapper = new DispatcherInstrumentationWrapper(configurator.config)

private val threadMXBean: ThreadMXBean = ManagementFactory.getThreadMXBean
private val interestingStateNames = Set("runnable", "waiting", "timed_waiting", "blocked")
private val interestingStates = Thread.State.values.filter(s => interestingStateNames.contains(s.name().toLowerCase))

PekkoSensors.schedule(
s"$id-states",
() => {
val threads = threadMXBean
.getThreadInfo(threadMXBean.getAllThreadIds, 0)
.filter(t =>
t != null
&& t.getThreadName.startsWith(s"$actorSystemName-$id")
)

interestingStates foreach { state =>
val stateLabel = state.toString.toLowerCase
DispatcherMetricsRegistration.threadStates
.labels(id, stateLabel)
.set(threads.count(_.getThreadState.name().equalsIgnoreCase(stateLabel)))
}
DispatcherMetricsRegistration.threads
.labels(id)
.set(threads.length)
}
)

override def execute(runnable: Runnable): Unit = wrapper(runnable, super.execute)

protected[pekko] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean =
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint))
if (mbox.setAsScheduled())
try {
wrapper(mbox, executorService.execute)
true
} catch {
case _: RejectedExecutionException =>
try {
wrapper(mbox, executorService.execute)
true
} catch { //Retry once
case e: RejectedExecutionException =>
mbox.setAsIdle()
eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
throw e
}
}
else false
else false
}
trait InstrumentedDispatcher extends InstrumentedDispatcherBase

class InstrumentedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(config, prerequisites) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package org.apache.pekko.sensors.metered
package nl.pragmasoft.pekko.sensors.metered

import nl.pragmasoft.pekko.sensors.DispatcherMetrics
import org.apache.pekko.dispatch.Dispatcher
import org.apache.pekko.sensors.DispatcherMetrics
import org.apache.pekko.sensors.metered.MeteredDispatcherInstrumentation

class MeteredDispatcher(settings: MeteredDispatcherSettings)
extends Dispatcher(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
package org.apache.pekko.sensors.metered
package nl.pragmasoft.pekko.sensors.metered

import com.typesafe.config.Config
import nl.pragmasoft.pekko.sensors.metered.MeteredDispatcherSetup
import org.apache.pekko.dispatch.{DispatcherPrerequisites, MessageDispatcher, MessageDispatcherConfigurator}

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{Duration, FiniteDuration}

private object MeteredDispatcherConfigurator {
final implicit class ConfigOps(val config: Config) extends AnyVal {
def getMillisDuration(path: String): FiniteDuration = getDuration(path, TimeUnit.MILLISECONDS)

def getNanosDuration(path: String): FiniteDuration = getDuration(path, TimeUnit.NANOSECONDS)

private def getDuration(path: String, unit: TimeUnit): FiniteDuration =
Duration(config.getDuration(path, unit), unit)
}
}

class MeteredDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(config, prerequisites) {
import org.apache.pekko.util.Helpers._
import MeteredDispatcherConfigurator.ConfigOps

private val instance: MessageDispatcher = {
val _metrics = MeteredDispatcherSetup.setupOrThrow(prerequisites).metrics
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package org.apache.pekko.sensors.metered
package nl.pragmasoft.pekko.sensors.metered

import org.apache.pekko.dispatch.MessageDispatcherConfigurator
import nl.pragmasoft.pekko.sensors.DispatcherMetrics
import org.apache.pekko.dispatch.{ExecutorServiceFactoryProvider, MessageDispatcherConfigurator}

import scala.concurrent.duration._
import org.apache.pekko.dispatch.ExecutorServiceFactoryProvider
import org.apache.pekko.sensors.DispatcherMetrics

private[metered] case class MeteredDispatcherSettings(
case class MeteredDispatcherSettings(
name: String,
metrics: DispatcherMetrics,
_configurator: MessageDispatcherConfigurator,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package nl.pragmasoft.pekko.sensors.metered

import nl.pragmasoft.pekko.sensors.DispatcherMetrics
import org.apache.pekko.actor.setup.Setup
import org.apache.pekko.dispatch.DispatcherPrerequisites
import org.apache.pekko.sensors.DispatcherMetrics

final case class MeteredDispatcherSetup(metrics: DispatcherMetrics) extends Setup

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package nl.pragmasoft.pekko.sensors.metered

import org.apache.pekko.sensors.dispatch.DispatcherInstrumentationWrapper.{InstrumentedRun, Run}
import com.typesafe.config.Config
import nl.pragmasoft.pekko.sensors.RunnableWatcher
import org.apache.pekko.sensors.DispatcherMetrics
import org.apache.pekko.sensors.dispatch.RunnableWrapper
import nl.pragmasoft.pekko.sensors.{DispatcherMetrics, RunnableWatcher}
import nl.pragmasoft.pekko.sensors.dispatch.DispatcherInstrumentationWrapper.{InstrumentedRun, Run}
import nl.pragmasoft.pekko.sensors.dispatch.RunnableWrapper

import java.util.concurrent.atomic.LongAdder
import scala.concurrent.duration.Duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.apache.pekko.persistence.{JournalProtocol => P}
import nl.pragmasoft.pekko.sensors.MetricOps._
import com.typesafe.scalalogging.LazyLogging
import nl.pragmasoft.pekko.sensors.{ClassNameUtil, SensorMetrics}
import nl.pragmasoft.pekko.sensors.{ClassNameUtil, PekkoSensorsExtension, SensorMetrics}

import scala.annotation.tailrec
import scala.reflect.ClassTag
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.apache.pekko.sensors.actor

import nl.pragmasoft.pekko.sensors.{ClassNameUtil, PekkoSensorsExtension}
import org.apache.pekko.actor.{Actor, ActorContext, ActorLogging, ActorRef, ReceiveTimeout}
import org.apache.pekko.persistence.{PersistentActor, RecoveryCompleted}
import org.apache.pekko.actor.{Actor, ActorLogging, ReceiveTimeout}
import org.apache.pekko.persistence.PersistentActor
import nl.pragmasoft.pekko.sensors.MetricOps._

import scala.collection.immutable
import scala.util.control.NonFatal
import nl.pragmasoft.pekko.sensors.MetricOps._

trait ActorMetrics extends Actor with ActorLogging {
self: Actor =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.apache.pekko.sensors.dispatch

import nl.pragmasoft.pekko.sensors.PekkoSensors
import nl.pragmasoft.pekko.sensors.dispatch.{DispatcherInstrumentationWrapper, DispatcherMetricsRegistration}
import org.apache.pekko.dispatch.{Dispatcher, Mailbox}
import org.apache.pekko.event.Logging.Error

import java.lang.management.{ManagementFactory, ThreadMXBean}
import java.util.concurrent.RejectedExecutionException

trait InstrumentedDispatcherBase extends Dispatcher {

def actorSystemName: String
private lazy val wrapper = new DispatcherInstrumentationWrapper(configurator.config)

private val threadMXBean: ThreadMXBean = ManagementFactory.getThreadMXBean
private val interestingStateNames = Set("runnable", "waiting", "timed_waiting", "blocked")
private val interestingStates = Thread.State.values.filter(s => interestingStateNames.contains(s.name().toLowerCase))

PekkoSensors.schedule(
s"$id-states",
() => {
val threads = threadMXBean
.getThreadInfo(threadMXBean.getAllThreadIds, 0)
.filter(t =>
t != null
&& t.getThreadName.startsWith(s"$actorSystemName-$id")
)

interestingStates foreach { state =>
val stateLabel = state.toString.toLowerCase
DispatcherMetricsRegistration.threadStates
.labels(id, stateLabel)
.set(threads.count(_.getThreadState.name().equalsIgnoreCase(stateLabel)))
}
DispatcherMetricsRegistration.threads
.labels(id)
.set(threads.length)
}
)

override def execute(runnable: Runnable): Unit = wrapper(runnable, super.execute)

protected[pekko] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean =
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint))
if (mbox.setAsScheduled())
try {
wrapper(mbox, executorService.execute)
true
} catch {
case _: RejectedExecutionException =>
try {
wrapper(mbox, executorService.execute)
true
} catch { //Retry once
case e: RejectedExecutionException =>
mbox.setAsIdle()
eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
throw e
}
}
else false
else false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.apache.pekko.sensors.dispatch

import nl.pragmasoft.pekko.sensors.dispatch.DispatcherInstrumentationWrapper.Run
import org.apache.pekko.dispatch.{Batchable, Mailbox}

import java.util.concurrent.ForkJoinTask
import scala.PartialFunction.condOpt

object PekkoRunnableWrapper {
def unapply(runnable: Runnable): Option[Run => Runnable] =
condOpt(runnable) {
case runnable: Batchable => new BatchableWrapper(runnable, _)
case runnable: Mailbox => new MailboxWrapper(runnable, _)
}

class BatchableWrapper(self: Batchable, r: Run) extends Batchable {
def run(): Unit = r(() => self.run())
def isBatchable: Boolean = self.isBatchable
}

class MailboxWrapper(self: Mailbox, r: Run) extends ForkJoinTask[Unit] with Runnable {
def getRawResult: Unit = self.getRawResult()
def setRawResult(v: Unit): Unit = self.setRawResult(v)
def exec(): Boolean = r(() => self.exec())
def run(): Unit = { exec(); () }
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.apache.pekko.sensors.dispatch

import org.apache.pekko.dispatch.Batchable
import DispatcherInstrumentationWrapper.Run
import nl.pragmasoft.pekko.sensors.dispatch.DispatcherInstrumentationWrapper.Run

import scala.PartialFunction.condOpt

Expand Down

0 comments on commit 1a6edd2

Please sign in to comment.