# Collectl Log Analysis

## Functionalities
- Plot CPU utilization graphs.
- Plot memory utilization graphs.
- Plot disk I/O utilization graphs.

## Input
Log files are read from a directory in `../data`. This directory is assumed to have the following structure:
```
logs/
  [node-1]/
    collectl.tar.gz
  ...
  [node-n]/
    collectl.tar.gz
```
A tarball `collectl.tar.gz` contains log files. The log file extension identifies the type of resource monitored:
- `.cpu.gz`: CPU monitoring log file.
- `.numa.gz`: memory monitoring log file.
- `.dsk.gz`: disk I/O monitoring log file.

## Notebook Configuration

In [None]:
########## GENERAL
# Name of the directory in `../data`
EXPERIMENT_DIRNAME = "BuzzBlogBenchmark_2021-10-10-18-35-22"

########## CPU
# Fine-grained window size (in ms)
COLLECTL_CPU_FG_WINDOW_SIZE = 50
# Aggregation function
COLLECTL_CPU_AGGREGATION_FUNC = "max"
# Analyzed metric (options: "user", "nice", "system", "wait", "irq", "soft",
# "steal", "idle", "total", "guest", "guest_n", "intrpt")
COLLECTL_CPU_METRIC = "total"

########## Memory
# Fine-grained window size (in ms)
COLLECTL_MEM_FG_WINDOW_SIZE = 50
# Aggregation function
COLLECTL_MEM_AGGREGATION_FUNC = "min"
# Analyzed metric (options: "used", "free", "slab", "mapped", "anon", "anonh", "inactive", "hits")
COLLECTL_MEM_METRIC = "free"

########## Disk I/O
# Fine-grained window size (in ms)
COLLECTL_DSK_FG_WINDOW_SIZE = 50
# Aggregation function
COLLECTL_DSK_AGGREGATION_FUNC = "max"
# Analyzed metric (options: "reads", "rmerge", "rkbytes", "waitr", "writes", "wmerge", "wkbytes", "waitw", "request",
# "quelen", "wait", "svctim", "util")
COLLECTL_DSK_METRIC = "quelen"

## Notebook Setup

In [None]:
# Import libraries.
%matplotlib inline
import datetime
import gzip
import matplotlib.pyplot as plt
import os
import pandas as pd
import tarfile
import warnings
warnings.filterwarnings("ignore")

node_names = os.listdir(os.path.join(os.pardir, "data", EXPERIMENT_DIRNAME, "logs"))

## CPU Monitoring

