Skip to content

Commit

Permalink
Merge a5de322 into cbbd04a
Browse files Browse the repository at this point in the history
  • Loading branch information
c0c0n3 committed Dec 24, 2020
2 parents cbbd04a + a5de322 commit bce839d
Show file tree
Hide file tree
Showing 24 changed files with 2,713 additions and 132 deletions.
4 changes: 4 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ rethinkdb = "==2.3"
pickle-mixin = "==1.0.2"
pytest-lazy-fixture = "~=0.6.3"

# run `pipenv install --dev` to get the packages below in your env
[dev-packages]
aiohttp = "~=3.7"
matplotlib = "~=3.3"
pandas = "~=1.1"

[requires]
python_version = "3.8.5"
628 changes: 496 additions & 132 deletions Pipfile.lock

Large diffs are not rendered by default.

199 changes: 199 additions & 0 deletions docs/manuals/admin/telemetry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
# Telemetry

QuantumLeap ships with a telemetry component for concurrent, low memory
footprint, efficient collection of time-varying quantities. Presently,
it is possible to collect:

* Duration of selected code blocks;
* Python garbage collection metrics;
* Python profiler (cProfile) data;
* Operating system resource usage: maximum resident set size, user and
kernel time.

Profiler data is collected in files that can be loaded into the Python
built-in analyser (`pstats`) whereas all other sampled quantities are
assembled into time series and output to CSV files which can be easily
imported into data analysis tools such as Pandas or a time series database
such as Crate or Timescale.


### Output files

As QuantumLeap collects telemetry data, files will be written to a
monitoring directory of your choice. Duration time series are output
to CSV files having a "duration" prefix and "csv" extension. Likewise
garbage collection and operating system resource usage time series are
collected in CSV files having a prefix of "runtime" and an extension
of "csv". Finally profiler data go into files having a name of:
"profiler.PID.data" where PID is the operating system PID of the process
being profiled---e.g. "profiler.5662.data". CSV files can be read and
deleted at will without interfering with QuantumLeap's telemetry collection
process, even if QuantumLeap is restarted multiple times. On the other
hand, profiler data files should only be opened after stopping QuantumLeap.
(These files are produced by cProfile not by QuantumLeap, so it is best
not to touch them until cProfile exits.)


### Output format

The profiler data files are binary files in the cProfile format as
documented in the Python standard library, hence they will not be
discussed here. The CSV files contain time series data and fields
are arranged as follows:

- **Timepoint**: time at which the measurement was taken, expressed
as number of nanoseconds since the epoch. (Integer value.)
- **Measurement**: sampled quantity. (Float value.)
- **Label**: name used to identify a particular kind of measurement
when sampling. (String value.)
- **PID**: operating system ID of the process that sampled the quantity.

Out of convenience, the CSV file starts with a header of:

Timepoint, Measurement, Label, PID

For duration files the sampled quantity is the amount of time, in
fractional seconds, that an HTTP request took to complete and the
label identifies that request using a combination of path and verb
as shown in the duration file excerpt below

Timepoint, Measurement, Label, PID
...
1607092101580206000, 0.237, "/v2/notify [POST]", 5659
...
1607092101580275000, 0.291, "/v2/notify [POST]", 5662
...

Runtime files contain both Python garbage collection and operating
system resource usage time series. Labels and measurements are as
follows.

- **GC collections**. Each measurement in the series represents the total
number of times the GC collector swept memory since the interpreter
was started. (This is the total across all generations.) The series
is labelled with "gc collections".
- **GC collected**. Each measurement in the series represents the total
number of objects the GC collector freed since the interpreter was
started. (This is the total across all generations.) The series is
labelled with "gc collected".
- **GC uncollectable**. Each measurement in the series represents the
total number of objects the GC collector couldn't free since the
interpreter was started. (This is the total across all generations.)
The series is labelled with "gc uncollectable".
- **User Time**. Each measurement in the series is the total amount of
time, in seconds, the process spent executing in user mode. The
series is labelled with "user time".
- **System Time**. Each measurement in the series is the total amount of
time, in seconds, the process spent executing in kernel mode. The
series is labelled with "system time".
- **Maximum RSS**. Each measurement in the series is maximum resident set
size used. The value will be in kilobytes on Linux and bytes on MacOS.
The series is labelled with "max rss".


### Basic usage

Telemetry is turned off by default but can easily be switched on using
the Gunicorn configuration file provided in the `server` package:
`gconfig_telemetry.py`. With this configuration, QuantumLeap will collect

