diff --git a/autotest/utils_tests.py b/autotest/utils_tests.py index 9074982a..62e59b42 100644 --- a/autotest/utils_tests.py +++ b/autotest/utils_tests.py @@ -3279,15 +3279,72 @@ def gpr_zdt1_ppw(): os.chdir("..") +def pestpp_runstorage_file_test(tmp_path): + import os + import numpy as np + import pandas as pd + import pyemu + org_rns_file = os.path.join("utils","runstor.rns") + rns_file = os.path.join(tmp_path,"runstor.rns") + if os.path.exists(rns_file): + os.remove(rns_file) + shutil.copy2(org_rns_file,rns_file) + rs = pyemu.helpers.RunStor(rns_file) + header,par_names,obs_names = rs.file_info(rns_file) + cols = ["n_runs","run_size","p_name_size","o_name_size","run_start"] + for col in cols: + assert col in header + assert header[col] > 0 + df = rs.get_data() + assert "run_status_label" in df.columns + assert np.all(df.run_pos.values>0) + + for entry in df.info_txt: + assert len(str(entry)) > 0 + assert "realization:" in entry + assert "da_cycle:-9999" in entry + assert df.run_status.sum() == 0 + org_df = df.copy() + df["run_status"] = -100 + df.loc[:,par_names[0]] = -111 + df.loc[:,obs_names[-1]] = -222 + df["buffer_status"] = 1 + #df.loc[0,par_names] = -1111 + rs.update(df) + rs2 = pyemu.helpers.RunStor(rns_file) + header,par_names,obs_names = rs.file_info(rns_file) + #print(header) + df2 = rs2.get_data() + #print(df2.shape) + assert df.shape == df2.shape + #print(df2.run_status_label) + assert np.all(df2.run_status.values == -100) + + print(df2.loc[:,par_names[0]]) + assert np.all(df2.loc[:,par_names[0]].values == -111) + print(df2.loc[:, obs_names[-1]]) + assert np.all(df2.loc[:,obs_names[-1]].values == -222) + print(df2.buffer_status) + #buffer status should always be 0 no matter what values are put in the dataframe + assert df2.buffer_status.sum() == 0 + rs2.update(org_df) + p1,o1,meta = pyemu.helpers.read_pestpp_runstorage(rns_file,irun="all", with_metadata=True) + + p2 = pd.read_csv(os.path.join("utils","runstor.0.par.csv"),index_col=0) + + diff = np.abs(p1.loc[:,p2.columns].values - p2.values) + print(diff.max()) + assert diff.max() < 1.0e-7 if __name__ == "__main__": + pestpp_runstorage_file_test(".") #geostat_draws_test('.') #fac2real_wrapped_test('.') #maha_pdc_test('.') #ppu_geostats_test(".") - pypestworker_test() + #pypestworker_test() #gpr_zdt1_test() #gpr_compare_invest() #gpr_constr_test() diff --git a/pyemu/utils/helpers.py b/pyemu/utils/helpers.py index 7b24c4b9..e5e10d48 100644 --- a/pyemu/utils/helpers.py +++ b/pyemu/utils/helpers.py @@ -1366,6 +1366,200 @@ def pst_from_parnames_obsnames( ) +class RunStor(object): + + def __init__(self,filename): + """access to the pest++ run storage file. Can be used to support + usage of the pest++ external run manager + + Args: + filename (str): the name of a pest++ run storage file (ie pest.rns) + + Example:: + + rns = pyemu.helpers.RunStor("pest.rns") + # get a dataframe of both parameter and observation + # values for all runs in the file. + df = rns.get_data() + # a function that processes the runs stored + # in df; the observation values in df should + # be updated "in place" + failed_idxs = process_my_model_runs(df) + #mark the failed runs + df.run_status.iloc[failed_idxs] = -99 + #update the parameter and observation values + # stored in the rns file + rns.update(df) + + """ + assert os.path.exists(filename) + self.filename = filename + self.info_txt_size = 1001 + + @staticmethod + def header_dtype(): + """the numpy header dtype of the file + """ + return np.dtype( + [ + ("n_runs", np.int64), + ("run_size", np.int64), + ("p_name_size", np.int64), + ("o_name_size", np.int64), + ] + ) + + @staticmethod + def file_info(filename): + """get information about whats stored in the file + + Args: + filename (str): the run storage file name + + Returns: + header (dict): the file header + par_names (list): parameter names ordered as they occur in the file + obs_names (list): observation names ordered as they occur in the file + """ + + with open(filename,"rb") as f: + header = np.fromfile(f, dtype=RunStor.header_dtype(), count=1) + header = {name: header[name][0] for name in RunStor.header_dtype().names} + p_name_size, o_name_size = header["p_name_size"], header["o_name_size"] + par_names = ( + struct.unpack("{0}s".format(p_name_size), f.read(p_name_size))[0] + .strip() + .lower() + .decode() + .split("\0")[:-1] + ) + obs_names = ( + struct.unpack("{0}s".format(o_name_size), f.read(o_name_size))[0] + .strip() + .lower() + .decode() + .split("\0")[:-1] + ) + run_start = f.tell() + header["run_start"] = run_start + return header, par_names, obs_names + + @staticmethod + def status_str(r_status): + """convert the run status string to a txt label + + Args: + r_status (int): the int run status from the file + + Returns: + status (str): run status label + + """ + if r_status == 0: + return "not completed" + if r_status == 1: + return "completed" + if r_status == -100: + return "canceled" + if r_status == "-99": + return "failed" + else: + return "failed" + + + def _read_run(self,f,npar,nobs): + """private method to read a run from the file + + Args: + f (file): the open file handle + npar (int): number of parameters + nobs (int): number of observations + + Returns: + r_status (int): run status + info_txt (str): run information + buf_status (int): status of the write buffer (not really used...) + par_vals (np.ndarray): the parameter values for the run + obs_vals (np.ndarray): the observation values for the run + + """ + r_status = np.fromfile(f, dtype=np.int8, count=1) + info_txt = struct.unpack("{0}s".format(self.info_txt_size), f.read(self.info_txt_size))[0].strip().lower().decode() + info_txt = info_txt.replace("\x00","") + info_val = np.fromfile(f, dtype=np.float64, count=1)[0] + par_vals = np.fromfile(f, dtype=np.float64, count=npar) + obs_vals = np.fromfile(f, dtype=np.float64, count=nobs) + buf_status = np.fromfile(f, dtype=np.int8, count=1)[0] + return r_status, info_txt, buf_status, par_vals, obs_vals + + def get_data(self): + """read the contents of the file into a dataframe + + Returns: + df (pd.DataFrame): the file contents + + """ + header, par_names, obs_names = RunStor.file_info(self.filename) + with open(self.filename,'rb') as f: + f.seek(header["run_start"]) + rstats, infos, par_vals, obs_vals = [],[],[],[] + run_poss,bstats = [],[] + for irun in range(header["n_runs"]): + run_pos = header["run_start"] + (irun*header["run_size"]) + f.seek(run_pos) + try: + r_status, info_txt, buf_status, par_val, obs_val = self._read_run(f,len(par_names),len(obs_names)) + except Exception as e: + raise Exception("error reading run {0}: {1}".format(irun,str(e))) + rstats.append(r_status[0]) + infos.append(info_txt) + par_vals.append(par_val) + obs_vals.append(obs_val) + run_poss.append(run_pos) + bstats.append(buf_status) + df = pd.DataFrame({"run_status":rstats,"run_pos":run_poss,"info_txt":infos,"buffer_status":bstats}) + df["run_status"] = df.run_status.astype(int) + df["run_status_label"] = df.run_status.apply(RunStor.status_str) + par_vals = np.array(par_vals) + obs_vals = np.array(obs_vals) + df = pd.concat([df,pd.DataFrame(par_vals,columns=par_names),pd.DataFrame(obs_vals,columns=obs_names)],axis=1) + return df + + def update(self,df): + """update the parameter and observation values + + Args: + df (pd.DataFrame) : file contents to update. Should be derived from the get_data() method + to maintain dtypes and required information. The parameter and observation values for each + run are updated "in place" in the file, as is the run_status int flag; this flag should be set to + -99 for any runs that "failed". + + """ + header, par_names, obs_names = RunStor.file_info(self.filename) + if header["n_runs"] != df.shape[0]: + raise Exception("number of runs implied by df nrows {0} != n_runs in file {1}".format(df.shape[0],header["n_runs"])) + par_vals = df.loc[:,par_names].values + obs_vals = df.loc[:,obs_names].values + run_status = df.run_status.astype(np.int8).values + run_pos = df.run_pos.values + offset = 1 + self.info_txt_size + with open(self.filename,"r+b") as f: + f.seek(header["run_start"]) + for irun,(rstat,rpos) in enumerate(zip(run_status,run_pos)): + f.seek(rpos) + run_status[irun].tofile(f,sep="") + + f.seek(rpos+offset) + #write the unused info val + np.float64(-999.).tofile(f, sep="") + par_vals[irun,:].tofile(f,sep="") + obs_vals[irun, :].tofile(f, sep="") + #the buffer status flag - 0 means the write was completed + np.int8(0).tofile(f, sep="") + + + + def read_pestpp_runstorage(filename, irun=0, with_metadata=False): """read pars and obs from a specific run in a pest++ serialized run storage file (e.g. .rns/.rnj) into dataframes.