Skip to content

Commit

Permalink
refactor: Move many variant operation in object.
Browse files Browse the repository at this point in the history
  • Loading branch information
natir committed Mar 11, 2024
1 parent c51c932 commit df36fa0
Show file tree
Hide file tree
Showing 31 changed files with 951 additions and 1,051 deletions.
19 changes: 13 additions & 6 deletions benchmark/parse_vcf.py
Expand Up @@ -10,16 +10,27 @@
import pytest

if typing.TYPE_CHECKING: # pragma: no cover
import polars
import pytest_benchmark


# project import
from variantplaner import io
from variantplaner import Vcf, VcfParsingBehavior

from benchmark import __generate_vcf

DATA_DIR = pathlib.Path(__file__).parent.parent / "tests" / "data"


def __worker(input_path: pathlib.Path) -> polars.LazyFrame:
"""Benchmark worker."""
vcf_df = Vcf()

vcf_df.from_path(input_path, DATA_DIR / "grch38.92.csv", behavior=VcfParsingBehavior.MANAGE_SV)

return vcf_df.variants().lf


def __generate_parse_vcf(
number_of_line: int,
) -> typing.Callable[[pathlib.Path, pytest_benchmark.BenchmarkSession], None]:
Expand All @@ -34,11 +45,7 @@ def inner(
__generate_vcf(input_path, number_of_line)

benchmark(
lambda: io.vcf.into_lazyframe(
input_path,
DATA_DIR / "grch38.92.csv",
extension=io.vcf.IntoLazyFrameExtension.MANAGE_SV,
).collect(),
lambda: __worker(input_path).collect(),
)

inner.__doc__ = f"""Parsing a vcf of {number_of_line} variant"""
Expand Down
5 changes: 1 addition & 4 deletions scripts/gen_benchmark_plot.py
Expand Up @@ -191,10 +191,7 @@ def render_plot() -> str:
if df.shape[0] == 0:
return ""

name2func = {
name: globals().get(f"{name}_func", nothing)
for name in df.get_column("benchmark").unique().to_list()
}
name2func = {name: globals().get(f"{name}_func", nothing) for name in df.get_column("benchmark").unique().to_list()}

bench2plot = {}
for name, data in df.group_by("benchmark"):
Expand Down
10 changes: 9 additions & 1 deletion src/variantplaner/__init__.py
Expand Up @@ -7,9 +7,17 @@

from __future__ import annotations

from variantplaner import exception, extract, generate, io, normalization, struct
from variantplaner import extract, generate, normalization, struct
from variantplaner.objects import Annotations, ContigsLength, Genotypes, Variants, Vcf, VcfHeader, VcfParsingBehavior

__all__: list[str] = [
"Annotations",
"ContigsLength",
"Genotypes",
"Variants",
"Vcf",
"VcfHeader",
"VcfParsingBehavior",
"exception",
"extract",
"generate",
Expand Down
1 change: 0 additions & 1 deletion src/variantplaner/__main__.py
Expand Up @@ -6,7 +6,6 @@
- https://docs.python.org/3/using/cmdline.html#cmdoption-m
"""


from variantplaner.cli import main

if __name__ == "__main__":
Expand Down
12 changes: 6 additions & 6 deletions src/variantplaner/cli/__init__.py
Expand Up @@ -21,7 +21,7 @@ class MultipleValueOption(click.Option):

def __init__(self, *args: list[typing.Any], **kwargs: dict[typing.Any, typing.Any]):
"""Intialise click option parser."""
super(MultipleValueOption, self).__init__(*args,**kwargs) # type: ignore[arg-type] # noqa: UP008 false positive and complexe type
super(MultipleValueOption, self).__init__(*args, **kwargs) # type: ignore[arg-type] # noqa: UP008 false positive and complexe type
self._previous_parser_process = None
self._eat_all_parser = None

Expand Down Expand Up @@ -109,8 +109,8 @@ def main(ctx: click.Context, *, threads: int = 1, verbose: int = 0, debug_info:


# module import required after main definition
from variantplaner.cli import metadata # noqa: E402 F401 I001 these import should be here
from variantplaner.cli import parquet2vcf # noqa: E402 F401 these import should be here
from variantplaner.cli import struct # noqa: E402 F401 these import should be here
from variantplaner.cli import transmission # noqa: E402 F401 these import should be here
from variantplaner.cli import vcf2parquet # noqa: E402 F401 these import should be here
from variantplaner.cli import metadata # noqa: E402 F401 I001 these import should be here
from variantplaner.cli import parquet2vcf # noqa: E402 F401 these import should be here
from variantplaner.cli import struct # noqa: E402 F401 these import should be here
from variantplaner.cli import transmission # noqa: E402 F401 these import should be here
from variantplaner.cli import vcf2parquet # noqa: E402 F401 these import should be here
66 changes: 35 additions & 31 deletions src/variantplaner/cli/parquet2vcf.py
Expand Up @@ -11,7 +11,7 @@
import polars

# project import
from variantplaner import cli, extract, io
from variantplaner import VcfHeader, cli, extract, io


@cli.main.command("parquet2vcf") # type: ignore[has-type]
Expand Down Expand Up @@ -138,45 +138,49 @@ def parquet2vcf(
f"parameter: {variants_path} {output_path} {genotypes_path} {headers_path} {chromosome} {position} {identifier} {reference} {alternative} {quality} {filter_col} {format_str}"
)

variants_lf = polars.scan_parquet(variants_path)
lf = polars.scan_parquet(variants_path)

if headers_path:
headers = VcfHeader()
headers.from_files(headers_path)
else:
headers = None

if genotypes_path and format_str:
genotypes_lf = polars.scan_parquet(genotypes_path)
sample_name = genotypes_lf.select("sample").collect().get_column("sample").to_list()
merge_lf = extract.merge_variants_genotypes(variants_lf, genotypes_lf, sample_name)
lf = extract.merge_variants_genotypes(lf, genotypes_lf, sample_name)
sample2vcf_col2polars_col: dict[str, dict[str, str]] = {}
for sample in sample_name:
sample2vcf_col2polars_col[sample] = {}
for format_col in format_str.split(":"):
sample2vcf_col2polars_col[sample][format_col] = f"{sample}_{format_col.lower()}"

io.vcf.from_lazyframe(
merge_lf,
output_path,
io.vcf.build_rename_column(
chromosome,
position,
identifier,
reference,
alternative,
quality,
filter_col,
[],
format_str,
sample2vcf_col2polars_col,
),
rename_column = io.vcf.build_rename_column(
chromosome,
position,
identifier,
reference,
alternative,
quality,
filter_col,
[],
format_str,
sample2vcf_col2polars_col,
)
else:
io.vcf.from_lazyframe(
variants_lf,
output_path,
io.vcf.build_rename_column(
chromosome,
position,
identifier,
reference,
alternative,
quality,
filter_col,
),
rename_column = io.vcf.build_rename_column(
chromosome,
position,
identifier,
reference,
alternative,
quality,
filter_col,
)

io.vcf.lazyframe_in_vcf(
lf,
output_path,
vcf_header=headers,
renaming=rename_column,
)
1 change: 0 additions & 1 deletion src/variantplaner/cli/parquet2vcf.rs

This file was deleted.

94 changes: 64 additions & 30 deletions src/variantplaner/cli/vcf2parquet.py
Expand Up @@ -9,12 +9,15 @@

# 3rd party import
import click
import polars

# project import
from variantplaner import cli, exception, extract, io
from variantplaner import Vcf, VcfParsingBehavior, cli, exception

logger = logging.getLogger("__name__")

@cli.main.group("vcf2parquet", chain=True) # type: ignore[has-type]

@cli.main.group("vcf2parquet", chain=True) # type: ignore[has-type]
@click.pass_context
@click.option(
"-i",
Expand Down Expand Up @@ -54,26 +57,27 @@ def vcf2parquet(

logger.debug(f"parameter: {input_path=} {chrom2length_path=} {append=}")

if not chrom2length_path:
logger.error("--chrom2length-path argument is required")

try:
logger.debug("Start extract header")
headers = io.vcf.extract_header(input_path)
logger.debug("End extract header")
except exception.NotAVCFError:
logger.error("Input file seems no be a vcf") # noqa: TRY400 we are in cli exception isn't readable
sys.exit(11)
lf = Vcf()

# Read vcf and manage structural variant
logger.debug("Start read vcf")
lf = io.vcf.into_lazyframe(input_path, chrom2length_path, extension=io.vcf.IntoLazyFrameExtension.MANAGE_SV)
try:
lf.from_path(input_path, chrom2length_path, behavior=VcfParsingBehavior.MANAGE_SV)
except exception.NotVcfHeaderError:
logging.error(f"Path {input_path} seems not contains Vcf.") # noqa: TRY400 we are in cli exception isn't readable
sys.exit(11)
except exception.NotAVCFError:
logging.error(f"Path {input_path} seems not contains Vcf.") # noqa: TRY400 we are in cli exception isn't readable
sys.exit(12)
except exception.NoContigsLengthInformationError:
logging.error("Vcf didn't contains contigs length information you could use chrom2length-path argument.") # noqa: TRY400 we are in cli exception isn't readable
sys.exit(13)
logger.debug("End read vcf")

ctx.obj["vcf_path"] = input_path
ctx.obj["lazyframe"] = lf
ctx.obj["append"] = append
ctx.obj["headers"] = headers
ctx.obj["headers"] = lf.header


@vcf2parquet.command("variants")
Expand All @@ -93,12 +97,20 @@ def variants(
logger = logging.getLogger("vcf2parquet.variants")

lf = ctx.obj["lazyframe"]
append = ctx.obj["append"] # noqa: F841 not used now
append = ctx.obj["append"] # not used now

logger.debug(f"parameter: {output_path=}")

logger.info(f"Start write variants in {output_path}")
extract.variants(lf).sink_parquet(output_path, maintain_order=False)
variants = lf.variants()

if append:
variants = __append(output_path, variants)

try:
variants.sink_parquet(output_path, maintain_order=False)
except polars.exceptions.InvalidOperationError:
variants.collect(streaming=True).write_parquet(output_path)
logger.info(f"End write variants in {output_path}")


Expand Down Expand Up @@ -128,20 +140,24 @@ def genotypes(
logger = logging.getLogger("vcf2parquet.genotypes")

lf = ctx.obj["lazyframe"]
append = ctx.obj["append"] # noqa: F841 not used now
headers = ctx.obj["headers"]
input_path = ctx.obj["vcf_path"]
append = ctx.obj["append"] # not used now

logger.debug(f"parameter: {output_path=} {format_string=}")

try:
genotypes_lf = extract.genotypes(lf, io.vcf.format2expr(headers, input_path), format_string)
genotypes_data = lf.genotypes(format_string)
except exception.NoGenotypeError:
logger.error("It's seems vcf not contains genotypes information.") # noqa: TRY400 we are in cli exception isn't readable
sys.exit(12)

if append:
genotypes_data = __append(output_path, genotypes_data)

logger.info(f"Start write genotypes in {output_path}")
genotypes_lf.sink_parquet(output_path, maintain_order=False)
try:
genotypes_data.lf.sink_parquet(output_path, maintain_order=False)
except polars.exceptions.InvalidOperationError:
genotypes_data.lf.collect(streaming=True).write_parquet(output_path)
logger.info(f"End write genotypes in {output_path}")


Expand Down Expand Up @@ -177,23 +193,28 @@ def annotations_subcommand(
logger = logging.getLogger("vcf2parquet.annotations")

lf = ctx.obj["lazyframe"]
append = ctx.obj["append"] # noqa: F841 not used now
headers = ctx.obj["headers"]
input_path = ctx.obj["vcf_path"]
append = ctx.obj["append"] # not used now
headers_obj = ctx.obj["headers"]

logger.debug(f"parameter: {output_path=}")

logger.info("Start extract annotations")
annotations_lf = lf.with_columns(io.vcf.info2expr(headers, input_path, info))
annotations_lf = annotations_lf.drop(["chr", "pos", "ref", "alt", "filter", "qual", "info"])
annotations_data = lf.lf.with_columns(headers_obj.info_parser(info))
annotations_data = annotations_data.drop(["chr", "pos", "ref", "alt", "filter", "qual", "info"])

if rename_id:
logger.info(f"Rename vcf variant id in {rename_id}")
annotations_lf = annotations_lf.rename({"vid": rename_id})
annotations_data = annotations_data.rename({"vid": rename_id})
logger.info("End extract annotations")

if append:
annotations_data = __append(output_path, annotations_data)

logger.info(f"Start write annotations in {output_path}")
annotations_lf.sink_parquet(output_path, maintain_order=False)
try:
annotations_data.sink_parquet(output_path, maintain_order=False)
except polars.exceptions.InvalidOperationError:
annotations_data.collect(streaming=True).write_parquet(output_path)
logger.info(f"End write annotations in {output_path}")


Expand All @@ -213,12 +234,25 @@ def headers(
"""Write vcf headers."""
logger = logging.getLogger("vcf2parquet.headers")

headers = ctx.obj["headers"]
headers_obj = ctx.obj["headers"]

logger.debug(f"parameter: {output_path=}")

logger.info(f"Start write headers in {output_path}")
with open(output_path, "w") as fh_out:
for line in headers:
for line in headers_obj:
print(line, file=fh_out)
logger.info(f"End write headers in {output_path}")


def __append(output_path: pathlib.Path, lf: polars.LazyFrame) -> polars.LazyFrame:
"""Concatenate contante of output_path with lf content if output_path exists.
If parquet schema not match an error will be raise.
"""
if not output_path.is_file():
logger.error("Target file not exist append mode isn't apply.")
else:
lf = polars.concat([polars.scan_parquet(output_path), lf])

return lf

0 comments on commit df36fa0

Please sign in to comment.