* The duration of each HTTP request;
* Python garbage collection metrics;
* Operating system resource usage: maximum resident set size, user and
kernel time.

If profiling data are needed too, edit `gconfig_telemetry.py` to enable
Python's built-in profiler (cProfile)

def post_worker_init(worker):
...
monitor.start(monitoring_dir=monitoring_dir,
with_runtime=True,
with_profiler=False)
# ^ set this to True

By default telemetry data are written to files in the `_monitoring`
directory under QuantumLeap's current working directory---if the directory
doesn't exist, it is automatically created. To choose a different location,
set the `monitoring_dir` variable in `gconfig_telemetry.py` to your liking.

#### Turning telemetry on
As mentioned earlier, telemetry is turned off by default. To turn it on,
start QuantumLeap this way

$ python app.py --config server/gconfig_telemetry.py

or, to use your own Gunicorn instead of QuantumLeap's embedded one

$ gunicorn server.wsgi --config server/gconfig_telemetry.py

If you are using the Docker image, pass the telemetry configuration
as a command argument, as in the Docker Compose snippet below:

quantumleap:
image: smartsdk/quantumleap:latest
command: --config server/gconfig_telemetry.py
...

At the moment the only way to turn telemetry off is to stop QuantumLeap
and then restart it with its default configuration---i.e. `gconfig.py`.

#### Analysing telemetry data
Profiler data can be analysed interactively using the Python `pstats`
module as explained in the Python standard library documentation, e.g.

$ python -m pstats profiler.5662.data

CSV files can be easily imported into data analysis tools such as Pandas
or a time series database such as Crate or Timescale using the `COPY FROM`
statement. For added convenience, there is a `pandas_import` module in
the `telemetry` package that you can use to import all duration and
runtime CSV files found in the monitoring directory:

$ cd ngsi-timeseries-api
$ pipenv install --dev
$ python
>>> import pandas as pd
>>> from server.telemetry.pandas_import import TelemetryDB
>>>
>>> db = TelemetryDB('/path/to/_monitoring')

Then you can use the `TelemetryDB` methods to populate Pandas frames
with duration and runtime data combed from the CSV files. For example
here's how to calculate requests per second statistics for the version
endpoint and plot requests per second over time

>>> get_version = db.duration().time_series('/version [GET]')
>>> rps = get_version.data().resample('1S').count()
>>> rps.describe()
...
>>> fig = rps.plot().get_figure()
>>> fig.savefig("get-version-rps.pdf")

For further inspiration, you can have a look at the `analysis` module
in the `tests/benchmark` directory.


### Advanced usage

Power users who need to instrument the code to investigate performance
bottlenecks can do so by decorating functions with a duration sampler
as in the example below where a `time_it` decorator is added to the
the version endpoint's handler.

from server.telemetry.monitor import time_it

@time_it(label='version()')
def version():
...

It is also possible to time specific blocks of code inside functions
or methods or in the outer module's scope, please refer to the documentation
of the `monitor` module for the details.

For more advanced scenarios or for writing your own samplers, first
familiarise yourself with the `observation` module (core functionality,
it comes with numerous examples), then have a look at the samplers in
the `sampler` module to see how to write one, finally you can use the
implementation of the `monitor` module as a starting point for wiring
together the building blocks to make them fit for your use case.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ nav:
- 'Grafana': 'admin/grafana.md'
- 'Data-Migration': 'admin/dataMigration.md'
- 'Bechmarks': 'admin/benchmarks.md'
- 'Telemetry': 'admin/telemetry.md'
34 changes: 34 additions & 0 deletions src/server/gconfig_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import multiprocessing
import os
import server
import server.telemetry.monitor as monitor


bind = f"{server.DEFAULT_HOST}:{server.DEFAULT_PORT}"
workers = multiprocessing.cpu_count() * 4 + 1
worker_class = 'gthread'
threads = 1
loglevel = 'error'


monitoring_dir = '_monitoring'


def post_worker_init(worker):
os.makedirs(monitoring_dir, exist_ok=True)
monitor.start(monitoring_dir=monitoring_dir,
with_runtime=True,
with_profiler=False)


def pre_request(worker, req):
req.duration_sample_id = monitor.start_duration_sample()


def post_request(worker, req, environ, resp):
key = f"{req.path} [{req.method}]"
monitor.stop_duration_sample(key, req.duration_sample_id)


