Skip to content

Commit

Permalink
custom collectors 2.0 (#51)
Browse files Browse the repository at this point in the history
* custom collectors 2.0

A new implementation of custom collectors that can be exposed in the
public API, because it hides collector implementation details.

Non thread-specific system metrics have been moved to custom collectors,
so they're being updated at collection time.
  • Loading branch information
stefantalpalaru committed Feb 14, 2022
1 parent 71e0f0e commit 858f73b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 52 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,10 @@ The `process_*` metrics are only available on Linux, for now.
their total heap usage (from all threads), at the time the metric is created.
Since this set changes with time, you'll see more than 10 types in Grafana.

These system metrics are being updated automatically when a user-defined metric
The thread-specific metrics are being updated automatically when a user-defined metric
is changed in the main thread, but only if a minimal interval has passed since
the last update (defaults to 10 second).
the last update (defaults to 10 second). All other system metrics are custom
collectors which are updated at collection time.

```nim
import times
Expand All @@ -384,7 +385,6 @@ metrics yourself.
# disable automatic updates
setSystemMetricsAutomaticUpdate(false)
# somewhere in your event loop, at an interval of your choice
updateSystemMetrics()
updateThreadMetrics()
```

Expand Down
134 changes: 85 additions & 49 deletions metrics.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2021 Status Research & Development GmbH
# Copyright (c) 2019-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license: http://opensource.org/licenses/MIT
# * Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0
Expand Down Expand Up @@ -194,6 +194,18 @@ when defined(metrics):
proc `$`*(collector: Collector): string =
collector.toText()

# Used for custom collectors, to shield the API user from having to deal with
# internal details like lock initialisation.
proc buildCollector* [T] (typ: typedesc[T], name: string, help: string, labels: LabelsParam = @[]): T {.raises: [Defect, ValueError].} =
validateName(name)
validateLabels(labels)
result = T(name: name,
help: help,
typ: "gauge", # Prometheus does not support a non-standard value here
labels: @labels,
creationThreadId: getThreadId())
result.lock.initLock()

proc `$`*(collector: type IgnoredCollector): string = ""

# for testing
Expand Down Expand Up @@ -250,7 +262,6 @@ proc unregister* (collector: type IgnoredCollector, registry = defaultRegistry)

proc collect*(registry: Registry): OrderedTable[Collector, Metrics] =
when defined(metrics):
result = initOrderedTable[Collector, Metrics]()
withLock registry.lock:
for collector in registry.collectors:
var collectorCopy: Collector
Expand Down Expand Up @@ -484,7 +495,6 @@ when defined(metrics):
help: help,
typ: "counter",
labels: @labels,
metrics: initOrderedTable[Labels, seq[Metric]](),
creationThreadId: getThreadId(),
sampleRate: sampleRate)
result.lock.initLock()
Expand Down Expand Up @@ -612,7 +622,6 @@ when defined(metrics):
help: help,
typ: "gauge",
labels: @labels,
metrics: initOrderedTable[Labels, seq[Metric]](),
creationThreadId: getThreadId())
result.lock.initLock()
if labels.len == 0:
Expand Down Expand Up @@ -767,7 +776,6 @@ when defined(metrics):
help: help,
typ: "summary",
labels: @labels,
metrics: initOrderedTable[Labels, seq[Metric]](),
creationThreadId: getThreadId())
result.lock.initLock()
if labels.len == 0:
Expand Down Expand Up @@ -883,7 +891,6 @@ when defined(metrics):
help: help,
typ: "histogram",
labels: @labels,
metrics: initOrderedTable[Labels, seq[Metric]](),
creationThreadId: getThreadId(),
buckets: bucketsSeq)
result.lock.initLock()
Expand Down Expand Up @@ -958,8 +965,6 @@ when defined(metrics):
ThreadMetricsUpdateProc = proc() {.gcsafe, nimcall.}
let mainThreadID = getThreadId()
var
systemMetricsUpdateProcs: array[metrics_max_hooks, SystemMetricsUpdateProc]
systemMetricsUpdateProcsIndex = 0
threadMetricsUpdateProcs: array[metrics_max_hooks, ThreadMetricsUpdateProc]
threadMetricsUpdateProcsIndex = 0
systemMetricsUpdateInterval = initDuration(seconds = 10)
Expand All @@ -980,32 +985,19 @@ when defined(metrics):
except Exception as e:
raise newException(Defect, e.msg)

