#  Convert PM100 dataset into Performance Co-Pilot archive format

### https://doi.org/10.1145/3624062.3624263

In [1]:
import pandas as pd
import datetime
import shutil
import cpmapi
import glob
import math
import re
import os

from pcp import pmapi, pmi
from cpmapi import (
    PM_SEM_DISCRETE, PM_SEM_INSTANT, PM_SEM_COUNTER,
    PM_SPACE_KBYTE, PM_SPACE_BYTE, PM_TIME_SEC, PM_TIME_USEC, PM_TIME_MSEC,
    PM_TYPE_FLOAT, PM_TYPE_U32, PM_TYPE_U64, PM_TYPE_STRING, PM_TYPE_DOUBLE,
    PM_ID_NULL, PM_IN_NULL, PM_INDOM_NULL)
from cpmi import (PMI_ERR_DUPMETRICNAME, PMI_ERR_DUPINSTNAME)

Helper for analysing time spent loading and transforming

In [2]:
from time import process_time, perf_counter

def start_timer():
    t0 = process_time()
    c0 = perf_counter()
    return (t0, c0)

def stop_timer(t0, c0):
    t1 = process_time() - t0
    c1 = perf_counter() - c0
    return 'Completed in %.5f seconds CPU time, %.5f elapsed time' % (t1, c1)

Helper for discarding some PM-100 information (e.g. weather details)

In [3]:
def ignore_metric(plugin, name):
    #print('Ignoring', plugin, 'metric', name)
    return None

Helper for consistently assigning metric identifiers within a domain

In [4]:
metric_dict = {}  # tracks all pmID assignments

def metric_pmid(log, cluster, metric_name):
    if metric_name in metric_dict:
        return metric_dict[metric_name]
    pmid = log.pmiID(1, cluster, len(metric_dict))
    metric_dict[metric_name] = pmid
    return pmid

### Functions providing PCP metric metadata for IPMI metrics

In [5]:
def ipmi_pmid(log, filename):
    return metric_pmid(log, 1, filename)

def ipmi_ambient(log, filename): return {
    'name': 'ipmi.ambient_temperature', 'pmid': ipmi_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, 0, 0, 0, 0, 0)
}

def ipmi_total_power(log, filename): return {
    'name': 'ipmi.total_power', 'pmid': ipmi_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, 0, 0, 0, 0, 0)
}

In [6]:
file_ipmi_metrics = {
    'ambient': ipmi_ambient,
    'total_power': ipmi_total_power,
}

def file_ipmi_metric(log, filename):
    try:
        ipmi_metric = file_ipmi_metrics[filename]
        return ipmi_metric(log, filename)
    except:
        print('FAIL', filename)
        pass # ignore misc environmental readings
    return None

### Functions providing PCP metric metadata for all GPU metrics

In [7]:
def gpu_pmid(log, filename):
    return metric_pmid(log, 2, filename)

def gpu_indom(log):
    return log.pmiInDom(2, 0)
    
def generic_gpu_metric(log, metricname, instid, instname): return {
    'name': 'gpu.' + metricname, 'pmid': gpu_pmid(log, metricname),
    'indom': gpu_indom(log), 'instid': instid, 'instname': instname,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, 0, 0, 0, 0, 0)
}

def file_gpu_metric(log, metricname):
    # match on input like: "Gpu2_xid_errors"
    # PCP metric becomes: gpu.xid_errors[GPU2]
    scan = re.search(r'Gpu([0-9].*?)_(.*)', metricname)
    if not scan:
        return None
    gpu_id = int(scan.group(1))
    gpu_name = 'GPU' + scan.group(1)
    metric_name = 'gpu.' + scan.group(2)
    return generic_gpu_metric(log, metric_name, gpu_id, gpu_name)

### Functions providing PCP metric metadata for all kernel metrics

In [8]:
def kernel_pmid(log, filename):
    return metric_pmid(log, 3, filename)

def load_indom(log):
    return log.pmiInDom(3, 0)

def hinv_ncpu(log, filename): return {
    'name': 'hinv.ncpu', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_U32, 'sem': PM_SEM_DISCRETE,
    'units': log.pmiUnits(0, 0, 0, 0, 0, 0)
}

def kernel_all_load(log, filename, instid, instname): return {
    'name': 'kernel.all.load', 'pmid': kernel_pmid(log, filename),
    'indom': load_indom(log), 'instid': instid, 'instname': instname,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, 0, 0, 0, 0, 0)
}
def kernel_all_load1(log, file): return kernel_all_load(log, 'load', 1, '1 minute')
def kernel_all_load5(log, file): return kernel_all_load(log, 'load', 5, '5 minute')
def kernel_all_load15(log, file): return kernel_all_load(log, 'load', 15, '15 minute')