In [None]:
# Parse logs
CPU_METRICS = ["user", "nice", "system", "wait", "irq", "soft", "steal", "idle", "total", "guest", "guest_n", "intrpt"]
cpu = {"node_name": [], "core_no": [], "timestamp": [], "metric": [], "value": []}
for node_name in node_names:
    tarball_path = os.path.join(os.pardir, "data", EXPERIMENT_DIRNAME, "logs", node_name, "collectl.tar.gz")
    with tarfile.open(tarball_path, "r:gz") as tar:
        for filename in tar.getnames():
            if filename.endswith(".cpu.gz"):
                with gzip.open(tar.extractfile(filename), "rt") as cpu_log_file:
                    for log in cpu_log_file:
                        if log[0] == '#':
                            # Skip comments.
                            continue
                        log_entry = log.split()
                        timestamp = datetime.datetime.strptime(" ".join(log_entry[:2]), "%Y%m%d %H:%M:%S.%f")
                        for core_no in range((len(log_entry) - 2) // len(CPU_METRICS)):
                            for (i, metric) in enumerate(CPU_METRICS):
                                cpu["node_name"].append(node_name)
                                cpu["core_no"].append(core_no)
                                cpu["timestamp"].append(timestamp)
                                cpu["metric"].append(metric)
                                cpu["value"].append(float(log_entry[i + 2 + core_no * len(CPU_METRICS)]))
                break

In [None]:
# Build data frame
cpu = pd.DataFrame.from_dict(cpu)
min_timestamp = cpu["timestamp"].min()
cpu["time"] = cpu.apply(lambda r: (r["timestamp"] - min_timestamp).total_seconds(), axis=1)
cpu["window"] = cpu.apply(lambda r: int(r["time"]), axis=1)
cpu["fg_window"] = cpu.apply(lambda r: int(r["time"] * 1000) // COLLECTL_CPU_FG_WINDOW_SIZE, axis=1)

### 1-second window

In [None]:
# Plot CPU utilization of each node (1-second window)
fig = plt.figure(figsize=(12, len(node_names) * 6))
for (i, node_name) in enumerate(node_names):
    df = cpu[(cpu["node_name"] == node_name) & (cpu["metric"] == COLLECTL_CPU_METRIC)]
    df = df.groupby(["window", "core_no"])["value"].agg(COLLECTL_CPU_AGGREGATION_FUNC)
    df = df.reindex([(i, j) for i in range(cpu["window"].max() + 1) for j in range(cpu["core_no"].max() + 1)],
        fill_value=0)
    df = df.unstack()
    ax = fig.add_subplot(len(node_names), 1, i + 1)
    ax.set_xlim((0, df.index.max()))
    ax.set_ylim((0, df.values.max()))
    ax.grid(alpha=0.75)
    df.plot(ax=ax, kind="line", title="%s - CPU Utilization (1-second window)" % node_name, xlabel="Time (seconds)",
        ylabel="%s(%s) (%%)" % (COLLECTL_CPU_AGGREGATION_FUNC, COLLECTL_CPU_METRIC), grid=True, legend=False,
        xticks=range(0, df.index.max() + 1, 60), yticks=range(0, 101, 10))

### Fine-grained window

In [None]:
# Plot CPU utilization of each node (fine-grained window)
fig = plt.figure(figsize=(12, len(node_names) * 6))
for (i, node_name) in enumerate(node_names):
    df = cpu[(cpu["node_name"] == node_name) & (cpu["metric"] == COLLECTL_CPU_METRIC)]
    df = df.groupby(["fg_window", "core_no"])["value"].agg(COLLECTL_CPU_AGGREGATION_FUNC)
    df = df.reindex([(i, j) for i in range(cpu["fg_window"].max() + 1) for j in range(cpu["core_no"].max() + 1)],
        fill_value=0)
    df = df.unstack()
    ax = fig.add_subplot(len(node_names), 1, i + 1)
    ax.set_xlim((0, df.index.max()))
    ax.set_ylim((0, df.values.max()))
    ax.grid(alpha=0.75)
    df.plot(ax=ax, kind="line", title="%s - CPU Utilization (%s-ms window)" % (node_name, COLLECTL_CPU_FG_WINDOW_SIZE),
        xlabel="Time (seconds)", ylabel="%s(%s) (%%)" % (COLLECTL_CPU_AGGREGATION_FUNC, COLLECTL_CPU_METRIC),
        grid=True, legend=False, yticks=range(0, 101, 10),
        xticks=range(0, df.index.max() + 1, 60 * (1000 // COLLECTL_CPU_FG_WINDOW_SIZE)))

## Memory Monitoring

In [None]:
# Parse logs
MEM_METRICS = ["used", "free", "slab", "mapped", "anon", "anonh", "inactive", "hits"]
mem = {"node_name": [], "numa_node": [], "timestamp": [], "metric": [], "value": []}
for node_name in node_names:
    tarball_path = os.path.join(os.pardir, "data", EXPERIMENT_DIRNAME, "logs", node_name, "collectl.tar.gz")
    with tarfile.open(tarball_path, "r:gz") as tar:
        for filename in tar.getnames():
            if filename.endswith(".numa.gz"):
                with gzip.open(tar.extractfile(filename), "rt") as mem_log_file:
                    for log in mem_log_file:
                        if log[0] == '#':
                            # Skip comments.
                            continue
                        log_entry = log.split()
                        timestamp = datetime.datetime.strptime(" ".join(log_entry[:2]), "%Y%m%d %H:%M:%S.%f")
                        for numa_node in range((len(log_entry) - 2) // len(MEM_METRICS)):
                            for (i, metric) in enumerate(MEM_METRICS):
                                mem["node_name"].append(node_name)
                                mem["numa_node"].append(numa_node)
                                mem["timestamp"].append(timestamp)
                                mem["metric"].append(metric)
                                mem["value"].append(float(log_entry[i + 2 + numa_node * len(MEM_METRICS)]))
                break

In [None]:
# Build data frame
mem = pd.DataFrame.from_dict(mem)
min_timestamp = mem["timestamp"].min()
mem["time"] = mem.apply(lambda r: (r["timestamp"] - min_timestamp).total_seconds(), axis=1)
mem["window"] = mem.apply(lambda r: int(r["time"]), axis=1)
mem["fg_window"] = mem.apply(lambda r: int(r["time"] * 1000) // COLLECTL_MEM_FG_WINDOW_SIZE, axis=1)

### 1-second window

In [None]:
# Plot memory utilization of each node (1-second window)
fig = plt.figure(figsize=(12, len(node_names) * 6))
for (i, node_name) in enumerate(node_names):
    df = mem[(mem["node_name"] == node_name) & (mem["metric"] == COLLECTL_MEM_METRIC)]
    df = df.groupby(["window", "numa_node"])["value"].agg(COLLECTL_MEM_AGGREGATION_FUNC)
    df = df.reindex([(i, j) for i in range(mem["window"].max() + 1) for j in range(mem["numa_node"].max() + 1)],
        fill_value=0)
    df = df.unstack()
    ax = fig.add_subplot(len(node_names), 1, i + 1)
    ax.set_xlim((0, df.index.max()))
    ax.set_ylim((0, df.values.max()))
    ax.grid(alpha=0.75)
    df.plot(ax=ax, kind="line", title="%s - Memory Utilization (1-second window)" % node_name, xlabel="Time (seconds)",
        ylabel="%s(%s)" % (COLLECTL_MEM_AGGREGATION_FUNC, COLLECTL_MEM_METRIC), grid=True,
        xticks=range(0, df.index.max() + 1, 60))

### Fine-grained window

In [None]:
# Plot memory utilization of each node (fine-grained window)
fig = plt.figure(figsize=(12, len(node_names) * 6))
for (i, node_name) in enumerate(node_names):
    df = mem[(mem["node_name"] == node_name) & (mem["metric"] == COLLECTL_MEM_METRIC)]
    df = df.groupby(["fg_window", "numa_node"])["value"].agg(COLLECTL_MEM_AGGREGATION_FUNC)
    df = df.reindex([(i, j) for i in range(mem["fg_window"].max() + 1) for j in range(mem["numa_node"].max() + 1)],
        fill_value=0)
    df = df.unstack()
    ax = fig.add_subplot(len(node_names), 1, i + 1)
    ax.set_xlim((0, df.index.max()))
    ax.set_ylim((0, df.values.max()))
    ax.grid(alpha=0.75)
    df.plot(ax=ax, kind="line",
        title="%s - Memory Utilization (%s-ms window)" % (node_name, COLLECTL_MEM_FG_WINDOW_SIZE),
        xlabel="Time (seconds)", ylabel="%s(%s)" % (COLLECTL_MEM_AGGREGATION_FUNC, COLLECTL_MEM_METRIC), grid=True,
        xticks=range(0, df.index.max() + 1, 60 * (1000 // COLLECTL_MEM_FG_WINDOW_SIZE)))

## Disk Monitoring

In [None]:
# Parse logs
DSK_METRICS = ["reads", "rmerge", "rkbytes", "waitr", "writes", "wmerge", "wkbytes", "waitw", "request",
    "quelen", "wait", "svctim", "util"]
dsk = {"node_name": [], "dsk_no": [], "timestamp": [], "metric": [], "value": []}
for node_name in node_names:
    tarball_path = os.path.join(os.pardir, "data", EXPERIMENT_DIRNAME, "logs", node_name, "collectl.tar.gz")
    with tarfile.open(tarball_path, "r:gz") as tar:
        for filename in tar.getnames():
            if filename.endswith(".dsk.gz"):
                with gzip.open(tar.extractfile(filename), "rt") as dsk_log_file:
                    for log in dsk_log_file:
                        if log[0] == '#':
                            # Skip comments.
                            continue
                        log_entry = log.split()
                        timestamp = datetime.datetime.strptime(" ".join(log_entry[:2]), "%Y%m%d %H:%M:%S.%f")
                        for dsk_no in range((len(log_entry) - 2) // (len(DSK_METRICS) + 1)):
                            for (i, metric) in enumerate(DSK_METRICS):
                                dsk["node_name"].append(node_name)
                                dsk["dsk_no"].append(dsk_no)
                                dsk["timestamp"].append(timestamp)
                                dsk["metric"].append(metric)
                                dsk["value"].append(float(log_entry[i + 3 + dsk_no * (len(DSK_METRICS) + 1)]))
                break

In [None]:
# Build data frame
dsk = pd.DataFrame.from_dict(dsk)
min_timestamp = dsk["timestamp"].min()
dsk["time"] = dsk.apply(lambda r: (r["timestamp"] - min_timestamp).total_seconds(), axis=1)
dsk["window"] = dsk.apply(lambda r: int(r["time"]), axis=1)
dsk["fg_window"] = dsk.apply(lambda r: int(r["time"] * 1000) // COLLECTL_DSK_FG_WINDOW_SIZE, axis=1)

### 1-second window

In [None]:
# Plot disk I/O utilization of each node (1-second window)
fig = plt.figure(figsize=(12, len(node_names) * 6))
for (i, node_name) in enumerate(node_names):
    df = dsk[(dsk["node_name"] == node_name) & (dsk["metric"] == COLLECTL_DSK_METRIC)]
    df = df.groupby(["window", "dsk_no"])["value"].agg(COLLECTL_DSK_AGGREGATION_FUNC)
    df = df.reindex([(i, j) for i in range(dsk["window"].max() + 1) for j in range(dsk["dsk_no"].max() + 1)],
        fill_value=0)
    df = df.unstack()
    ax = fig.add_subplot(len(node_names), 1, i + 1)
    ax.set_xlim((0, df.index.max()))
    ax.set_ylim((0, df.values.max()))
    ax.grid(alpha=0.75)
    df.plot(ax=ax, kind="line", title="%s - Disk I/O Utilization (1-second window)" % node_name,
        xlabel="Time (seconds)", ylabel="%s(%s) (%%)" % (COLLECTL_DSK_AGGREGATION_FUNC, COLLECTL_DSK_METRIC),
        grid=True, xticks=range(0, df.index.max() + 1, 60))

### Fine-grained window

In [None]:
# Plot disk I/O utilization of each node (fine-grained window)
fig = plt.figure(figsize=(12, len(node_names) * 6))
for (i, node_name) in enumerate(node_names):
    df = dsk[(dsk["node_name"] == node_name) & (dsk["metric"] == COLLECTL_DSK_METRIC)]
    df = df.groupby(["fg_window", "dsk_no"])["value"].agg(COLLECTL_DSK_AGGREGATION_FUNC)
    df = df.reindex([(i, j) for i in range(dsk["fg_window"].max() + 1) for j in range(dsk["dsk_no"].max() + 1)],
        fill_value=0)
    df = df.unstack()
    ax = fig.add_subplot(len(node_names), 1, i + 1)
    ax.set_xlim((0, df.index.max()))
    ax.set_ylim((0, df.values.max()))
    ax.grid(alpha=0.75)
    df.plot(ax=ax, kind="line",
        title="%s - Disk I/O Utilization (%s-ms window)" % (node_name, COLLECTL_DSK_FG_WINDOW_SIZE),
        xlabel="Time (seconds)", ylabel="%s(%s) (%%)" % (COLLECTL_DSK_AGGREGATION_FUNC, COLLECTL_DSK_METRIC),
        grid=True, xticks=range(0, df.index.max() + 1, 60))