Skip to content

Commit

Permalink
Add tests and documentation for date-based restart pruning
Browse files Browse the repository at this point in the history
- remove logic for Y,M frequency offsets as YS/MS may be sufficient in this case
- rewrite/parametrize calendar tests
- add tests for mom-driver parsing restart files
- add integration tests for prune_restarts and access-om2 model driver
- add documentation on date-based restart freq and restart_history
  • Loading branch information
jo-basevi authored and Jo Basevi committed Sep 27, 2023
1 parent e3b2281 commit 27fa74d
Show file tree
Hide file tree
Showing 9 changed files with 606 additions and 255 deletions.
24 changes: 17 additions & 7 deletions docs/source/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,23 @@ configuration.
Specifies the rate of saved restart files. For the default rate of 5, we
keep the restart files for every fifth run (``restart004``, ``restart009``,
``restart014``, etc.).

Intermediate restarts are not deleted until a permanently archived restart
has been produced. For example, if we have just completed run ``11``, then
we keep ``restart004``, ``restart009``, ``restart010``, and ``restart011``.
Restarts 10 through 13 are not deleted until ``restart014`` has been saved.

``restart_freq: 1`` saves all restart files.
Using ``restart_freq: 1`` will save all restart files.

For both integer and date-based restart frequency, the first restart and,
by default, the 5 latest restarts are saved.

To use a date-based restart frequency, specify a number with a time unit.
The supported time units are ``YS`` - year-start, ``MS`` - month-start,
``W`` - week, ``D`` - day, ``H`` - hour, ``T`` - minute and ``S`` - second.
For example, ``restart_freq: 10YS`` would save earliest restart of the year,
10 years from the last permanently saved restart's datetime.

Please note that currently, only ACCESS-OM2 and MOM models support
date-based restart frequency, as it depends the payu model driver being
able to parse restarts files for a datetime.

``restart_history`` (*Default:* ``5``)
Specifies the number of latest restart files to save

*The following model-based tags are typically not configured*

Expand Down
171 changes: 86 additions & 85 deletions payu/calendar.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ def int_to_date(date):


def date_to_int(date):

return (date.year * 10**4 + date.month * 10**2 + date.day)
return date.year * 10**4 + date.month * 10**2 + date.day