def mem_util_bufmem(log, filename): return {
    'name': 'mem.util.bufmem', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_U64, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(1, 0, 0, PM_SPACE_KBYTE, 0, 0)
}

def mem_util_cached(log, filename): return {
    'name': 'mem.util.cached', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_U64, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(1, 0, 0, PM_SPACE_KBYTE, 0, 0)
}

def mem_util_free(log, filename): return {
    'name': 'mem.util.free', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_U64, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(1, 0, 0, PM_SPACE_KBYTE, 0, 0)
}

def mem_util_shared(log, filename): return {
    'name': 'mem.util.shared', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_U64, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(1, 0, 0, PM_SPACE_KBYTE, 0, 0)
}

def mem_physmem(log, filename): return {
    'name': 'mem.physmem', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_U64, 'sem': PM_SEM_DISCRETE,
    'units': log.pmiUnits(1, 0, 0, PM_SPACE_KBYTE, 0, 0)
}

def kernel_all_nprocs(log, filename): return {
    'name': 'kernel.all.nprocs', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_U64, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, 0, 0, 0, 0, 0)
}

def kernel_all_running(log, filename): return {
    'name': 'kernel.all.running', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_U64, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, 0, 0, 0, 0, 0)
}

def swap_length(log, filename): return {
    'name': 'swap.length', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_U64, 'sem': PM_SEM_DISCRETE,
    'units': log.pmiUnits(1, 0, 0, PM_SPACE_BYTE, 0, 0)
}
    
def swap_free(log, filename): return {
    'name': 'swap.free', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_U64, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(1, 0, 0, PM_SPACE_BYTE, 0, 0)
}
    
def kernel_uname_sysname(log, filename): return {
    'name': 'kernel.uname.sysname', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_STRING, 'sem': PM_SEM_DISCRETE,
    'units': log.pmiUnits(0, 0, 0, 0, 0, 0)
}
    
def kernel_uname_release(log, filename): return {
    'name': 'kernel.uname.release', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_STRING, 'sem': PM_SEM_DISCRETE,
    'units': log.pmiUnits(0, 0, 0, 0, 0, 0)
}
    
def kernel_uname_machine(log, filename): return {
    'name': 'kernel.uname.machine', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_STRING, 'sem': PM_SEM_DISCRETE,
    'units': log.pmiUnits(0, 0, 0, 0, 0, 0)
}
    
def kernel_all_boottime(log, filename): return {
    'name': 'kernel.all.boottime', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_U64, 'sem': PM_SEM_DISCRETE,
    'units': log.pmiUnits(0, 1, 0, 0, PM_TIME_SEC, 0)
}
    
def kernel_all_idletime(log, filename): return {
    'name': 'kernel.all.idletime', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_DOUBLE, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, -1, 0, 0, PM_TIME_SEC, 0)
}

def hinv_all_cpu_clock(log, filename): return {
    'name': 'hinv.all.cpu.clock', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_DISCRETE,
    'units': log.pmiUnits(0, -1, 0, 0, PM_TIME_USEC, 0)
}
    
def kernel_all_cpu_user(log, filename): return {
    'name': 'kernel.all.cpu.user', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, -1, 0, 0, PM_TIME_SEC, 0)
}

def kernel_all_cpu_wait_total(log, filename): return {
    'name': 'kernel.all.cpu.wait.total', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, -1, 0, 0, PM_TIME_SEC, 0)
}
    
def kernel_all_cpu_steal(log, filename): return {
    'name': 'kernel.all.cpu.steal', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, -1, 0, 0, PM_TIME_SEC, 0)
}
    
def kernel_all_cpu_nice(log, filename): return {
    'name': 'kernel.all.cpu.nice', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, -1, 0, 0, PM_TIME_SEC, 0)
}
    
def kernel_all_cpu_idle(log, filename): return {
    'name': 'kernel.all.cpu.idle', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, -1, 0, 0, PM_TIME_SEC, 0)
}
    
def kernel_all_cpu_sys(log, filename): return {
    'name': 'kernel.all.cpu.sys', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, -1, 0, 0, PM_TIME_SEC, 0)
}
    
def network_all_out_packets(log, filename): return {
    'name': 'network.all.out.packets', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, 0, -1, 0, 0, 1)
}

def network_all_in_packets(log, filename): return {
    'name': 'network.all.in.packets', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(0, 0, -1, 0, 0, 1)
}
    
def network_all_out_bytes(log, filename): return {
    'name': 'network.all.out.bytes', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(-1, 0, 0, PM_SPACE_BYTE, 0, 0)
}
    
def network_all_in_bytes(log, filename): return {
    'name': 'network.all.in.bytes', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(-1, 0, 0, PM_SPACE_BYTE, 0, 0)
}

