Skip to content

Commit

Permalink
Fix tool stdio on pulsar and implement job stdio
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Apr 6, 2023
1 parent 0698979 commit b89bf9e
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 12 deletions.
7 changes: 3 additions & 4 deletions lib/galaxy/jobs/command_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,10 @@ def build_command(
else:
commands_builder = CommandsBuilder(externalized_commands)

wrap_stdio = externalized or not for_pulsar
wrap_stdio = externalized and not for_pulsar
if wrap_stdio:
# Galaxy writes I/O files to outputs, Pulsar uses metadata. metadata seems like
# it should be preferred - at least if the directory exists.
io_directory = "../metadata" if for_pulsar else "../outputs"
# Galaxy writes I/O files to outputs, Pulsar instruments own I/O redirection
io_directory = "../outputs"
commands_builder.capture_stdout_stderr(
f"{io_directory}/tool_stdout", f"{io_directory}/tool_stderr", stream_stdout_stderr=stream_stdout_stderr
)
Expand Down
15 changes: 9 additions & 6 deletions lib/galaxy/jobs/runners/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,15 +619,16 @@ def finish_job(self, job_state: JobState):
assert isinstance(
job_state, AsynchronousJobState
), f"job_state type is '{type(job_state)}', expected AsynchronousJobState"
stderr = stdout = ""
job_wrapper = job_state.job_wrapper
try:
client = self.get_client_from_state(job_state)
run_results = client.full_status()
remote_metadata_directory = run_results.get("metadata_directory", None)
stdout = run_results.get("stdout", "")
stderr = run_results.get("stderr", "")
exit_code = run_results.get("returncode", None)
tool_stdout = run_results.get("stdout", "")
tool_stderr = run_results.get("stderr", "")
job_stdout = run_results.get("job_stdout")
job_stderr = run_results.get("job_stderr")
exit_code = run_results.get("returncode")
pulsar_outputs = PulsarOutputs.from_status_response(run_results)
state = job_wrapper.get_state()
# Use Pulsar client code to transfer/copy files back
Expand Down Expand Up @@ -668,9 +669,11 @@ def finish_job(self, job_state: JobState):
):
job_metrics_directory = job_wrapper.working_directory
job_wrapper.finish(
stdout,
stderr,
tool_stdout,
tool_stderr,
exit_code,
job_stdout=job_stdout,
job_stderr=job_stderr,
remote_metadata_directory=remote_metadata_directory,
job_metrics_directory=job_metrics_directory,
)
Expand Down
1 change: 1 addition & 0 deletions test/integration/test_pulsar_embedded.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def handle_galaxy_config_kwds(cls, config):
"vcf_bgzip_test",
"environment_variables",
"multi_output_assign_primary_ext_dbkey",
"job_properties",
"strict_shell",
"tool_provided_metadata_9",
]
Expand Down
2 changes: 1 addition & 1 deletion test/integration/test_pulsar_embedded_extended_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def handle_galaxy_config_kwds(cls, config):
"version_command_tool_dir",
"simple_constructs",
"metadata_bam",
# "job_properties", # https://github.com/galaxyproject/galaxy/issues/11813
"job_properties",
"from_work_dir_glob",
]
)
2 changes: 1 addition & 1 deletion test/integration/test_pulsar_embedded_remote_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ def handle_galaxy_config_kwds(cls, config):
[
"simple_constructs",
"metadata_bam",
# "job_properties", # https://github.com/galaxyproject/galaxy/issues/11813
"job_properties",
]
)

0 comments on commit b89bf9e

Please sign in to comment.