Skip to content

Commit

Permalink
Pipestat polish (#412)
Browse files Browse the repository at this point in the history
* fix pep-config issue

* pass output_dir to pipestat #411 and # 390

* adjust building looper namespace

* revert to using pipestat config path, tests broken

* fix tests by reverting some changes

* allow sample name to be passed during config check, raise value error if no pipestat config

* pass sample name only

* resolve schema path based on pipestat config

* clean test and allow pipestat namespace to include config_file path

* remove unnecessary pipestat namespace key value pairs

* Attempt constructing a pipestat config file from looper config and piface and then writing to file. Tests broken.

* fix tests

* general clean up

* remove sample name during pipestat creation

* remove redundancy

* lint

* clean up comments

* fix runp for pipestat and add to pytest

* add information to looper's pipestat documentation.

* Update changelog
  • Loading branch information
donaldcampbelljr committed Sep 6, 2023
1 parent bff4f34 commit cd31351
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 86 deletions.
9 changes: 8 additions & 1 deletion docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) and [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) format.

## [1.6.0] -- 2023-09-XX

### Changed
- looper now works with pipestat v0.6.0 and greater
- looper table and check now use pipestat and therefore require pipestat configuration. [#390](https://github.com/pepkit/looper/issues/390)
- changed how looper configures pipestat [#411](https://github.com/pepkit/looper/issues/411)

## [1.5.1] -- 2023-08-14

### Fixed
Expand Down Expand Up @@ -68,7 +75,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## [1.3.1] -- 2021-06-18

### Changed
- If remote schemas are not accessbile, the job submission doesn't fail anymore
- If remote schemas are not accessible, the job submission doesn't fail anymore
- Fixed a bug where looper stated "No failed flag found" when a failed flag was found

### Deprecated
Expand Down
56 changes: 56 additions & 0 deletions docs/pipestat.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,62 @@ Starting with version 1.4.0, looper supports additional functionality for [pipes
For non-pipestat-compatible pipelines, you can still use looper to run pipelines, but you won't be able to use `looper report` or `looper status` to manage their output.

## Pipestat configuration overview
Starting with version 1.6.0 configuring looper to work with pipestat has changed.

Now, Looper will obtain pipestat configurations data from two sources:
1. pipeline interface
2. looper_config file

Looper will combine the necessary configuration data and write a new pipestat configuration file named `looper_pipestat_config.yaml` which looper will place in its output directory. Pipestat then uses this configuration file to create the required pipestatManager objects. See [Hello_Looper](https://github.com/pepkit/hello_looper) for a specific example.

Briefly, the Looper config file must contain a pipestat field. A project name must be supplied if running a project level pipeline. The user must also supply a file path for a results file if using a local file backend or database credentials if using a postgresql database backend.

```yaml
pep_config: project_config_pipestat.yaml # pephub registry path or local path
output_dir: output
sample_table: annotation_sheet.csv
pipeline_interfaces:
sample: ./pipeline_interface1_sample_pipestat.yaml
project: ./pipeline_interface1_project_pipestat.yaml
looper:
all:
output_dir: output
sample_modifiers:
append:
attr: "val"
derive:
attributes: [read1, read2]
sources:
SRA_1: "{SRR}_1.fastq.gz"
SRA_2: "{SRR}_2.fastq.gz"
pipestat:
project_name: TEST_PROJECT_NAME
results_file_path: tmp_pipestat_results.yaml
flag_file_dir: output/results_pipeline
database:
dialect: postgresql
driver: psycopg2
name: pipestat-test
user: postgres
password: pipestat-password
host: 127.0.0.1
port: 5432
```
And the pipeline interface must include information required by pipestat such as pipeline_name, pipeline_type, and an output schema path:
```yaml
pipeline_name: example_pipestat_pipeline
pipeline_type: sample
schema_path: pipeline_pipestat/pipestat_output_schema.yaml
command_template: >
python {looper.piface_dir}/count_lines.py {sample.file} {sample.sample_name} {pipestat.results_file}
```




### Pipestat Configuration for Looper Versions 1.4.0-1.5.0
Note: The instructions below are for older versions of Looper.

Generally, pipestat configuration comes from 3 sources, with the following priority:

Expand Down
41 changes: 14 additions & 27 deletions looper/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import subprocess
import time
import yaml
from copy import copy, deepcopy
from json import loads
from subprocess import check_output
Expand Down Expand Up @@ -101,31 +102,15 @@ def write_sample_yaml(namespaces):
return {"sample": sample}


def write_pipestat_config(namespaces):
def write_pipestat_config(looper_pipestat_config_path, pipestat_config_dict):
"""
This is run a the project level, not at the sample level like the other plugins
This is run at the project level, not at the sample level like the other plugins
"""
with open(looper_pipestat_config_path, "w") as f:
yaml.dump(pipestat_config_dict, f)
print(f"Initialized looper config file: {looper_pipestat_config_path}")

if "pipestat" not in namespaces["looper"]:
return {}

# pipestat config contains information from 2 sources: pipeline-author, and pipeline-runner
# start with the information provided by the pipeline-runner via looper config
pipestat_config_data = namespaces["looper"]["pipestat"]

# add information re: pipestat provided by pipeline-author in the piface.
pipestat_config_data["pipeline_type"] = namespaces["pipeline"]["pipeline_type"]
pipestat_config_data["pipestat_flag_dir"] = namespaces["pipeline"]["pipestat_flag_dir"]
pipestat_config_data["output_schema"] = namespaces["pipeline"]["output_schema"]

# where to save this?
pipestat_config_path = f"{namespaces['looper']['output_dir']}/pipestat_config.yaml"

# write pipestat config file.
with open(pipestat_config_path, "w") as yamlfile:
dump(pipestat_config_data, yamlfile)

return {"pipestat": {"config_path": pipestat_config_path}}
return True


def write_sample_yaml_prj(namespaces):
Expand Down Expand Up @@ -444,7 +429,9 @@ def add_sample(self, sample, rerun=False):
)
if self.prj.pipestat_configured:
psms = self.prj.get_pipestat_managers(sample_name=sample.sample_name)
sample_statuses = psms[self.pl_name].get_status()
sample_statuses = psms[self.pl_name].get_status(
sample_name=sample.sample_name
)
sample_statuses = [sample_statuses] if sample_statuses else []
else:
sample_statuses = fetch_sample_flags(self.prj, sample, self.pl_name)
Expand Down Expand Up @@ -691,11 +678,10 @@ def _set_pipestat_namespace(
return YAMLConfigManager()
else:
full_namespace = {
"schema": psm.schema_path,
"results_file": psm.file,
"record_id": psm.sample_name,
"namespace": psm.project_name,
"config": psm.config_path,
"sample_name": psm.sample_name,
"project_name": psm.project_name,
"config_file": psm._config_path,
}
filtered_namespace = {k: v for k, v in full_namespace.items() if v}
return YAMLConfigManager(filtered_namespace)
Expand Down Expand Up @@ -772,6 +758,7 @@ def write_script(self, pool, size):
argstring = jinja_render_template_strictly(
template=templ, namespaces=namespaces
)
print(argstring)
except UndefinedError as jinja_exception:
_LOGGER.warning(NOT_SUB_MSG.format(str(jinja_exception)))
except KeyError as e:
Expand Down
1 change: 1 addition & 0 deletions looper/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def _get_apperance_dict(type, templ=APPEARANCE_BY_FLAG):
FILE_CHECKS_KEY,
SAMPLE_PL_ARG,
PIPESTAT_KEY,
DEFAULT_PIPESTAT_CONFIG_ATTR,
]

# resource package TSV-related consts
Expand Down
76 changes: 37 additions & 39 deletions looper/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pipestat import PipestatError, PipestatManager
from ubiquerg import expandpath, is_command_callable
from yacman import YAMLConfigManager
from .conductor import write_pipestat_config

from .exceptions import *
from .pipeline_interface import PipelineInterface
Expand Down Expand Up @@ -488,82 +489,79 @@ def _get_pipestat_configuration(self, sample_name=None, project_level=False):
Get all required pipestat configuration variables from looper_config file
"""

def _get_val_from_attr(pipestat_sect, object, attr_name, default, no_err=False):
"""
Get configuration value from an object's attribute or return default
:param dict pipestat_sect: pipestat section for sample or project
:param peppy.Sample | peppy.Project object: object to get the
configuration values for
:param str attr_name: attribute name with the value to retrieve
:param str default: default attribute name
:param bool no_err: do not raise error in case the attribute is missing,
in order to use the values specified in a different way, e.g. in pipestat config
:return str: retrieved configuration value
"""
if pipestat_sect is not None and attr_name in pipestat_sect:
return pipestat_sect[attr_name]
try:
return object[default]
except KeyError:
if no_err:
return None
raise AttributeError(f"'{default}' attribute is missing")

ret = {}
if not project_level and sample_name is None:
raise ValueError(
"Must provide the sample_name to determine the "
"sample to get the PipestatManagers for"
)
key = "project" if project_level else "sample"
# self[EXTRA_KEY] pipestat is stored here on the project if added to looper config file.
if PIPESTAT_KEY in self[EXTRA_KEY] and key in self[EXTRA_KEY][PIPESTAT_KEY]:
pipestat_config_dict = self[EXTRA_KEY][PIPESTAT_KEY][key]

if PIPESTAT_KEY in self[EXTRA_KEY]:
pipestat_config_dict = self[EXTRA_KEY][PIPESTAT_KEY]
else:
_LOGGER.debug(
f"'{PIPESTAT_KEY}' not found in '{LOOPER_KEY}' section of the "
f"project configuration file."
)
pipestat_config_dict = None
# We cannot use pipestat without it being defined in the looper config file.
raise ValueError

pipestat_config = YAMLConfigManager(entries=pipestat_config_dict)
# Get looper user configured items first and update the pipestat_config_dict
try:
results_file_path = pipestat_config.data["results_file_path"]
results_file_path = pipestat_config_dict["results_file_path"]
if not os.path.exists(os.path.dirname(results_file_path)):
results_file_path = os.path.join(
os.path.dirname(self.output_dir), results_file_path
)
pipestat_config_dict.update({"results_file_path": results_file_path})
except KeyError:
results_file_path = None

try:
flag_file_dir = pipestat_config.data["flag_file_dir"]
flag_file_dir = pipestat_config_dict["flag_file_dir"]
if not os.path.isabs(flag_file_dir):
flag_file_dir = os.path.join(
os.path.dirname(self.output_dir), flag_file_dir
)
pipestat_config_dict.update({"flag_file_dir": flag_file_dir})
except KeyError:
flag_file_dir = None

if project_level and "project_name" in pipestat_config_dict:
pipestat_config_dict.update(
{"project_name": pipestat_config_dict["project_name"]}
)

pipestat_config_dict.update({"output_dir": self.output_dir})

pifaces = (
self.project_pipeline_interfaces
if project_level
else self._interfaces_by_sample[sample_name]
)

for piface in pifaces:
rec_id = (
pipestat_config.data["project_name"]
if project_level
else pipestat_config.data["sample_name"]
# We must also obtain additional pipestat items from the pipeline author's piface
if "schema_path" in piface.data:
pipestat_config_dict.update({"schema_path": piface.data["schema_path"]})
if "pipeline_name" in piface.data:
pipestat_config_dict.update(
{"pipeline_name": piface.data["pipeline_name"]}
)
if "pipeline_type" in piface.data:
pipestat_config_dict.update(
{"pipeline_type": piface.data["pipeline_type"]}
)

# Pipestat_dict_ is now updated from all sources and can be written to a yaml.
looper_pipestat_config_path = os.path.join(
os.path.dirname(self.output_dir), "looper_pipestat_config.yaml"
)
if not os.path.exists(looper_pipestat_config_path):
write_pipestat_config(looper_pipestat_config_path, pipestat_config_dict)

ret[piface.pipeline_name] = {
"config_dict": pipestat_config_dict,
"results_file_path": results_file_path,
"flag_file_dir": flag_file_dir,
"sample_name": rec_id,
"schema_path": piface.get_pipeline_schemas(OUTPUT_SCHEMA_KEY),
"config_file": looper_pipestat_config_path,
}
return ret

Expand Down
19 changes: 14 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
from looper.const import *

CFG = "project_config.yaml"
PIPESTAT_CONFIG = "global_pipestat_config.yaml"
PROJECT_CFG_PIPESTAT = "project_config_pipestat.yaml"
LOOPER_CFG = "looper_config_pipestat.yaml"
PRJ_CFG_W_PIPESTAT = "project_config_pipestat.yaml"
PIPESTAT_OS = "pipestat_output_schema.yaml"
PIPESTAT_PI = "pipeline_interface1_sample_pipestat.yaml"
PIPESTAT_PI_PRJ = "pipeline_interface1_project_pipestat.yaml"
ST = "annotation_sheet.csv"
PIP = "pipeline_interface{}_project.yaml"
PIS = "pipeline_interface{}_sample.yaml"
Expand Down Expand Up @@ -285,7 +287,6 @@ def prepare_pep_with_dot_file(prep_temp_pep):
return dot_file_path


#
@pytest.fixture
def prep_temp_pep_pipestat(example_pep_piface_path):
# TODO this should be combined with the other prep_temp_pep
Expand All @@ -295,29 +296,34 @@ def prep_temp_pep_pipestat(example_pep_piface_path):
# ori paths

cfg_path = os.path.join(example_pep_piface_path, LOOPER_CFG)
pipestat_config_path = os.path.join(example_pep_piface_path, PRJ_CFG_W_PIPESTAT)
project_cfg_pipestat_path = os.path.join(
example_pep_piface_path, PROJECT_CFG_PIPESTAT
)
output_schema_path = os.path.join(example_pep_piface_path, PIPESTAT_OS)

sample_table_path = os.path.join(example_pep_piface_path, ST)
piface1s_path = os.path.join(example_pep_piface_path, PIPESTAT_PI)
piface1p_path = os.path.join(example_pep_piface_path, PIPESTAT_PI_PRJ)

res_proj_path = os.path.join(example_pep_piface_path, RES.format("project"))
res_samp_path = os.path.join(example_pep_piface_path, RES.format("sample"))
# temp copies
temp_path_cfg = os.path.join(td, LOOPER_CFG)
temp_path_project_cfg_pipestat = os.path.join(td, PROJECT_CFG_PIPESTAT)
temp_path_output_schema = os.path.join(td, PIPESTAT_OS)
temp_path_pipestat_config = os.path.join(td, PRJ_CFG_W_PIPESTAT)

temp_path_sample_table = os.path.join(td, ST)
temp_path_piface1s = os.path.join(td, PIPESTAT_PI)
temp_path_piface1p = os.path.join(td, PIPESTAT_PI_PRJ)
temp_path_res_proj = os.path.join(td, RES.format("project"))
temp_path_res_samp = os.path.join(td, RES.format("sample"))
# copying
copyfile(cfg_path, temp_path_cfg)
copyfile(project_cfg_pipestat_path, temp_path_project_cfg_pipestat)

copyfile(pipestat_config_path, temp_path_pipestat_config)
copyfile(sample_table_path, temp_path_sample_table)
copyfile(piface1s_path, temp_path_piface1s)
copyfile(piface1p_path, temp_path_piface1p)
copyfile(output_schema_path, temp_path_output_schema)
copyfile(res_proj_path, temp_path_res_proj)
copyfile(res_samp_path, temp_path_res_samp)
Expand All @@ -329,6 +335,9 @@ def prep_temp_pep_pipestat(example_pep_piface_path):
piface_data[LOOPER_KEY][OUTDIR_KEY] = out_td
piface_data[LOOPER_KEY][CLI_KEY] = {}
piface_data[LOOPER_KEY][CLI_KEY]["runp"] = {}
piface_data[LOOPER_KEY][CLI_KEY]["runp"][PIPELINE_INTERFACES_KEY] = [
temp_path_piface1p,
]
piface_data[SAMPLE_MODS_KEY][CONSTANT_KEY][PIPELINE_INTERFACES_KEY] = [
temp_path_piface1s,
]
Expand Down

0 comments on commit cd31351

Please sign in to comment.