diff --git a/setup.py b/setup.py index 06ea1c020..0ce92ef47 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ packages=find_packages(), url='https://github.com/NCAR/wrf_hydro_py', license='MIT', - install_requires=['pandas','f90nml','deepdiff','pathlib','xarray','datetime','pytest','pytest-datadir-ng'], + install_requires=['pandas','f90nml','deepdiff','pathlib','xarray','datetime','pytest','pytest-datadir-ng','boltons'], author='Joe Mills', author_email='jmills@ucar.edu', description='Crude API for the WRF-Hydro model', diff --git a/wrfhydro/ensemble.py b/wrfhydro/ensemble.py new file mode 100644 index 000000000..838f96d8c --- /dev/null +++ b/wrfhydro/ensemble.py @@ -0,0 +1,223 @@ +from wrf_hydro_model import * +from deepdiff import DeepDiff +from boltons.iterutils import remap +import copy + + +# ######################## +class DeepDiffEq(DeepDiff): + + def __init__(self, + t1, + t2, + eq_types, + ignore_order=False, + report_repetition=False, + significant_digits=None, + exclude_paths=set(), + exclude_regex_paths=set(), + exclude_types=set(), + include_string_type_changes=False, + verbose_level=1, + view='text', + **kwargs): + + # Must set this first for some reason. + self.eq_types = set(eq_types) + + super().__init__(t1, + t2, + ignore_order=False, + report_repetition=False, + significant_digits=None, + exclude_paths=set(), + exclude_regex_paths=set(), + exclude_types=set(), + include_string_type_changes=False, + verbose_level=1, + view='text', + **kwargs) + + # Have to force override __diff_obj. + def _DeepDiff__diff_obj(self, level, parents_ids=frozenset({}), + is_namedtuple=False): + """Difference of 2 objects using their __eq__ if requested""" + + if type(level.t1) in self.eq_types: + if level.t1 == level.t2: + return + else: + self._DeepDiff__report_result('values_changed', level) + return + + super(DeepDiffEq, self).__diff_obj(level, parents_ids=frozenset({}), + is_namedtuple=False) + + + + + +def copy_member(member, + do_copy: bool): + if do_copy: + return(copy.deepcopy(member)) + else: + return(member) + +# ######################## +# Classes for constructing and running a wrf_hydro simulation +class WrfHydroEnsembleSim(object): + """Class for a WRF-Hydro model, which consitutes the model source code and compiled binary. + """ + def __init__(self, + members: list, + ensemble_dir: str='' ): + """Instantiate a WrfHydroEnsembleSim object. + Args: + members: + ensemble_dir: Optional, + Returns: + A WrfHydroEnsembleSim object. + """ + self.__members = [] + self.members = members + self.__members_dict = {} + """list: of WrfHydroSim objects.""" + + # Several simulation properties are not specified + # until run time. Place them here + self.ens_dir = '' + + + # Data to store in the ensemble object + # 1) list of simulations = the ensemble + # 2) N = __len__(), @property + # 3) ensemble dir, the directory containing the ensemble member_dir run dirs + + + def __len__(self): + return( len(self.members) ) + + + # The "canonical" name for len + @property + def N(self): + return(self.__len__()) + + + # Data to store with the "member" simulations, conceptually this + # data belongs to the members: + # 1) member number + # 2) description + # 3) member_dir + # 4) forcing_source_dir + # + # Ensemblize the individual members. + # Except for changing the Class definition, why + # would I define a child class instead of just adding attributes? + + + @property + def members(self): + return(self.__members) + + @members.setter + def members(self, + new_members: list, + copy_members: bool=True): + + if( type(new_members) is not list ): + new_members = [ new_members ] + + for nn in new_members: + self.__members.append(copy_member(nn, copy_members)) + # If copying an existing ensemble member, nuke the metadata + # number is the detector for all ensemble metadata. + if hasattr(nn, 'number'): + delattr(self.__members[len(self.__members)-1], 'number') + + # Put refs to these properties in the ensemble objects + for mm in range(len(self.__members)): + if not hasattr(self.__members[mm], 'number'): + self.__members[mm].number = -1 + self.__members[mm].description = '' + self.__members[mm].run_dir = '' + self.__members[mm].forcing_source_dir = '' + + + # A quick way to setup a basic ensemble from a single sim. + def replicate_member(self, + N: int, + copy_members: bool=True): + if self.N > 1: + print('WTF mate?') + else: + self.members = [ self.members[0] for nn in range(N-1) ] + + + @property + def members_dict(self): + m_dict = self.__members_dict + for mm in range(len(self.members)): + self.members[mm].number = mm + m_dict['number'] = [ mm.number for mm in self.members ] + m_dict['description'] = [ mm.description for mm in self.members ] + m_dict['run_dir'] = [ mm.run_dir for mm in self.members ] + m_dict['forcing_source_dir'] = [ mm.forcing_source_dir for mm in self.members ] + return(m_dict) + + @members_dict.setter + def members_dict(self, + att_path_key: str, + values: list): + m_dict = self.__members_dict + + m_dict[att_path_key] =[] + + att_path_key_tuple = tuple(map(str, att_path_key.split('/'))) + att_key = att_path_key_tuple[len(key_path_tuple)-1] + att_path = key_path_tuple[0:(len(key_path_tuple)-1)] + + def visit(path, key, value): + if path == att_path: + if key == 'att_key': + m_dict[att_path_key] = m_dict[att_path_key].append(value) + + for mm in self.members: + remap(mm.__dict__, visit=visit) + + + + # Would want a method for detecting differences between ensemble members + # instead of just specifying them... + + + + # def get_ens_attributes(self, attribute, the_key): + + # # Parse up the attribute + # return_list = [] + + # def visit_path_key(path, key, value): + # if key == the_key: + # return_list.append(value) #print(path, key, value) + # return key, value + # return key, value + + # def remap_path_key(ll): + # return(remap(ll, visit_path_key)) + + # att_list = [remap_path_key(getattr(i, attribute)) for i in self.members ] + # #att_list = [ i.hydro_namelist['nudging_nlist']['nlastobs'] for i in self.members ] + # return(return_list) + + +#Ens: +#Run method checks run dir name differences +#Run dir names +#Print differences across all fields, incl namelists +#Job array submission +#Operations on data. +#Bulk edit of name lists: Run start and end times, etc. +#Forcing source and run dirs (preprocess the run forcings for the run period) + diff --git a/wrfhydro/one_off_scripts/example_1.py b/wrfhydro/one_off_scripts/example_1.py new file mode 100644 index 000000000..b08b2a133 --- /dev/null +++ b/wrfhydro/one_off_scripts/example_1.py @@ -0,0 +1,121 @@ +WRF_HYDRO_NWM_PATH=/Users/${USER}/WRF_Hydro/wrf_hydro_nwm_myFork +WRF_HYDRO_PY_PATH=/Users/${USER}/WRF_Hydro/wrf_hydro_py + +docker create --name croton wrfhydro/domains:croton_NY +## The complement when youre done with it: +## docker rm -v sixmile_channel-only_test + +docker run -it \ + -v ${WRF_HYDRO_NWM_PATH}:/wrf_hydro_nwm \ + -v ${WRF_HYDRO_PY_PATH}:/home/docker/wrf_hydro_py \ + --volumes-from croton \ + wrfhydro/dev:conda + +####################################################### +cp -r /wrf_hydro_nwm /home/docker/wrf_hydro_nwm +python + +####################################################### +import sys +from pprint import pprint +sys.path.insert(0, '/home/docker/wrf_hydro_py/wrfhydro') +from wrf_hydro_model import * +from utilities import * + +# ###################################################### +# Model Section +# There are implied options here +# What is the argument? Are there more arguments? +theModel = WrfHydroModel('/home/docker/wrf_hydro_nwm/trunk/NDHMS') + +# The attributes of the model object. +# Note: menus of both compile options and namelists (run-time options) are now in the +# repository. These menus of namelists come in with the creation of the model +# object. Note that while the compile time options can be used in the compile method +# on the model object, the namelists are used only in simulation objects (where +# the model is actually run on a domain). + +# Compile options are not yet version/configed in the json namelist +pprint(theModel.compile_options) + +pprint(theModel.hrldas_namelists) + +pprint(theModel.hydro_namelists) + +pprint(theModel.source_dir) +pprint(theModel.version) + +# The only method on the model (it is independent of domain). +# Should be able to pass version/configuration to the compile. Currently not args. +# What are other arguments here? Might just show help. +theModel.compile('gfort') +# The compilation results in the following new attributes/slots +## {'__slotnames__', 'compile_dir', 'compiler', 'wrf_hydro_exe', 'table_files', 'configure_log', 'object_id', 'compile_log'} +pprint(theModel.compiler) +pprint(theModel.compile_dir) +# Resulting binary +pprint(theModel.wrf_hydro_exe) +# The parameter table files which result from compiling. +pprint(theModel.table_files) +# Logs of config and compile +print(theModel.configure_log.stdout.decode('utf-8')) +print(theModel.configure_log.stderr.decode('utf-8')) +print(theModel.compile_log.stdout.decode('utf-8')) +prtin(theModel.compile_log.stderr.decode('utf-8')) +# An object that needs some description...... +pprint(theModel.object_id) + +# ###################################################### +# Domain Section +theDomain = WrfHydroDomain(domain_top_dir='/home/docker/domain/croton_NY', + model_version='v1.2.1', + domain_config='NWM') + +# Note: The domain has no methods! +# Examine the attributes, skip the attributes set in the call. + +# Each domain has 2 kinds of files which are actually independent of +# version+configuration: Forcing and nudging files. +pprint(theDomain.forcing_dir) +pprint(theDomain.nudging_files) + +# The choice of domain+version+configuration specifices certain "patches" to the +# base namelists that were in the model object. Note that none of these are physics +# options, they are only domain-specific files, domain-specific times, and restart +# output frequencies. +# The patches are held in/with the individual domains. The patch files is +# specified here +pprint(theDomain.namelist_patch_file) +# The patches are contained here +pprint(theDomain.namelist_patches) + +# The specific hydro and lsm files found in the patches are listed in the following fields. +# These are patch fields which are files and can be opened with xarray. +pprint(theDomain.hydro_files) + +# WrfHydroStatic objects can be opened via xarray? +pprint(theDomain.lsm_files) + + +# ###################################################### +# Simulation Section +# simulation object = model object + domain object +# Note that CHANGING THE MODEL OR DOMAIN OBJECTS WILL CHANGE THE SIMULATION +# OBJECT ONLY BEFORE IT IS RUN. +theSim = WrfHydroSim(theModel, theDomain) + +pprint(theSim.hydro_namelist) +pprint(theSim.namelist_hrldas) + +# Edit an object in theDom +id1=theSim.model.object_id +# '3451646a-2cae-4b1f-9c38-bd8725e1c55f' + +# Dress up the example to show the object is copied. A small point. +theModel.compile('gfort', compile_options={'WRF_HYDRO_NUDGING':1}) +theModel.object_id +theSim.model.object_id + +## +theRun = theSim.run('/home/docker/testRun1', overwrite=True) +theRun.chanobs.open() diff --git a/wrfhydro/one_off_scripts/example_ens.py b/wrfhydro/one_off_scripts/example_ens.py new file mode 100644 index 000000000..0d34b90ca --- /dev/null +++ b/wrfhydro/one_off_scripts/example_ens.py @@ -0,0 +1,243 @@ +# WRF_HYDRO_NWM_PATH=/Users/${USER}/WRF_Hydro/wrf_hydro_nwm_myFork +# WRF_HYDRO_PY_PATH=/Users/${USER}/WRF_Hydro/wrf_hydro_py + +# docker create --name croton wrfhydro/domains:croton_NY +# ## The complement when youre done with it: +# ## docker rm -v croton + +# docker run -it \ +# -v /Users/${USER}/Downloads:/Downloads \ +# -v ${WRF_HYDRO_NWM_PATH}:/wrf_hydro_nwm \ +# -v ${WRF_HYDRO_PY_PATH}:/home/docker/wrf_hydro_py \ +# --volumes-from croton \ +# wrfhydro/dev:conda + +# ####################################################### +# cp -r /wrf_hydro_nwm /home/docker/wrf_hydro_nwm +# python + +####################################################### +import os +wrf_hydro_py_path = '/home/docker/wrf_hydro_py/wrfhydro' +USER = os.path.expanduser('~/') +wrf_hydro_py_path = USER + '/WRF_Hydro/wrf_hydro_py/wrfhydro' # osx +wrf_hydro_py_path = USER + '/wrf_hydro_py/wrfhydro' # docker + + +import sys +from pprint import pprint +sys.path.insert(0, wrf_hydro_py_path) +from wrf_hydro_model import * +from utilities import * +from ensemble import * + +# ###################################################### +# Model Section +# There are implied options here +# What is the argument? Are there more arguments? + +#domain_top_path = '/home/docker/domain/croton_NY' +#source_path = '/home/docker/wrf_hydro_nwm/trunk/NDHMS' +domain_top_path = USER + '/Downloads/domain/croton_NY' +domain_top_path = USER + '/domain/croton_NY' +source_path = USER + '/WRF_Hydro/wrf_hydro_nwm_myFork/trunk/NDHMS' +source_path = '/wrf_hydro_nwm/trunk/NDHMS/' + +theDomain = WrfHydroDomain(domain_top_dir=domain_top_path, + model_version='v1.2.1', + domain_config='NWM') + +theModel = WrfHydroModel(source_path) + +theSim = WrfHydroSim(theModel, theDomain) + + +# Regular lists or numpy ndarrays +#print('ensemble 0') +#e0=WrfHydroEnsembleSim([]) +#print('len(e0): ',len(e0)) +#e0.add_members(theSim) +#print('len(e0): ',len(e0)) + +print('ensemble 1') +e1=WrfHydroEnsembleSim([theSim]) +print('len(e1): ',len(e1)) +e1.members[0].description='the primal member' +print(e1.members_dict) +e1.replicate_member(4) +print('len(e1): ',len(e1)) + +print('e1.N: ',e1.N) +print(e1.members_dict) + +e1.members[1].description='the first member' +print(e1.members_dict) + +e1.members[1].number=400 +print(e1.members_dict) + + +m1=e1.members[1] +m2=e1.members[2] +import pathlib + + + + +m1.domain.forcing_dir = \ + pathlib.PosixPath('/Users/jamesmcc/Downloads/domain/croton_NY/FORCING_FOO') +m1.model.source_dir = pathlib.PosixPath('foo') + +import pathlib +from deepdiff import DeepDiff + +DeepDiff(m1.domain, m2.domain, eq_types={pathlib.PosixPath}) +DeepDiff(m1.model, m2.model, eq_types={pathlib.PosixPath}) + + + +# ###################################################### +# ###################################################### +# ###################################################### + + +# Can I come up with a good visitor... + +from boltons.iterutils import remap +import collections + +def build_mem_refs_dict_list(bad_self): + + def build_component_types(ii): + + def visit(path, key, value): + # r and super_path are from the calling scope + if isinstance(value, collections.Container) or type(value) in exclude_types: + return False + if super_path is None: + r[path + (key,)] = [ value ] + else: + r[(super_path,) + path + (key,)] = [ value ] + return False + + r={}; super_path = None + dum = remap(bad_self.members[ii].__dict__, visit=visit) + ref_dict = r + + r={}; super_path = 'model' + dum = remap(bad_self.members[ii].model.__dict__, visit=visit) + ref_dict = { **ref_dict, **r} + + r={}; super_path = 'domain' + dum = remap(bad_self.members[ii].domain.__dict__, visit=visit) + ref_dict = { **ref_dict, **r} + + return(ref_dict) + + exclude_types = [WrfHydroModel, WrfHydroDomain] + mems_refs_dict_list = [ build_component_types(mm) for mm, val in enumerate(bad_self.members) ] + return(mems_refs_dict_list) + +#e1.members[0].domain.forcing_dir = 'foobar' +#e1.members[1].domain.forcing_dir = 'barfoo' +mrdl = build_mem_refs_dict_list(e1) + +from deepdiff import DeepDiff +dd=DeepDiff(mrdl[1], mrdl[2]) + +#e1.members + +#print(e1.get_ens_attributes('hydro_namelist', 'nlastobs')) +#print(e1.get_ens_attributes('hydro_namelist', 'nlastobs')) + + + +# Edit the forcing + +# Edit the namelists + + + + +#theRun = theSim.run('/home/docker/testRun1', overwrite=True) + + + +# ###################################################### +# Install the jmccreight/dev PR. +import pathlib +from deepdiff import DeepDiff + +# Great... +p1='foobar' +p2='barfoo' +assert p1 != p2 +assert not p1.__eq__(p2) +assert bool(DeepDiff(p1, p2)) is True # True == not-empty + +# Huh?... +pp1=pathlib.PosixPath(p1) +pp2=pathlib.PosixPath(p2) +assert pp1 != pp2 +assert not pp1.__eq__(pp2) + +assert bool(DeepDiff(pp1, pp2)) is True # True == not-empty + +assert bool(DeepDiff(pp1, pp2, eq_types={pathlib.PosixPath})) is True # True == not-empty +DeepDiff(pp1, pp2, eq_types={pathlib.PosixPath}) + +# ###################################################### +# Subclass instead of PR - dont depend on distribution of deepdiff. +# Install the upstream/master. +import pathlib +from deepdiff import DeepDiff + +class DeepDiffEq(DeepDiff): + def __init__(self, + t1, + t2, + eq_types, + ignore_order=False, + report_repetition=False, + significant_digits=None, + exclude_paths=set(), + exclude_regex_paths=set(), + exclude_types=set(), + include_string_type_changes=False, + verbose_level=1, + view='text', + **kwargs): + self.eq_types = set(eq_types) + super().__init__(t1, + t2, + ignore_order=False, + report_repetition=False, + significant_digits=None, + exclude_paths=set(), + exclude_regex_paths=set(), + exclude_types=set(), + include_string_type_changes=False, + verbose_level=1, + view='text', + **kwargs) + def _DeepDiff__diff_obj(self, level, parents_ids=frozenset({}), + is_namedtuple=False): + """Difference of 2 objects using their __eq__ if requested""" + if type(level.t1) in self.eq_types: + if level.t1 == level.t2: + return + else: + self._DeepDiff__report_result('values_changed', level) + return + super(DeepDiffEq, self)._DeepDiff__diff_obj(level, parents_ids=frozenset({}), + is_namedtuple=False) + +p1='foobar' +p2='barfoo' +pp1=pathlib.PosixPath(p1) +pp2=pathlib.PosixPath(p2) +DeepDiffEq(pp1, pp2, eq_types={pathlib.PosixPath}) +DeepDiffEq(pp1, pp2, eq_types={DeepDiff}) + +####################################################### + diff --git a/wrfhydro/one_off_scripts/example_job_model_dates.py b/wrfhydro/one_off_scripts/example_job_model_dates.py new file mode 100644 index 000000000..9c2e5224c --- /dev/null +++ b/wrfhydro/one_off_scripts/example_job_model_dates.py @@ -0,0 +1,72 @@ +# add_job +## solve model_start_time, model_end_time, model_restart variables. +### handle nones +## copy namelist from setup object to job +## apply solved times/restart to job's namelists +### namelist.hrldas: start, khour, kday, RESTART +### hydro.namelist: HYDRO_RST +## if restart: check that the restart is available +## diff the setup namelists with the new one to ensure only certain fields are changed. + + +import datetime +import os +import re +from wrfhydropy import * + +home = os.path.expanduser("~/") +model_path = home + '/WRF_Hydro/' +the_model = WrfHydroModel( + os.path.expanduser(model_path + '/wrf_hydro_nwm_public/trunk/NDHMS'), + 'NWM' +) + +domain_path = '/Users/james/Downloads/croton_NY_domain/domain/croton_NY/' +the_domain = WrfHydroDomain( + domain_top_dir=domain_path, + model_version='v1.2.1', + domain_config='NWM' +) + +the_setup = WrfHydroSetup( + the_model, + the_domain +) + +solve_model_start_end_times = job_tools.solve_model_start_end_times + +# ################################# + +# All are 1 day and 2 hours. +def assert_start_end_soln(s,e): + assert s == datetime.datetime(2011, 8, 26, 0, 0) + assert e == datetime.datetime(2011, 9, 2, 0, 0) + + +s, e = solve_model_start_end_times(None, None, the_setup) +assert_start_end_soln(s, e) + +model_start_time = '2011-08-26 00' +model_end_time = '2011-09-02 00' +s, e = solve_model_start_end_times(model_start_time, model_end_time, the_setup) +assert_start_end_soln(s, e) + +model_start_time = '2011-08-26 00:00' +model_end_time = '2011-09-02 00' +s, e = solve_model_start_end_times(model_start_time, model_end_time, the_setup) +assert_start_end_soln(s, e) + +model_start_time = '2011-08-26 00:00' +model_end_time = datetime.timedelta(days=7) +s, e = solve_model_start_end_times(model_start_time, model_end_time, the_setup) +assert_start_end_soln(s, e) + +model_start_time = '2011-08-26 00:00' +model_end_time = {'hours': 24*7} +s, e = solve_model_start_end_times(model_start_time, model_end_time, the_setup) +assert_start_end_soln(s, e) + +model_start_time = '2011-08-26 00:00' +model_end_time = {'days': 6, 'hours': 24} +s, e = solve_model_start_end_times(model_start_time, model_end_time, the_setup) +assert_start_end_soln(s, e) diff --git a/wrfhydro/one_off_scripts/example_scheduler.py b/wrfhydro/one_off_scripts/example_scheduler.py new file mode 100644 index 000000000..04bd7646b --- /dev/null +++ b/wrfhydro/one_off_scripts/example_scheduler.py @@ -0,0 +1,125 @@ +from wrfhydropy import Job, Scheduler +import os, re +import sys +from pprint import pprint +home = os.path.expanduser("~/") +sys.path.insert(0, home + '/WRF_Hydro/wrf_hydro_tests/toolbox/') +from establish_specs import establish_spec +from establish_job import get_job_args_from_specs + +# Establish a job +# A job has a scheduler. Create a scheduler first. + +machine_spec_file = home + '/WRF_Hydro/wrf_hydro_tests/machine_spec.yaml' +candidate_spec_file = home + '/WRF_Hydro/wrf_hydro_tests/template_candidate_spec.yaml' +user_spec_file = home + '/WRF_Hydro/wrf_hydro_tests/template_user_spec.yaml' + + +job_args = get_job_args_from_specs( + job_name='test_job', + nnodes=1, + nproc=72, + mode='w', + machine_spec_file=machine_spec_file, + user_spec_file=user_spec_file, + candidate_spec_file=candidate_spec_file +) +# pprint(job_args) + +# Test: set nproc +job_args['scheduler']['nnodes']=2 +job_args['scheduler']['nproc']=None +job_args['scheduler']['ppn']=36 +#sched = Scheduler( **job_args['scheduler'] ) +job = Job( **job_args ) + +#pprint(job) +#print('-------------------------------------------------------') +#pprint(job.__dict__) +#print('-------------------------------------------------------') +#pprint(job.scheduler.__dict__) + +print(job.nproc) + +assert job.scheduler.nnodes == 2 +assert job.scheduler.nproc == 2*36 +assert job.scheduler.ppn == 36 +assert job.scheduler.nproc_last_node == 0 +assert re.findall('select=2:ncpus=36:mpiprocs=36', job.scheduler.string()) + + +sys.exit() + + +# Test: set nnodes +job_args['nnodes']=None +job_args['nproc']=72 +job_args['ppn']=36 +job = Jobuler( **job_args ) +assert job.nnodes == 2 +assert job.nproc == 72 +assert job.ppn == 36 +assert job.nproc_last_node == 0 +assert re.findall('select=2:ncpus=36:mpiprocs=36',job.string()) + +# Test: set nnodes with remainder on last node +job_args['nnodes']=None +job_args['nproc']=71 +job_args['ppn']=36 +job = Jobuler( **job_args ) +assert job.nnodes == 2 +assert job.nproc == 71 +assert job.ppn == 36 +assert job.nproc_last_node == 35 +assert re.findall('select=1:ncpus=36:mpiprocs=36\+1:ncpus=35:mpiprocs=35',job.string()) + +# Test: set nnodes evenly distributed ppn < ppn_max +job_args['nnodes']=None +job_args['nproc']=48 +job_args['ppn']=24 +job = Jobuler( **job_args ) +assert job.nnodes == 2 +assert job.nproc == 48 +assert job.ppn == 24 +assert job.nproc_last_node == 0 +assert re.findall('select=2:ncpus=24:mpiprocs=24',job.string()) + +# Test: set nnodes distributed ppn < ppn_max and remainder on last node +job_args['nnodes']=None +job_args['nproc']=47 +job_args['ppn']=24 +job = Jobuler( **job_args ) +assert job.nnodes == 2 +assert job.nproc == 47 +assert job.ppn == 24 +assert job.nproc_last_node == 23 +assert re.findall('select=1:ncpus=24:mpiprocs=24\+1:ncpus=23:mpiprocs=23',job.string()) + +# Test: set ppn +job_args['nnodes']=2 +job_args['nproc']=72 +job_args['ppn']=None +job = Jobuler( **job_args ) +assert job.nnodes == 2 +assert job.nproc == 72 +assert job.ppn == 36 +assert job.nproc_last_node == 0 +assert re.findall('select=2:ncpus=36:mpiprocs=36',job.string()) + +# Test: set ppn with remainder on last node. +job_args['nnodes']=2 +job_args['nproc']=71 +job_args['ppn']=None +job = Jobuler( **job_args ) +assert job.nnodes == 2 +assert job.nproc == 71 +assert job.ppn == 36 +assert job.nproc_last_node == 35 +assert re.findall('select=1:ncpus=36:mpiprocs=36\+1:ncpus=35:mpiprocs=35',job.string()) + +job.script() +print(job.string()) + +job.stdout_exe +job.submit() +job.stdout_exe diff --git a/wrfhydro/one_off_scripts/example_scheduler_submission.py b/wrfhydro/one_off_scripts/example_scheduler_submission.py new file mode 100644 index 000000000..dbdef3a6f --- /dev/null +++ b/wrfhydro/one_off_scripts/example_scheduler_submission.py @@ -0,0 +1,263 @@ +# docker pull wrfhydro/domains:croton_NY +# docker pull wrfhydro/dev:conda + +# docker create --name croton wrfhydro/domains:croton_NY +# # When done with the container: docker rm -v croton + +# export WRF_HYDRO_DIR=~/WRF_Hydro +# if [ $HOSTNAME == *"chimayo"* ]; then +# export PUBLIC_DIR=/Volumes/d1/chimayoSpace/git_repos/wrf_hydro_nwm_public:/home/docker/wrf_hydro_nwm_public +# else +# export PUBLIC_DIR=${WRF_HYDRO_DIR}/wrf_hydro_nwm_public:/wrf_hydro_nwm_public +# fi + +# docker run -it \ +# -e WRF_HYDRO_TESTS_USER_SPEC='/home/docker/WRF_Hydro/wrf_hydro_tests/template_user_spec.yaml' \ +# -v ${WRF_HYDRO_DIR}:/home/docker/WRF_Hydro \ +# -v ${PUBLIC_DIR} \ +# --volumes-from croton \ +# wrfhydro/dev:conda + +# # Inside docker +# if [ ! -e /home/docker/wrf_hydro_nwm_public ]; then +# cp -r /wrf_hydro_nwm_public /home/docker/wrf_hydro_nwm_public +# fi + +# cd ~/WRF_Hydro/wrf_hydro_py/ +# pip uninstall -y wrfhydropy +# python setup.py develop +# pip install boltons termcolor +# python + +from wrfhydropy import * + +import copy +import datetime +import os +from pprint import pprint +import re +from socket import gethostname +import sys + +home = os.path.expanduser("~/") +sys.path.insert(0, home + '/WRF_Hydro/wrf_hydro_tests/toolbox/') +from establish_specs import establish_spec +from establish_job import get_job_args_from_specs + +machine = job_tools.get_machine() +if machine == 'cheyenne': + model_path = '/glade/u/home/jamesmcc/WRF_Hydro/' + domain_path = '/glade/p/work/jamesmcc/DOMAINS/croton_NY' + run_dir = "/glade/scratch/jamesmcc/test_dir" +else: + model_path = '/home/docker' + domain_path = '/home/docker/domain/croton_NY' + run_dir = "/home/docker/test_dir" + + +# Establish the setup +the_model = WrfHydroModel( + os.path.expanduser(model_path + '/wrf_hydro_nwm_public/trunk/NDHMS'), + 'NWM' +) +# modules are an attribute of the model?!?! +the_model.compile("gfort") + +the_domain = WrfHydroDomain( + domain_top_dir=domain_path, + model_version='v1.2.1', + domain_config='NWM' +) + +the_setup = WrfHydroSetup( + the_model, + the_domain +) + +# ####################################################### +# Use these to build Jobs +machine_spec_file = home +'/WRF_Hydro/wrf_hydro_tests/machine_spec.yaml' +candidate_spec_file = home + '/WRF_Hydro/wrf_hydro_tests/template_candidate_spec.yaml' +user_spec_file = home + '/WRF_Hydro/wrf_hydro_tests/template_user_spec.yaml' + + +# ###################################################### +# ###################################################### +# ###################################################### + + +# ####################################################### +# CHEYENNE +# Add two scheduled jobs on cheyenne + +run_sched_dir = "/glade/scratch/jamesmcc/test_sched" +run_sched = WrfHydroRun( + the_setup, + run_sched_dir, + rm_existing_run_dir=True +) + +job_args = get_job_args_from_specs( + job_name='test_job', + nnodes=1, + nproc=2, + mode='w', + machine_spec_file=machine_spec_file, + user_spec_file=user_spec_file, + candidate_spec_file=candidate_spec_file +) +job_template = Job( **job_args ) +job_template.scheduler.walltime = '00:04:00' +job_template.scheduler.ppn = 1 + +time_0 = datetime.datetime(2011, 8, 26, 0, 0) +time_1 = time_0 + datetime.timedelta(days=2) +time_2 = time_1 + datetime.timedelta(days=5) + +job_template.model_start_time = time_0 +job_template.model_end_time = time_1 +job_0 = copy.deepcopy(job_template) + +job_template.model_start_time = time_1 +job_template.model_end_time = time_2 +job_1 = copy.deepcopy(job_template) + +run_sched.add_jobs([job_0, job_1]) + +run_sched.run_jobs() + +del run_sched + +import pickle +with open(run_sched_dir + '/WrfHydroRun.pkl', 'rb') as f: + r = pickle.load(f) +assert len(r.chanobs) == 168 + +sys.exit() + + +# ####################################################### +# CHEYENNE: interactive run +run_interactive_dir = "/glade/scratch/jamesmcc/test_dir" +run_interactive = WrfHydroRun( + the_setup, + run_interactive_dir, + rm_existing_run_dir = True +) + +job_args = get_job_args_from_specs( + job_name='test_job', + nnodes=1, + nproc=2, + mode='w', + scheduler_name = None, # Choice disables PBS + machine_spec_file=machine_spec_file, + user_spec_file=user_spec_file, + candidate_spec_file=candidate_spec_file +) +job_interactive_template = Job( **job_args ) + +time_0 = datetime.datetime(2011, 8, 26, 0, 0) +time_1 = time_0 + datetime.timedelta(days=2) +time_2 = time_1 + datetime.timedelta(days=5) + +job_interactive_template.model_start_time = time_0 +job_interactive_template.model_end_time = time_1 +job_0 = copy.deepcopy(job_interactive_template) + +job_interactive_template.model_start_time = time_1 +job_interactive_template.model_end_time = time_2 +job_1 = copy.deepcopy(job_interactive_template) + +run_interactive.add_jobs([job_0, job_1]) +run_interactive.run_jobs() + +## Verify that the run occurred. +assert len(run_interactive.chanobs) == 168 + + +# ###################################################### +# DOCKER +# a default docker Job +# Try a default job + +# run_interactive_2 = WrfHydroRun( +# the_setup, +# run_dir, +# rm_existing_run_dir = True, +# job=Job(nproc=2) +# ) + + +# run_interactive_2.run_jobs() + +# assert len(run_interactive_2.chanobs) == 168 + +# ###### + +run_interactive_3 = WrfHydroRun( + the_setup, + run_dir, + rm_existing_run_dir = True, +) + + +time_0 = datetime.datetime(2011, 8, 26, 0, 0) +time_1 = time_0 + datetime.timedelta(days=2) +time_2 = time_1 + datetime.timedelta(days=5) + +j0 = Job( + nproc=2, + model_start_time=time_0, + model_end_time=time_1 +) + +j1 = Job( + nproc=2, + model_start_time=time_1, + model_end_time=time_2 +) + +run_interactive_3.add_job([j0,j1]) +run_interactive_3.run_jobs() + +assert len(run_interactive_3.chanobs) == 7*24 + +sys.exit() + + + + +# ####################################################### +# Check setting of job.nproc vs that of job.scheduler.nproc + + + +# ################################# +# A bit less defaultish +run_interactive = WrfHydroRun( + the_setup, + run_dir, + rm_existing_run_dir = True +) + +job_args = get_job_args_from_specs( + job_name='test_job', + nproc=2, + mode='w', + machine_spec_file=machine_spec_file, + user_spec_file=user_spec_file, + candidate_spec_file=candidate_spec_file +) +job_interactive = Job( **job_args ) +run_interactive.add_job(job_interactive) + +assert len(run_interactive.chanobs) == 24 # croton_lite + +sys.exit() + + + + + + diff --git a/wrfhydro/one_off_scripts/example_test_fundamental.py b/wrfhydro/one_off_scripts/example_test_fundamental.py new file mode 100644 index 000000000..befbd45a3 --- /dev/null +++ b/wrfhydro/one_off_scripts/example_test_fundamental.py @@ -0,0 +1,68 @@ +####################################################### +# Before Docker +WRF_HYDRO_NWM_PATH=/Users/${USER}/WRF_Hydro/wrf_hydro_nwm_myFork +WRF_HYDRO_PY_PATH=/Users/${USER}/WRF_Hydro/wrf_hydro_py + +docker create --name croton wrfhydro/domains:croton_NY +## The complement when youre done with it: +## docker rm -v sixmile_channel-only_test + +docker run -it \ + -v ${WRF_HYDRO_NWM_PATH}:/wrf_hydro_nwm \ + -v ${WRF_HYDRO_PY_PATH}:/home/docker/wrf_hydro_py \ + --volumes-from croton \ + wrfhydro/dev:conda + +####################################################### +# Inside docker (before python) +cp -r /wrf_hydro_nwm /home/docker/wrf_hydro_nwm +python + + +####################################################### +# Python inside docker + +import sys +sys.path.insert(0, '/home/docker/wrf_hydro_py/wrfhydro') + +from wrf_hydro_model import * +from test_cases import * +from utilities import * +from pprint import pprint + +# Setup a domain +domain = WrfHydroDomain(domain_top_dir='/home/docker/domain/croton_NY', + domain_config='NWM', + model_version='v1.2.1') +# Setup a candidate model +candidate_model = WrfHydroModel('/home/docker/wrf_hydro_nwm/trunk/NDHMS') + +# Setup a reference model +reference_model = WrfHydroModel('/home/docker/wrf_hydro_nwm/trunk/NDHMS') + +# Setup a candidate simulation +candidate_sim = WrfHydroSim(candidate_model,domain) + +# Setup a reference simulation +reference_sim = WrfHydroSim(reference_model,domain) + +# Setup the test +testCase = FundamentalTest(candidate_sim,reference_sim,'/home/docker/test',overwrite=True) + +# Run the individual questions instead of just invoking run_test() + +testCase.test_compile_candidate('gfort', overwrite=True, compile_options={'WRF_HYDRO_NUDGING': 1}) + +testCase.test_run_candidate() + +testCase.test_ncores_candidate() + +testCase.test_perfrestart_candidate() +testCase.test_compile_reference('gfort',overwrite=True,compile_options={'WRF_HYDRO_NUDGING': 1}) + +testCase.test_run_reference() +testCase.test_regression() + +print(testCase.results) + +exit(testCase.exit_code) diff --git a/wrfhydro/one_off_scripts/example_wrf_hydro_tests.py b/wrfhydro/one_off_scripts/example_wrf_hydro_tests.py new file mode 100644 index 000000000..1d9c196a5 --- /dev/null +++ b/wrfhydro/one_off_scripts/example_wrf_hydro_tests.py @@ -0,0 +1,29 @@ +####################################################### +# Before Docker +WRF_HYDRO_NWM_PATH=/Users/${USER}/WRF_Hydro/wrf_hydro_nwm_myFork +WRF_HYDRO_PY_PATH=/Users/${USER}/WRF_Hydro/wrf_hydro_py +WRF_HYDRO_TESTS_PATH=/Users/${USER}/WRF_Hydro/wrf_hydro_tests + +docker create --name croton wrfhydro/domains:croton_NY +## The complement when youre done with it: +## docker rm -v sixmile_channel-only_test + +docker run -it \ + -e GITHUB_AUTHTOKEN=$GITHUB_AUTHTOKEN \ + -e GITHUB_USERNAME=$GITHUB_USERNAME \ + -e WRF_HYDRO_TESTS_USER_SPEC=/home/docker/wrf_hydro_tests/template_user_spec.yaml \ + -v ${WRF_HYDRO_NWM_PATH}:/wrf_hydro_nwm \ + -v ${WRF_HYDRO_PY_PATH}:/home/docker/wrf_hydro_py \ + -v ${WRF_HYDRO_TESTS_PATH}:/home/docker/wrf_hydro_tests \ + --volumes-from croton \ + wrfhydro/dev:conda + +####################################################### +# Inside docker (before python) +cd ~/wrf_hydro_tests/ +pip install boltons + +## run and check the logs +python3 take_test.py ; echo; echo ---------------------; echo; cat example.log ; rm example.log + + diff --git a/wrfhydropy/__init__.py b/wrfhydropy/__init__.py index 1634d5272..232773fec 100644 --- a/wrfhydropy/__init__.py +++ b/wrfhydropy/__init__.py @@ -1,4 +1,8 @@ from .core import utilities from .core.wrfhydroclasses import WrfHydroTs, WrfHydroStatic, WrfHydroModel, WrfHydroDomain, \ - WrfHydroSim, WrfHydroRun, DomainDirectory, RestartDiffs \ No newline at end of file + WrfHydroSetup, WrfHydroRun, DomainDirectory, RestartDiffs + +from .core.job import Scheduler, Job + +from .core import job_tools diff --git a/wrfhydropy/core/default_job_specs.yaml b/wrfhydropy/core/default_job_specs.yaml new file mode 100644 index 000000000..98e1230ae --- /dev/null +++ b/wrfhydropy/core/default_job_specs.yaml @@ -0,0 +1,33 @@ +# wrf_hydro_tests: machine configuration file. +# Purpose: Log all the static information for each machine in this file. This file +# is sourced after the candidate specification file and my rely on +# variables defined therein. +cheyenne: + + modules: + base : nco/4.6.2 python/3.6.2 + ifort: intel/16.0.3 ncarenv/1.2 ncarcompilers/0.4.1 mpt/2.15f netcdf/4.4.1 + gfort: gnu/7.1.0 ncarenv/1.2 ncarcompilers/0.4.1 mpt/2.15 netcdf/4.4.1.1 + +# This should Not be necessary. +# netcdf: +# intel: /glade/u/apps/ch/opt/netcdf/4.4.1/intel/16.0.1 +# GNU : /glade/u/apps/ch/opt/netcdf/4.4.1.1/gnu/7.1.0 + + scheduler: + name: "PBS" + max_walltime: 12:00 + + cores_per_node: 36 + + # The executable invocation on the machine + exe_cmd: + PBS : 'mpiexec_mpt ./wrf_hydro.exe' + default: 'mpirun -np {nproc} ./wrf_hydro.exe' + +docker: + modules: + scheduler: + cores_per_node: + exe_cmd: + default: 'mpirun -ppn {nproc} ./wrf_hydro.exe' diff --git a/wrfhydropy/core/job.py b/wrfhydropy/core/job.py new file mode 100644 index 000000000..ccc9138d6 --- /dev/null +++ b/wrfhydropy/core/job.py @@ -0,0 +1,623 @@ +import datetime +import f90nml +import math +import os +import pathlib +import shlex +import socket +import subprocess +import warnings + +from .job_tools import \ + touch, submit_scheduler, PBSError, \ + get_sched_name, get_machine, get_user, seconds, \ + core_dir, default_job_spec, \ + compose_scheduled_python_script, \ + compose_scheduled_bash_script,\ + check_file_exist_colon, \ + check_job_input_files + +from .job_tools import release as jt_release + + +class Scheduler(object): + """A PBS/torque or slurm scheduler Job object. + + Initialize either with all the parameters, or with 'qsubstr' a PBS submit script as a string. + If 'qsubstr' is given, all other arguments are ignored and set using Job.read(). + + Variables + On cheyenne, PBS attributes are described in `man qsub` and `man pbs_resources`. + See also: https://www2.cisl.ucar.edu/resources/computational-systems/cheyenne/running-jobs/submitting-jobs-pbs + A dictionary can be constructed in advance from specification files by the function + get_sched_args_from_specs(). + + Var Name default example Notes + PBS usage on Chyenne + ------------------------------------------------------------------------------------------- + name "my_job" + -N + account "NRAL0017" + -A + email_when "a","abe" "a"bort, "b"efore, "e"nd + -m + email_who "${USER}@ucar.edu" "johndoe@ucar.edu" + -M + queue "regular" "regular" + -q + walltime "12:00" "10:00:00" Seconds coerced. Appropriate run times are best. + -l walltime= + afterok "12345:6789" Begin after successful completion of job1:job2:etc. + -W depends=afterok: + + array_size -J None 16 integer + grab_env -V None True logical + + Sepcify: nproc, nnodes, nproc + nnodes, nproc + ppn, nnodes + ppn + nproc 500 Number of procs + nodes 4 Number of nodes + ppn Default: 24 Number of procs/node + machine_spec_file.cores_per_node + + modules + + -*-*-*-*-*-*-*- FOLLOWING NOT TESTED ON CHEYENNE -*-*-*-*-*-*-*- + + pmem -l pmem= "2GB" Default is no restriction. + exetime -a "1100" Not tested + """ + + def __init__( + self, + job_name: str, + account: str, + nproc: int=None, + nnodes: int=None, + ppn: int=None, + email_when: str="a", + email_who: str="${USER}@ucar.edu", + queue: str='regular', + walltime: str="12:00", + wait_for_complete: bool=True, + monitor_freq_s: int=None, + afterok: str=None, + array_size: int=None, + pmem: str=None, + grab_env: str=False, + exetime: str=None, + job_date_id: str=None + ): + + # Declare attributes. + # Required + self.job_name = job_name + self.account = account + + # Defaults in arglist + self.email_when = email_when + self.queue = queue + self.afterok = afterok + self.array_size = array_size + self.grab_env = grab_env + # TODO JLM: Should probably stash the grabbed argument in this case. + # TODO JLM: is there a better variable name than grab_env? + + # Automagically set from environment + self.sched_name = get_sched_name() + # TODO(JLM): remove this testing comment/hack below when not testing it. + #self.sched_version = int(re.split("[\+\ \.]", get_version())[2]) + + # Extra Coercion + self.email_who = os.path.expandvars(email_who) + self.walltime = ':'.join((walltime+':00').split(':')[0:3]) + + # Construction + self._nproc = nproc + self._nnodes = nnodes + self._ppn = ppn + + self._job_date_id = job_date_id + + self.wait_for_complete = wait_for_complete + self.monitor_freq_s = monitor_freq_s + + # Set attributes. + + # Try to get a default scheduler? + + # Check for required inputs + # TODO(JLM): Deal with setting ppn from machine_spec_file. + self.solve_nodes_cores() + + # Extra Coercion + self.email_who = os.path.expandvars(email_who) + self.walltime = ':'.join((walltime+':00').split(':')[0:3]) + + self.nproc_last_node = (self.nproc - (self.nnodes * self.ppn)) % self.ppn + if self.nproc_last_node > 0: + if self.nproc_last_node >= self.ppn: + raise ValueError('nproc - (nnodes * ppn) = {0} >= ppn'.format(self.nproc_last_node)) + + # Currently unsupported. + self.pmem = pmem + self.exetime = exetime + + # TODO(JLM): the term job here is a bit at odds with where I'm putting the attributes + # sched_id (this requires some refactoring with job_tools)? job_script seems ok, however. + # sched_job_id is set at submission + self.sched_job_id = None + + self.job_script = None + + # PBS has a silly stream buffer that 1) has a limit, 2) cant be seen until the job ends. + # Separate and standardize the stdout/stderr of the exe_cmd and the scheduler. + + # The path to the model stdout&stderr + self._stdout_exe = "{run_dir}/{job_date_id}.{sched_job_id}.stdout" + self._stderr_exe = "{run_dir}/{job_date_id}.{sched_job_id}.stderr" + + # Tracejob file which holds performance information + self._tracejob_file = "{run_dir}/{job_date_id}.{sched_job_id}." + self.sched_name + ".tracejob" + + # Dot files for the pbs stdout&stderr files, both temp and final. + # The initial path to the PBS stdout&stderr, during the job + self._stdout_pbs_tmp = "{run_dir}/.{job_date_id}." + self.sched_name + ".stdout" + self._stderr_pbs_tmp = "{run_dir}/.{job_date_id}." + self.sched_name + ".stderr" + # The eventual path to the " + self.sched_name + " stdout&stderr, after the job + self._stdout_pbs = "{run_dir}/.{job_date_id}.{sched_job_id}." + self.sched_name + ".stdout" + self._stderr_pbs = "{run_dir}/.{job_date_id}.{sched_job_id}." + self.sched_name + ".stderr" + + + # A three state variable. If "None" then script() can be called. + # bool(None) is False so + # None = submitted = True while not_submitted = False + self.not_submitted = True + + # A status that depends on job being submitted and the .job_not_complete file + # not existing being missing. + self._sched_job_complete = False + + + def solve_nodes_cores(self): + if None not in [self._nproc, self._nnodes, self._ppn]: + warnings.warn("Not currently checking consistency of nproc, nnodes, ppn.") + return + + if not self._nproc and self._nnodes and self._ppn: + self._nproc = self._nnodes * self._ppn + if not self._nnodes and self._nproc and self._ppn: + self._nnodes = math.ceil(self._nproc / self._ppn) + if not self._ppn and self._nnodes and self._nproc: + self._ppn = math.ceil(self._nproc / self._nnodes) + + if None in [self._nproc, self._nnodes, self._ppn]: + raise ValueError("Not enough information to solve all of nproc, nnodes, ppn.") + + @property + def nproc(self): + self.solve_nodes_cores() + return self._nproc + @nproc.setter + def nproc(self, value): + self._nproc = value + + @property + def nnodes(self): + self.solve_nodes_cores() + return self._nnodes + @nnodes.setter + def nnodes(self, value): + self._nnodes = value + + @property + def ppn(self): + self.solve_nodes_cores() + return self._ppn + @ppn.setter + def ppn(self, value): + self._ppn = value + + +class Job(object): + def __init__( + self, + nproc: int, + exe_cmd: str=None, + modules: str=None, + scheduler: Scheduler = None, + model_start_time: str=None, + model_end_time: str=None, + model_restart: bool=True, + ): + + self.exe_cmd = exe_cmd + """str: The command to be executed. Python {}.format() evaluation available but + limited. Taken from the machine_spec.yaml file if not specified.""" + self._nproc = nproc + """int: Optional, the number of processors to use. If also supplied in the scheduler + then there will be ab error.""" + self.machine = get_machine() + """str: The name of the machine being used.""" + self.modules = modules + """str: The modules to be loaded prior to execution. Taken from machine_spec.yaml + if not present.""" + self.scheduler = scheduler + """Scheduler: Optional, scheduler object for the job.""" + + self.model_start_time = model_start_time + """str?: The model time at the start of the execution.""" + self.model_end_time = model_end_time + """str?: The model time at the end of the execution.""" + self.model_restart = model_restart + """bool: Look for restart files at modelstart_time?""" + + # These are only outputs/atts of the object. + self.namelist_hrldas = None + """dict: the HRLDAS namelist used for this job.""" + self.hydro_namelist = None + """dict: the hydro namelist used for this job.""" + + self.namelist_hrldas_file = None + """dict: the file containing the HRLDAS namelist used for this job.""" + self.hydro_namelist_file = None + """dict: the file containing the hydro namelist used for this job.""" + + + self.job_status = "created" + """str: The status of the job object: created/submitted/running/complete.""" + + # ################################# + # Attributes solved from the environment at run time or later (not now). + self.user = None + """str: Determine who the user is.""" + + # TODO(JLM): this is admittedly a bit dodgy because sensitive info + # might be in the environment (github authtoken?) + # Are there parts of the env we must have? + # self.environment = None + + self.job_date_id = None + """str: The job date identifier at 'submission' time.""" + self.job_start_time = None + """str?: The time at the start of the execution.""" + self.job_end_time = None + """str?: The time at the end of the execution.""" + self.job_submission_time = None + """str?: The time the job object was created.""" + + self.exit_status = None + """int: The exit value of the model execution.""" + + """pathlib.PosixPath: The tracejob/performance/profiling file.""" + + """pathlib.PosixPath: The standard out file.""" + + """pathlib.PosixPath: The standard error file.""" + + self.diag_files = None + """pathlib.PosixPath: The diag files for the job.""" + + # ################################# + # Setting better defaults. + + # If there is no scheduler on the machine. Do not allow a scheduler object. + if get_sched_name() is None: + self.scheduler = None + else: + # Allow coercion from a dict to a scheduler object. + if type(self.scheduler) is dict: + self.scheduler = Scheduler(**self.scheduler) + + # ################################### + # Deal with the potential conflict between the job ncores and the scheduler ncores. + if self._nproc and self.scheduler: + if self.scheduler.nproc: + if self.scheduler.nproc != nproc: + error_msg = "The number of cores passed to the job does not match the " + error_msg += "number of cores specified in the job's scheduler." + raise ValueError(error_msg) + else: + self.scheduler.nproc = self.nproc + + # TODO(JLM): Maybe this should be done first and overwritten? + # TODO(JLM): For missing job/scheduler properties, attempt to get defaults. + # These are in a file in the package dir. Where? + # A method (used by django) about specifying the root dir of the project. + # https://stackoverflow.com/questions/25389095/python-get-path-of-root-project-structure + #self.build_default_job(self) + default_job = default_job_spec() + for ii in default_job.keys(): + if ii in self.__dict__.keys(): + if self.__dict__[ii] is None and default_job[ii] is not None: + warnings.warn('Using docker default for missing Job argument ' + ii) + self.__dict__[ii] = default_job[ii] + + @property + def nproc(self): + if self.scheduler: + self._nproc = self.scheduler.nproc + return self.scheduler.nproc + return self._nproc + @nproc.setter + def nproc(self, value): + self._nproc = value + if self.scheduler: + self.scheduler.nproc = value + return self.scheduler.nproc + return self._nproc + + + def schedule( + self, + run_dir, + hold: bool=False + ) -> object: + """Scheulde a run of the wrf_hydro simulation + Args: + self: A Self object + """ + + # Deal with the shared queue here, since it affects both scripts. + if get_machine() == 'cheyenne' and \ + self.scheduler.nnodes-1 == 0 and \ + self.scheduler.nproc_last_node <= 18: + self.scheduler.queue = 'share' + warnings.warn("Less than 18 procesors requested, using the 'share' queue.") + find='mpiexec_mpt' + repl='mpirun {hostname} -np ' + str(self.nproc) + self.exe_cmd = self.exe_cmd.replace(find, repl) + + # Write python to be executed by the bash script given to the scheduler. + # Execute the model from python script and the python script from the bash script: + # swap their execution commands. + + model_exe_cmd = self.exe_cmd + py_script_name = str(run_dir / (self.job_date_id + ".wrfhydropy.py")) + py_run_cmd = "python " + py_script_name + \ + " --sched_job_id $sched_job_id --job_date_id $job_date_id" + + # This needs to happen before composing the scripts. + self.scheduler.not_submitted = False + + # The python script + selfstr = compose_scheduled_python_script(py_run_cmd, model_exe_cmd) + with open(py_script_name, "w") as myfile: + myfile.write(selfstr) + + # The bash submission script which calls the python script. + self.exe_cmd = py_run_cmd + filename = run_dir / (self.job_date_id + '.' + self.scheduler.sched_name + '.job') + jobstr = compose_scheduled_bash_script(run_dir=run_dir, job=self) + with open(filename, "w") as myfile: + myfile.write(jobstr) + + try: + + self.scheduler.sched_job_id = submit_scheduler(substr=bytearray(jobstr, 'utf-8'), + sched_name=self.scheduler.sched_name, + hold=hold) + + except PBSError as e: + self.scheduler.not_submitted = True + raise e + + # TODO(JLM): should make this a helper method + touch(str(run_dir) + '/.job_not_complete') + + + def release(self): + return(jt_release(self.scheduler)) + + + def run(self, run_dir): + + # TODO(JLM): does the job['mode'] need checked at this point? + + # Create the namelists for the job and link to the generic namelists names. + print('\nRunning job ' + self.job_date_id + ': ') + print(' Wall start time: ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + print(' Model start time: ' + self.model_start_time.strftime('%Y-%m-%d %H:%M')) + print(' Model end time: ' + self.model_end_time.strftime('%Y-%m-%d %H:%M')) + + # Check for restart files both as specified and in the run dir.. + # Alias the mutables for some brevity + hydro_nlst = self.hydro_namelist['hydro_nlist'] + hydro_nlst['restart_file'] = check_file_exist_colon(run_dir, hydro_nlst['restart_file']) + nudging_nlst = self.hydro_namelist['nudging_nlist'] + nudging_nlst['nudginglastobsfile'] = \ + check_file_exist_colon(run_dir, nudging_nlst['nudginglastobsfile']) + + check_job_input_files(self, run_dir) + + self.write_namelists(run_dir) + + if self.scheduler: + + # This is for when the submitted script calls the run method. + # Note that this exe_cmd is for a python script which executes and optionally + # waits for the model (it's NOT direct execution of the model). + exe_cmd = self.exe_cmd.format(**{'hostname': socket.gethostname()}) + " 2> {0} 1> {1}" + exe_cmd = exe_cmd.format(self.stderr_exe(run_dir), self.stdout_exe(run_dir)) + exe_cmd = shlex.split(exe_cmd) + subprocess.run(exe_cmd, cwd=run_dir) + + else: + + # These dont have the sched_job_id that the scheduled job output files have. + self.stderr_file = run_dir / ("{0}.stderr".format(self.job_date_id)) + self.stdout_file = run_dir / ("{0}.stdout".format(self.job_date_id)) + + # source the modules before execution. + exe_cmd = '/bin/bash -c "' + if self.modules: + exe_cmd += "module purge && module load " + self.modules + " && " + exe_cmd += self.exe_cmd.format(**{'nproc': self.nproc}) + exe_cmd += " 2> {0} 1> {1}".format(self.stderr_file, self.stdout_file) + exe_cmd += '"' + self.exe_cmd = exe_cmd + + self.job_status='running' + self.job_start_time = str(datetime.datetime.now()) + self.run_log = subprocess.run( + shlex.split(exe_cmd), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=run_dir + ) + self.job_end_time = str(datetime.datetime.now()) + self.job_status='completed' + + # TODO(JLM): The following be made a method which checks the run. + # The following should not be run if the scheduler is not waiting. + # Put this in collect_run? + try: + + # Get diag files + # TODO(JLM): diag_files should be scrapped orrenamed to no conflict between jobs. + run_dir_posix = pathlib.PosixPath(run_dir) + self.diag_files = list(run_dir_posix.glob('diag_hydro.*')) + self.stdout_file = list(run_dir_posix.glob(self.job_date_id+'.*stdout'))[0] + self.stderr_file = list(run_dir_posix.glob(self.job_date_id+'.*stderr'))[0] + self.tracejob_file = list(run_dir_posix.glob(self.job_date_id+'.*tracejob')) + if len(self.tracejob_file): + self.tracejob_file = self.tracejob_file[0] + else: + self.tracejob_file = None + + self.exit_status = 1 + self.job_status='completed failure' + # String match diag files for successfull run + with open(run_dir.joinpath('diag_hydro.00000')) as f: + diag_file = f.read() + if 'The model finished successfully.......' in diag_file: + self.exit_status = 0 + self.job_status='completed success' + except Exception as e: + warnings.warn('Could not parse diag files') + print(e) + + + def write_namelists(self, run_dir): + + # TODO(JLM): make the setup namelists @properties without setter (protect them) + # write hydro.namelist for the job + self.hydro_namelist_file = run_dir.joinpath(self.job_date_id + '.hydro.namelist') + f90nml.write(self.hydro_namelist, self.hydro_namelist_file) + nlst_file = run_dir.joinpath('hydro.namelist') + if nlst_file.exists(): + nlst_file.unlink() + nlst_file.symlink_to(self.hydro_namelist_file) + + # write namelist.hrldas + self.namelist_hrldas_file = run_dir.joinpath(self.job_date_id + '.namelist.hrldas') + f90nml.write(self.namelist_hrldas, self.namelist_hrldas_file) + nlst_file = run_dir.joinpath('namelist.hrldas') + if nlst_file.exists(): + nlst_file.unlink() + nlst_file.symlink_to(self.namelist_hrldas_file) + + + ####################################################### + # These can probably be made properties that may set on job or job.scheduler. + def stdout_exe(self, run_dir): + if self.scheduler: + return self.eval_std_file_vars(self.scheduler._stdout_exe, run_dir) + else: + None + def stderr_exe(self, run_dir): + if self.scheduler: + return self.eval_std_file_vars(self.scheduler._stderr_exe, run_dir) + else: + None + def tracejob_file(self, run_dir): + if self.scheduler: + return self.eval_std_file_vars(self.scheduler._tracejob_file, run_dir) + else: + None + def stdout_pbs(self, run_dir): + if self.scheduler: + return self.eval_std_file_vars(self.scheduler._stdout_pbs, run_dir) + else: + None + def stderr_pbs(self, run_dir): + if self.scheduler: + return self.eval_std_file_vars(self.scheduler._stderr_pbs, run_dir) + else: + None + def stdout_pbs_tmp(self, run_dir): + if self.scheduler: + return self.eval_std_file_vars(self.scheduler._stdout_pbs_tmp, run_dir) + else: + None + def stderr_pbs_tmp(self, run_dir): + if self.scheduler: + return self.eval_std_file_vars(self.scheduler._stderr_pbs_tmp, run_dir) + else: + None + + def eval_std_file_vars(self, the_str, run_dir): + if self.scheduler.not_submitted: + return(the_str) + dict = {'run_dir': run_dir, + 'job_date_id': self.job_date_id, + 'sched_job_id': self.scheduler.sched_job_id} + if dict['sched_job_id'] is None: dict['sched_job_id'] = '${sched_job_id}' + return(the_str.format(**dict)) + + + def apply_model_start_end_job_namelists( + self + ): + + # Refs + noah_nlst = self.namelist_hrldas['noahlsm_offline'] + hydro_nlst = self.hydro_namelist['hydro_nlist'] + + # Duration + noah_nlst['kday'] = None + noah_nlst['khour'] = None + duration = self.model_end_time - self.model_start_time + if duration.seconds == 0: + noah_nlst['kday'] = int(duration.days) + else: + noah_nlst['khour'] = int(duration.days*60 + duration.seconds/3600) + + # Start + noah_nlst['start_year'] = int(self.model_start_time.year) + noah_nlst['start_month'] = int(self.model_start_time.month) + noah_nlst['start_day'] = int(self.model_start_time.day) + noah_nlst['start_hour'] = int(self.model_start_time.hour) + noah_nlst['start_min'] = int(self.model_start_time.minute) + + # Restart + if self.model_restart: + restart_time = self.model_start_time + else: + # Though it will be commented, make it obvious. + restart_time = datetime.datetime(9999, 9, 9, 9, 9) + + lsm_restart_dirname = '.' #os.path.dirname(noah_nlst['restart_filename_requested']) + hydro_restart_dirname = '.' #os.path.dirname(hydro_nlst['restart_file']) + + #2011082600 - no minutes + lsm_restart_basename = 'RESTART.' + \ + self.model_start_time.strftime('%Y%m%d%H') + '_DOMAIN1' + #2011-08-26_00_00 - minutes + hydro_restart_basename = 'HYDRO_RST.' + \ + self.model_start_time.strftime('%Y-%m-%d_%H:%M') + '_DOMAIN1' + + lsm_restart_file = lsm_restart_dirname + '/' + lsm_restart_basename + hydro_restart_file = hydro_restart_dirname + '/' + hydro_restart_basename + + if not self.model_restart: + lsm_restart_file = '!!! ' + lsm_restart_file + hydro_restart_file = '!!! ' + hydro_restart_file + + noah_nlst['restart_filename_requested'] = lsm_restart_file + hydro_nlst['restart_file'] = hydro_restart_file + + # @property + # def job_complete(self): + # if self.scheduler.not_submitted: + # return(False) + # return( not os.path.isfile(run_dir + '/.job_not_complete') ) diff --git a/wrfhydropy/core/job_tools.py b/wrfhydropy/core/job_tools.py new file mode 100644 index 000000000..660b61dfa --- /dev/null +++ b/wrfhydropy/core/job_tools.py @@ -0,0 +1,1139 @@ +from boltons.iterutils import remap, get_path +import datetime +from distutils.spawn import find_executable +import io +import os +import pathlib +import re +import shlex +import socket +import subprocess +import sys +import time +import warnings +import yaml + +# Where is wrfhydropy/core dir in the filesystem? +# A method (used by django) about specifying the root dir of the project. +# https://stackoverflow.com/questions/25389095/python-get-path-of-root-project-structure +core_dir = pathlib.PosixPath(os.path.dirname(os.path.abspath(__file__))) + + +class PBSError(Exception): + """ A custom error class for pbs errors """ + def __init__(self, jobid, msg): + self.jobid = jobid + self.msg = msg + super(PBSError, self).__init__() + + def __str__(self): + return self.jobid + ": " + self.msg + + +def get_sched_name(): + """Tries to find qsub, then sbatch. Returns "PBS" if qsub + is found, else returns "slurm" if sbatch is found, else returns + "other" if neither is found. """ + if find_executable("qsub") is not None: + return "PBS" + elif find_executable("sbatch") is not None: + return "slurm" + else: + return None + + +def in_docker(): + path = "/proc/" + str(os.getpid()) + "/cgroup" + if not os.path.isfile(path): return False + with open(path) as f: + for line in f: + if re.match("\d+:[\w=]+:/docker(-[ce]e)?/\w+", line): + return True + return False + + +def get_machine(): + hostname = socket.gethostname() + if re.match('cheyenne', hostname): + machine='cheyenne' + else: + machine='docker' + if not in_docker(): + warnings.warn('This machine is not recognized, using docker defaults.') + return machine + + +def get_user(): + """Returns the user name/handle.""" + try: + return os.getlogin() + except OSError: + if 'USER' in os.environ.keys(): + return os.environ['USER'] + else: + sp = subprocess.run(['whoami'], stdout=subprocess.PIPE) + return sp.stdout.decode('utf-8').split('\n')[0] + else: + return "?" + + +def get_version(software=None): + """Returns the software version """ + if software is None: + software = get_sched_name() + if software is "PBS": + opt = ["qstat", "--version"] + + # call 'qstat' using subprocess + p = subprocess.Popen(opt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name + stdout, stderr = p.communicate() #pylint: disable=unused-variable + sout = io.StringIO(stdout.decode('utf-8')) + + # return the version number + return sout.read().rstrip("\n").lstrip("version: ") + elif software is "slurm": + opt = ["squeue", "--version"] + + # call 'squeue' using subprocess + p = subprocess.Popen(opt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name + stdout, stderr = p.communicate() #pylint: disable=unused-variable + sout = io.StringIO(stdout.decode('utf-8')) + + # return the version number + return sout.read().rstrip("\n").lstrip("slurm ") + + else: + return "0" + + +def seconds(walltime): + """Convert [[[DD:]HH:]MM:]SS to hours""" + wtime = walltime.split(":") + if len(wtime) == 1: + return float(wtime[0]) + elif len(wtime) == 2: + return float(wtime[0])*60.0 + float(wtime[1]) + elif len(wtime) == 3: + return float(wtime[0])*3600.0 + float(wtime[1])*60.0 + float(wtime[2]) + elif len(wtime) == 4: + return (float(wtime[0])*24.0*3600.0 + + float(wtime[0])*3600.0 + + float(wtime[1])*60.0 + + float(wtime[2])) + else: + print("Error in walltime format:", walltime) + sys.exit() + + +def hours(walltime): + """Convert [[[DD:]HH:]MM:]SS to hours""" + wtime = walltime.split(":") + if len(wtime) == 1: + return float(wtime[0])/3600.0 + elif len(wtime) == 2: + return float(wtime[0])/60.0 + float(wtime[1])/3600.0 + elif len(wtime) == 3: + return float(wtime[0]) + float(wtime[1])/60.0 + float(wtime[2])/3600.0 + elif len(wtime) == 4: + return (float(wtime[0])*24.0 + + float(wtime[0]) + + float(wtime[1])/60.0 + + float(wtime[2])/3600.0) + else: + print("Error in walltime format:", walltime) + sys.exit() + + +def strftimedelta(seconds): #pylint: disable=redefined-outer-name + """Convert seconds to D+:HH:MM:SS""" + seconds = int(seconds) + + day_in_seconds = 24.0*3600.0 + hour_in_seconds = 3600.0 + minute_in_seconds = 60.0 + + day = int(seconds/day_in_seconds) + seconds -= day*day_in_seconds + + hour = int(seconds/hour_in_seconds) + seconds -= hour*hour_in_seconds + + minute = int(seconds/minute_in_seconds) + seconds -= minute*minute_in_seconds + + return str(day) + ":" + ("%02d" % hour) + ":" + ("%02d" % minute) + ":" + ("%02d" % seconds) + + +def exetime(deltatime): + """Get the exetime string for the PBS '-a'option from a [[[DD:]MM:]HH:]SS string + + exetime string format: YYYYmmddHHMM.SS + """ + return (datetime.datetime.now() + +datetime.timedelta(hours=hours(deltatime))).strftime("%Y%m%d%H%M.%S") + + +def touch(filename, mode=0o666, dir_fd=None, **kwargs): + flags = os.O_CREAT | os.O_APPEND + with os.fdopen(os.open(filename, flags=flags, mode=mode, dir_fd=dir_fd)) as f: + os.utime(f.fileno() if os.utime in os.supports_fd else filename, + dir_fd=None if os.supports_fd else dir_fd, **kwargs) + + +def _qstat(jobid=None, + username=get_user(), + full=False, + version=int(14)): #re.split("[\+\ \.]", get_version())[2])): + """Return the stdout of qstat minus the header lines. + + By default, 'username' is set to the current user. + 'full' is the '-f' option + 'id' is a string or list of strings of job ids + + Returns the text of qstat, minus the header lines + """ + + # -u and -f contradict in earlier versions of PBS + if full and username is not None and (version < 5.0 and jobid is None): + # First get all jobs by the user + qopt = ["qselect"] + qopt += ["-u", username] + + # Call 'qselect' using subprocess + q = subprocess.Popen(qopt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name + stdout, stderr = q.communicate() #pylint: disable=unused-variable + + qsout = io.StringIO(stdout) + + # Get the jobids + jobid = [] + for line in qsout: + jobid += [line.rstrip("\n")] + + opt = ["qstat"] + # If there are jobid(s), you don't need a username + if username is not None and jobid is None: + opt += ["-u", username] + # But if there are jobid(s) and a username, you need -a to get full output + elif username is not None and jobid is not None and not full: + opt += ["-a"] + # By this point we're guaranteed torque ver >= 5.0, so -u and -f are safe together + if full: + opt += ["-f"] + if jobid is not None: + if isinstance(jobid, str) or isinstance(jobid, unicode): + jobid = [jobid] + elif isinstance(jobid, list): + pass + else: + print("Error in scheduler_misc.qstat(). type(jobid):", type(jobid)) + sys.exit() + opt += jobid + + # call 'qstat' using subprocess + # print opt + p = subprocess.Popen(opt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name + stdout, stderr = p.communicate() #pylint: disable=unused-variable + + sout = io.StringIO(stdout) + + # strip the header lines + if full is False: + for line in sout: + if line[0] == "-": + break + + # return the remaining text + return sout.read() + + +def job_id(all=False, name=None): #pylint: disable=redefined-builtin + """If 'name' given, returns a list of all jobs with a particular name using qstat. + Else, if all=True, returns a list of all job ids by current user. + Else, returns this job id from environment variable PBS_JOBID (split to get just the number). + + Else, returns None + + """ + if all or name is not None: + + jobid = [] + stdout = _qstat() + sout = io.StringIO(stdout) + for line in sout: + if name is not None: + if line.split()[3] == name: + jobid.append((line.split()[0]).split(".")[0]) + else: + jobid.append((line.split()[0]).split(".")[0]) + return jobid + + else: + + if 'PBS_JOBID' in os.environ: + return os.environ['PBS_JOBID'].split(".")[0] + elif 'SLURM_JOBID' in os.environ: + return os.environ['SLURM_JOBID'].split(".")[0] + else: + return None + #raise PBSError( + # "?", + # "Could not determine jobid. 'PBS_JOBID' environment variable not found.\n" + # + str(os.environ)) + + +def job_rundir(jobid, sched_name): + """Return the directory job "id" was run in using qstat. + + Returns a dict, with id as key and rundir and value. + """ + rundir = dict() + + if sched_name == 'PBS': + + if isinstance(jobid, (list)): + for i in jobid: + stdout = _qstat(jobid=i, full=True) + match = re.search(",PWD=(.*),", stdout) + rundir[i] = match.group(1) + else: + stdout = _qstat(jobid=jobid, full=True) + match = re.search(",PWD=(.*),", stdout) + rundir[i] = match.group(1) + return rundir + + elif sched_name == 'slurm': + + if isinstance(jobid, (list)): + for i in jobid: + stdout = _squeue(jobid=i, full=True) + match = re.search("WorkDir=(.*),", stdout) + rundir[i] = match.group(1) + else: + stdout = _squeue(jobid=jobid, full=True) + match = re.search("WorkDir=(.*),", stdout) + rundir[i] = match.group(1) + return rundir + + else: + + # TODO JLM: harden. + warnings.warn("sched_name matches neither 'PBS' nor 'slurm': FIX THIS.") + + +def job_status_PBS(jobid=None): + """Return job status using qstat + + Returns a dict of dict, with jobid as key in outer dict. + Inner dict contains: + "name", "nodes", "procs", "walltime", + "jobstatus": status ("Q","C","R", etc.) + "qstatstr": result of qstat -f jobid, None if not found + "elapsedtime": None if not started, else seconds as int + "starttime": None if not started, else seconds since epoch as int + "completiontime": None if not completed, else seconds since epoch as int + + *This should be edited to return job_status_dict()'s* + """ + status = dict() + + stdout = _qstat(jobid=jobid, full=True) + sout = io.StringIO(stdout) + + # TODO: figure out why jobstatus is being initialized as a None vs as a dict() and then checked for content ### pylint: disable=fixme + jobstatus = None + + for line in sout: + + m = re.search(r"Job Id:\s*(.*)\s", line) #pylint: disable=invalid-name + if m: + if jobstatus is not None: + if jobstatus["jobstatus"] == "R": #pylint: disable=unsubscriptable-object + jobstatus["elapsedtime"] = int(time.time()) - jobstatus["starttime"] #pylint: disable=unsubscriptable-object + status[jobstatus["jobid"]] = jobstatus #pylint: disable=unsubscriptable-object + jobstatus = dict() + jobstatus["jobid"] = m.group(1).split(".")[0] + jobstatus["qstatstr"] = line + jobstatus["elapsedtime"] = None + jobstatus["starttime"] = None + jobstatus["completiontime"] = None + continue + + jobstatus["qstatstr"] += line + + #results = line.split() + #jobid = results[0].split(".")[0] + #jobstatus = dict() + #jobstatus["jobid"] = jobid + + #jobstatus["jobname"] = results[3] + m = re.match(r"\s*Job_Name\s*=\s*(.*)\s", line) #pylint: disable=invalid-name + if m: + jobstatus["jobname"] = m.group(1) + continue + + #jobstatus["nodes"] = int(results[5]) + #jobstatus["procs"] = int(results[6]) + m = re.match(r"\s*Resource_List\.nodes\s*=\s*(.*):ppn=(.*)\s", line) #pylint: disable=invalid-name + if m: + jobstatus["nodes"] = m.group(1) + jobstatus["procs"] = int(m.group(1))*int(m.group(2)) + continue + + #jobstatus["walltime"] = int(seconds(results[8])) + m = re.match(r"\s*Resource_List\.walltime\s*=\s*(.*)\s", line) #pylint: disable=invalid-name + if m: + jobstatus["walltime"] = int(seconds(m.group(1))) + continue + + #jobstatus["jobstatus"] = results[9] + m = re.match(r"\s*job_state\s*=\s*(.*)\s", line) #pylint: disable=invalid-name + if m: + jobstatus["jobstatus"] = m.group(1) + continue + + #elapsedtime = line.split()[10] + #if elapsedtime == "--": + # jobstatus["elapsedtime"] = None + #else: + # jobstatus["elapsedtime"] = int(seconds(elapsedtime)) + # + #qstatstr = qstat(jobid, full=True) + #if not re.match("^qstat: Unknown Job Id Error.*",qstatstr): + # jobstatus["qstatstr"] = qstatstr + # m = re.search("Job_Name = (.*)\n",qstatstr) + # if m: + # jobstatus["jobname"] = m.group(1) + + #m = re.match("\s*resources_used.walltime\s*=\s*(.*)\s",line) + #if m: + # print line + # jobstatus["elapsedtime"] = int(seconds(m.group(1))) + + m = re.match(r"\s*start_time\s*=\s*(.*)\s", line) #pylint: disable=invalid-name + if m: + jobstatus["starttime"] = int(time.mktime(datetime.datetime.strptime( + m.group(1), "%a %b %d %H:%M:%S %Y").timetuple())) + continue + + m = re.search(r"\s*comp_time\s*=\s*(.*)\s", line) #pylint: disable=invalid-name + if m: + jobstatus["completiontime"] = int(time.mktime(datetime.datetime.strptime( + m.group(1), "%a %b %d %H:%M:%S %Y").timetuple())) + continue + + if jobstatus is not None: + if jobstatus["jobstatus"] == "R": + jobstatus["elapsedtime"] = int(time.time()) - jobstatus["starttime"] + status[jobstatus["jobid"]] = jobstatus + + return status + + +def submit_scheduler(substr, sched_name, hold=False): + """Submit a PBS job using qsub. + + substr: The submit script string + """ + + if type(substr) is bytearray: + substr_str = substr.decode('utf-8') + else: + substr_str = substr + + m = re.search(r"-N\s+(.*)\s", substr_str) #pylint: disable=invalid-name + if m: + jobname = m.group(1) #pylint: disable=unused-variable + else: + raise PBSError( + None, + r"Error in scheduler_misc.submit(). Jobname (\"-N\s+(.*)\s\") not found in submit string.") + + + if sched_name == 'PBS': + + qsub_cmd = "qsub" + if hold: + qsub_cmd += " -h" + + p = subprocess.Popen( shlex.split(qsub_cmd), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + + stdout, stderr = p.communicate(input=substr) + if re.search("error", stdout.decode('utf-8')): + raise PBSError(0, "PBS Submission error.\n" + stdout + "\n" + stderr) + else: + jobid = stdout.decode('utf-8').split(".")[0] + return jobid + + elif sched_name == 'slurm': + + p = subprocess.Popen( "sbatch", stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout, stderr = p.communicate(input=substr) #pylint: disable=unused-variable + if re.search("error", stdout): + raise PBSError(0, "PBS Submission error.\n" + stdout + "\n" + stderr) + else: + jobid = stdout.rstrip().split()[-1] + return jobid + + else: + + # TODO JLM: harden. + warnings.warn("sched_name matches neither 'PBS' nor 'slurm': FIX THIS.") + + +def generic_popen(cmd_list): + p = subprocess.Popen(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout, stderr = p.communicate() + return p.returncode + + +def delete(jobid, sched_name): + """qdel (PBS) or scancel (slurm) a job.""" + if sched_name == 'PBS': cmd = 'qdel' + if sched_name == 'slurm': cmd = 'scancel' + return(generic_popen([cmd, jobid])) + + +def hold(jobid, sched_name): + """qhold (PBS) or scontrol (slurm) a job.""" + if sched_name == 'PBS': cmd = 'qhold' + if sched_name == 'slurm': cmd = 'scontrol' + return(generic_popen([cmd, jobid])) + + +def release(sched): + """qrls (PBS) or scontrol un-delay (slurm) a job.""" + + if sched.sched_name == 'PBS': + cmd_list = ['qrls', sched.sched_job_id] + if sched.sched_name == 'slurm': + cmd_list = ["scontrol", "update", "JobId=", sched.sched_job_id, "StartTime=", "now"] + return(generic_popen(cmd_list)) + + +def alter(jobid, arg): + """qalter (PBS) or scontrol update (slurm) a job. + 'arg' is a pbs command option string. For instance, "-a 201403152300.19" + """ + if sched_name == 'PBS': + cmd_list = ["qalter"] + arg.split() + [jobid] + if sched_name == 'slurm': + cmd_list = ["scontrol", "update", "JobId=", jobid] + arg.split() + return(generic_popen(cmd_list)) + + +# ####################################################### +# SLURM-only section follows + +def _squeue(jobid=None, + username=get_user(), + full=False, + version=int(14), + sformat=None): + """Return the stdout of squeue minus the header lines. + + By default, 'username' is set to the current user. + 'full' is the '-f' option + 'jobid' is a string or list of strings of job ids + 'version' is a software version number, used to + determine compatible ops + 'sformat' is a squeue format string (e.g., "%A %i %j %c") + + Returns the text of squeue, minus the header lines + """ + + # If Full is true, we need to use scontrol: + if full is True: + if jobid is None: + if username is None: + # Clearly we want ALL THE JOBS + sopt = ["scontrol", "show", "job"] + + # Submit the command + p = subprocess.Popen(sopt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name + stdout, stderr = p.communicate() #pylint: disable=unused-variable + + sout = io.StringIO(stdout) + + # Nothing to strip, as scontrol provides no headers + return sout.read() + + else: + # First, get jobids that belong to that username using + # squeue (-h strips the header) + sopt = ["squeue", "-h", "-u", username] + + q = subprocess.Popen(sopt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name + stdout, stderr = q.communicate() #pylint: disable=unused-variable + + qsout = io.StringIO(stdout) + + # Get the jobids + jobid = [] + for line in qsout: + jobid += [line.rstrip("\n")] + # Great, now we have some jobids to pass along + + # Ensure the jobids are a list, even if they're a list of 1... + if not isinstance(jobid, list) and jobid is not None: + jobid = [jobid] + if isinstance(jobid, list): + opt = ["scontrol", "show", "job"] + sreturn = "" + for my_id in jobid: + sopt = opt + [str(my_id)] + + q = subprocess.Popen(sopt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name + stdout, stderr = q.communicate() #pylint: disable=unused-variable + + sreturn = sreturn + stdout + "\n" + + return sreturn + + else: + sopt = ["squeue", "-h"] + if username is not None: + sopt += ["-u", username] + if jobid is not None: + sopt += ["--job="] + if isinstance(jobid, list): + sopt += ["'"+",".join([str(i) for i in jobid])+"'"] + else: + sopt += [str(jobid)] + if sformat is not None: + sopt += ["-o", "'" + sformat + "'"] + else: + if jobid is None and username is None: + sopt += ["-o", "'%i %u %P %j %U %D %C %m %l %t %M'"] + else: + sopt += ["-o", "'%i %j %u %M %t %P'"] + + q = subprocess.Popen(sopt, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #pylint: disable=invalid-name + stdout, stderr = q.communicate() #pylint: disable=unused-variable + + sout = io.StringIO(stdout) + + # return the remaining text + return sout.read() + + +def job_status_slurm(jobid=None): + """Return job status using squeue + + Returns a dict of dict, with jobid as key in outer dict. + Inner dict contains: + "name", "nodes", "procs", "walltime", + "jobstatus": status ("Q","C","R", etc.) + "qstatstr": result of squeue -f jobid, None if not found + "elapsedtime": None if not started, else seconds as int + "starttime": None if not started, else seconds since epoch as int + "completiontime": None if not completed, else seconds since epoch as int + + *This should be edited to return job_status_dict()'s* + """ + status = dict() + + stdout = _squeue(jobid=jobid, full=True) + sout = io.StringIO(stdout) + +### TODO: figure out why jobstatus is being initialized as a None vs as a dict() and then checked for content ### pylint: disable=fixme + # jobstatus = None + jobstatus = {"jobid" : None, "name" : None, "nodes" : None, "procs" : None, "walltime" : None, "qstatstr" : None, "elapsedtime" : None, "starttime" : None, "completiontime" : None, "jobstatus" : None, "cluster": None} + + for line in sout: + # Check for if we're at a new job header line + m = re.search(r"JobId=\s*(\S*)\s*", line) #pylint: disable=invalid-name + if m: + if jobstatus["jobstatus"] is not None: + status[jobstatus["jobid"]] = jobstatus + jobstatus = {"jobid" : None, "name" : None, "nodes" : None, "procs" : None, "walltime" : None, "qstatstr" : None, "elapsedtime" : None, "starttime" : None, "completiontime" : None, "jobstatus" : None, "cluster" : None} + jobstatus["jobid"] = m.group(1) + + # Grab the job name + m = re.match(r"\S*\s*Name=\s*(.*)\s?", line) #pylint: disable=invalid-name + if m: + jobstatus["jobname"] = m.group(1) + + # Save the full output + jobstatus["qstatstr"] = line + continue + + jobstatus["qstatstr"] += line + + # Look for the Nodes/PPN Info + m = re.search(r"NumNodes=\s*([0-9]*)\s", line) #pylint: disable=invalid-name + if m: + jobstatus["nodes"] = int(m.group(1)) + m = re.match(r"\S*\s*NumCPUs=\s*([0-9]*)\s", line) #pylint: disable=invalid-name + if m: + jobstatus["procs"] = int(m.group(1)) + continue + + + # Look for timing info + m = re.search(r"RunTime=\s*([0-9]*:[0-9]*:[0-9]*)\s", line) #pylint: disable=invalid-name + if m: + if m.group(1) == "Unknown": + continue + hrs, mns, scs = m.group(1).split(":") + runtime = datetime.timedelta(hours=int(hrs), minutes=int(mns), seconds=int(scs)) + jobstatus["elapsedtime"] = runtime.seconds + + m = re.match(r"\S*\s*TimeLimit=\s*([0-9]*:[0-9]*:[0-9]*)\s", line) #pylint: disable=invalid-name + if m: + walltime = datetime.timedelta(hours=int(hrs), minutes=int(mns), seconds=int(scs)) + jobstatus["walltime"] = walltime.seconds + continue + + # Grab the job start time + m = re.search(r"StartTime=\s*([0-9]*\-[0-9]*\-[0-9]*T[0-9]*:[0-9]*:[0-9]*)\s", line) #pylint: disable=invalid-name + if m: + if m.group(1) == "Unknown": + continue + year, month, day = m.group(1).split("T")[0].split("-") + hrs, mns, scs = m.group(1).split("T")[1].split(":") + starttime = datetime.datetime(year=int(year), month=int(month), day=int(day), hour=int(hrs), minute=int(mns), second=int(scs)) + jobstatus["starttime"] = time.mktime(starttime.timetuple()) + continue + + # Grab the job status + m = re.search(r"JobState=\s*([a-zA-Z]*)\s", line) #pylint: disable=invalid-name + if m: + my_status = m.group(1) + if my_status == "RUNNING" or my_status == "CONFIGURING": + jobstatus["jobstatus"] = "R" + elif my_status == "BOOT_FAIL" or my_status == "FAILED" or my_status == "NODE_FAIL" or my_status == "CANCELLED" or my_status == "COMPLETED" or my_status == "PREEMPTED" or my_status == "TIMEOUT": + jobstatus["jobstatus"] = "C" + elif my_status == "COMPLETING" or my_status == "STOPPED": + jobstatus["jobstatus"] = "E" + elif my_status == "PENDING" or my_status == "SPECIAL_EXIT": + jobstatus["jobstatus"] = "Q" + elif my_status == "SUSPENDED": + jobstatus["jobstatus"] = "S" + else: + jobstatus["jobstatus"] = "?" + continue + + # Grab the cluster/allocating node: + m = re.search(r"AllocNode:\s*.*=(.*):.*", line) #pylint: disable=invalid-name + if m: + raw_str = m.group(1) + m = re.search(r"(.*?)(?=[^a-zA-Z0-9]*login.*)", raw_str) #pylint: disable=invalid-name + if m: + jobstatus["cluster"] = m.group(1) + else: + jobstatus["cluster"] = raw_str + + + if jobstatus["jobstatus"] is not None: + status[jobstatus["jobid"]] = jobstatus + + return status + + +def default_job_spec(machine='docker'): + if machine != 'docker': + warnings.warn("Default job sepcs do not currently make sense except for docker.") + default_job_specs_file = core_dir / 'default_job_specs.yaml' + with open(default_job_specs_file) as ff: + default_job_specs = yaml.safe_load(ff) + default_job_spec = default_job_specs[machine] + default_job_spec['exe_cmd'] = default_job_spec['exe_cmd']['default'] + return default_job_spec + + +def compose_scheduled_python_script( + py_run_cmd: str, + model_exe_cmd: str +): + jobstr = "#!/usr/bin/env python\n" + jobstr += "\n" + + jobstr += "import argparse\n" + jobstr += "import datetime\n" + jobstr += "import os\n" + jobstr += "import pickle\n" + jobstr += "import sys\n" + jobstr += "import wrfhydropy\n" + jobstr += "\n" + + jobstr += "parser = argparse.ArgumentParser()\n" + jobstr += "parser.add_argument('--sched_job_id',\n" + jobstr += " help='The numeric part of the scheduler job ID.')\n" + jobstr += "parser.add_argument('--job_date_id',\n" + jobstr += " help='The date-time identifier created by Schduler obj.')\n" + jobstr += "args = parser.parse_args()\n" + jobstr += "\n" + + jobstr += "print('sched_job_id: ', args.sched_job_id)\n" + jobstr += "print('job_date_id: ', args.job_date_id)\n" + jobstr += "\n" + + jobstr += "run_object = pickle.load(open('WrfHydroRun.pkl', 'rb'))\n" + jobstr += "\n" + + jobstr += "# The lowest index jobs_pending should be the job to run. Verify that it \n" + jobstr += "# has the same job_date_id as passed first, then set job to active.\n" + jobstr += "if run_object.job_active:\n" + jobstr += " msg = 'There is an active job conflicting with this scheduled job.'\n" + jobstr += " raise ValueError(msg)\n" + jobstr += "if not run_object.jobs_pending[0].job_date_id == args.job_date_id:\n" + jobstr += " msg = 'The first pending job does not match the passed job_date_id.'\n" + jobstr += " raise ValueError(msg)\n" + jobstr += "\n" + + jobstr += "# Promote the job to active.\n" + jobstr += "run_object.job_active = run_object.jobs_pending.pop(0)\n" + + jobstr += "# Set some run-time attributes of the job.\n" + jobstr += "run_object.job_active.py_exe_cmd = \"" + py_run_cmd + "\"\n" + jobstr += "run_object.job_active.scheduler.sched_job_id = args.sched_job_id\n" + jobstr += "run_object.job_active.exe_cmd = \"" + model_exe_cmd + "\"\n" + jobstr += "# Pickle before running the job.\n" + jobstr += "run_object.pickle()\n" + jobstr += "\n" + + jobstr += "print(\"Running the model.\")\n" + jobstr += "run_object.job_active.job_start_time = str(datetime.datetime.now())\n" + jobstr += "run_object.job_active.run(run_object.run_dir)\n" + jobstr += "run_object.job_active.job_start_time = str(datetime.datetime.now())\n" + jobstr += "\n" + + jobstr += "print(\"Collecting model output.\")\n" + jobstr += "run_object.collect_output()\n" + jobstr += "print(\"Job completed.\")\n" + jobstr += "\n" + + jobstr += "run_object.job_active._sched_job_complete = True\n" + jobstr += "run_object.jobs_completed.append(run_object.job_active)\n" + jobstr += "run_object.job_active = None\n" + jobstr += "run_object.pickle()\n" + jobstr += "\n" + jobstr += "sys.exit(0)\n" + + return jobstr + + +def compose_scheduled_bash_script( + run_dir: str, + job: object +): + """ Write Job as a string suitable for job.scheduler.sched_name """ + + if job.scheduler.sched_name.lower() == "slurm": + ###Write this Job as a string suitable for slurm + ### NOT USED: + ### exetime + ### priority + ### auto + jobstr = "#!/bin/bash\n" + + ## FROM DART for job arrays. + #JOBNAME=$SLURM_JOB_NAME + #JOBID=$SLURM_JOBID + #ARRAY_INDEX=$SLURM_ARRAY_TASK_ID + #NODELIST=$SLURM_NODELIST + #LAUNCHCMD="mpirun -np $SLURM_NTASKS -bind-to core" + + jobstr += "#SBATCH -J {0}\n".format(job.scheduler.job_name) + if job.scheduler.account is not None: + jobstr += "#SBATCH -A {0}\n".format(job.scheduler.account) + jobstr += "#SBATCH -t {0}\n".format(job.scheduler.walltime) + jobstr += "#SBATCH -n {0}\n".format(job.scheduler.nnodes*job.scheduler.ppn) + if job.scheduler.pmem is not None: + jobstr += "#SBATCH --mem-per-cpu={0}\n".format(job.scheduler.pmem) + if job.scheduler.qos is not None: + jobstr += "#SBATCH --qos={0}\n".format(job.scheduler.qos) + if job.scheduler.email != None and job.scheduler.message != None: + jobstr += "#SBATCH --mail-user={0}\n".format(job.scheduler.email) + if 'b' in job.scheduler.message: + jobstr += "#SBATCH --mail-type=BEGIN\n" + if 'e' in job.scheduler.message: + jobstr += "#SBATCH --mail-type=END\n" + if 'a' in job.scheduler.message: + jobstr += "#SBATCH --mail-type=FAIL\n" + # SLURM does assignment to no. of nodes automatically + # jobstr += "#SBATCH -N {0}\n".format(job.scheduler.nodes) + if job.scheduler.queue is not None: + jobstr += "#SBATCH -p {0}\n".format(job.scheduler.queue) + jobstr += "{0}\n".format(job.exe_cmd) + + return jobstr + + else: + + # Write this Job as a string suitable for PBS # + + jobstr = "" + jobstr += "#!/bin/sh\n" + jobstr += "#PBS -N {0}\n".format(job.scheduler.job_name) + jobstr += "#PBS -A {0}\n".format(job.scheduler.account) + jobstr += "#PBS -q {0}\n".format(job.scheduler.queue) + jobstr += "#PBS -M {0}\n".format(job.scheduler.email_who) + jobstr += "#PBS -m {0}\n".format(job.scheduler.email_when) + jobstr += "\n" + + jobstr += "#PBS -l walltime={0}\n".format(job.scheduler.walltime) + jobstr += "\n" + + if job.scheduler.nproc_last_node == 0: + prcstr = "select={0}:ncpus={1}:mpiprocs={1}\n" + prcstr = prcstr.format(job.scheduler.nnodes, job.scheduler.ppn) + else: + prcstr = "select={0}:ncpus={1}:mpiprocs={1}+1:ncpus={2}:mpiprocs={2}\n" + prcstr = prcstr.format(job.scheduler.nnodes-1, + job.scheduler.ppn, + job.scheduler.nproc_last_node) + + jobstr += "#PBS -l " + prcstr + jobstr += "\n" + + jobstr += "# Not using PBS standard error and out files to capture model output\n" + jobstr += "# but these hidden files might catch output and errors from the scheduler.\n" + jobstr += "#PBS -o {0}\n".format(job.stdout_pbs_tmp(run_dir)) + jobstr += "#PBS -e {0}\n".format(job.stderr_pbs_tmp(run_dir)) + jobstr += "\n" + + if job.scheduler.afterok: + if job.machine == 'cheyenne': + cheyenne_afterok = get_cheyenne_job_dependency_id(job.scheduler.afterok) + jobstr += "#PBS -W depend=afterok:{0}\n".format(cheyenne_afterok) + else: + jobstr += "#PBS -W depend=afterok:{0}\n".format(job.scheduler.afterok) + + if job.scheduler.array_size: + jobstr += "#PBS -J 1-{0}\n".format(job.scheduler.array_size) + if job.scheduler.exetime: + jobstr += "#PBS -a {0}\n".format(job.scheduler.exetime) + if job.scheduler.pmem: + jobstr += "#PBS -l pmem={0}\n".format(job.scheduler.pmem) + if job.scheduler.grab_env: + jobstr += "#PBS -V\n" + if job.scheduler.array_size or \ + job.scheduler.exetime or \ + job.scheduler.pmem or \ + job.scheduler.grab_env: + jobstr += "\n" + + # End PBS Header + + if job.modules: + jobstr += 'module purge\n' + jobstr += 'module load {0}\n'.format(job.modules) + jobstr += "\n" + + jobstr += "echo PBS_JOBID: $PBS_JOBID\n" + jobstr += "sched_job_id=`echo ${PBS_JOBID} | cut -d'.' -f1`\n" + jobstr += "echo sched_job_id: $sched_job_id\n" + jobstr += "job_date_id={0}\n".format(job.job_date_id) + jobstr += "echo job_date_id: $job_date_id\n" + jobstr += "\n" + + jobstr += "export TMPDIR=/glade/scratch/$USER/temp\n" + jobstr += "mkdir -p $TMPDIR\n" + + if job.scheduler.queue == 'share': + jobstr += "export MPI_USE_ARRAY=false\n" + + jobstr += "cd {0}\n".format(run_dir) + jobstr += "echo \"pwd:\" `pwd`\n" + jobstr += "\n" + + jobstr += "# DART job variables for future reference\n" + jobstr += "# JOBNAME=$PBS_JOBNAME\n" + jobstr += "# JOBID=\"$PBS_JOBID\"\n" + jobstr += "# ARRAY_INDEX=$PBS_ARRAY_INDEX\n" + jobstr += "# NODELIST=`cat \"${PBS_NODEFILE}\"`\n" + jobstr += "# LAUNCHCMD=\"mpiexec_mpt\"\n" + jobstr += "# \n" + + jobstr += "# CISL suggests users set TMPDIR when running batch jobs on Cheyenne.\n" + jobstr += "export TMPDIR=/glade/scratch/$USER/temp\n" + jobstr += "mkdir -p $TMPDIR\n" + jobstr += "\n" + + exestr = "{0} ".format(job.exe_cmd) + exestr += "2> {0} 1> {1}".format(job.stderr_exe(run_dir), job.stdout_exe(run_dir)) + jobstr += "echo \"" + exestr + "\"\n" + jobstr += exestr + "\n" + jobstr += "\n" + + jobstr += "cmd_status=$?\n" + jobstr += "echo \"cmd_status: $cmd_status\"\n" + jobstr += "\n" + + jobstr += "# Touch these files just to get the job_date_id in their file names.\n" + jobstr += "# Can identify the files by sched_job_id and replace contents...\n" + jobstr += "touch {0}\n".format(job.tracejob_file(run_dir)) + jobstr += "touch {0}\n".format(job.stdout_pbs(run_dir)) + jobstr += "touch {0}\n".format(job.stderr_pbs(run_dir)) + jobstr += "\n" + + jobstr += "# Simple, file-based method for checking if the job is done.\n" + jobstr += "# qstat is a bad way of doing this, apparently.\n" + jobstr += "rm .job_not_complete\n" + jobstr += "\n" + + ## TODO(JLM): the tracejob execution gets called by the waiting process. + jobstr += "exit $cmd_status\n" + + return jobstr + +def get_cheyenne_job_dependency_id(numeric_job_id): + """Lovely bug in cheyenne's PBS that requires the full name on the job id.""" + cmd = 'qstat -w ' + str(numeric_job_id) + '| grep ' + str(numeric_job_id) + ' | cut -d" " -f1' + cmd = "/bin/bash -c '" + cmd + "'" + cmd_run = subprocess.run( + shlex.split(cmd), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + return cmd_run.stdout.decode("utf-8").rstrip() + +def solve_model_start_end_times(model_start_time, model_end_time, setup_obj): + + # model_start_time + if model_start_time is None: + + # Get the namelist from the sim_object + nlst_noah =setup_obj.namelist_hrldas['noahlsm_offline'] + start_noah_keys = {'year':'start_year', 'month':'start_month', + 'day':'start_day', 'hour':'start_hour', 'minute':'start_min'} + start_noah_times = { kk:nlst_noah[vv] for (kk, vv) in start_noah_keys.items() } + model_start_time = datetime.datetime(**start_noah_times) + + elif type(model_start_time) is str: + + # Allow minutes to be optional + if not bool(re.match('.*:.*', model_start_time)): + model_start_time += ':00' + model_start_time = datetime.datetime.strptime(model_start_time, '%Y-%m-%d %H:%M') + + elif type(model_start_time) is not datetime.datetime: + + raise TypeError('model_start_time is NOT one of type: None, str, or datetime.datetime') + + # model_end_time + if type(model_end_time) is datetime.datetime: + + pass + + elif model_end_time is None: + + # get one of kday or khour, convert it to timedelta + nlst_noah = setup_obj.namelist_hrldas['noahlsm_offline'] + if 'khour' in nlst_noah.keys(): + duration = {'hours': nlst_noah['khour']} + elif 'kday' in nlst_noah.keys(): + duration = {'days': nlst_noah['kday']} + else: + raise ValueError("Neither KDAY nor KHOUR in the setup's namelist.hrldas.") + model_end_time = model_start_time + datetime.timedelta(**duration) + + elif type(model_end_time) is str: + + # Allow minutes to be optional + if not bool(re.match('.*:.*', model_end_time)): + model_end_time += ':00' + model_end_time = datetime.datetime.strptime(model_end_time, '%Y-%m-%d %H:%M') + + elif type(model_end_time) is datetime.timedelta: + + model_end_time = model_start_time + model_end_time + + elif type(model_end_time) is dict: + + model_end_time = model_start_time + datetime.timedelta(**model_end_time) + + else: + + raise TypeError('model_end_time is NOT one of type: datetime.datetime, ' + + 'None, str, datetime.timedelta, dict.') + + return model_start_time, model_end_time + + +def check_file_exist_colon(run_dir, file_str): + """Takes a file WITH A COLON (not without).""" + if type(file_str) is not str: + file_str = str(file_str) + file_colon = pathlib.PosixPath(file_str) + file_no_colon = pathlib.PosixPath(file_str.replace(':','_')) + if (run_dir / file_colon).exists(): + return './' + str(file_colon) + if (run_dir / file_no_colon).exists(): + return './' + str(file_no_colon) + return None + + +def check_job_input_files(job_obj, run_dir): + + # A run object, check it's next (first pending) job for all the dependencies. + # This is after this jobs namelists are established. + # Properties of the setup_obj identify some of the required input files. + + def visit_is_file(path, key, value): + if value is None: + return False + return type(value) is str or type(value) is dict + + def visit_not_none(path, key, value): + return bool(value) + + def visit_str_posix_exists(path, key, value): + if type(value) is dict: + return True + return key, (run_dir / pathlib.PosixPath(value)).exists() + + def remap_nlst(nlst): + # The outer remap removes empty dicts + files = remap(nlst, visit=visit_is_file) + files = remap(files, visit=visit_not_none) + exists = remap(files, visit=visit_str_posix_exists) + return exists + + hrldas_file_dict = remap_nlst(job_obj.namelist_hrldas) + hydro_file_dict = remap_nlst(job_obj.hydro_namelist) + + # INDIR is a special case: do some regex magic and counting. + + # What are the colon cases? Hydro/nudging restart files + hydro_file_dict['hydro_nlist']['restart_file'] = \ + bool(check_file_exist_colon(run_dir, + job_obj.hydro_namelist['hydro_nlist']['restart_file'])) + hydro_file_dict['nudging_nlist']['nudginglastobsfile'] = \ + bool(check_file_exist_colon(run_dir, + job_obj.hydro_namelist['nudging_nlist']['nudginglastobsfile'])) + + hrldas_exempt_list = [] + hydro_exempt_list = ['nudginglastobsfile', 'timeslicepath'] + + def check_nlst(nlst, file_dict): + + # Scan the dicts for FALSE exempting certain ones for certain configs. + def visit_missing_file(path, key, value): + if type(value) is dict: + return True + if not value: + message = 'The namelist file ' + key + ' = ' + \ + str(get_path(nlst, (path))[key]) + ' does not exist' + if key not in [*hrldas_exempt_list, *hydro_exempt_list]: + raise ValueError(message) + else: + warnings.warn(message) + return False + + remap(file_dict, visit=visit_missing_file) + return None + + check_nlst(job_obj.namelist_hrldas, hrldas_file_dict) + check_nlst(job_obj.hydro_namelist, hydro_file_dict) + + # Check the parameter table files: do the ones in the model match the ones in the rundir? + # Will this be by construction? + + return None diff --git a/wrfhydropy/core/utilities.py b/wrfhydropy/core/utilities.py index 049efe1ed..fba4a2ac0 100644 --- a/wrfhydropy/core/utilities.py +++ b/wrfhydropy/core/utilities.py @@ -1,10 +1,13 @@ -import subprocess +import deepdiff +import f90nml import io +import os import pandas as pd +import pathlib +import subprocess import warnings -import f90nml -import deepdiff import xarray as xr +from .job_tools import touch import pathlib import numpy as np @@ -211,4 +214,34 @@ def __make_relative__(run_object, basepath=None): if attr == 'simulation': __make_relative__(run_object.simulation.domain, - basepath=run_object.simulation.domain.domain_top_dir) \ No newline at end of file + basepath=run_object.simulation.domain.domain_top_dir) + + +def get_pickle_lock_file(run_obj): + return run_obj.run_dir / 'pickle_locked' + + +def lock_pickle(run_obj): + if is_pickle_locked(run_obj): + raise ValueError('The pickle file, ' + run_obj.run_dir + ', is already locked') + pickle_lock_file = get_pickle_lock_file(run_obj) + touch(pickle_lock_file) + run_obj._pickle_lock_file = pickle_lock_file + + +def unlock_pickle(run_obj): + if not is_pickle_locked(run_obj): + raise ValueError('The pickle file, ' + run_obj.run_dir + ', is already unlocked') + pickle_lock_file = get_pickle_lock_file(run_obj) + os.remove(pickle_lock_file) + run_obj._pickle_lock_file = None + + +def is_pickle_locked(run_obj): + internal_lock = run_obj._pickle_lock_file is not None + pickle_lock_file = get_pickle_lock_file(run_obj) + external_lock = pickle_lock_file.exists() + total_lock = internal_lock + external_lock + if total_lock == 1: + raise ValueError('The internal_lock must match external_lock.') + return bool(total_lock) diff --git a/wrfhydropy/core/wrfhydroclasses.py b/wrfhydropy/core/wrfhydroclasses.py index cbe51d5d6..4427afbdc 100644 --- a/wrfhydropy/core/wrfhydroclasses.py +++ b/wrfhydropy/core/wrfhydroclasses.py @@ -1,20 +1,29 @@ -import subprocess -import pathlib -import shutil -import xarray as xr -import f90nml -import json import copy +import datetime +import json import os -import uuid +import pathlib import pickle +import re +import shutil +import subprocess +import uuid import warnings +import xarray as xr -from .utilities import compare_ncfiles, open_nwmdataset, __make_relative__ +from .utilities import \ + compare_ncfiles, open_nwmdataset, \ + __make_relative__ , lock_pickle, \ + unlock_pickle, is_pickle_locked +from .job_tools import \ + get_user, \ + solve_model_start_end_times +from .job import Job ######################### # netcdf file object classes + class WrfHydroTs(list): def open(self, chunks: dict = None): """Open a WrfHydroTs object @@ -26,6 +35,7 @@ def open(self, chunks: dict = None): """ return open_nwmdataset(self,chunks=chunks) + class WrfHydroStatic(pathlib.PosixPath): def open(self): """Open a WrfHydroStatic object @@ -38,7 +48,7 @@ def open(self): ######################### -# Classes for constructing and running a wrf_hydro simulation +# Classes for constructing and running a wrf_hydro setup class WrfHydroModel(object): """Class for a WRF-Hydro model, which consitutes the model source code and compiled binary. """ @@ -206,7 +216,7 @@ def compile(self, compiler: str, # WRF-Hydro Domain object class WrfHydroDomain(object): """Class for a WRF-Hydro domain, which consitutes all domain-specific files needed for a - simulation. + setup. """ def __init__(self, domain_top_dir: str, @@ -293,17 +303,18 @@ def __init__(self, self.forcing_data = WrfHydroTs(self.forcing_dir.glob('*')) -class WrfHydroSim(object): - """Class for a WRF-Hydro simulation, which is comprised of a WrfHydroModel and a WrfHydroDomain. +class WrfHydroSetup(object): + """Class for a WRF-Hydro setup object, which is comprised of a WrfHydroModel and a WrfHydroDomain. """ - def __init__(self, wrf_hydro_model: object, + def __init__(self, + wrf_hydro_model: object, wrf_hydro_domain: object): - """Instantiates a WrfHydroSim object + """Instantiates a WrfHydroSetup object Args: wrf_hydro_model: A WrfHydroModel object wrf_hydro_domain: A WrfHydroDomain object Returns: - A WrfHydroSim object + A WrfHydroSetup object """ # Validate that hte domain and model are compatible @@ -320,10 +331,10 @@ def __init__(self, wrf_hydro_model: object, # assign objects to self self.model = copy.deepcopy(wrf_hydro_model) - """WrfHydroModel: A copy of the WrfHydroModel object used for the simulation""" + """WrfHydroModel: A copy of the WrfHydroModel object used for the setup""" self.domain = copy.deepcopy(wrf_hydro_domain) - """WrfHydroDomain: A copy of the WrfHydroDomain object used for the simulation""" + """WrfHydroDomain: A copy of the WrfHydroDomain object used for the setup""" # Create namelists self.hydro_namelist = \ @@ -360,64 +371,42 @@ def __init__(self, wrf_hydro_model: object, ['namelist_hrldas'] ['wrf_hydro_offline']) - def run(self, - simulation_dir: str, - num_cores: int = 2, - mode: str = 'r') -> object: - """Run the wrf_hydro simulation - Args: - simulation_dir: The path to the directory to use for run - num_cores: Optional, the number of cores to using default run_command - mode: Write mode, 'w' for overwrite if directory exists, and 'r' for fail if - directory exists - Returns: - A model run object - TODO: - Add option for custom run commands to deal with job schedulers - """ - #Make copy of simulation object to alter and return - simulation = copy.deepcopy(self) - run_object = WrfHydroRun(wrf_hydro_simulation=simulation, - simulation_dir=simulation_dir, - num_cores=num_cores, - mode=mode) - return run_object - class WrfHydroRun(object): - def __init__(self, - wrf_hydro_simulation: WrfHydroSim, - simulation_dir: str, - num_cores: int = 2, - mode: str = 'r' - ): - """Instantiate a WrfHydroRun object, including running the simulation + def __init__( + self, + wrf_hydro_setup: WrfHydroSetup, + run_dir: str, + rm_existing_run_dir = False, + job: Job=None + ): + """Instantiate a WrfHydroRun object. A run is a WrfHydroSetup with multiple jobs. Args: - wrf_hydro_simulation: A WrfHydroSim object to run - simulation_dir: The path to the directory to use for run - num_cores: Optional, the number of cores to using default run_command - mode: Write mode, 'w' for overwrite if directory exists, and 'r' for fail if - directory exists + wrf_hydro_setup: A setup object. + run_dir: str, where to execute the job. This is an attribute of the Run object. + job: Optional, Job object Returns: - A WrfHydroRun object - TODO: - Add option for custom run commands to deal with job schedulers + A WrfHydroRun object. """ + # TODO(JLM): Accept a list of Jobs in the job argument? # Initialize all attributes and methods - self.simulation = wrf_hydro_simulation - """WrfHydroSim: The WrfHydroSim object used for the run""" - self.num_cores = num_cores - """int: The number of cores used for the run""" - self.simulation_dir = pathlib.Path(simulation_dir) - """pathlib.Path: pathlib.Path to the directory used for the run""" - self.run_log = None - """CompletedProcess: The subprocess returned from the run call""" - self.run_status = None - """int: exit status of the run""" - self.diag = list() - """list: pathlib.Paths to diag files generated at run time""" + self.setup = wrf_hydro_setup + """WrfHydroSetup: The WrfHydroSetup object used for the run""" + + self.run_dir = pathlib.PosixPath(run_dir) + """pathlib.PosixPath: The location of where the jobs will be executed.""" + + self.jobs_completed = [] + """Job: A list of previously executed jobs for this run.""" + self.jobs_pending = [] + """Job: A list of jobs *scheduled* to be executed for this run + with prior job dependence.""" + self.job_active = None + """Job: The job currently executing.""" + + # TODO(JLM): these are properties of the run. self.channel_rt = list() """WrfHydroTs: Timeseries dataset of CHRTOUT files""" self.chanobs = list() @@ -432,176 +421,318 @@ def __init__(self, """list: List of RESTART WrfHydroStatic objects""" self.restart_nudging = list() """list: List of nudgingLastObs WrfHydroStatic objects""" + self.object_id = None """str: A unique id to join object to run directory.""" - - - # Make directory if it does not exists - if self.simulation_dir.is_dir() is False: - self.simulation_dir.mkdir(parents=True) - else: - if self.simulation_dir.is_dir() is True and mode == 'w': - shutil.rmtree(str(self.simulation_dir)) - self.simulation_dir.mkdir(parents=True) - elif self.simulation_dir.is_dir() is True and mode == 'r': - raise PermissionError('Run directory already exists and mode = r') - else: - warnings.warn('Existing run directory will be used for simulation') - - ### Check that compile object uid matches compile directory uid - ### This is to ensure that a new model has not been compiled into that directory unknowingly - with open(self.simulation.model.compile_dir.joinpath('.uid')) as f: + self._pickle_lock_file = None + """pathlib.PosixPath: The pickle lock file path.""" + + # Establish the values. + + # TODO(JLM): Check that the setup object is "complete". + # TODO(JLM): What constitutes a complete sim object? + # 1) compiler specified, 2) domain_config specified. + # 3) A compiled model? + + # TODO(JLM): If adding a job to an existing run, enforce that only + # start times and khour/kday and associated restart file + # times are different? Anything else that's flexible across + # jobs of a single run? + + + # Make run_dir directory if it does not exist. + if self.run_dir.is_dir() and not rm_existing_run_dir: + raise ValueError("Run directory already exists and rm_existing_run_dir is False.") + + if self.run_dir.exists(): + shutil.rmtree(str(self.run_dir)) + self.run_dir.mkdir(parents=True) + + # Check that compile object uid matches compile directory uid + # This is to ensure that a new model has not been compiled into that directory unknowingly + with open(self.setup.model.compile_dir.joinpath('.uid')) as f: compile_uid = f.read() - if self.simulation.model.object_id != compile_uid: + if self.setup.model.object_id != compile_uid: raise PermissionError('object id mismatch between WrfHydroModel object and' 'WrfHydroModel.compile_dir directory. Directory may have been' 'used for another compile') - ########################################################################### - # MAKE RUN DIRECTORIES - # Construct all file/dir paths - # Convert strings to pathlib.Path objects + # Build the inputs into the run_dir. + # Construct all file/dir paths. + # Convert strings to pathlib.Path objects. + + # TODO(JLM): Make symlinks the default option? Also allow copying? + # Loop to make symlinks for each TBL file - for from_file in self.simulation.model.table_files: + for from_file in self.setup.model.table_files: # Create file paths to symlink - to_file = self.simulation_dir.joinpath(from_file.name) + to_file = self.run_dir.joinpath(from_file.name) # Create symlinks to_file.symlink_to(from_file) # Symlink in exe - wrf_hydro_exe = self.simulation.model.wrf_hydro_exe - self.simulation_dir.joinpath(wrf_hydro_exe.name).symlink_to(wrf_hydro_exe) + wrf_hydro_exe = self.setup.model.wrf_hydro_exe + self.run_dir.joinpath(wrf_hydro_exe.name).symlink_to(wrf_hydro_exe) # Symlink in forcing - forcing_dir = self.simulation.domain.forcing_dir - self.simulation_dir.joinpath(forcing_dir.name). \ + forcing_dir = self.setup.domain.forcing_dir + self.run_dir.joinpath(forcing_dir.name). \ symlink_to(forcing_dir, target_is_directory=True) # create DOMAIN directory and symlink in files # Symlink in hydro_files - for file_path in self.simulation.domain.hydro_files: + for file_path in self.setup.domain.hydro_files: # Get new file path for run directory, relative to the top-level domain directory # This is needed to ensure the path matches the domain namelist - relative_path = file_path.relative_to(self.simulation.domain.domain_top_dir) - symlink_path = self.simulation_dir.joinpath(relative_path) + relative_path = file_path.relative_to(self.setup.domain.domain_top_dir) + symlink_path = self.run_dir.joinpath(relative_path) if symlink_path.parent.is_dir() is False: symlink_path.parent.mkdir(parents=True) symlink_path.symlink_to(file_path) # Symlink in nudging files - for file_path in self.simulation.domain.nudging_files: + for file_path in self.setup.domain.nudging_files: # Get new file path for run directory, relative to the top-level domain directory # This is needed to ensure the path matches the domain namelist - relative_path = file_path.relative_to(self.simulation.domain.domain_top_dir) - symlink_path = self.simulation_dir.joinpath(relative_path) + relative_path = file_path.relative_to(self.setup.domain.domain_top_dir) + symlink_path = self.run_dir.joinpath(relative_path) if symlink_path.parent.is_dir() is False: symlink_path.parent.mkdir(parents=True) symlink_path.symlink_to(file_path) # Symlink in lsm files - for file_path in self.simulation.domain.lsm_files: + for file_path in self.setup.domain.lsm_files: # Get new file path for run directory, relative to the top-level domain directory # This is needed to ensure the path matches the domain namelist - relative_path = file_path.relative_to(self.simulation.domain.domain_top_dir) - symlink_path = self.simulation_dir.joinpath(relative_path) + relative_path = file_path.relative_to(self.setup.domain.domain_top_dir) + symlink_path = self.run_dir.joinpath(relative_path) if symlink_path.parent.is_dir() is False: symlink_path.parent.mkdir(parents=True) symlink_path.symlink_to(file_path) - # write hydro.namelist - f90nml.write(self.simulation.hydro_namelist, - self.simulation_dir.joinpath('hydro.namelist')) - # write namelist.hrldas - f90nml.write(self.simulation.namelist_hrldas, - self.simulation_dir.joinpath('namelist.hrldas')) - - # Run the model - self.run_log = subprocess.run(['mpiexec', '-np', str(num_cores), './wrf_hydro.exe'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - cwd=self.simulation_dir) - - try: - self.run_status = 1 - # String match diag files for successfull run - with open(self.simulation_dir.joinpath('diag_hydro.00000')) as f: - diag_file = f.read() - if 'The model finished successfully.......' in diag_file: - self.run_status = 0 - except Exception as e: - warnings.warn('Could not parse diag files') - print(e) - - if self.run_status == 0: - - ##################### - # Grab outputs as WrfHydroXX classes of file paths - - ## Get diag files - self.diag = list(self.simulation_dir.glob('diag_hydro.*')) - - ## Get channel files - if len(list(self.simulation_dir.glob('*CHRTOUT*'))) > 0: - self.channel_rt = WrfHydroTs(list(self.simulation_dir.glob('*CHRTOUT*'))) - - if len(list(self.simulation_dir.glob('*CHANOBS*'))) > 0: - self.chanobs = WrfHydroTs(list(self.simulation_dir.glob('*CHANOBS*'))) - - #Get Lakeout files - if len(list(self.simulation_dir.glob('*LAKEOUT*'))) > 0: - self.lakeout = WrfHydroTs(list(self.simulation_dir.glob('*LAKEOUT*'))) - - #Get gwout files - if len(list(self.simulation_dir.glob('*GWOUT*'))) > 0: - self.gwout = WrfHydroTs(list(self.simulation_dir.glob('*GWOUT*'))) - - ## Get restart files and sort by modified time - ### Hydro restarts - for file in self.simulation_dir.glob('HYDRO_RST*'): - file = WrfHydroStatic(file) - self.restart_hydro.append(file) - - if len(self.restart_hydro) > 0: - self.restart_hydro = sorted(self.restart_hydro, - key=lambda file: file.stat().st_mtime_ns) - - ### LSM Restarts - for file in self.simulation_dir.glob('RESTART*'): - file = WrfHydroStatic(file) - self.restart_lsm.append(file) - - if len(self.restart_lsm) > 0: - self.restart_lsm = sorted(self.restart_lsm, - key=lambda file: file.stat().st_mtime_ns) + # Restart files are symlinked in to the run dir at run init. + model_files = [*self.setup.domain.hydro_files, + *self.setup.domain.nudging_files, + *self.setup.domain.lsm_files] + for ff in model_files: + if re.match('.*/RESTART/.*',str(ff)): + symlink_path = self.run_dir.joinpath(os.path.basename(ff)) + symlink_path.symlink_to(ff) + + # The jobs now add the namelists at run time. + if job: + self.add_jobs(job) + + + def add_jobs( + self, + jobs: list + ): + """Dispatch a run the wrf_hydro setup: either run() or schedule_run() + If a scheduler is passed, then that run is scheduled. + Args: + As for run and schedule_run(). + Returns: + A WrfHydroRun object + """ + # Dont tamper with the passed object, let it remain a template in the calling level. + jobs = copy.deepcopy(jobs) - ### Nudging restarts - for file in self.simulation_dir.glob('nudgingLastObs*'): - file = WrfHydroStatic(file) - self.restart_nudging.append(file) + if type(jobs) is not list: + jobs = [jobs] - if len(self.restart_nudging) > 0: - self.restart_nudging = sorted(self.restart_nudging, - key=lambda file: file.stat().st_mtime_ns) + for jj in jobs: + # Attempt to add the job + if jj.scheduler: - ##################### + # A scheduled job can be appended to the jobs.pending list if + # 1) there are no active or pending jobs + # 2) if it is (made) dependent on the last active or pending job. - # create a UID for the simulation and save in file - self.object_id = str(uuid.uuid4()) - with open(self.simulation_dir.joinpath('.uid'), 'w') as f: - f.write(self.object_id) + # Get the job id of the last active or pending job. + last_job_id = None + if self.job_active: + last_job_id = self.job_active.sched_job_id + if len(self.jobs_pending): + last_job_id = self.jobs_pending[-1].scheduler.sched_job_id - # Save object to simulation directory - # Save the object out to the compile directory - with open(self.simulation_dir.joinpath('WrfHydroRun.pkl'), 'wb') as f: - pickle.dump(self, f, 2) + # Check the dependency on a previous job + if last_job_id is not None: + if jj.scheduler.afterok is not None and jj.scheduler.afterok != last_job_id: + raise ValueError("The job's dependency/afterok conflicts with reality.") + jj.scheduler.afterok = last_job_id + else: + if jj.scheduler.afterok is not None: + raise ValueError("The job's dependency/afterok conflicts with reality.") + + # Set submission-time job variables here. + jj.user = get_user() + job_submission_time = datetime.datetime.now() + jj.job_submission_time = str(job_submission_time) + jj.job_date_id = '{date:%Y-%m-%d-%H-%M-%S-%f}'.format(date=job_submission_time) + + jj.model_start_time, jj.model_end_time = solve_model_start_end_times( + jj.model_start_time, + jj.model_end_time, + self.setup + ) + + # Add a namelist to each job + jj.namelist_hrldas = copy.deepcopy(self.setup.namelist_hrldas) + jj.hydro_namelist = copy.deepcopy(self.setup.hydro_namelist) + + # Satisfying the model start/end times and restart options + jj.apply_model_start_end_job_namelists() + + # Check the the resulting namelists are + + # in the job object? + # Determine a different job_name? + # Tag the namelists with the job name and symlink? When is the namelist written? + + # TODO(JLM): + # Edit the namelists with model start/end times and if restarting. + # Stash the namelists in the job. + # This begs for consistency check across jobs: start previous job = end current job + + self.jobs_pending.append(jj) + + + def run_jobs(self): + + # Make sure there are no active jobs? + # make sure all jobs are either scheduled or interactive? + + if self.jobs_pending[0].scheduler: + + # submit the jobs_pending. + lock_pickle(self) + job_afterok = None + hold = True + + for jj in self.jobs_pending: + + jj.scheduler.afterok = job_afterok + # TODO(JLM): why not make hold an attribute? + jj.schedule(self.run_dir, hold=hold) + job_afterok = jj.scheduler.sched_job_id + hold = False + + self.pickle() + unlock_pickle(self) + self.jobs_pending[0].release() + self.destruct() - print('Model run succeeded') else: - warnings.warn('Model run failed') + + for jj in range(0, len(self.jobs_pending)): + + self.job_active = self.jobs_pending.pop(0) + self.job_active.run(self.run_dir) + self.collect_output() + self.jobs_completed.append(self.job_active) + self.job_active = None + self.pickle() + + + def collect_output(self): + + if self.job_active.exit_status != 0: + warnings.warn('Model run failed.') + return(None) + + print('Model run succeeded.\n') + ##################### + # Grab outputs as WrfHydroXX classes of file paths + + # Get channel files + if len(list(self.run_dir.glob('*CHRTOUT*'))) > 0: + self.channel_rt = WrfHydroTs(list(self.run_dir.glob('*CHRTOUT*'))) + # Make relative to run dir + # for file in self.channel_rt: + # file.relative_to(file.parent) + + if len(list(self.run_dir.glob('*CHANOBS*'))) > 0: + self.chanobs = WrfHydroTs(list(self.run_dir.glob('*CHANOBS*'))) + # Make relative to run dir + # for file in self.chanobs: + # file.relative_to(file.parent) + + #Get Lakeout files + if len(list(self.run_dir.glob('*LAKEOUT*'))) > 0: + self.lakeout = WrfHydroTs(list(self.run_dir.glob('*LAKEOUT*'))) + + #Get gwout files + if len(list(self.run_dir.glob('*GWOUT*'))) > 0: + self.gwout = WrfHydroTs(list(self.run_dir.glob('*GWOUT*'))) + + # Get restart files and sort by modified time + # Hydro restarts + self.restart_hydro = [] + for file in self.run_dir.glob('HYDRO_RST*'): + file = WrfHydroStatic(file) + self.restart_hydro.append(file) + + if len(self.restart_hydro) > 0: + self.restart_hydro = sorted(self.restart_hydro, + key=lambda file: file.stat().st_mtime_ns) + else: + self.restart_hydro = None + + ### LSM Restarts + self.restart_lsm = [] + for file in self.run_dir.glob('RESTART*'): + file = WrfHydroStatic(file) + self.restart_lsm.append(file) + + if len(self.restart_lsm) > 0: + self.restart_lsm = sorted(self.restart_lsm, + key=lambda file: file.stat().st_mtime_ns) + else: + self.restart_lsm = None + + ### Nudging restarts + self.restart_nudging = [] + for file in self.run_dir.glob('nudgingLastObs*'): + file = WrfHydroStatic(file) + self.restart_nudging.append(file) + + if len(self.restart_nudging) > 0: + self.restart_nudging = sorted(self.restart_nudging, + key=lambda file: file.stat().st_mtime_ns) + else: + self.restart_nudging = None + + self.pickle() + + + def pickle(self): + # create a UID for the run and save in file + self.object_id = str(uuid.uuid4()) + with open(self.run_dir.joinpath('.uid'), 'w') as f: + f.write(self.object_id) + + # Save object to run directory + # Save the object out to the compile directory + with open(self.run_dir.joinpath('WrfHydroRun.pkl'), 'wb') as f: + pickle.dump(self, f, 2) + + + def unpickle(self): + # Load run object from run directory after a scheduler job + with open(self.run_dir.joinpath('WrfHydroRun.pkl'), 'rb') as f: + return(pickle.load(f)) + + + def destruct(self): + # This gets rid of everything but the methods. + print("Jobs have been submitted to the scheduler: This run object will now self destruct.") + self.__dict__ = {} + def make_relative(self,basepath = None): """Make all file paths relative to a given directory, useful for opening file @@ -615,9 +746,10 @@ def make_relative(self,basepath = None): """ __make_relative__(run_object=self,basepath=basepath) + class DomainDirectory(object): """An object that represents a WRF-Hydro domain directory. Primarily used as a utility class - for WrfHydroDomain""" + for WrfHydroDomain""" def __init__(self, domain_top_dir: str, domain_config: str, @@ -752,4 +884,4 @@ def __init__(self, self.diff_counts.update({'nudging':diff_counts}) else: warnings.warn('length of candidate_sim.restart_nudging or ' - 'reference_sim.restart_nudging is 0') \ No newline at end of file + 'reference_sim.restart_nudging is 0')