# Queue Length Analysis

## Functionalities
- Plot number of connections to each server and thresholds for queueing and packet dropping (max. number of threads and TCP buffer size, respectively).
- Plot queue length of each server.

## Input
Log files are read from a directory in `../data`. This directory is assumed to have the following structure:
```
logs/
  [node-1]/
    *_service*.tar.gz
    ...
    apigateway*.tar.gz
  ...
  [node-n]/
    *_service*.tar.gz
    ...
    apigateway*.tar.gz
```
`*_service*.tar.gz` and `apigateway*.tar.gz` tarballs contain RPC log files named `calls.log` and database query log files named `queries.log`.

## Notebook Configuration

In [None]:
########## GENERAL
# Name of the directory in `../data`
EXPERIMENT_DIRNAME = "BuzzBlogBenchmark_2021-11-04-04-05-41"

########## CONNECTION
# Window size
WINDOW_IN_MS = 1

########## THRIFT SERVERS
THRIFT_SOMAXCONN = 128
THRIFT_THREADS = 8

########## POSTGRES SERVERS
PG_SOMAXCONN = 128
PG_MAX_CONNECTIONS = 128

## Notebook Setup

In [None]:
# Import libraries.
%matplotlib inline
import datetime
import matplotlib.pyplot as plt
import os
import pandas as pd
import re
import tarfile
import warnings
warnings.filterwarnings("ignore")

# Constants
COMPONENT_TARBALL_PATTERNS = [
    r"^apigateway.*\.tar\.gz$",
    r"^.+_service.*\.tar\.gz$",
]
RPC_LOG_PATTERN = r"^\[(.+)\] pid=(.+) tid=(.+) request_id=(.+) server=(.+) function=(.+) latency=(.+)$"
QUERY_LOG_PATTERN = r"^\[(.+)\] pid=(.+) tid=(.+) request_id=(.+) dbname=(.+) latency=(.+) query=\"(.+)\"$"

## RPC Log Parsing

In [None]:
# Parse logs
rpc = {"node_name": [], "timestamp": [], "request_id": [], "server": [], "function": [], "latency": []}
node_names = os.listdir(os.path.join(os.pardir, "data", EXPERIMENT_DIRNAME, "logs"))
for node_name in node_names:
  for tarball_name in os.listdir(os.path.join(os.pardir, "data", EXPERIMENT_DIRNAME, "logs", node_name)):
    if sum([1 if re.match(tarball_pattern, tarball_name) else 0 for tarball_pattern in COMPONENT_TARBALL_PATTERNS]):
      tarball_path = os.path.join(os.pardir, "data", EXPERIMENT_DIRNAME, "logs", node_name, tarball_name)
      with tarfile.open(tarball_path, "r:gz") as tar:
        for filename in tar.getnames():
          if filename.endswith("calls.log"):
            with tar.extractfile(filename) as log_file:
              for row in log_file:
                rpc_log_match = re.match(RPC_LOG_PATTERN, row.decode("utf-8"))
                if rpc_log_match:
                  timestamp, pid, tid, request_id, server, function, latency = rpc_log_match.groups()
                  rpc["node_name"].append(node_name)
                  rpc["timestamp"].append(datetime.datetime.strptime(timestamp[:-3], "%H:%M:%S.%f"))
                  rpc["request_id"].append(request_id)
                  rpc["server"].append(server)
                  rpc["function"].append(function)
                  rpc["latency"].append(float(latency) * 1000)

In [None]:
# Build data frame
rpc = pd.DataFrame.from_dict(rpc)

# Get values
servers = sorted(rpc["server"].unique())
min_timestamp = rpc["timestamp"].values.min()

