In [None]:
"""Setup for communication with InfluxDBv2."""

from pathlib import Path

influx_url = "https://monitoring.lsst.cloud"
influx_org = "square"
influx_token = Path("./influxdb.token").read_text().strip()

In [None]:
"""Imports for the query."""

import os
from influxdb_client import InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError

In [None]:
class QueryClient:
    """Query an InfluxDBv2 server and retrieve our monitoring data."""
    def __init__(self, *, url: str, token: str, org: str,
                 templates: Path = Path("./templates"),
                 outputs: Path = Path("./data/query"),
                 duration: str = "7d") -> None:
        self._client = InfluxDBClient(url=url, token=token, org=org)
        self._mem_q = Path(templates / "mem_query.flux").read_text()
        self._cpu_q = Path(templates / "cpu_query.flux").read_text()
        self._outputs = outputs
        self._applications = Path(
            templates / "applications"
        ).read_text().split()
        self.api = self._client.query_api()
        self._duration = duration
        
    def main(self) -> None:
        """Run the Influx query for each application and each of cpu/mem."""
        for app in self._applications:
            buk = app.replace("-","_")
            mem_query = self._mem_q.format(bucket=buk,duration=self._duration)
            cpu_query = self._cpu_q.format(bucket=buk,duration=self._duration)

            q_dict = { "mem": mem_query,
                       "cpu": cpu_query }

            for k in q_dict:
                query = q_dict[k]
                try:
                    tables = self.api.query(query)
                    if tables:
                        output = tables.to_json(indent=2)
                        if not self._outputs.is_dir():
                            self._outputs.mkdir(parents=True)
                        Path(
                            self._outputs / f"{app}-{k}.json"
                        ).write_text(output)
                except InfluxDBError as exc:
                    if exc.response.status == 404:
                        continue
                    raise

In [None]:
def query() -> None:
    """Execute the InfluxDBv2 query."""
    client = QueryClient(url=influx_url, token=influx_token, org=influx_org)
    client.main()

In [None]:
"""Imports for the Burster."""

import json
from pathlib import Path
from typing import Union

# These would have "type" added in front of them for Python 3.12
Metric = list[int]
# app/container/mtype/cluster
PathMetric = dict[str,dict[str,dict[str,dict[str,Metric]]]]

In [None]:
class Burster:
    """Take query data and split it into smaller files arranged for ease of analysis."""
    def __init__(self, *, inputs: Path = Path("./data/query"),
                 outputs: Path = Path("./data/burst")
                 ) -> None:
        self._inp = inputs
        self._outp = outputs
        self._burst: PathMetric = {}

    def main(self) -> None:
        """Burst apart and rearrange each of the input files."""
        for inp in self._inp.glob("*.json"):
            self.burst(inp)
        self.write_object()

    def burst(self, inp: Path) -> PathMetric:
        """Take the InfluxDB query data and rearrange it for ease of use."""
        out_obj: PathMetric = {}
        mtype = inp.name[-8:-5]  # "cpu" or "mem"
        app=inp.name[:-9]
        if app not in self._burst:
            self._burst[app] = {}
        obj_l = json.loads(inp.read_text())
        for obj in obj_l:
            ctr = obj["container_name"]
            cluster = obj["cluster"]
            val = obj["_value"]
            self._add_to_burst(val, app, ctr, mtype, cluster)

    def _add_to_burst(self, val: int, app: str, ctr: str, mtype: str, cluster: str) -> None:
        """Accumulate measurements in the right place."""
        if ctr not in self._burst[app]:
            self._burst[app][ctr] = {}
        if mtype not in self._burst[app][ctr]:
            self._burst[app][ctr][mtype] = {}
        if cluster not in self._burst[app][ctr][mtype]:
            self._burst[app][ctr][mtype][cluster] = []
        self._burst[app][ctr][mtype][cluster].append(val)

    def write_object(self) -> None:
        """Each leaf node of the tree becomes a list of ints, each one an individual measurement."""
        obj = self._burst
        for app in obj:
            for ctr in obj[app]:
                for mtype in obj[app][ctr]:
                    for cluster in obj[app][ctr][mtype]:
                        outdir = Path(self._outp / app / ctr / mtype / cluster)
                        if not outdir.is_dir():
                            outdir.mkdir(parents=True)
                        outp = json.dumps(obj[app][ctr][mtype][cluster])
                        outf = Path(outdir / "measurements.json")
                        outf.write_text(outp)
  

In [None]:
def burst() -> None:
    """Execute the burst."""
    burster = Burster()
    burster.main()

In [None]:
"""Imports for the Summarizer."""

from dataclasses import dataclass
from statistics import median_low, mean

from humanfriendly import format_number, format_size

# Same 3.12 "type" advice here.
MetricContainer = dict[str,Union[float,int,str]]
# app/container/mtype/cluster
Summarypath = dict[str,dict[str,dict[str, dict[str, MetricContainer]]]]

