Skip to content

Commit

Permalink
[WIP] Create first class
Browse files Browse the repository at this point in the history
  • Loading branch information
natir committed Feb 29, 2024
1 parent c51c932 commit 07ce149
Show file tree
Hide file tree
Showing 23 changed files with 808 additions and 41 deletions.
5 changes: 1 addition & 4 deletions scripts/gen_benchmark_plot.py
Expand Up @@ -191,10 +191,7 @@ def render_plot() -> str:
if df.shape[0] == 0:
return ""

name2func = {
name: globals().get(f"{name}_func", nothing)
for name in df.get_column("benchmark").unique().to_list()
}
name2func = {name: globals().get(f"{name}_func", nothing) for name in df.get_column("benchmark").unique().to_list()}

bench2plot = {}
for name, data in df.group_by("benchmark"):
Expand Down
3 changes: 2 additions & 1 deletion src/variantplaner/__init__.py
Expand Up @@ -7,7 +7,8 @@

from __future__ import annotations

from variantplaner import exception, extract, generate, io, normalization, struct
from variantplaner import extract, generate, io, normalization, struct
from variantplaner.objects import Vcf, VcfParsingBehavior

__all__: list[str] = [
"exception",
Expand Down
12 changes: 6 additions & 6 deletions src/variantplaner/cli/__init__.py
Expand Up @@ -21,7 +21,7 @@ class MultipleValueOption(click.Option):

def __init__(self, *args: list[typing.Any], **kwargs: dict[typing.Any, typing.Any]):
"""Intialise click option parser."""
super(MultipleValueOption, self).__init__(*args,**kwargs) # type: ignore[arg-type] # noqa: UP008 false positive and complexe type
super(MultipleValueOption, self).__init__(*args, **kwargs) # type: ignore[arg-type] # noqa: UP008 false positive and complexe type
self._previous_parser_process = None
self._eat_all_parser = None

Expand Down Expand Up @@ -109,8 +109,8 @@ def main(ctx: click.Context, *, threads: int = 1, verbose: int = 0, debug_info:


# module import required after main definition
from variantplaner.cli import metadata # noqa: E402 F401 I001 these import should be here
from variantplaner.cli import parquet2vcf # noqa: E402 F401 these import should be here
from variantplaner.cli import struct # noqa: E402 F401 these import should be here
from variantplaner.cli import transmission # noqa: E402 F401 these import should be here
from variantplaner.cli import vcf2parquet # noqa: E402 F401 these import should be here
from variantplaner.cli import metadata # noqa: E402 F401 I001 these import should be here
from variantplaner.cli import parquet2vcf # noqa: E402 F401 these import should be here
from variantplaner.cli import struct # noqa: E402 F401 these import should be here
from variantplaner.cli import transmission # noqa: E402 F401 these import should be here
from variantplaner.cli import vcf2parquet # noqa: E402 F401 these import should be here
63 changes: 38 additions & 25 deletions src/variantplaner/cli/vcf2parquet.py
Expand Up @@ -9,12 +9,13 @@

# 3rd party import
import click
import polars

# project import
from variantplaner import cli, exception, extract, io
from variantplaner import Vcf, VcfParsingBehavior, cli, exception, extract, io


@cli.main.group("vcf2parquet", chain=True) # type: ignore[has-type]
@cli.main.group("vcf2parquet", chain=True) # type: ignore[has-type]
@click.pass_context
@click.option(
"-i",
Expand Down Expand Up @@ -54,26 +55,27 @@ def vcf2parquet(

logger.debug(f"parameter: {input_path=} {chrom2length_path=} {append=}")

if not chrom2length_path:
logger.error("--chrom2length-path argument is required")

try:
logger.debug("Start extract header")
headers = io.vcf.extract_header(input_path)
logger.debug("End extract header")
except exception.NotAVCFError:
logger.error("Input file seems no be a vcf") # noqa: TRY400 we are in cli exception isn't readable
sys.exit(11)
lf = Vcf()

# Read vcf and manage structural variant
logger.debug("Start read vcf")
lf = io.vcf.into_lazyframe(input_path, chrom2length_path, extension=io.vcf.IntoLazyFrameExtension.MANAGE_SV)
try:
lf.from_path(input_path, chrom2length_path, behavior=VcfParsingBehavior)
except exception.NotVcfHeaderError:
logging.error(f"Path {input_path} seems not contains Vcf.") # noqa: TRY400 we are in cli exception isn't readable
sys.exit(11)
except exception.NotAVCFError:
logging.error(f"Path {input_path} seems not contains Vcf.") # noqa: TRY400 we are in cli exception isn't readable
sys.exit(12)
except exception.NoContigsLengthInformationError:
logging.error("Vcf didn't contains contigs length information you could use chrom2length-path argument.") # noqa: TRY400 we are in cli exception isn't readable
sys.exit(13)
logger.debug("End read vcf")

ctx.obj["vcf_path"] = input_path
ctx.obj["lazyframe"] = lf
ctx.obj["append"] = append
ctx.obj["headers"] = headers
ctx.obj["headers"] = lf.header


@vcf2parquet.command("variants")
Expand All @@ -98,7 +100,12 @@ def variants(
logger.debug(f"parameter: {output_path=}")

logger.info(f"Start write variants in {output_path}")
extract.variants(lf).sink_parquet(output_path, maintain_order=False)
variants = lf.variants()

try:
variants.sink_parquet(output_path, maintain_order=False)
except polars.exceptions.InvalidOperationError:
variants.collect(streaming=True).write_parquet(output_path)
logger.info(f"End write variants in {output_path}")


Expand Down Expand Up @@ -129,19 +136,22 @@ def genotypes(

lf = ctx.obj["lazyframe"]
append = ctx.obj["append"] # noqa: F841 not used now
headers = ctx.obj["headers"]
headers_obj = ctx.obj["headers"]
input_path = ctx.obj["vcf_path"]

logger.debug(f"parameter: {output_path=} {format_string=}")

try:
genotypes_lf = extract.genotypes(lf, io.vcf.format2expr(headers, input_path), format_string)
genotypes_data = lf.genotypes(format_string)
except exception.NoGenotypeError:
logger.error("It's seems vcf not contains genotypes information.") # noqa: TRY400 we are in cli exception isn't readable
sys.exit(12)

logger.info(f"Start write genotypes in {output_path}")
genotypes_lf.sink_parquet(output_path, maintain_order=False)
try:
genotypes_data.lf.sink_parquet(output_path, maintain_order=False)
except polars.exceptions.InvalidOperationError:
genotypes_data.lf.collect(streaming=True).write_parquet(output_path)
logger.info(f"End write genotypes in {output_path}")


Expand Down Expand Up @@ -178,22 +188,25 @@ def annotations_subcommand(

lf = ctx.obj["lazyframe"]
append = ctx.obj["append"] # noqa: F841 not used now
headers = ctx.obj["headers"]
headers_obj = ctx.obj["headers"]
input_path = ctx.obj["vcf_path"]

logger.debug(f"parameter: {output_path=}")

logger.info("Start extract annotations")
annotations_lf = lf.with_columns(io.vcf.info2expr(headers, input_path, info))
annotations_lf = annotations_lf.drop(["chr", "pos", "ref", "alt", "filter", "qual", "info"])
annotations_data = lf.lf.with_columns(headers_obj.info_parser(info))
annotations_data = annotations_data.drop(["chr", "pos", "ref", "alt", "filter", "qual", "info"])

if rename_id:
logger.info(f"Rename vcf variant id in {rename_id}")
annotations_lf = annotations_lf.rename({"vid": rename_id})
annotations_data = annotations_data.rename({"vid": rename_id})
logger.info("End extract annotations")

logger.info(f"Start write annotations in {output_path}")
annotations_lf.sink_parquet(output_path, maintain_order=False)
try:
annotations_data.sink_parquet(output_path, maintain_order=False)
except polars.exceptions.InvalidOperationError:
annotations_data.collect(streaming=True).write_parquet(output_path)
logger.info(f"End write annotations in {output_path}")


Expand All @@ -213,12 +226,12 @@ def headers(
"""Write vcf headers."""
logger = logging.getLogger("vcf2parquet.headers")

headers = ctx.obj["headers"]
headers_obj = ctx.obj["headers"]

logger.debug(f"parameter: {output_path=}")

logger.info(f"Start write headers in {output_path}")
with open(output_path, "w") as fh_out:
for line in headers:
for line in headers_obj:
print(line, file=fh_out)
logger.info(f"End write headers in {output_path}")
24 changes: 24 additions & 0 deletions src/variantplaner/exception.py
Expand Up @@ -13,6 +13,30 @@
# project import


class NoContigsLengthInformationError(Exception):
"""Exception raise if we didn't get Contigs Length information in vcf or in compagnion file."""

def __init__(self):
"""Initize no contigs length information error."""
super().__init__("Contigs length information is required in vcf header of in compagnion file.")


class NotAVariantCsvError(Exception):
"""Exception raise if file is a csv should contains variants info but columns name not match minimal requirement."""

def __init__(self, path: pathlib.Path):
"""Initialize not a variant csv error."""
super().__init__(f"{path} seems not be a csv variant.")


class NotVcfHeaderError(Exception):
"""Exception raise if header isn't compatible with vcf."""

def __init__(self):
"""Initialize not a vcf header error."""
super().__init__("Not a vcf header")


class NotAVCFError(Exception):
"""Exception raise if file read seems not be a vcf, generally not contains a line starts with '#CHROM'."""

Expand Down
3 changes: 2 additions & 1 deletion src/variantplaner/normalization.py
Expand Up @@ -33,6 +33,7 @@ def add_variant_id(lf: polars.LazyFrame, chrom2length: polars.LazyFrame) -> pola
Returns:
[polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) with chr column normalized
"""

real_pos_max = chrom2length.select([polars.col("length").sum()]).collect().get_column("length").max()

if "SVTYPE" in lf.columns and "SVLEN" in lf.columns:
Expand All @@ -56,7 +57,7 @@ def add_variant_id(lf: polars.LazyFrame, chrom2length: polars.LazyFrame) -> pola
),
)

lf = lf.join(chrom2length, on="chr", how="left")
lf = lf.join(chrom2length, right_on="contig", left_on="chr", how="left")
lf = lf.with_columns(real_pos=polars.col("pos") + polars.col("offset"))

lf = lf.with_columns(
Expand Down
8 changes: 8 additions & 0 deletions src/variantplaner/objects/__init__.py
@@ -0,0 +1,8 @@
"""Module to store variantplaner object."""

# std import
from __future__ import annotations

# 3rd party import
# project import
from variantplaner.objects.vcf import Vcf, VcfParsingBehavior
24 changes: 24 additions & 0 deletions src/variantplaner/objects/annotations.py
@@ -0,0 +1,24 @@
"""Declare Genotypes object."""

# std import
from __future__ import annotations

# 3rd party import
import polars

# project import


class Annotations(polars.LazyFrame):
"""Object to manage lazyframe as Annotations."""

def __init__(self):
"""Initialize a Annotations object."""
self.lf = polars.LazyFrame(schema=Annotations.minimal_schema())

@classmethod
def minimal_schema(cls) -> dict[str, type]:
"""Get minimal schema of genotypes polars.LazyFrame."""
return {
"id": polars.UInt64,
}
84 changes: 84 additions & 0 deletions src/variantplaner/objects/contigs_length.py
@@ -0,0 +1,84 @@
"""Declare Vcf object."""

# std import
from __future__ import annotations

import re
import typing

# 3rd party import
import polars

# project import
from variantplaner.objects.csv import Csv

if typing.TYPE_CHECKING:
import pathlib
import sys

from variantplaner.objects.csv import ScanCsv
from variantplaner.objects.vcf_header import VcfHeader

if sys.version_info >= (3, 11):
from typing import Unpack
else:
from typing_extensions import Unpack


class ContigsLength:
"""Store contigs -> length information."""

def __init__(self):
"""Initialise a contigs length."""
self.lf = polars.LazyFrame(
schema={
"contig": polars.String,
"length": polars.UInt64,
"offset": polars.UInt64,
}
)

def from_vcf_header(self, header: VcfHeader) -> int:
"""Fill a object with VcfHeader.
Argument:
header: VcfHeader
Returns: Number of contigs line view
"""
contigs_id = re.compile(r"ID=(?P<id>[^,]+)")
contigs_len = re.compile(r"length=(?P<length>[^,>]+)")

count = 0
contigs2len = {"contig": list(), "length": list()}
for contig_line in header.contigs:
if (len_match := contigs_len.search(contig_line)) and (id_match := contigs_id.search(contig_line)):
contigs2len["contig"].append(id_match.groupdict()["id"])
contigs2len["length"].append(int(len_match.groupdict()["length"]))
count += 1

self.lf = polars.LazyFrame(contigs2len, schema={"contig": polars.String, "length": polars.UInt64})

self.__compute_offset()

return count

def from_path(self, path: pathlib.Path, /, **scan_csv_args: Unpack[ScanCsv]) -> int:
"""Fill object with file point by pathlib.Path.
Argument:
path: path of input file
Returns: Number of contigs line view
"""
csv = Csv()
csv.from_path(path, **scan_csv_args)
self.lf = csv.lf

self.__compute_offset()

return self.lf.collect().shape[0]

def __compute_offset(self):
self.lf = self.lf.with_columns(offset=polars.col("length").cum_sum() - polars.col("length"))
self.lf = self.lf.cast({"offset": polars.UInt64})

0 comments on commit 07ce149

Please sign in to comment.