In [1]:
from pathlib import Path
from typing import List, Tuple, Optional, Union
from pydantic import BaseModel
from enum import Enum
import os
from joblib import Parallel, delayed
import dataclasses
from dataclasses import dataclass
import itertools

import importlib
import modeling
importlib.reload(modeling)
import modeling.riscv_dv as riscv_dv
importlib.reload(modeling.riscv_dv)
import modeling.coverage_parser as coverage_parser
importlib.reload(modeling.coverage_parser)

<module 'modeling.coverage_parser' from '/nscratch/vighneshiyer/cov-proxy-model/modeling/coverage_parser.py'>

In [2]:
class Timeout:
    pass
@dataclass
class Pass:
    pass_cycles: int
@dataclass
class Fail:
    fail_cycles: Optional[int] # None = failed for unknown reason (e.g. bad syscall), int = failed due to timeout in given cycles
    
@dataclass
class TestData:
    config: riscv_dv.RiscvDvConfig
    cov_data: Optional[List[coverage_parser.cov_t]]
    spike_status: Optional[int]  # None = timeout, int value = number of lines of spike.log (roughly correlated with number of instructions executed)
    rtl_sim_status: Union[Pass, Fail] # pass: cycles, fail: timeout and cycles, fail: no reason
    test_id: int
    test_iteration: int
    
def read_last_line(file: Path) -> str:
    with open(file, 'rb') as f:
        try:  # catch OSError in case of a one line file 
            f.seek(-2, os.SEEK_END)
            while f.read(1) != b'\n':
                f.seek(-2, os.SEEK_CUR)
        except OSError:
            f.seek(0)
        last_line = f.readline().decode()
    return last_line

def clean_test_data(data: TestData) -> Optional[TestData]:
    if not data.cov_data:
        return None  # discard data points without coverage data
    if isinstance(data.rtl_sim_status, Fail) and data.rtl_sim_status.fail_cycles is None:
        return None  # discard data points that fail for no obvious reason
    
    # remove coverage points with 'block' coverage type
    cov_data = [x for x in data.cov_data if x[3] != "block"]
    
    # normalize coverage points by cycle count
    if hasattr(data.rtl_sim_status, "pass_cycles"):
        cov_data = [(x[0], x[1], x[2], x[3], x[4], float(x[5]) / float(data.rtl_sim_status.pass_cycles)) for x in cov_data]
    elif hasattr(data.rtl_sim_status, "fail_cycles"): # include failing tests as long as the test simply timed out
        cov_data = [(x[0], x[1], x[2], x[3], x[4], float(x[5]) / float(data.rtl_sim_status.fail_cycles)) for x in cov_data]
    else:
        print(data.rtl_sim_status)
        assert False
        
    cov_data.sort(key=lambda c: (c[0], c[1], c[3])) # sort by file, then by line number, then by type (if vs else)
    
    return dataclasses.replace(data, cov_data=cov_data)

In [3]:
def parse_test(test_id_folder: Path) -> List[TestData]:
    # Figure out how many iterations were run for a given test id, look at the maximum number of file names
    n_iterations: int = int(max([x.name.split('.')[0] for x in test_id_folder.iterdir()])[0])
    test_data: List[TestData] = []
    print(f"Parsing test {test_id_folder}")
    for i in range(n_iterations+1):
        riscv_dv_config = riscv_dv.RiscvDvConfig.parse_raw((test_id_folder / f"{i}.json").read_text())
        
        cov_dat_file = (test_id_folder / f"{i}.cov.dat")
        if cov_dat_file.exists():
            cov_data = [coverage_parser.parse_line(line) for line in cov_dat_file.read_text().split('\n')[1:] if len(line) != 0]
        else:
            cov_data = None
        
        spike_status_str = (test_id_folder / f"{i}.spike.status").read_text()
        if spike_status_str == "TIMEOUT":
            spike_status = None
        else:
            spike_status = int(spike_status_str)
            
        sim_log_file = (test_id_folder / f"{i}.sim.log")
        sim_log_str = read_last_line(sim_log_file)
        if sim_log_str.startswith("*** PASSED ***"):
            rtl_sim_status = Pass(pass_cycles=int(sim_log_str.split(' ')[5]))
        elif sim_log_str.startswith("*** FAILED *** via trace_count"):
            rtl_sim_status = Fail(fail_cycles=int(sim_log_str.split(' ')[9]))
        else:
            rtl_sim_status = Fail(fail_cycles=None)
        test_data.append(TestData(config=riscv_dv_config, cov_data=cov_data, spike_status=spike_status, rtl_sim_status=rtl_sim_status, test_id=test_id_folder.name, test_iteration=i))
    return test_data

In [4]:
# Get coverage data and riscv-dv configs for each test
data = Parallel(n_jobs=24)(delayed(parse_test)(test_id_folder) for test_id_folder in (Path.cwd() / "configs").iterdir())
len(data)

100

In [5]:
data = list(itertools.chain(*data))

In [7]:
cleaned_data = [clean_test_data(d) for d in data]
cleaned_data = [x for x in cleaned_data if x is not None]

In [26]:
def data_to_vector(data: TestData) -> Tuple[List[float], List[float]]: # returns a row of the A matrix and a row of the B matrix
    if 'boot_mode' in data.config.plusarg_config: del data.config.plusarg_config['boot_mode']
    keys = list(data.config.plusarg_config.keys())
    keys.sort()
    A = [float(data.config.plusarg_config[key]) for key in keys]
    
    B = [float(d[5]) for d in data.cov_data]
    
    return A, B

In [27]:
data_to_vector(cleaned_data[0])

([0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  1.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  200.0,
  1.0,
  1.0,
  1.0,
  1.0,
  0.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  1.0,
  0.0,
  1.0,
  5.0,
  1.0,
  0.0,
  0.0,
  0.0,
  1.0,
  0.0,
  2.0,
  0.0],
 [0.853315420347937,
  0.1466663630567447,
  0.809946261043811,
  0.19003552236087076,
  2.280435376628108,
  0.7601329811458238,
  0.7166408598232991,
  0.28334092358138263,
  0.9925403042171418,
  0.007441479187539849,
  0.021445486838509883,
  0.008320429911649514,
  0.34606521541123964,
  0.653916567993442,
  0.9912742508425175,
  0.008707532562164132,
  0.026122597686492395,
  0.008707532562164132,
  0.34606521541123964,
  0.653916567993442,
  0.8606612624100556,
  0.1393205209946261,
  0.4179615629838783,
  0.1393205209946261,
  0.9532698788596411,
  0.04671190454504053,
  0.9733855542399126,
  0.026596229164769104,
  0

In [None]:
    
for point in cleaned_data:
    input_features = point.config.plusarg_config
    
    int_features = {k:v for k, v in input_features.items()} # sort keys by name
    boot_mode