def worker_exit(servo, worker):
monitor.stop()
14 changes: 14 additions & 0 deletions src/server/telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
Thread-safe, low memory footprint, and efficient collection of time-varying
quantities.
For common telemetry scenarios (timing, profiling, GC) you should just be
able to use the ``monitor`` module as is. See there for details and usage.
For more advanced scenarios or writing your own samplers, familiarise
yourself with the ``observation`` module (core functionality, comes with
lots of examples) first, then have a look at the samplers in the ``sampler``
module to see how to write one, finally you can use the implementation of
the ``monitor`` module as a starting point for wiring together the building
blocks to make them fit for your use case.
"""
110 changes: 110 additions & 0 deletions src/server/telemetry/flush.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""
Flushing of time series memory buffers to permanent storage.
Each buffer is saved to its own file to avoid race conditions among processes
and threads. Saving of data is efficient since it's based on streams and
lock-free, i.e. there's no need to acquire global locks to coordinate
writers. Files are written to a configured target directory atomically and
with unique names. This avoids interference with other programs that may be
processing previously written files. For example, another program can safely
scan the directory, aggregate data in each file, process the aggregate and
then delete the processed files with no risk of race conditions w/r/t the
writers in this module.
"""

import csv
import os
from uuid import uuid4

from server.telemetry.observation import ObservationStore, \
ObservationStoreAction, tabulate


TIMEPOINT_CSV_FIELD = 'Timepoint'
"""
Name of the observation's timepoint field in the CSV header.
"""
MEASUREMENT_CSV_FIELD = 'Measurement'
"""
Name of the observation's measurement field in the CSV header.
"""
LABEL_CSV_FIELD = 'Label'
"""
Name of the observation's label field in the CSV header.
"""
PID_CSV_FIELD = 'PID'
"""
Name of the PID field in the CSV header.
"""

OBSERVATION_STORE_HEADER = [TIMEPOINT_CSV_FIELD, MEASUREMENT_CSV_FIELD,
LABEL_CSV_FIELD, PID_CSV_FIELD]
"""
Header of the CSV file where observation store contents get written.
"""


def flush_to_csv(target_dir: str, filename_prefix: str) \
-> ObservationStoreAction:
"""
Build an action to stream the contents of an observation store to a CSV
file. Write the file *atomically* to the specified target directory and
with a unique file name. Write CSV fields in this order: time point,
measurement, label, PID. Notice PID is the process ID of the current
process which isn't part of the observation store but is added by this
function to each row.
:param target_dir: the directory where to write the file.
:param filename_prefix: a string to prepend to the generated unique file
name.
:return: a function that takes an observation store and writes its contents
to file.
"""
return lambda store: _save_csv(target_dir, filename_prefix, store)


def _save_csv(target_dir: str, filename_prefix: str,
store: ObservationStore):
temp_name, filename = _file_names(filename_prefix)
temp_path = os.path.join(target_dir, temp_name) # (*)
target_path = os.path.join(target_dir, filename)

_write_csv(temp_path, store)
os.rename(temp_path, target_path) # (*)

# NOTE. Atomic move. Rename is atomic but won't work across file systems,
# see
# - https://alexwlchan.net/2019/03/atomic-cross-filesystem-moves-in-python/
# If you try moving a file across file systems you get an error similar to:
#
# OSError: [Errno 18] Cross-device link:
# '/tmp/file.csv' -> '/dir/on/other/fs/file.csv'
#
# This is why we write the file directly to the target dir with a temp name
# and then do the move. In fact, putting the file in a temp dir and then
# moving it to the target dir may fail if the two dirs are on different
# file systems.


def _file_names(filename_prefix: str) -> (str, str):
fid = uuid4().hex
temp_name = f"{filename_prefix}.{fid}.tmp"
target_name = f"{filename_prefix}.{fid}.csv"
return temp_name, target_name


def _write_csv(path: str, content: ObservationStore):
pid = os.getpid()
ts = ((t, m, k, pid) for t, m, k in tabulate(content)) # (1)
with open(path, mode='w') as fd:
w = csv.writer(fd, delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL) # (2)
w.writerow(OBSERVATION_STORE_HEADER)
w.writerows(ts)

# NOTES.
# 1. Lazy evaluation. Parens, contrary to square brackets, don't force
# evaluation, so we won't wind up with double the memory of the store.
# See:
# - https://stackoverflow.com/questions/18883414
# 2. CSV quoting. Only quoting fields if they contain a delimiter or the
# quote char.
Loading

0 comments on commit bce839d

Please sign in to comment.