# rAdvisor graphs

## Notebook inputs

In [None]:
import os

# Name of the directory in `../data`
EXPERIMENT_DIRNAME = ""
# Name of the directory in `../data/{experiment_dirname}/logs`
NODE_NAME = ""
# Docker container ID to parse statistics for
# This can be the short (12 chars) or long (64 chars) ID;
# it is used as a prefix when matching against the log files.
CONTAINER_ID = ""

# Collection interval, displayed on graphs
INTERVAL = "50ms"

## Setup

In [None]:
# Import libraries
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import lib.radvisor_parser as radvisor_parser
import math
import tarfile
import io
from typing import List, Iterable, Tuple, Dict, Any, Literal

In [None]:
def construct_dataframe(log_file: Iterable[str]) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Constructs a dataframe containing all of the parsed rAdvisor log data,
    including a couple of additional columns with derived data that are used for analysis.
    Additionally, returns the YAML metadata included at the top of the log file.
    """

    log_iter, metadata = radvisor_parser.parse_target_log(log_file)
    dataframe = pd.DataFrame(log_iter)

    # Construct other series as needed for analysis
    min_timestamp = dataframe["read"].min()
    # Normalize timestamps by subtracting minimum
    dataframe["time"] = dataframe.apply(lambda r: (r["read"] - min_timestamp), axis=1)
    dataframe["time_diff"] = dataframe["time"].diff()

    return (dataframe, metadata)

In [None]:
tarball_path = os.path.join(os.pardir, "data", EXPERIMENT_DIRNAME, "logs", NODE_NAME, "radvisor.tar.gz")
with tarfile.open(tarball_path, "r:gz") as tar:
    container_log_filepath= None
    for filepath in tar.getnames():
        base_filename = os.path.basename(filepath)
        if base_filename.startswith(CONTAINER_ID) and base_filename.endswith(".log"):
            container_log_filepath = filepath
            break
    
    if not container_log_filepath:
        raise Exception(f"Could not find container log file with prefix {CONTAINER_ID}"
                        f" in tarball at {tarball_path}; all files in archive: {tar.getnames()}")

    with tar.extractfile(container_log_filepath) as log_file_byte_reader:
        # rAdvisor, written in Rust, always uses and outputs UTF-8 text
        log_file_string_reader = io.TextIOWrapper(log_file_byte_reader, encoding="utf-8")
        dataframe, metadata = construct_dataframe(log_file_string_reader)
        print(f"Generated dataframe for '{metadata['Metadata']['Names'][0]}' ({metadata['Metadata']['Id']})")

## Point-in-time / line graphs

- CPU (per-cpu rolled up into single time-series) - `cpu.usage.percpu`
- Memory - `memory.usage.current`
- I/O (read, write) - `blkio.service.bytes.read`, `blkio.service.bytes.write`

### CPU Point-in-time / line graph

In [None]:
# Graph inputs

# Size of the window to aggregate collection samples in.
WINDOW_SIZE_MS = 1_000

# Aggregation function used for windows.
WINDOW_AGGREGATION_FUNCTION = max

In [None]:
def generate_cpu_series(dataframe: pd.DataFrame) -> pd.Series:
    """
    Generates the CPU time-series by aggregating the data.
    This mutates the dataframe by adding additional columns.
    """
    window_size_ns = int(WINDOW_SIZE_MS * 1e6)
    dataframe["cpu_diff"] = dataframe["cpu_stat__usage_usec"].diff()
    dataframe["cpu"] = dataframe.apply(
        lambda r: (r["cpu_diff"] / r["time_diff"] if r["time_diff"] != 0 else 0), axis=1)
    dataframe["cpu_window"] = dataframe.apply(lambda r: r["time"] // window_size_ns, axis=1)
    cpu_series = dataframe.groupby(["cpu_window"])["cpu"].agg(WINDOW_AGGREGATION_FUNCTION)
    # Convert the series index from window indices to seconds
    cpu_series.index = cpu_series.index.map(lambda i: i * (WINDOW_SIZE_MS / float(1e3)))
    # Convert to core-percentage
    cpu_series = cpu_series.apply(lambda v: v * 100)

    return cpu_series

In [None]:
cpu_series = generate_cpu_series(dataframe)

fig, ax = plt.subplots()
fig.set_size_inches(10, 5)
max_time = cpu_series.index.max()
cpu_series.plot(ax=ax, kind="line", grid=True)
ax.set_xlim((0, max_time))
ax.grid(b=True, alpha=0.75)
ax.set_title(f"CPU Utilization ({WINDOW_SIZE_MS / 1e3:.1f}s window)")
ax.set_xlabel("Time (seconds)")
ax.set_ylabel(f"Total CPU utilization per window (core-%)")

None

### Memory Point-in-time / line graph

In [None]:
# Graph inputs

# Size of the window to aggregate collection samples in.
WINDOW_SIZE_MS = 1_000

# Aggregation function used for windows.
WINDOW_AGGREGATION_FUNCTION = np.mean

In [None]:
def generate_memory_series(dataframe: pd.DataFrame) -> pd.Series:
    """
    Generates the Memory time-series by aggregating the data.
    This mutates the dataframe by adding additional columns.
    """
    window_size_ns = int(WINDOW_SIZE_MS * 1e6)
    dataframe["memory_window"] = dataframe.apply(lambda r: r["time"] // window_size_ns, axis=1)
    memory_series = dataframe.groupby(["memory_window"])["memory_current"].agg(WINDOW_AGGREGATION_FUNCTION)
    # Convert the series index from window indices to seconds
    memory_series.index = memory_series.index.map(lambda i: i * (WINDOW_SIZE_MS / float(1e3)))
    # Convert to MiB
    memory_series = memory_series.apply(lambda v: v / (1024 * 1024))

    return memory_series

In [None]:
memory_series = generate_memory_series(dataframe)

fig, ax = plt.subplots()
fig.set_size_inches(10, 5)
max_time = memory_series.index.max()
max_memory = memory_series.max()
memory_series.plot(ax=ax, kind="line", grid=True)
ax.set_xlim((0, max_time))
ax.set_ylim((0, max_memory*1.1))
ax.grid(b=True, alpha=0.75)
ax.set_title(f"Memory usage ({WINDOW_SIZE_MS / 1e3:.1f}s window)")
ax.set_xlabel("Time (seconds)")
ax.set_ylabel(f"Memory used (MiB)")

None

### I/O (Read, Write) Point-in-time / line graph

In [None]:
# Graph inputs

# Size of the window to aggregate collection samples in.
WINDOW_SIZE_MS = 1_000

# Aggregation function used for windows.
WINDOW_AGGREGATION_FUNCTION = sum

In [None]:
def generate_io_series(dataframe: pd.DataFrame, metric: Literal["read", "write"]) -> pd.Series:
    """
    Generates the IO time-series by aggregating the data.
    This mutates the dataframe by adding additional columns.
    """
    window_size_ns = int(WINDOW_SIZE_MS * 1e6)
    dataframe[f"io_{metric}_window"] = dataframe.apply(lambda r: r["time"] // window_size_ns, axis=1)
    dataframe[f"io_{metric}_diff"] = dataframe[f"io_stat__{metric[0]}bytes"].diff()
    io_series = dataframe.groupby([f"io_{metric}_window"])[f"io_{metric}_diff"].agg(WINDOW_AGGREGATION_FUNCTION)
    # Convert the series index from window indices to seconds
    io_series.index = io_series.index.map(lambda i: i * (WINDOW_SIZE_MS / float(1e3)))
    # Convert to KiB
    io_series = io_series.apply(lambda v: v / 1024)

    return io_series

In [None]:
io_series = {
    "read": generate_io_series(dataframe, "read"),
    "write": generate_io_series(dataframe, "write"),
}

# Use the same max time for both plots
max_time = max(series.index.max() for series in io_series.values())

fig = plt.figure(figsize=(10, 10))
for (idx, (metric, series)) in enumerate(io_series.items()):
    ax = fig.add_subplot(2, 1, idx + 1)
    max_io = series.max()
    series.plot(ax=ax, kind="line", grid=True)
    ax.set_xlim((0, max_time))
    ax.set_ylim((0, max(max_io, 1)))
    ax.grid(b=True, alpha=0.75)
    ax.set_title(f"I/O {metric}s ({WINDOW_SIZE_MS / 1e3:.1f}s window)")
    ax.set_xlabel("Time (seconds)")
    ax.set_ylabel(f"Data transferred (KiB)")
fig.tight_layout(h_pad=1.2)

None

## Histograms

- CPU (per-cpu rolled up into single time-series) - `cpu.usage.percpu`
- Memory - `memory.usage.current`
- I/O (read, write) - `blkio.service.bytes.read`, `blkio.service.bytes.write`

In [None]:
def generate_histogram_series(src_series: pd.Series, bin_width: float) -> pd.Series:
    """
    Generates a series containing the same number of points as src_series,
    where each value is the lower bin bound that the corresponding point belongs to.
    """
    bin_indices = (src_series
        .apply(lambda r: r // bin_width)
        .dropna()
        .apply(lambda r: int(r)))
    bin_lower_bounds = bin_indices.apply(lambda r: r * bin_width)
    return bin_lower_bounds

### CPU Histogram

In [None]:
# Graph inputs

# Number of bins to use when plotting the CPU data
CPU_NUM_BINS = 20

In [None]:
def generate_cpu_histogram_series(dataframe: pd.DataFrame) -> pd.Series:
    """
    Generates the CPU histogram data by aggregating the data.
    This mutates the dataframe by adding additional columns.
    """
    dataframe["cpu_histogram_diff"] = dataframe["cpu_stat__usage_usec"].diff()
    cpu_series = dataframe.apply(
        lambda r: (r["cpu_histogram_diff"] / r["time_diff"] if r["time_diff"] != 0 else 0), axis=1)
    cpu_series = cpu_series.reindex(range(0, cpu_series.index.max() + 1))
    # Convert to core-percentage
    cpu_series = cpu_series.apply(lambda v: v * 100)
    return cpu_series


In [None]:
cpu_series = generate_cpu_histogram_series(dataframe)

fig, ax = plt.subplots()
fig.set_size_inches(10, 5)
cpu_series.plot(ax=ax, kind='hist', edgecolor='white', linewidth=1.2,
    bins=CPU_NUM_BINS)
ax.grid(b=True, alpha=0.75, axis='y')
ax.set_yscale("log")
ax.set_title(f"CPU Utilization Distribution (for utilization each {INTERVAL})")
ax.set_xlabel("CPU Utilization (%)")
ax.set_ylabel("Frequency")

None

### Memory Histogram

In [None]:
# Graph inputs

# Number of bins to use when plotting the memory data
MEMORY_NUM_BINS = 20

In [None]:
def generate_memory_histogram_series(dataframe: pd.DataFrame) -> pd.Series:
    """
    Generates the Memory histogram data by aggregating the data.
    This mutates the dataframe by adding additional columns.
    """
    # Convert to MiB
    memory_series = dataframe["memory_current"].apply(lambda v: v / (1024 * 1024))
    return memory_series

In [None]:
memory_series = generate_memory_histogram_series(dataframe)

fig, ax = plt.subplots()
fig.set_size_inches(10, 5)
memory_series.plot(ax=ax, kind='hist', edgecolor='white', linewidth=1.2,
    bins=MEMORY_NUM_BINS)
ax.grid(b=True, alpha=0.75, axis='y')
ax.set_yscale("log")
ax.set_title(f"Memory Usage Distribution (for measurements collected each {INTERVAL})")
ax.set_xlabel("Memory Usage (MiB)")
ax.set_ylabel("Frequency")

None

### I/O (Read, Write) Histograms

In [None]:
# Graph inputs

# Number of bins to use when plotting the IO data
IO_NUM_BINS = 20

In [None]:
def generate_io_histogram_series(dataframe: pd.DataFrame, metric: Literal["read", "write"]) -> pd.Series:
    """
    Generates the IO time-series by aggregating the data.
    This mutates the dataframe by adding additional columns.
    """
    # Use the dataframe columns produced by generate_io_series(...) earlier
    # Convert to KiB
    io_series = dataframe[f"io_{metric}_window"].apply(lambda v: v / 1024)
    return io_series

In [None]:
io_histogram_series = {
    "read": generate_io_histogram_series(dataframe, "read"),
    "write": generate_io_histogram_series(dataframe, "write"),
}

fig = plt.figure(figsize=(10, 10))
for (idx, (metric, series)) in enumerate(io_series.items()):
    ax = fig.add_subplot(2, 1, idx + 1)
    series.plot(ax=ax, kind='hist', edgecolor='white', linewidth=1.2, bins=IO_NUM_BINS)
    ax.grid(b=True, alpha=0.75, axis='y')
    ax.set_yscale("log")
    ax.set_title(f"I/O {metric.capitalize()} Distribution (for data transferred in {INTERVAL})")
    ax.set_xlabel("Data transferred (KiB)")
    ax.set_ylabel("Frequency")
fig.tight_layout(h_pad=1.2)

None