# No longer used for all system metrics, which now are custom collectors, but
# still used for main-thread metrics.
proc updateSystemMetrics*() {.gcsafe.} =
var doUpdate = false

if systemMetricsAutomaticUpdate:
# Update system metrics if at least systemMetricsUpdateInterval seconds
# have passed and if we are being called from the main thread.
if getThreadId() == mainThreadID:
let currTime = now()
if currTime >= (systemMetricsLastUpdated + systemMetricsUpdateInterval):
systemMetricsLastUpdated = currTime
doUpdate = true
# Update thread metrics, only when automation is on and we're in the
# main thread.
updateThreadMetrics()
else:
# We're being called directly by the API user, so don't introduce any conditions.
doUpdate = true

if doUpdate:
for i in 0 ..< systemMetricsUpdateProcsIndex:
try:
systemMetricsUpdateProcs[i]()
except CatchableError as e:
printError(e.msg)
except Exception as e:
raise newException(Defect, e.msg)

################
# process info #
Expand All @@ -1014,13 +1006,6 @@ when defined(metrics):
when defined(metrics) and defined(linux):
import posix

declareGauge process_virtual_memory_bytes, "virtual memory size in bytes"
declareGauge process_resident_memory_bytes, "resident memory size in bytes"
declareGauge process_start_time_seconds, "start time of the process since unix epoch in seconds"
declareGauge process_cpu_seconds_total, "total user and system CPU time spent in seconds"
declareGauge process_max_fds, "maximum number of open file descriptors"
declareGauge process_open_fds, "number of open file descriptors"

var
btime {.global.}: float64 = 0
ticks {.global.}: float64 # clock ticks per second
Expand All @@ -1037,7 +1022,13 @@ when defined(metrics) and defined(linux):
ticks = sysconf(SC_CLK_TCK).float64
pagesize = sysconf(SC_PAGE_SIZE).float64

proc updateProcessInfo() =
type ProcessInfo = ref object of Gauge
var processInfo* {.global.} = ProcessInfo.buildCollector("process_info", "CPU and memory usage")

method collect*(collector: ProcessInfo): Metrics =
let timestamp = getTime().toMilliseconds()
result[@[]] = @[]

try:
if btime == 0:
# we couldn't access /proc
Expand All @@ -1048,35 +1039,64 @@ when defined(metrics) and defined(linux):
# $ cat /proc/self/stat
# 30494 (cat) R 3022 30494 3022 34830 30494 4210688 98 0 0 0 0 0 0 0 20 0 1 0 73800491 10379264 189 18446744073709551615 94060049248256 94060049282149 140735229395104 0 0 0 0 0 0 0 0 0 17 6 0 0 0 0 0 94060049300560 94060049302112 94060076990464 140735229397011 140735229397031 140735229397031 140735229403119 0
let selfStat = readFile("/proc/self/stat").split(") ")[^1].split(' ')

process_virtual_memory_bytes.set(selfStat[20].parseFloat(), doUpdateSystemMetrics = false)
process_resident_memory_bytes.set(selfStat[21].parseFloat() * pagesize, doUpdateSystemMetrics = false)
process_start_time_seconds.set(selfStat[19].parseFloat() / ticks + btime, doUpdateSystemMetrics = false)
process_cpu_seconds_total.set((selfStat[11].parseFloat() + selfStat[12].parseFloat()) / ticks, doUpdateSystemMetrics = false)
result[@[]] = @[
Metric(
name: "process_virtual_memory_bytes", # Virtual memory size in bytes.
value: selfStat[20].parseFloat(),
timestamp: timestamp,
),
Metric(
name: "process_resident_memory_bytes", # Resident memory size in bytes.
value: selfStat[21].parseFloat() * pagesize,
timestamp: timestamp,
),
Metric(
name: "process_start_time_seconds", # Start time of the process since unix epoch in seconds.
value: selfStat[19].parseFloat() / ticks + btime,
timestamp: timestamp,
),
Metric(
name: "process_cpu_seconds_total", # Total user and system CPU time spent in seconds.
value: (selfStat[11].parseFloat() + selfStat[12].parseFloat()) / ticks,
timestamp: timestamp,
),
]

