Skip to content

Commit

Permalink
Merge pull request #29 from kinow/streamflow
Browse files Browse the repository at this point in the history
Add StreamFlow (CWL compatible)
  • Loading branch information
inutano committed Apr 15, 2022
2 parents 4913167 + 0a22d5d commit 0735bd9
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 8 deletions.
16 changes: 15 additions & 1 deletion sapporo/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def parse_workflows(parse_request: ParseRequest) -> ParseResult:
return parse_result


WF_TYPES = Literal["CWL", "WDL", "NFL", "SMK", "unknown"]
WF_TYPES = Literal["CWL", "WDL", "NFL", "SMK", "StreamFlow", "unknown"]


def inspect_wf_type(wf_content: str, wf_location: str) -> WF_TYPES:
Expand Down Expand Up @@ -93,6 +93,8 @@ def check_by_shebang(wf_content: str) -> WF_TYPES:
return "SMK"
elif "cromwell" in first_line:
return "WDL"
elif "streamflow" in first_line:
return "StreamFlow"

return "unknown"

Expand Down Expand Up @@ -132,6 +134,8 @@ def inspect_wf_version(wf_content: str, wf_type: WF_TYPES) -> str:
wf_version = inspect_nfl_version(wf_content)
elif wf_type == "SMK":
wf_version = inspect_smk_version()
elif wf_type == "StreamFlow":
wf_version = inspect_streamflow_version(wf_content)

return wf_version

Expand Down Expand Up @@ -177,6 +181,16 @@ def inspect_smk_version() -> str:
return default_smk_version


def inspect_streamflow_version(wf_content: str) -> str:
default_streamflow_version = "v1.0"

yaml = yaml_no_ts()
yaml_obj = yaml.load(wf_content)

return yaml_obj['version'] or default_streamflow_version



def parse_cwl_inputs(wf_content: str, wf_location: str) -> List[Dict[str, Any]]:
if is_remote_url(wf_location):
inputs = wf_location_to_inputs(wf_location)
Expand Down
21 changes: 20 additions & 1 deletion sapporo/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ function run_snakemake() {
fi
fi
local wf_basedir="$(dirname ${wf_url_local})"
# NOTE this are common conventions but not hard requirements for Snakemake Standardized Usage.
# NOTE these are common conventions but not hard requirements for Snakemake Standardized Usage.
local wf_schemas_dir="${wf_basedir}/schemas"
local wf_scripts_dir="${wf_basedir}/scripts"
local wf_results_dir="${wf_basedir}/results"
Expand Down Expand Up @@ -109,6 +109,25 @@ function run_ep3() {
eval ${cmd_txt} || executor_error
}

function run_streamflow() {
if [[ "${wf_url}" == http://* ]] || [[ "${wf_url}" == https://* ]]; then
# It is a remote file.
local wf_url_local="${exe_dir}/$(basename ${wf_url})"
curl -fsSL -o ${wf_url_local} ${wf_url} || executor_error
else
# It is a local file.
if [[ "${wf_url}" == /* ]]; then
local wf_url_local="${wf_url}"
else
local wf_url_local="${exe_dir}/${wf_url}"
fi
fi
local container="alphaunito/streamflow:0.1.3-base"
local cmd_txt="docker run --mount type=bind,source=${run_dir},target=/streamflow/project --mount type=bind,source=${outputs_dir},target=/streamflow/results ${container} run /streamflow/project/exe/$(basename ${wf_url_local}) 1>${stdout} 2>${stderr}"
echo ${cmd_txt} >${cmd}
eval ${cmd_txt} || executor_error
}

function cancel() {
# Pre-cancellation procedures
if [[ ${wf_engine_name} == "cwltool" ]]; then
Expand Down
6 changes: 5 additions & 1 deletion sapporo/service-info.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
},
"SMK": {
"workflow_type_version": ["1.0"]
},
"StreamFlow": {
"workflow_type_version": ["v1.0"]
}
},
"supported_filesystem_protocols": ["http", "https", "file", "s3"],
Expand All @@ -34,7 +37,8 @@
"toil (experimental)": "4.1.0",
"cromwell": "72",
"snakemake": "v6.9.1",
"ep3 (experimental)": "v1.7.0"
"ep3 (experimental)": "v1.7.0",
"streamflow": "0.1.3"
},
"auth_instructions_url": "https://github.com/sapporo-wes/sapporo-service",
"contact_info_url": "https://github.com/sapporo-wes/sapporo-service",
Expand Down
10 changes: 5 additions & 5 deletions sapporo/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ def validate_run_request(run_id: str) -> RunRequest:
validate_wf_type(wf_type, wf_type_version)
wf_type = cast(WorkflowTypes, wf_type)

validate_meta_charactors("workflow_url", wf_url)
validate_meta_characters("workflow_url", wf_url)
if wf_engine_params is not None:
try:
wf_engine_params_obj = json.loads(wf_engine_params)
if isinstance(wf_engine_params_obj, list):
for val in wf_engine_params_obj:
validate_meta_charactors("workflow_engine_parameters", val)
validate_meta_characters("workflow_engine_parameters", val)
elif isinstance(wf_engine_params_obj, dict):
for key, val in wf_engine_params_obj.items():
validate_meta_charactors("workflow_engine_parameters", key)
validate_meta_charactors("workflow_engine_parameters", val)
validate_meta_characters("workflow_engine_parameters", key)
validate_meta_characters("workflow_engine_parameters", val)
else:
abort(400, "The `workflow_engine_parameters` must be a list or a dict.")
except json.JSONDecodeError:
Expand Down Expand Up @@ -209,7 +209,7 @@ def validate_run_id(run_id: str) -> None:
abort(404, f"The run ID `{run_id}` does not exist.")


def validate_meta_charactors(_type: str, content: str) -> None:
def validate_meta_characters(_type: str, content: str) -> None:
"""
This function validates the string that will actually be evaluated in eval
in run.sh. The possible types of strings are 'workflow_url',
Expand Down

0 comments on commit 0735bd9

Please sign in to comment.