def filesys_all_free(log, filename): return {
    'name': 'filesys.all.free', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(1, 0, 0, PM_SPACE_MBYTE, 0, 0)
}
    
def filesys_all_capacity(log, filename): return {
    'name': 'filesys.all.capacity', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_DISCRETE,
    'units': log.pmiUnits(1, 0, 0, PM_SPACE_MBYTE, 0, 0)
}
    
def filesys_all_maxused(log, filename): return {
    'name': 'filesys.all.maxused', 'pmid': kernel_pmid(log, filename),
    'indom': None, 'instid': None, 'instname': None,
    'type': PM_TYPE_FLOAT, 'sem': PM_SEM_INSTANT,
    'units': log.pmiUnits(1, 0, 0, PM_SPACE_MBYTE, 0, 0)
}

In [9]:
# Mapping individual PM100 file names to specific PCP metrics
file_kernel_metrics = {
    'cpu_num': hinv_ncpu,
    'mem_buffers': mem_util_bufmem,
    'mem_cached': mem_util_cached,
    'mem_free': mem_util_free,
    'mem_shared': mem_util_shared,
    'mem_total': mem_physmem,
    'proc_total': kernel_all_nprocs,
    'proc_run': kernel_all_running,
    'swap_total': swap_length,
    'swap_free': swap_free,
    'os_name': kernel_uname_sysname,
    'os_release': kernel_uname_release,
    'machine_type': kernel_uname_machine,
    'boottime': kernel_all_boottime,
    'cpu_aidle': kernel_all_idletime,
    'cpu_speed': hinv_all_cpu_clock,
    'cpu_user': kernel_all_cpu_user,
    'cpu_wio': kernel_all_cpu_wait_total,
    'cpu_steal': kernel_all_cpu_steal,
    'cpu_nice': kernel_all_cpu_nice,
    'cpu_idle': kernel_all_cpu_idle,
    'cpu_system': kernel_all_cpu_sys,
    'load_one': kernel_all_load1,
    'load_five': kernel_all_load5,
    'load_fifteen': kernel_all_load15,
    'pkts_out': network_all_out_packets,
    'pkts_in': network_all_in_packets,
    'bytes_out': network_all_out_bytes,
    'bytes_in': network_all_in_bytes,
    'disk_free': filesys_all_free,
    'disk_total': filesys_all_capacity,
    'part_max_used': filesys_all_maxused,
    'gexec': None, # "scalable cluster remote execution system"
}

def file_kernel_metric(log, filename):
    try:
        kernel_metric = file_kernel_metrics[filename]
        if not kernel_metric:
            raise TypeError(filename)
        return kernel_metric(log, filename)
    except:
        print('Kernel metric missing:', filename)
    return None

In [10]:
def get_file_mapping(log, plugin, metric):
    if plugin == 'ganglia_pub':
        if metric[:3] == 'Gpu':
            return file_gpu_metric(log, metric)
        return file_kernel_metric(log, metric)
    elif plugin == 'ipmi_pub':
        return file_ipmi_metric(log, metric)

    # ignore all other recorded subsystems
    elif plugin in ['weather_pub', 'nagios_pub']:  # misc monitoring
        return ignore_metric(plugin, metric)
    elif plugin in ['job_table', 'slurm_pub']:  # HPC job scheduler
        return ignore_metric(plugin, metric)
    elif plugin in ['logics_pub', 'vertiv_pub', 'schneider_pub']:
        return ignore_metric(plugin, metric)

    print('Plugin handler missing:', plugin)
    return None

In [11]:
MAX_NODES = 1000

def instance_mapping(pcp_metric, node, node_indom=True):
    inst_name = 'node' + str(node)
    if node_indom:
        inst_id = node
    else:
        inst_name += '::' + pcp_metric['instname']
        inst_id = node + (pcp_metric['instid'] * MAX_NODES)
    return inst_name, inst_id

