Skip to content

Commit

Permalink
Start working on rrfs workflow with yaml.
Browse files Browse the repository at this point in the history
  • Loading branch information
danielabdi-noaa committed Mar 28, 2023
1 parent dd4749a commit a6bde34
Show file tree
Hide file tree
Showing 2 changed files with 278 additions and 0 deletions.
114 changes: 114 additions & 0 deletions parm/wflow/default_workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#
# The defaults that cover a majority of our workflow test experiments.
#
rocoto:
entities:
ACCOUNT: '{{ user.ACCOUNT }}'
SERVICE_ACCOUNT: '{{ user.SERVICE_ACCOUNT }}'
# obs dirs
CCPA_OBS_DIR: '{{ platform.CCPA_OBS_DIR }}'
MRMS_OBS_DIR: '{{ platform.MRMS_OBS_DIR }}'
NDAS_OBS_DIR: '{{ platform.NDAS_OBS_DIR }}'
# comin dirs
COMIN_DIR: '{% if user.RUN_ENVIR == "nco" %}{{ nco.COMIN_BASEDIR }}/{{ nco.RUN }}.@Y@m@d/@H {% else %}{{ nco.COMIN_BASEDIR }}/@Y@m@d@H{% endif %}'
COMINgfs: '{{ platform.get("COMINgfs") }}'
# others
GLOBAL_VAR_DEFNS_FP: '{{ workflow.GLOBAL_VAR_DEFNS_FP }}'
LOAD_MODULES_RUN_TASK_FP: '{{ workflow.LOAD_MODULES_RUN_TASK_FP }}'
LOGDIR: '{% if user.RUN_ENVIR == "nco" %}{{"{}/@Y@m@d".format(nco.LOGBASEDIR)}}{% else %}{{nco.LOGBASEDIR }}{% endif %}'
LOGEXT: '{% if user.RUN_ENVIR == "nco" %}{{".{}.log".format(workflow.WORKFLOW_ID)}}{% else %}{{".log"}}{% endif %}'
NET: '{{ nco.NET }}'
RUN: '{{ nco.RUN }}'
SLASH_ENSMEM_SUBDIR: '{% if global.DO_ENSEMBLE %}{{ "/mem#mem#" }}{% else %}{{ "/" }}{% endif %}'
FCST_DIR: '{{ nco.DATAROOT }}/run_fcst_prod.{{ workflow.WORKFLOW_ID }}_@Y@m@d@H'
WARMSTART_CYCLE_DIR: '{{ workflow.WARMSTART_CYCLE_DIR }}'
# reservations
NCORES_PER_NODE: '{{ platform.NCORES_PER_NODE }}'
PARTITION_DEFAULT: '{{ platform.get("PARTITION_DEFAULT") }}'
PARTITION_FCST: '{{ platform.get("PARTITION_FCST") }}'
PARTITION_HPSS: '{{ platform.get("PARTITION_HPSS") }}'
QUEUE_DEFAULT: '{{ platform.get("QUEUE_DEFAULT") }}'
QUEUE_FCST: '{{ platform.get("QUEUE_FCST") }}'
QUEUE_HPSS: '{{ platform.get("QUEUE_HPSS") }}'
QUEUE_POST: '{{ platform.get("QUEUE_POST") }}'
QUEUE_PRDGEN: '{{ platform.get("QUEUE_PRDGEN") }}'
QUEUE_ANALYSIS: '{{ platform.get("QUEUE_ANALYSIS") }}'
QUEUE_GRAPHICS: '{{ platform.get("QUEUE_GRAPHICS") }}'
RRFS_RESERVE: '{% if platform.get("reservation") %}"--reservation={{ platform.get("reservation") }}"{% else %}""{% endif %}'
RRFS_POST_RESERVE: '{% if platform.get("reservation_post") %}"--reservation={{ platform.get("reservation_post") }}"{% else %}""{% endif %}'
# directories
USHdir: '{{ user.USHdir }}'
SCRIPTSdir: '{{ user.SCRIPTSdir }}'
JOBSDIR: '{{ user.JOBSdir }}'
# ids
WORKFLOW_ID: '{{ workflow.WORKFLOW_ID }}'
TAG: '{% if workflow.TAG %}{{ workflow.TAG }}{% else %}""{% endif %}'
# rrfs
OBSPATH: '{% platform.get("OBSPATH") %}'
NWGES_BASEDIR: '{% nco.get("NWEGES_BASEDIR") %}'
ENSCTRL_COMIN_BASEDIR: '{% nco.get("ENSCTRL_COMIN_BASEDIR") %}'
ENSCTRL_NWGES_BASEDIR: '{% nco.get("ENSCTRL_NWGES_BASEDIR") %}'
ENSCTRL_COMOUT_BASEDIR: '{% nco.get("ENSCTRL_COMOUT_BASEDIR") %}'
ENSCTRL_COMOUT_DIR: '{% nco.get("ENSCTRL_COMOUT_DIR") %}'
RRFSE_FG_ROOT: '{% nco.get("RRFSE_NWGES_BASEDIR") %}'
FG_ROOT: '{% nco.get("NWGES_BASEDIR") %}'
{% if workflow.get("DO_REAL_TIME") %}
WALL_LIMIT_PRE: '<deadline><cyclestr offset=\"16:00:00\">@Y@m@d@H@M</cyclestr></deadline>'
WALL_LIMIT_ANAL: '<deadline><cyclestr offset=\"16:00:00\">@Y@m@d@H@M</cyclestr></deadline>'
WALL_LIMIT_FCST: '<deadline><cyclestr offset=\"23:30:00\">@Y@m@d@H@M</cyclestr></deadline>'
WALL_LIMIT_POST: '<deadline><cyclestr offset=\"24:00:00\">@Y@m@d@H@M</cyclestr></deadline>'
WALL_LIMIT_GRAPHICS: '<deadline><cyclestr offset=\"24:00:00\">@Y@m@d@H@M</cyclestr></deadline>'
WALL_LIMIT_RECENTER: '<deadline><cyclestr offset=\"24:00:00\">@Y@m@d@H@M</cyclestr></deadline>'
WALL_LIMIT_SAVE_RECENTER: '<deadline><cyclestr offset=\"24:00:00\">@Y@m@d@H@M</cyclestr></deadline>'
{% else %}
WALL_LIMIT_PRE: ''
WALL_LIMIT_ANAL: ''
WALL_LIMIT_FCST: ''
WALL_LIMIT_POST: ''
WALL_LIMIT_GRAPHICS: ''
WALL_LIMIT_RECENTER: ''
WALL_LIMIT_SAVE_RECENTER: ''
{% endif %}
START_TIME_SPINUP: '{{ start_time.get("START_TIME_SPINUP") }}'
START_TIME_PROD: '{{ start_time.get("START_TIME_PROD") }}'
START_TIME_CONVENTIONAL_SPINUP: '{{ start_time.get("START_TIME_CONVENTIONAL_SPINUP") }}'
START_TIME_LATE_ANALYSIS: '{{ start_time.get("START_TIME_LATE_ANALYSIS") }}'
START_TIME_CONVENTIONAL: '{{ start_time.get("START_TIME_CONVENTIONAL") }}'
START_TIME_NSSLMOSIAC: '{{ start_time.get("START_TIME_NSSLMOSIAC") }}'
START_TIME_LIGHTNINGNC: '{{ start_time.get("START_TIME_LIGHTNINGNC") }}'
START_TIME_PROCSMOKE: '{{ start_time.get("START_TIME_PROCSMOKE") }}'
attrs:
cyclethrottle: "20"
realtime: "F"
scheduler: '{{ platform.SCHED }}'
cycledefs:
{% if not rrfs.get("DO_RRFS_DEV") %}
at_start: &DEFCYC
- !startstopfreq ['{{workflow.DATE_FIRST_CYCL}}', '{{workflow.DATE_FIRST_CYCL}}', '{{workflow.INCR_CYCL_FREQ}}']
forecast: *DEFCYC
initial: *DEFCYC
boundary: *DEFCYC
boundary_long: *DEFCYC
spinupcyc: *DEFCYC
prodcyc: *DEFCYC
prodcyc_long: *DEFCYC
saveinputcyc: *DEFCYC
recentercyc: *DEFCYC
archivecyc: *DEFCYC
{% else %}
at_start: '{{ cycledefs.get("AT_START_CYCLEDEF") }}'
initial: '{{ cycledefs.get("INITIAL_CYCLEDEF") }}'
boundary: '{{ cycledefs.get("BOUNDARY_CYCLEDEF") }}'
boundary_long: '{{ cycledefs.get("BOUNDARY_LONG_CYCLEDEF") }}'
spinupcyc: '{{ cycledefs.get("SPINUPCYC_CYCLEDEF") }}'
prodcyc: '{{ cycledefs.get("PRODCYC_CYCLEDEF") }}'
prodcyc_long: '{{ cycledefs.get("PRODCYC_LONG_CYCLEDEF") }}'
{% if rrfs.get("DO_SAVE_INPUT") %}
saveinputcyc: '{{ cycledefs.get("SAVEINPUT_CYCLEDEF") }}'
{% endif %}
recentercyc: '{{ cycledefs.get("RECENTER_CYCLEDEF") }}'
archivecyc: '{{ cycledefs.get("ARCHIVE_CYCLEDEF") }}'
{% endif %}
log: !cycstr '&LOGDIR;/FV3LAM_wflow.{% if user.RUN_ENVIR == "nco" %}{{ workflow.WORKFLOW_ID + "." }}{% endif %}log'
tasks:
taskgroups: '{{ ["parm/wflow/prep.yaml", "parm/wflow/coldstart.yaml", "parm/wflow/post.yaml"]|include }}'
164 changes: 164 additions & 0 deletions parm/wflow/rrfs_data_prep.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
{% if not rrfs.get("DO_ENSFCST") %}
{% if rrfs.get("DO_RRFS_DEV") %}
{%- if rrfs.get("DO_SMOKE_DUST") or rrfs.get("DO_NONVAR_CLDANAL") or rrfs.get("DO_REFL2TTEN") or rrfs.get("DO_ENKF_RADAR_REF") or rrfs.get("DO_ENVAR_RADAR_REF") %}
#################################################################
# metatask for data preprocessing tasks
#################################################################
metatask_data_prep:
{% if rrfs.get("DO_SPINUP") %}
var:
cycletype: spinupcyc prodcyc,prodcyc_long
type: spinup prod
{% else %}
var:
cycletype: prodcyc,prodcyc_long
type: prod
{% endif %}
#################################################################
# smoke and dust
#################################################################
{% if rrfs.get("DO_SMOKE_DUST") %}
task_process_smoke_#type#:
account: '&ACCOUNT;'
attrs:
cycledefs: '#cycletype#'
maxtries: 1
command: '&LOAD_MODULES_RUN_TASK_FP; "process_smoke" "&JOBSDIR;/JREGIONAL_PROCESS_SMOKE"'
envars:
GLOBAL_VAR_DEFNS_FP: '&GLOBAL_VAR_DEFNS_FP;'
USHdir: '&USHdir;'
PDY: !cycstr '@Y@m@d'
cyc: !cycstr '@H'
subcyc: !cycstr '@M'
nprocs: '{{ parent.nnodes * parent.ppn }}'
CYCLE_TYPE: '#type#'
CYCLE_SUBTYPE: ''
NWGES_BASEDIR: '&NWGES_BASEDIR'
NWGES_DIR: '&NWGES_BASEDIR'
join: !cycstr '&LOGDIR;/{{ jobname }}_#type#_@Y@m@d@H.log'
native: '{{ platform.SCHED_NATIVE_CMD }} &RRFS_RESERVE;'
{%- if machine not in ["GAEA", "NOAACLOUD"] %}
memory: 80G
{%- endif %}
nnodes: 1
ppn: 1
nodes: '{{ nnodes }}:ppn={{ ppn }}'
nodesize: "&NCORES_PER_NODE;"
partition: '{% if platform.get("PARTITION_DEFAULT") %}&PARTITION_DEFAULT;{% else %}None{% endif %}'
queue: '&QUEUE_DEFAULT;'
walltime: 00:30:00
dependency:
and:
timedep:
text: '<cyclestr offset="&START_TIME_PROCSMOKE">@Y@m@d@H@M00</cyclestr>'
{% endif %}
#################################################################
# Groups of tasks
#################################################################
{%- if rrfs.get("DO_NONVAR_CLDANAL") or rrfs.get("DO_REFL2TTEN") or rrfs.get("DO_ENKF_RADAR_REF") or rrfs.get("DO_ENVAR_RADAR_REF") %}
#################################################################
# radar refelectivity
#################################################################
task_process_radarref_#type#:
account: '&ACCOUNT;'
attrs:
cycledefs: '#cycletype#'
maxtries: 1
command: '&LOAD_MODULES_RUN_TASK_FP; "process_radarref" "&JOBSDIR;/JREGIONAL_PROCESS_RADARREF"'
envars:
GLOBAL_VAR_DEFNS_FP: '&GLOBAL_VAR_DEFNS_FP;'
USHdir: '&USHdir;'
PDY: !cycstr '@Y@m@d'
cyc: !cycstr '@H'
subcyc: !cycstr '@M'
nprocs: '{{ parent.nnodes * parent.ppn }}'
CYCLE_TYPE: '#type#'
CYCLE_SUBTYPE: ''
NWGES_BASEDIR: '&NWGES_BASEDIR'
RADAR_REF_THINNING: '&RADAR_REF_THINNING;'
join: !cycstr '&LOGDIR;/{{ jobname }}_#type#_@Y@m@d@H.log'
native: '{{ platform.SCHED_NATIVE_CMD }} &RRFS_RESERVE;'
nnodes: 2
ppn: 24
nodes: '{{ nnodes }}:ppn={{ ppn }}'
nodesize: "&NCORES_PER_NODE;"
partition: '{% if platform.get("PARTITION_DEFAULT") %}&PARTITION_DEFAULT;{% else %}None{% endif %}'
queue: '&QUEUE_DEFAULT;'
walltime: 00:30:00
dependency:
and:
timedep:
text: '<cyclestr offset="&START_TIME_NSSLMOSIAC">@Y@m@d@H@M00</cyclestr>'
#################################################################
# lightning process
#################################################################
{% if rrfs.get("DO_NLDN_LGHT") %}
task_process_lightning_#type#:
account: '&ACCOUNT;'
attrs:
cycledefs: '#cycletype#'
maxtries: 1
command: '&LOAD_MODULES_RUN_TASK_FP; "process_lightning" "&JOBSDIR;/JREGIONAL_PROCESS_LIGHTNING"'
envars:
GLOBAL_VAR_DEFNS_FP: '&GLOBAL_VAR_DEFNS_FP;'
USHdir: '&USHdir;'
PDY: !cycstr '@Y@m@d'
cyc: !cycstr '@H'
subcyc: !cycstr '@M'
nprocs: '{{ parent.nnodes * parent.ppn }}'
CYCLE_TYPE: '#type#'
CYCLE_SUBTYPE: ''
join: !cycstr '&LOGDIR;/{{ jobname }}_#type#_@Y@m@d@H.log'
native: '{{ platform.SCHED_NATIVE_CMD }} &RRFS_RESERVE;'
nnodes: 1
ppn: 1
nodes: '{{ nnodes }}:ppn={{ ppn }}'
nodesize: "&NCORES_PER_NODE;"
partition: '{% if platform.get("PARTITION_DEFAULT") %}&PARTITION_DEFAULT;{% else %}None{% endif %}'
queue: '&QUEUE_DEFAULT;'
walltime: 00:30:00
dependency:
and:
timedep:
text: '<cyclestr offset="&START_TIME_LIGHTNINGNC">@Y@m@d@H@M00</cyclestr>'
{% endif %}
#################################################################
# process bufr
#################################################################
task_process_bufr_#type#:
account: '&ACCOUNT;'
attrs:
cycledefs: '#cycletype#'
maxtries: 1
command: '&LOAD_MODULES_RUN_TASK_FP; "process_bufr" "&JOBSDIR;/JREGIONAL_PROCESS_BUFR"'
envars:
GLOBAL_VAR_DEFNS_FP: '&GLOBAL_VAR_DEFNS_FP;'
USHdir: '&USHdir;'
PDY: !cycstr '@Y@m@d'
cyc: !cycstr '@H'
subcyc: !cycstr '@M'
nprocs: '{{ parent.nnodes * parent.ppn }}'
CYCLE_TYPE: '#type#'
CYCLE_SUBTYPE: ''
join: !cycstr '&LOGDIR;/{{ jobname }}_#type#_@Y@m@d@H.log'
native: '{{ platform.SCHED_NATIVE_CMD }} &RRFS_RESERVE;'
{%- if machine not in ["GAEA", "NOAACLOUD"] %}
memory: 20G
{%- endif %}
nnodes: 1
ppn: 1
nodes: '{{ nnodes }}:ppn={{ ppn }}'
nodesize: "&NCORES_PER_NODE;"
partition: '{% if platform.get("PARTITION_DEFAULT") %}&PARTITION_DEFAULT;{% else %}None{% endif %}'
queue: '&QUEUE_DEFAULT;'
walltime: 00:30:00
dependency:
and:
timedep:
text: '<cyclestr offset="&START_TIME_CONVENTIONAL">@Y@m@d@H@M00</cyclestr>'
#################################################################
# end data prep metatask
#################################################################
{% endif %}
{% endif %}
{% endif %}

0 comments on commit a6bde34

Please sign in to comment.