Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion autotest/utils_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3279,15 +3279,72 @@ def gpr_zdt1_ppw():
os.chdir("..")


def pestpp_runstorage_file_test(tmp_path):
import os
import numpy as np
import pandas as pd
import pyemu

org_rns_file = os.path.join("utils","runstor.rns")
rns_file = os.path.join(tmp_path,"runstor.rns")
if os.path.exists(rns_file):
os.remove(rns_file)
shutil.copy2(org_rns_file,rns_file)
rs = pyemu.helpers.RunStor(rns_file)
header,par_names,obs_names = rs.file_info(rns_file)
cols = ["n_runs","run_size","p_name_size","o_name_size","run_start"]
for col in cols:
assert col in header
assert header[col] > 0
df = rs.get_data()
assert "run_status_label" in df.columns
assert np.all(df.run_pos.values>0)

for entry in df.info_txt:
assert len(str(entry)) > 0
assert "realization:" in entry
assert "da_cycle:-9999" in entry
assert df.run_status.sum() == 0
org_df = df.copy()
df["run_status"] = -100
df.loc[:,par_names[0]] = -111
df.loc[:,obs_names[-1]] = -222
df["buffer_status"] = 1
#df.loc[0,par_names] = -1111
rs.update(df)
rs2 = pyemu.helpers.RunStor(rns_file)
header,par_names,obs_names = rs.file_info(rns_file)
#print(header)
df2 = rs2.get_data()
#print(df2.shape)
assert df.shape == df2.shape
#print(df2.run_status_label)
assert np.all(df2.run_status.values == -100)

print(df2.loc[:,par_names[0]])
assert np.all(df2.loc[:,par_names[0]].values == -111)
print(df2.loc[:, obs_names[-1]])
assert np.all(df2.loc[:,obs_names[-1]].values == -222)
print(df2.buffer_status)
#buffer status should always be 0 no matter what values are put in the dataframe
assert df2.buffer_status.sum() == 0
rs2.update(org_df)
p1,o1,meta = pyemu.helpers.read_pestpp_runstorage(rns_file,irun="all", with_metadata=True)

p2 = pd.read_csv(os.path.join("utils","runstor.0.par.csv"),index_col=0)

diff = np.abs(p1.loc[:,p2.columns].values - p2.values)
print(diff.max())
assert diff.max() < 1.0e-7


if __name__ == "__main__":
pestpp_runstorage_file_test(".")
#geostat_draws_test('.')
#fac2real_wrapped_test('.')
#maha_pdc_test('.')
#ppu_geostats_test(".")
pypestworker_test()
#pypestworker_test()
#gpr_zdt1_test()
#gpr_compare_invest()
#gpr_constr_test()
Expand Down
194 changes: 194 additions & 0 deletions pyemu/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,6 +1366,200 @@ def pst_from_parnames_obsnames(
)


class RunStor(object):

def __init__(self,filename):
"""access to the pest++ run storage file. Can be used to support
usage of the pest++ external run manager

Args:
filename (str): the name of a pest++ run storage file (ie pest.rns)

Example::

rns = pyemu.helpers.RunStor("pest.rns")
# get a dataframe of both parameter and observation
# values for all runs in the file.
df = rns.get_data()
# a function that processes the runs stored
# in df; the observation values in df should
# be updated "in place"
failed_idxs = process_my_model_runs(df)
#mark the failed runs
df.run_status.iloc[failed_idxs] = -99
#update the parameter and observation values
# stored in the rns file
rns.update(df)

"""
assert os.path.exists(filename)
self.filename = filename
self.info_txt_size = 1001

@staticmethod
def header_dtype():
"""the numpy header dtype of the file
"""
return np.dtype(
[
("n_runs", np.int64),
("run_size", np.int64),
("p_name_size", np.int64),
("o_name_size", np.int64),
]
)

@staticmethod
def file_info(filename):
"""get information about whats stored in the file

Args:
filename (str): the run storage file name

Returns:
header (dict): the file header
par_names (list): parameter names ordered as they occur in the file
obs_names (list): observation names ordered as they occur in the file
"""

with open(filename,"rb") as f:
header = np.fromfile(f, dtype=RunStor.header_dtype(), count=1)
header = {name: header[name][0] for name in RunStor.header_dtype().names}
p_name_size, o_name_size = header["p_name_size"], header["o_name_size"]
par_names = (
struct.unpack("{0}s".format(p_name_size), f.read(p_name_size))[0]
.strip()
.lower()
.decode()
.split("\0")[:-1]
)
obs_names = (
struct.unpack("{0}s".format(o_name_size), f.read(o_name_size))[0]
.strip()
.lower()
.decode()
.split("\0")[:-1]
)
run_start = f.tell()
header["run_start"] = run_start
return header, par_names, obs_names