# (Re) Build columns
rpc["timestamp"] = rpc.apply(lambda r: (r["timestamp"] - min_timestamp).total_seconds(), axis=1)
rpc["window"] = rpc.apply(lambda r: range(int(r["timestamp"] * 1000) // WINDOW_IN_MS,
    int((r["timestamp"] + r["latency"] / 1000) * 1000) // WINDOW_IN_MS + 1), axis=1)
rpc = rpc.explode("window")

## Microservices Queue Length Analysis

In [None]:
########## LOCAL CONFIG
# Minimum time (in seconds)
MIN_TIME = None
# Maximum time (in seconds)
MAX_TIME = None

# Plot
fig = plt.figure(figsize=(24, len(servers) * 12))
for (i, server) in enumerate(servers):
    df = rpc[(rpc["server"] == server)]
    if MIN_TIME:
        df = df[(df["timestamp"] >= MIN_TIME)]
    if MAX_TIME:
        df = df[(df["timestamp"] <= MAX_TIME)]
    df = df.groupby(["window"])["window"].count()
    df = df.reindex(range(int(df.index.min()), int(df.index.max()) + 1), fill_value=0)
    ax = fig.add_subplot(len(servers), 1, i + 1)
    ax.grid(alpha=0.75)
    ax.set_xlim((df.index.min(), df.index.max()))
    ax.set_ylim((0, df.values.max()))
    ax.axhline(y=THRIFT_THREADS, ls="--", color="blue", linewidth=5)
    ax.axhline(y=THRIFT_THREADS + THRIFT_SOMAXCONN, ls="--", color="red", linewidth=5)
    df.plot(ax=ax, kind="line",
        title="%s - Number of connections (%s-millisecond window)" % (server, WINDOW_IN_MS),
        xlabel="Time (s)", ylabel="Connections (count)", color="black", grid=True,
        xticks=range(int(df.index.min()), int(df.index.max()) + 1, 60 * (1000 // WINDOW_IN_MS)))

In [None]:
########## LOCAL CONFIG
# Minimum time (in seconds)
MIN_TIME = None
# Maximum time (in seconds)
MAX_TIME = None

# Plot
fig = plt.figure(figsize=(24, len(servers) * 12))
for (i, server) in enumerate(servers):
    df = rpc[(rpc["server"] == server)]
    if MIN_TIME:
        df = df[(df["timestamp"] >= MIN_TIME)]
    if MAX_TIME:
        df = df[(df["timestamp"] <= MAX_TIME)]
    df = df.groupby(["window"])["window"].count()
    df = df.apply(lambda r: max(r - THRIFT_THREADS, 0))
    df = df.reindex(range(int(df.index.min()), int(df.index.max()) + 1), fill_value=0)
    ax = fig.add_subplot(len(servers), 1, i + 1)
    ax.grid(alpha=0.75)
    ax.set_xlim((df.index.min(), df.index.max()))
    ax.set_ylim((0, df.values.max()))
    ax.axhline(y=THRIFT_SOMAXCONN, ls="--", color="red", linewidth=5)
    df.plot(ax=ax, kind="line",
        title="%s - Queue Length (%s-millisecond window)" % (server, WINDOW_IN_MS),
        xlabel="Time (s)", ylabel="Connections (count)", color="black", grid=True,
        xticks=range(int(df.index.min()), int(df.index.max()) + 1, 60 * (1000 // WINDOW_IN_MS)))

## Query Log Parsing

In [None]:
# Parse logs
query = {"node_name": [], "timestamp": [], "request_id": [], "dbname": [], "type": [], "latency": []}
node_names = os.listdir(os.path.join(os.pardir, "data", EXPERIMENT_DIRNAME, "logs"))
for node_name in node_names:
  for tarball_name in os.listdir(os.path.join(os.pardir, "data", EXPERIMENT_DIRNAME, "logs", node_name)):
    if sum([1 if re.match(tarball_pattern, tarball_name) else 0 for tarball_pattern in COMPONENT_TARBALL_PATTERNS]):
      tarball_path = os.path.join(os.pardir, "data", EXPERIMENT_DIRNAME, "logs", node_name, tarball_name)
      with tarfile.open(tarball_path, "r:gz") as tar:
        for filename in tar.getnames():
          if filename.endswith("queries.log"):
            with tar.extractfile(filename) as log_file:
              for row in log_file:
                query_log_match = re.match(QUERY_LOG_PATTERN, row.decode("utf-8"))
                if query_log_match:
                  timestamp, pid, tid, request_id, dbname, latency, query_str = query_log_match.groups()
                  query["node_name"].append(node_name)
                  query["timestamp"].append(datetime.datetime.strptime(timestamp[:-3], "%H:%M:%S.%f"))
                  query["request_id"].append(request_id)
                  query["dbname"].append(dbname)
                  query["type"].append(query_str.strip().split()[0].upper())
                  query["latency"].append(float(latency) * 1000)

In [None]:
# Build data frame
query = pd.DataFrame.from_dict(query)

# Get values
dbnames = sorted(query["dbname"].unique())
min_timestamp = query["timestamp"].values.min()

# (Re) Build columns
query["timestamp"] = query.apply(lambda q: (q["timestamp"] - min_timestamp).total_seconds(), axis=1)
query["window"] = query.apply(lambda q: range(int(q["timestamp"] * 1000) // WINDOW_IN_MS,
    int((q["timestamp"] + q["latency"] / 1000) * 1000) // WINDOW_IN_MS + 1), axis=1)
query = query.explode("window")

## Databases Queue Length Analysis

In [None]:
########## LOCAL CONFIG
# Minimum time (in seconds)
MIN_TIME = None
# Maximum time (in seconds)
MAX_TIME = None

# Plot
fig = plt.figure(figsize=(24, len(servers) * 12))
for (i, dbname) in enumerate(dbnames):
    df = query[(query["dbname"] == dbname)]
    if MIN_TIME:
        df = df[(df["timestamp"] >= MIN_TIME)]
    if MAX_TIME:
        df = df[(df["timestamp"] <= MAX_TIME)]
    df = df.groupby(["window"])["window"].count()
    df = df.reindex(range(int(df.index.min()), int(df.index.max()) + 1), fill_value=0)
    ax = fig.add_subplot(len(servers), 1, i + 1)
    ax.grid(alpha=0.75)
    ax.set_xlim((df.index.min(), df.index.max()))
    ax.set_ylim((0, df.values.max()))
    ax.axhline(y=PG_MAX_CONNECTIONS, ls="--", color="blue", linewidth=5)
    ax.axhline(y=PG_MAX_CONNECTIONS + PG_SOMAXCONN, ls="--", color="red", linewidth=5)
    df.plot(ax=ax, kind="line",
        title="%s - Number of connections (%s-millisecond window)" % (dbname, WINDOW_IN_MS),
        xlabel="Time (s)", ylabel="Connections (count)", color="black", grid=True,
        xticks=range(int(df.index.min()), int(df.index.max()) + 1, 60 * (1000 // WINDOW_IN_MS)))

In [None]:
########## LOCAL CONFIG
# Minimum time (in seconds)
MIN_TIME = None
# Maximum time (in seconds)
MAX_TIME = None

# Plot
fig = plt.figure(figsize=(24, len(servers) * 12))
for (i, dbname) in enumerate(dbnames):
    df = query[(query["dbname"] == dbname)]
    if MIN_TIME:
        df = df[(df["timestamp"] >= MIN_TIME)]
    if MAX_TIME:
        df = df[(df["timestamp"] <= MAX_TIME)]
    df = df.groupby(["window"])["window"].count()
    df = df.apply(lambda r: max(r - PG_MAX_CONNECTIONS, 0))
    df = df.reindex(range(int(df.index.min()), int(df.index.max()) + 1), fill_value=0)
    ax = fig.add_subplot(len(servers), 1, i + 1)
    ax.grid(alpha=0.75)
    ax.set_xlim((df.index.min(), df.index.max()))
    ax.set_ylim((0, df.values.max()))
    ax.axhline(y=PG_SOMAXCONN, ls="--", color="red", linewidth=5)
    df.plot(ax=ax, kind="line",
        title="%s - Queue Length (%s-millisecond window)" % (dbname, WINDOW_IN_MS),
        xlabel="Time (s)", ylabel="Connections (count)", color="black", grid=True,
        xticks=range(int(df.index.min()), int(df.index.max()) + 1, 60 * (1000 // WINDOW_IN_MS)))