for line in lines("/proc/self/limits"):
if line.startsWith("Max open files"):
process_max_fds.set(line.splitWhiteSpace()[3].parseFloat(), doUpdateSystemMetrics = false) # a simple `split()` does not combine adjacent whitespace
result[@[]].add(
Metric(
name: "process_max_fds", # Maximum number of open file descriptors.
value: line.splitWhiteSpace()[3].parseFloat(), # a simple `split()` does not combine adjacent whitespace
timestamp: timestamp,
)
)
break

process_open_fds.set(toSeq(walkDir("/proc/self/fd")).len.float64, doUpdateSystemMetrics = false)
result[@[]].add(
Metric(
name: "process_open_fds", # Number of open file descriptors.
value: toSeq(walkDir("/proc/self/fd")).len.float64,
timestamp: timestamp,
)
)
except CatchableError as e:
printError(e.msg)

systemMetricsUpdateProcs[systemMetricsUpdateProcsIndex] = updateProcessInfo
systemMetricsUpdateProcsIndex += 1
processInfo.register(defaultRegistry)

####################
# Nim runtime info #
####################

when defined(metrics):
declareGauge nim_gc_mem_bytes, "the number of bytes that are owned by a thread's GC", ["thread_id"]
declareGauge nim_gc_mem_occupied_bytes, "the number of bytes that are owned by a thread's GC and hold data", ["thread_id"]
declareGauge nim_gc_heap_instance_occupied_bytes, "total bytes occupied, by instance type (all threads)", ["type_name"]
declareGauge nim_gc_heap_instance_occupied_summed_bytes, "total bytes occupied by all instance types, in all threads - should be equal to 'sum(nim_gc_mem_occupied_bytes)' when 'updateThreadMetrics()' is being called in all threads, but it's somewhat smaller"
type NimRuntimeInfo = ref object of Gauge
var nimRuntimeInfo* {.global.} = NimRuntimeInfo.buildCollector("nim_runtime_info", "Nim runtime info")

method collect*(collector: NimRuntimeInfo): Metrics =
let timestamp = getTime().toMilliseconds()
result[@[]] = @[]

proc updateNimRuntimeInfoGlobal() =
try:
when defined(nimTypeNames) and declared(dumpHeapInstances):
# Too high cardinality causes performance issues in Prometheus.
Expand Down Expand Up @@ -1105,13 +1125,29 @@ when defined(metrics):
# Lower the number of metrics to reduce metric cardinality.
for i in 0..<labelsLimit:
let (typeName, size) = heapSizes[i]
nim_gc_heap_instance_occupied_bytes.set(size.float64, labelValues = @[$typeName], doUpdateSystemMetrics = false)
nim_gc_heap_instance_occupied_summed_bytes.set(heapSum.float64, doUpdateSystemMetrics = false)
result[@[]].add(
Metric(
name: "nim_gc_heap_instance_occupied_bytes", # total bytes occupied, by instance type (all threads)
value: size.float64,
timestamp: timestamp,
labels: @["type_name"],
labelValues: @[$typeName],
)
)
result[@[]].add(
Metric(
name: "nim_gc_heap_instance_occupied_summed_bytes", # total bytes occupied by all instance types, in all threads - should be equal to 'sum(nim_gc_mem_occupied_bytes)' when 'updateThreadMetrics()' is being called in all threads, but it's somewhat smaller
value: heapSum.float64,
timestamp: timestamp,
)
)
except CatchableError as e:
printError(e.msg)

systemMetricsUpdateProcs[systemMetricsUpdateProcsIndex] = updateNimRuntimeInfoGlobal
systemMetricsUpdateProcsIndex += 1
nimRuntimeInfo.register(defaultRegistry)

declareGauge nim_gc_mem_bytes, "the number of bytes that are owned by a thread's GC", ["thread_id"]
declareGauge nim_gc_mem_occupied_bytes, "the number of bytes that are owned by a thread's GC and hold data", ["thread_id"]

proc updateNimRuntimeInfoThread() =
try:
Expand Down

0 comments on commit 858f73b

Please sign in to comment.