def runtime_from_date(start_date, years, months, days, seconds, caltype):
Expand All @@ -31,8 +30,9 @@ def runtime_from_date(start_date, years, months, days, seconds, caltype):
Ignores Feb 29 for caltype == NOLEAP.
"""

end_date = start_date + relativedelta(years=years, months=months,
days=days)
end_date = start_date + relativedelta(
years=years, months=months, days=days
)
runtime = end_date - start_date

if caltype == NOLEAP:
Expand Down Expand Up @@ -70,7 +70,6 @@ def get_leapdays(init_date, final_date):
leap_days = 0

while curr_date != final_date:

if curr_date.month == 2 and curr_date.day == 29:
leap_days += 1

Expand All @@ -91,107 +90,109 @@ def calculate_leapdays(init_date, final_date):
return datetime.timedelta(days=leap_days)


def add_year_offset_to_datetime(initial_dt, n):
"""Return a datetime n years from the initial datetime"""
if isinstance(initial_dt, datetime.datetime): # Standard datetime Calendar
return initial_dt + relativedelta(years=n)

if isinstance(initial_dt, cftime.datetime): # Non-standard Calendars
return initial_dt.replace(year=initial_dt.year + n)


def add_year_start_offset_to_datetime(initial_dt, n):
"""Return a datetime at the start of the year - n years from the initial datetime"""
if isinstance(initial_dt, datetime.datetime):
return initial_dt + relativedelta(years=n, month=1, day=1, hour=0, minute=0, second=0)

if isinstance(initial_dt, cftime.datetime):
return initial_dt.replace(year=initial_dt.year + n, month=1, day=1, hour=0, minute=0, second=0)
"""Return a cftime datetime at the start of the year, that is n years
from the initial datetime"""
return cftime.datetime(
year=initial_dt.year + n,
month=1,
day=1,
hour=0,
minute=0,
second=0,
calendar=initial_dt.calendar,
)


def add_month_start_offset_to_datetime(initial_dt, n):
"""Return a datetime of the start of the month - n months from the initial datetime"""
if isinstance(initial_dt, datetime.datetime):
return initial_dt + relativedelta(months=n, day=1, hour=0, minute=0, second=0)

if isinstance(initial_dt, cftime.datetime):
year = initial_dt.year + ((initial_dt.month + n - 1) // 12)
month = (initial_dt.month + n - 1) % 12 + 1

return initial_dt.replace(year=year, month=month, day=1, hour=0, minute=0, second=0)


def add_month_offset_to_datetime(initial_dt, n):
"""Return a datetime n months from the initial datetime"""
if isinstance(initial_dt, datetime.datetime):
return initial_dt + relativedelta(months=n)

if isinstance(initial_dt, cftime.datetime):
year = initial_dt.year + ((initial_dt.month + n - 1) // 12)
month = (initial_dt.month + n - 1) % 12 + 1
day = initial_dt.day

max_day_in_month = {1: 31, 2: 28, 3: 31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31}
if initial_dt.calendar == "noleap":
day = initial_dt.day if initial_dt.day <= max_day_in_month[month] else max_day_in_month[month]

if initial_dt.calendar == "all_leap":
max_day_in_month[2] = 29 # Every year is a leap year
day = initial_dt.day if initial_dt.day <= max_day_in_month[month] else max_day_in_month[month]

return initial_dt.replace(year=year, month=month, day=day)

"""Return a cftime datetime of the start of the month, that is n months
from the initial datetime"""
years_to_add = (initial_dt.month + n - 1) // 12
months_to_add = n - years_to_add * 12

return cftime.datetime(
year=initial_dt.year + years_to_add,
month=initial_dt.month + months_to_add,
day=1,
hour=0,
minute=0,
second=0,
calendar=initial_dt.calendar,
)


def add_timedelta_fn(timedelta):
"""Returns a function that adds a timedelta - n times to an initial datetime"""
# Standard and cftime datetimes supports timedelta operations
"""Returns a function that takes initial datetime and multiplier n,
and returns a datetime that is n * offset from the initial datetime"""
# cftime datetimes supports timedelta operations
return lambda initial_dt, n: initial_dt + n * timedelta


class DatetimeOffset:

"""A utility class for adding various time offsets to cftime datetimes.
Parameters:
unit (str): The unit of the time offset. Supported units are:
- "YS" for years (start of the year)
- "MS" for months (start of the month)
- "W" for weeks
- "D" for days
- "H" for hours
- "T" for minutes
- "S" for seconds
magnitude (int): The magnitude of the time offset.
Methods:
- `add_to_datetime(initial_dt: cftime.datetime) -> cftime.datetime`:
Adds the specified time offset to the given cftime datetime and
returns the resulting datetime.
Attributes:
- unit (str): The unit of the time offset.
- magnitude (int): The magnitude of the time offset.
"""

def __init__(self, unit, magnitude):
# Dictionary of 'offset units' to functions which takes an initial_dt (Standard or cftime datetime)
# and n (multiplier of the offset unit), and returns the next datetime with the offset added
supported_datetime_offsets = {
'Y': add_year_offset_to_datetime,
'YS': add_year_start_offset_to_datetime,
'M': add_month_offset_to_datetime,
'MS': add_month_start_offset_to_datetime,
'W': add_timedelta_fn(datetime.timedelta(weeks=1)),
'D': add_timedelta_fn(datetime.timedelta(days=1)),
'H': add_timedelta_fn(datetime.timedelta(hours=1)),
'T': add_timedelta_fn(datetime.timedelta(minutes=1)),
'S': add_timedelta_fn(datetime.timedelta(seconds=1))
"YS": add_year_start_offset_to_datetime,
"MS": add_month_start_offset_to_datetime,
"W": add_timedelta_fn(datetime.timedelta(weeks=1)),
"D": add_timedelta_fn(datetime.timedelta(days=1)),
"H": add_timedelta_fn(datetime.timedelta(hours=1)),
"T": add_timedelta_fn(datetime.timedelta(minutes=1)),
"S": add_timedelta_fn(datetime.timedelta(seconds=1)),
}
assert unit in supported_datetime_offsets, f"payu: error: unsupported datetime offset: {unit}"
if unit not in supported_datetime_offsets:
raise ValueError(
f"Unsupported datetime offset: {unit}. "
"Supported offsets: YS, MS, W, D, H, T, S"
)
self.unit = unit
self.magnitude = magnitude
self.add_offset_to_datetime = supported_datetime_offsets[unit]

self._add_offset_to_datetime = supported_datetime_offsets[unit]

def add_to_datetime(self, initial_dt):
"""Takes a datetime object (standard or cftime datetime),
and returns a datetime with the offset added if possible, returns None otherwise"""
"""Takes an initial cftime datetime,
and returns a datetime with the offset added"""

if self.unit in ['M', 'Y'] and isinstance(initial_dt, cftime.datetime):
if initial_dt.datetime_compatible:
# Transform cftime datetime to standard datetime
initial_dt = datetime.datetime(initial_dt.year, initial_dt.month, initial_dt.day,
initial_dt.hour, initial_dt.minute, initial_dt.second)
elif initial_dt.calendar not in ["360_day", "noleap", "all_leap"]:
raise ValueError(f"Calendar type {initial_dt.calendar} is unsupported for given date offset {self.unit}")

if not (isinstance(initial_dt, cftime.datetime) or isinstance(initial_dt, datetime.datetime)):
raise TypeError(f"Invalid initial datetime type: {type(initial_dt)}. Expected types: cftime.datetime or datetime.datetime")
if not (isinstance(initial_dt, cftime.datetime)):
raise TypeError(
f"Invalid initial datetime type: {type(initial_dt)}. "
"Expected type: cftime.datetime"
)

return self.add_offset_to_datetime(initial_dt=initial_dt, n=self.magnitude)
return self._add_offset_to_datetime(
initial_dt=initial_dt, n=self.magnitude
)


def parse_date_offset(offset):
"""Parse a given string date offset string, and returns an DatetimeOffset"""
match = re.search('[0-9]+', offset)
assert match is not None, f"payu: error: no numerical value given for offset: {offset}"
"""Parse a given string date offset string and return an DatetimeOffset"""
match = re.search("[0-9]+", offset)
if match is None:
raise ValueError(
f"No numerical value given for offset: {offset}"
)
n = match.group()
unit = offset.lstrip(n)
return DatetimeOffset(unit=unit, magnitude=int(n))
return DatetimeOffset(unit=unit, magnitude=int(n))
61 changes: 35 additions & 26 deletions payu/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

# Local
from payu import envmod
from payu.fsops import mkdir_p, make_symlink, read_config, movetree, required_libs
from payu.fsops import mkdir_p, make_symlink, read_config, movetree
from payu.schedulers.pbs import get_job_info, pbs_env_init, get_job_id
from payu.models import index as model_index
import payu.profilers
Expand Down Expand Up @@ -345,7 +345,7 @@ def set_output_paths(self):
user_restart_dir = self.config.get('restart')
if (self.counter == 0 or self.repeat_run) and user_restart_dir:
# TODO: Some user friendliness needed...
assert(os.path.isdir(user_restart_dir))
assert (os.path.isdir(user_restart_dir))
self.prior_restart_path = user_restart_dir
else:
prior_restart_dir = 'restart{0:03}'.format(self.counter - 1)
Expand Down Expand Up @@ -450,10 +450,10 @@ def run(self, *user_flags):

# XXX: This was previously done in reversion
envmod.setup()

# Add any user-defined module dir(s) to MODULEPATH
for module_dir in self.config.get('modules', {}).get('use', []):
envmod.module('use', module_dir)
envmod.module('use', module_dir)

self.load_modules()

Expand Down Expand Up @@ -488,7 +488,7 @@ def run(self, *user_flags):
mpi_flags = self.config.get('mpirun', [])
# TODO: Legacy config removal warning

if type(mpi_flags) != list:
if not isinstance(mpi_flags, list):
mpi_flags = [mpi_flags]

# TODO: More uniform support needed here
Expand Down Expand Up @@ -747,7 +747,8 @@ def archive(self):
restarts_to_prune = self.prune_restarts()
for restart_path in restarts_to_prune:
# Only delete real directories; ignore symbolic restart links
if (os.path.isdir(restart_path) and not os.path.islink(restart_path)):
if (os.path.isdir(restart_path) and
not os.path.islink(restart_path)):
shutil.rmtree(restart_path)

# Ensure dynamic library support for subsequent python calls
Expand Down Expand Up @@ -980,30 +981,32 @@ def sweep(self, hard_sweep=False):
if os.path.islink(self.work_sym_path):
print('Removing symlink {0}'.format(self.work_sym_path))
os.remove(self.work_sym_path)

def prune_restarts(self, from_n_restart=0, to_n_restart=None):
"""Returns a list of restart directories that can be pruned"""
restart_freq = self.config.get('restart_freq', 5)
restart_history = self.config.get('restart_history', default_restart_history)
restart_freq = self.config.get('restart_freq', default_restart_freq)
restart_history = self.config.get('restart_history',
default_restart_history)

# All restarts directories
restarts = [d for d in os.listdir(self.archive_path)
if d.startswith('restart')]
if d.startswith('restart')]
# Sort restarts based on counter - in increasing date order
restarts.sort(key=lambda d: int(d.lstrip('restart')))

# Note: from_n_restart and to_end_restart could be useful for inspecting only the more recent restarts
if to_n_restart is None:
# Keep restart_history n restarts
# Keep restart_history n restarts
to_n_restart = -restart_history
restarts = restarts[from_n_restart:to_n_restart]
restarts = restarts[from_n_restart:to_n_restart]

restarts_to_prune = []
if self.repeat_run:
# TODO: Previous logic was to prune all restarts if self.repeat_run - is that still the case?
restarts_to_prune = [os.path.join(self.archive_path, restart) for restart in restarts]
# TODO: Previous logic was to prune all restarts if
# self.repeat_run - is that still the case?
restarts_to_prune = [os.path.join(self.archive_path, restart)
for restart in restarts]
elif isinstance(restart_freq, int):
# Using integer frequency to prune restarts
# Using integer frequency to prune restarts
for restart in restarts:
restart_idx = int(restart.lstrip('restart'))
if not restart_idx % restart_freq == 0:
Expand All @@ -1014,21 +1017,27 @@ def prune_restarts(self, from_n_restart=0, to_n_restart=None):
try:
date_offset = parse_date_offset(restart_freq)

next_datetime = None
next_dt = None
for restart in restarts:
restart_path = os.path.join(self.archive_path, restart)

# Use model-driver to parse restart files for a datetime
restart_datetime = self.model.get_restart_datetime(restart_path)
if next_datetime is not None and restart_datetime < next_datetime:
restart_dt = self.model.get_restart_datetime(restart_path)

if (next_dt is not None and restart_dt < next_dt):
restarts_to_prune.append(restart_path)
else:
# Keep the earliest datetime and use last kept datetime as point of reference when adding the next time interval
next_datetime = date_offset.add_to_datetime(restart_datetime)
except Exception as e:
print("payu: error occured during date-based restart pruning:", e)

# Keep the earliest datetime and use last kept datetime
# as point of reference when adding the next time
# interval
next_dt = date_offset.add_to_datetime(restart_dt)

except Exception as error:
print(
"payu: error occured during date-based restart pruning:",
error
)

return restarts_to_prune


Expand Down
8 changes: 6 additions & 2 deletions payu/models/accessom2.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,13 @@ def collate(self):
pass

def get_restart_datetime(self, restart_path):
"""Given a restart path, parse the restart files and
return a cftime datetime (for date-based restart pruning)"""
for model in self.expt.models:
if model.model_type == 'mom':
mom_restart_path = os.path.join(restart_path, model.name)
return model.get_restart_datetime(mom_restart_path)

raise NotImplementedError('access-om2 date-based restart pruning currently only uses the mom sub-model to find restart dates')

raise NotImplementedError(
'Cannot find mom sub-model: access-om2 date-based restart pruning '
'requires the mom sub-model to determine restart dates')

0 comments on commit 27fa74d

Please sign in to comment.