Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamFlow (CWL compatible) #29

Merged
merged 2 commits into from
Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 15 additions & 1 deletion sapporo/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def parse_workflows(parse_request: ParseRequest) -> ParseResult:
return parse_result


WF_TYPES = Literal["CWL", "WDL", "NFL", "SMK", "unknown"]
WF_TYPES = Literal["CWL", "WDL", "NFL", "SMK", "StreamFlow", "unknown"]


def inspect_wf_type(wf_content: str, wf_location: str) -> WF_TYPES:
Expand Down Expand Up @@ -93,6 +93,8 @@ def check_by_shebang(wf_content: str) -> WF_TYPES:
return "SMK"
elif "cromwell" in first_line:
return "WDL"
elif "streamflow" in first_line:
return "StreamFlow"

return "unknown"

Expand Down Expand Up @@ -132,6 +134,8 @@ def inspect_wf_version(wf_content: str, wf_type: WF_TYPES) -> str:
wf_version = inspect_nfl_version(wf_content)
elif wf_type == "SMK":
wf_version = inspect_smk_version()
elif wf_type == "StreamFlow":
wf_version = inspect_streamflow_version(wf_content)

return wf_version

Expand Down Expand Up @@ -177,6 +181,16 @@ def inspect_smk_version() -> str:
return default_smk_version


def inspect_streamflow_version(wf_content: str) -> str:
default_streamflow_version = "v1.0"

yaml = yaml_no_ts()
yaml_obj = yaml.load(wf_content)

return yaml_obj['version'] or default_streamflow_version



def parse_cwl_inputs(wf_content: str, wf_location: str) -> List[Dict[str, Any]]:
if is_remote_url(wf_location):
inputs = wf_location_to_inputs(wf_location)
Expand Down
21 changes: 20 additions & 1 deletion sapporo/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ function run_snakemake() {
fi
fi
local wf_basedir="$(dirname ${wf_url_local})"
# NOTE this are common conventions but not hard requirements for Snakemake Standardized Usage.
# NOTE these are common conventions but not hard requirements for Snakemake Standardized Usage.
local wf_schemas_dir="${wf_basedir}/schemas"
local wf_scripts_dir="${wf_basedir}/scripts"
local wf_results_dir="${wf_basedir}/results"
Expand Down Expand Up @@ -109,6 +109,25 @@ function run_ep3() {
eval ${cmd_txt} || executor_error
}

function run_streamflow() {
if [[ "${wf_url}" == http://* ]] || [[ "${wf_url}" == https://* ]]; then
# It is a remote file.
local wf_url_local="${exe_dir}/$(basename ${wf_url})"
curl -fsSL -o ${wf_url_local} ${wf_url} || executor_error
else
# It is a local file.
if [[ "${wf_url}" == /* ]]; then
local wf_url_local="${wf_url}"
else
local wf_url_local="${exe_dir}/${wf_url}"
fi
fi
local container="alphaunito/streamflow:0.1.3-base"
local cmd_txt="docker run --mount type=bind,source=${run_dir},target=/streamflow/project --mount type=bind,source=${outputs_dir},target=/streamflow/results ${container} run /streamflow/project/exe/$(basename ${wf_url_local}) 1>${stdout} 2>${stderr}"
echo ${cmd_txt} >${cmd}
eval ${cmd_txt} || executor_error
}

function cancel() {
# Pre-cancellation procedures
if [[ ${wf_engine_name} == "cwltool" ]]; then
Expand Down
6 changes: 5 additions & 1 deletion sapporo/service-info.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
},
"SMK": {
"workflow_type_version": ["1.0"]
},
"StreamFlow": {
"workflow_type_version": ["v1.0"]
}
},
"supported_filesystem_protocols": ["http", "https", "file", "s3"],
Expand All @@ -34,7 +37,8 @@
"toil (experimental)": "4.1.0",
"cromwell": "72",
"snakemake": "v6.9.1",
"ep3 (experimental)": "v1.7.0"
"ep3 (experimental)": "v1.7.0",
"streamflow": "0.1.3"
},
"auth_instructions_url": "https://github.com/sapporo-wes/sapporo-service",
"contact_info_url": "https://github.com/sapporo-wes/sapporo-service",
Expand Down
10 changes: 5 additions & 5 deletions sapporo/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ def validate_run_request(run_id: str) -> RunRequest:
validate_wf_type(wf_type, wf_type_version)
wf_type = cast(WorkflowTypes, wf_type)

validate_meta_charactors("workflow_url", wf_url)
validate_meta_characters("workflow_url", wf_url)
if wf_engine_params is not None:
try:
wf_engine_params_obj = json.loads(wf_engine_params)
if isinstance(wf_engine_params_obj, list):
for val in wf_engine_params_obj:
validate_meta_charactors("workflow_engine_parameters", val)
validate_meta_characters("workflow_engine_parameters", val)
elif isinstance(wf_engine_params_obj, dict):
for key, val in wf_engine_params_obj.items():
validate_meta_charactors("workflow_engine_parameters", key)
validate_meta_charactors("workflow_engine_parameters", val)
validate_meta_characters("workflow_engine_parameters", key)
validate_meta_characters("workflow_engine_parameters", val)
else:
abort(400, "The `workflow_engine_parameters` must be a list or a dict.")
except json.JSONDecodeError:
Expand Down Expand Up @@ -209,7 +209,7 @@ def validate_run_id(run_id: str) -> None:
abort(404, f"The run ID `{run_id}` does not exist.")


def validate_meta_charactors(_type: str, content: str) -> None:
def validate_meta_characters(_type: str, content: str) -> None:
"""
This function validates the string that will actually be evaluated in eval
in run.sh. The possible types of strings are 'workflow_url',
Expand Down