@staticmethod
def status_str(r_status):
"""convert the run status string to a txt label

Args:
r_status (int): the int run status from the file

Returns:
status (str): run status label

"""
if r_status == 0:
return "not completed"
if r_status == 1:
return "completed"
if r_status == -100:
return "canceled"
if r_status == "-99":
return "failed"
else:
return "failed"


def _read_run(self,f,npar,nobs):
"""private method to read a run from the file

Args:
f (file): the open file handle
npar (int): number of parameters
nobs (int): number of observations

Returns:
r_status (int): run status
info_txt (str): run information
buf_status (int): status of the write buffer (not really used...)
par_vals (np.ndarray): the parameter values for the run
obs_vals (np.ndarray): the observation values for the run

"""
r_status = np.fromfile(f, dtype=np.int8, count=1)
info_txt = struct.unpack("{0}s".format(self.info_txt_size), f.read(self.info_txt_size))[0].strip().lower().decode()
info_txt = info_txt.replace("\x00","")
info_val = np.fromfile(f, dtype=np.float64, count=1)[0]
par_vals = np.fromfile(f, dtype=np.float64, count=npar)
obs_vals = np.fromfile(f, dtype=np.float64, count=nobs)
buf_status = np.fromfile(f, dtype=np.int8, count=1)[0]
return r_status, info_txt, buf_status, par_vals, obs_vals

def get_data(self):
"""read the contents of the file into a dataframe

Returns:
df (pd.DataFrame): the file contents

"""
header, par_names, obs_names = RunStor.file_info(self.filename)
with open(self.filename,'rb') as f:
f.seek(header["run_start"])
rstats, infos, par_vals, obs_vals = [],[],[],[]
run_poss,bstats = [],[]
for irun in range(header["n_runs"]):
run_pos = header["run_start"] + (irun*header["run_size"])
f.seek(run_pos)
try:
r_status, info_txt, buf_status, par_val, obs_val = self._read_run(f,len(par_names),len(obs_names))
except Exception as e:
raise Exception("error reading run {0}: {1}".format(irun,str(e)))
rstats.append(r_status[0])
infos.append(info_txt)
par_vals.append(par_val)
obs_vals.append(obs_val)
run_poss.append(run_pos)
bstats.append(buf_status)
df = pd.DataFrame({"run_status":rstats,"run_pos":run_poss,"info_txt":infos,"buffer_status":bstats})
df["run_status"] = df.run_status.astype(int)
df["run_status_label"] = df.run_status.apply(RunStor.status_str)
par_vals = np.array(par_vals)
obs_vals = np.array(obs_vals)
df = pd.concat([df,pd.DataFrame(par_vals,columns=par_names),pd.DataFrame(obs_vals,columns=obs_names)],axis=1)
return df

def update(self,df):
"""update the parameter and observation values

Args:
df (pd.DataFrame) : file contents to update. Should be derived from the get_data() method
to maintain dtypes and required information. The parameter and observation values for each
run are updated "in place" in the file, as is the run_status int flag; this flag should be set to
-99 for any runs that "failed".

"""
header, par_names, obs_names = RunStor.file_info(self.filename)
if header["n_runs"] != df.shape[0]:
raise Exception("number of runs implied by df nrows {0} != n_runs in file {1}".format(df.shape[0],header["n_runs"]))
par_vals = df.loc[:,par_names].values
obs_vals = df.loc[:,obs_names].values
run_status = df.run_status.astype(np.int8).values
run_pos = df.run_pos.values
offset = 1 + self.info_txt_size
with open(self.filename,"r+b") as f:
f.seek(header["run_start"])
for irun,(rstat,rpos) in enumerate(zip(run_status,run_pos)):
f.seek(rpos)
run_status[irun].tofile(f,sep="")

f.seek(rpos+offset)
#write the unused info val
np.float64(-999.).tofile(f, sep="")
par_vals[irun,:].tofile(f,sep="")
obs_vals[irun, :].tofile(f, sep="")
#the buffer status flag - 0 means the write was completed
np.int8(0).tofile(f, sep="")




def read_pestpp_runstorage(filename, irun=0, with_metadata=False):
"""read pars and obs from a specific run in a pest++ serialized
run storage file (e.g. .rns/.rnj) into dataframes.
Expand Down
Loading