In [12]:
def write_metric_archives(logpath, host, timezone, year, month, day):
    """
    Reads from the PM100 dataset for a single metric for a single day.
    Writes a PCP archive with metric containing values for all nodes.
    These share a hostname and metric metadata so can be merged later.
    """
    datestring = '%d-%02d-%02d' % (year, month, day)
    hostname = '%s.cineca.it' % (host)
    archive = '%4d%02d%02d' % (year, month, day)
    epoch = datetime.datetime(1970,1,1, tzinfo=datetime.timezone.utc)

    files = glob.glob('year_month=*/plugin=*/metric=*/*.parquet')
    for file in sorted(files):
        print('Input File:', file)
        result = re.search(r'/plugin=(.*?)/metric=(.*?)/', file)
        plugin, metric = result.group(1, 2)

        filename = '%s/%s/%s' % (logpath, metric, archive)
        dirname = os.path.dirname(filename)
        os.makedirs(dirname, exist_ok=True)
        print('Output File:', filename)
        log = pmi.pmiLogImport(filename)
        log.pmiSetHostname(hostname)
        log.pmiSetTimezone(timezone)

        pcp_metric = get_file_mapping(log, plugin, metric)
        if not pcp_metric:
            continue

        tt, cc = start_timer()    
        df = pd.read_parquet(file)
        ss = stop_timer(tt, cc)
    
        if 'node' not in df.columns:
            print('No node column in', file)
            continue
        print('*** Parquet', df.shape)
        print(ss)

        df.set_index('timestamp', inplace=True)
        try:  # there may be no data on this day
            df = df.loc[datestring]  # just this day
        except KeyError:
            continue
        df['node'] = df['node'].astype(int)
        print('*** Processed', df.shape)

        name = pcp_metric['name']
        indom = pcp_metric['indom']
        if not indom or indom == PM_INDOM_NULL:
            node_indom = True
            indom = log.pmiInDom(1, 0)
        else:
            node_indom = False

        tt, cc = start_timer()
        pcp_handle = {}
        #print('AddMetric:', name)
        log.pmiAddMetric(name, pcp_metric['pmid'], pcp_metric['type'],
                     indom, pcp_metric['sem'], pcp_metric['units'])
        for node in sorted(df.node.unique()):
            inst_name, inst_id = instance_mapping(pcp_metric, node, node_indom)
            log.pmiAddInstance(indom, inst_name, inst_id)
            pcp_handle[inst_name] = log.pmiGetHandle(name, inst_name)
            #print('AddInstance:', inst_id, inst_name, pcp_handle[inst_name])
        ss = stop_timer(tt, cc)
        print('*** Metadata', df.shape)
        print(ss)

        tt, cc = start_timer()
        (previous, seconds) = (None, None)
        (count, total) = (0, 0)

        for row in df.sort_values('timestamp').itertuples(index=True, name='sample'):
            previous = seconds
            seconds = int((row[0] - epoch).total_seconds())
            # if this is a new sample, flush the previous one
            if seconds != previous and count > 0:
                try:
                    log.pmiWrite(previous, 0)
                    #print('Wrote sample with', count, 'values')
                    count = 0
                except pmi.pmiErr as error:
                    print('log flush failed at time', previous, '\n', error)
                    pass

            value = row[1]
            node = row[2]
            #print('row:', seconds, value, node)
            if not isinstance(value, str):
                if math.isnan(value):
                    continue
                value = str(value).rstrip('.0')
            inst_name, _ = instance_mapping(pcp_metric, node, node_indom)

            try:
                log.pmiPutValueHandle(pcp_handle[inst_name], value)
                if previous is None:
                    previous = seconds
                count += 1
                total += 1
            except pmi.pmiErr as error:
                print(name, inst_name, 'failed at time', seconds, '\n', error)
                continue

        if count != 0:  # possible need to flush last sample
            try:
                log.pmiWrite(seconds, 0)
                #print('Wrote last sample with', count, 'values')
            except pmi.pmiErr as error:
                print('final log flush failed at time', seconds, '\n', error)
                pass
        del log
        ss = stop_timer(tt, cc)
        print('*** Values', total, name)
        print(ss)

In [13]:
days = [(2022, 9, 1)]
host = 'marconi100'
logpath = 'archives'
timezone = 'CET'

for (year, month, day) in days:
    df = write_metric_archives(logpath, host, timezone, year, month, day)

Input File: year_month=22-09/plugin=ganglia_pub/metric=Gpu0_board_limit_violation/a_0.parquet
Output File: archives/Gpu0_board_limit_violation/20220901
PM ganglia_pub Gpu0_board_limit_violation
*** Parquet (117818442, 3)
Completed in 10.49830 seconds CPU time, 3.43325 elapsed time
*** Processed (4258870, 2)
*** Metadata (4258870, 2)
Completed in 0.02297 seconds CPU time, 0.02326 elapsed time
*** Values 4258870 gpu.gpu.board_limit_violation
Completed in 29.58409 seconds CPU time, 30.43204 elapsed time
Input File: year_month=22-09/plugin=ganglia_pub/metric=Gpu0_current_clock_throttle_reasons/a_0.parquet
Output File: archives/Gpu0_current_clock_throttle_reasons/20220901
PM ganglia_pub Gpu0_current_clock_throttle_reasons
*** Parquet (117818478, 3)
Completed in 6.29319 seconds CPU time, 2.60694 elapsed time
*** Processed (4258914, 2)
*** Metadata (4258914, 2)
Completed in 0.02643 seconds CPU time, 0.02910 elapsed time
*** Values 4258914 gpu.gpu.current_clock_throttle_reasons
Completed in 30