In [None]:
class Summarizer:
    """Take the data from the burst-apart queries, summarize it, and make recommendations."""
    def __init__(self, *,
                 inputs: Path = Path("./data/burst"),
                 outputs: Path = Path("./data/summary"),
                 cpu_multiplier: int = 3,
                 mem_multiplier: int = 2
                )-> None:
        self._inp = inputs
        self._outp = outputs
        self._summary: Summarypath = {}
        self._cpu_multiplier = cpu_multiplier
        self._mem_multiplier = mem_multiplier

    def main(self) -> None:
        """Make summary statistics from measurements and report them."""
        self.build_summary()
        self.write_output()


    def build_summary(self) -> None:
        """Make the summary statistics."""
        for app in self._inp.glob("*"):
            if not app.is_dir():
                continue
            app_n = app.name
            for ctr in app.glob("*"):
                if not ctr.is_dir():
                    continue
                ctr_n = ctr.name
                for mtype in ("cpu", "mem"):
                    if not Path(ctr / mtype).is_dir():
                        continue
                    for cluster in Path(ctr / mtype).glob("*"):
                        if not cluster.is_dir():
                            continue
                        clus_n = cluster.name
                        summary = self.summarize(
                            app_n, ctr_n, mtype, clus_n
                        )
                        if app_n not in self._summary:
                            self._summary[app_n] = {}
                        if ctr_n not in self._summary[app_n]:
                            self._summary[app_n][ctr_n] = {}
                        if mtype not in self._summary[app_n][ctr_n]:
                            self._summary[app_n][ctr_n][mtype] = {}
                        if clus_n not in self._summary[app_n][ctr_n]:
                            self._summary[app_n][ctr_n][mtype][clus_n] = {}

                        # Now add human-friendly recommended fields
                        if mtype == "cpu":
                            factor = self._cpu_multiplier
                            # If we've never seen more than 1 CPU in use,
                            # assume it's one of our FastAPI services, and set
                            # max to 1.
                            #
                            # Note that we're changing the scale from nanocores
                            # to cores or millicores

                            max_obs = summary["max"]
                            if max_obs > 1E9:
                                summary["recommended_limit"] = format_number(
                                    float(factor * max_obs) / float(1E9)
                                )
                            else:
                                summary["recommended_limit"] = float(1.0)
                            n_mcores = float(summary["rolling_avg"])/float(1E6)
                            if n_mcores > 1000:
                                summary["recommended_request"] = format_number(n_mcores/1000)
                            else:
                                summary["recommended_request"] = f"{format_number(n_mcores)}m"
                        else:
                            factor = self._mem_multiplier
                            summary["recommended_limit"] = format_size(
                                factor * summary["max"],
                                binary=True
                            )                        
                            summary["recommended_request"] = format_size(
                                summary["median"],
                                binary=True
                            )
                   
                        self._summary[app_n][ctr_n][mtype][clus_n] = summary                    
                        

    def summarize(
        self, app: str, ctr: str, mtype: str, cluster: str
    ) -> MetricContainer:
        """For a given list of measurements, return """
        
        def _rolling_avg(inp: list[int]) -> float:
            """This returns the max of the mean-taken-five-at-a-time from the input list."""
            accum: list[int] = []
            if not inp:  # No data?  That's weird.
                return 0.0
            l_i = len(inp)
            if l_i > 5:
                # The normal case, where we have plenty of data.
                for i in range(0, len(inp) - 5):
                    sublist = inp[i:i+5]
                    accum.append(mean(sublist))
                return max(accum)
            # Too short to build a rolling average?  Return the mean.
            return mean(inp)
        
        holder: list[int] = []
        meas_list = json.loads(
            Path(
                self._inp / app / ctr / mtype / cluster / "measurements.json"
            ).read_text()
        )
        m_max = max(meas_list)
        m_med = median_low(meas_list)
        m_rolling_avg = _rolling_avg(meas_list)
        return MetricContainer(
            max = m_max,
            median = m_med,
            rolling_avg = m_rolling_avg
        )
            
    def write_output(self) -> None:    
        """Write the summary data to a file."""
        if not self._outp.is_dir():
            self._outp.mkdir(parents=True)
        sum_j = json.dumps(self._summary, indent=2, sort_keys=True)
        out_f = Path(self._outp / "summary.json")
        out_f.write_text(sum_j)

In [None]:
def summarize() -> None:
    """Execute the summary."""
    summarizer = Summarizer()
    summarizer.main()

In [None]:
"""This lets us cache our results.  Clear the appropriate directories under data to force re-run."""
if not Path("./data/query").is_dir():
    query()
if not Path("./data/burst").is_dir():
    burst()
if not Path("./data/summary").is_dir():
    summarize()