Skip to content

Commit

Permalink
feat(io.vcf): Add parsing of genotypes.
Browse files Browse the repository at this point in the history
  • Loading branch information
natir committed Apr 27, 2023
1 parent ab950b5 commit 98f0ea6
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/variantplaner/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ class NotAVCFError(Exception):
"""Exception raise if file read seems not be a vcf."""

def __init__(self, path: pathlib.Path):
"""Initilize not a vcf error."""
"""Initialize not a vcf error."""
super().__init__(f"File {path} seems not be a valid vcf file.")


class NoGenotypeError(Exception):
"""Exception raise if file read seems not be a vcf."""

def __init__(self):
"""Intilize no genotype error."""
"""Initialize no genotype error."""
super().__init__("LazyFrame seems not contains genotypes.")
119 changes: 117 additions & 2 deletions src/variantplaner/io/vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,20 @@
import logging
import re
import typing
from typing import Callable

if typing.TYPE_CHECKING: # pragma: no cover
import pathlib
import sys

if sys.version_info >= (3, 11):
from typing import Concatenate, ParamSpec
else:
from typing_extensions import Concatenate, ParamSpec

T = typing.TypeVar("T")
P = ParamSpec("P")


# 3rd party import
import polars
Expand Down Expand Up @@ -52,18 +63,19 @@ def extract_header(input_path: pathlib.Path) -> list[str]:


def info2expr(header: list[str], input_path: pathlib.Path, select_info: set[str] | None = None) -> list[polars.Expr]:
"""Read vcf header to generate a list of polars.Expr to extract info.
"""Read vcf header to generate a list of polars.Expr to extract variants informations.
Args:
header: Line of vcf header
input_path: Path to vcf file.
select_info: List of info field
select_info: List of target info field
Returns:
List of polars expr to parse info columns.
Raises:
NotAVCFError: If all line not start by '#CHR'
NotSupportType: If header line indicate a not support type
"""
info_re = re.compile(
r"ID=(?P<id>([A-Za-z_][0-9A-Za-z_.]*|1000G)),Number=(?P<number>[ARG0-9\.]+),Type=(?P<type>Integer|Float|String)",
Expand All @@ -88,18 +100,121 @@ def info2expr(header: list[str], input_path: pathlib.Path, select_info: set[str]
local_expr = local_expr.cast(polars.Int64)
elif search["type"] == "Float":
local_expr = local_expr.cast(polars.Float64)
elif search["type"] == "String":
pass # Not do anything on string
else:
pass # Not reachable

else:
local_expr = local_expr.str.split(",")
if search["type"] == "Integer":
local_expr = local_expr.cast(polars.List(polars.Int64))
elif search["type"] == "Float":
local_expr = local_expr.cast(polars.List(polars.Float64))
elif search["type"] == "String":
pass # Not do anything on string
else:
pass # Not reachable

expressions.append(local_expr.alias(search["id"]))

raise NotAVCFError(input_path)


def __format_gt(expr: polars.Expr, /, *_args: typing.Any, **kargs: typing.Any) -> polars.Expr:
"""Manage gt field."""
return expr.str.count_match("1").cast(polars.UInt8).alias(kargs["name"].lower())


def __format_one_int(expr: polars.Expr, /, *_args: typing.Any, **kargs: typing.Any) -> polars.Expr:
"""Manage integer field."""
return expr.str.parse_int(10, strict=False).cast(polars.UInt16).alias(kargs["name"].lower())


def __format_one_str(expr: polars.Expr, /, *_args: typing.Any, **kargs: typing.Any) -> polars.Expr:
"""Manage string field."""
return expr.alias(kargs["name"].lower())


def __format_list_int(expr: polars.Expr, /, *_args: typing.Any, **kargs: typing.Any) -> polars.Expr:
"""Manage list of integer field."""
return (
expr.str.split(",")
.arr.eval(polars.element().str.parse_int(10, strict=False).cast(polars.UInt16))
.alias(kargs["name"].lower())
)


def __format_list_str(expr: polars.Expr, /, *_args: typing.Any, **kargs: typing.Any) -> polars.Expr:
"""Manag list string field."""
return expr.str.split(",").alias(kargs["name"].lower())


def format2expr(
header: list[str],
input_path: pathlib.Path,
select_format: set[str] | None = None,
) -> dict[str, Callable[Concatenate[polars.Expr, P], polars.Expr]]:
"""Read vcf header to generate a list of polars.Expr to extract genotypes informations.
**Warning**: Float values can't be converted for the moment they are stored as String to keep the information
Args:
header: Line of vcf header
input_path: Path to vcf file.
select_format: List of target format field
Returns:
A dict to link format id to pipeable function with Polars.Expr
Raises:
NotAVCFError: If all line not start by '#CHR'
"""
format_re = re.compile(
"ID=(?P<id>[A-Za-z_][0-9A-Za-z_.]*),Number=(?P<number>[ARG0-9\\.]+),Type=(?P<type>Integer|Float|String|Character)",
)

expressions: dict[str, Callable[Concatenate[polars.Expr, P], polars.Expr]] = {}

for line in header:
if line.startswith("#CHROM"):
return expressions

if not line.startswith("##FORMAT"):
continue

if (search := format_re.search(line)) and (not select_format or search["id"] in select_format):
name = search["id"]
number = search["number"]
format_type = search["type"]

if name == "GT":
expressions["GT"] = __format_gt
continue

if number == "1":
if format_type == "Integer":
expressions[name] = __format_one_int
elif format_type == "Float": # noqa: SIM114 Float isn't already support but in future
expressions[name] = __format_one_str
elif format_type == "String" or format_type == "Character":
expressions[name] = __format_one_str
else:
pass # Not reachable

else:
if format_type == "Integer": # noqa: PLR5501 All other number are consider as list
expressions[name] = __format_list_int
elif format_type == "Float": # noqa: SIM114 Float isn't already support but in future
expressions[name] = __format_list_str
elif format_type == "String" or format_type == "Character":
expressions[name] = __format_list_str
else:
pass # Not reachable

raise NotAVCFError(input_path)


def sample_index(header: list[str], input_path: pathlib.Path) -> dict[str, int] | None:
"""Read vcf header to generate an association map between sample name and index.
Expand Down
39 changes: 39 additions & 0 deletions tests/test_io_vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@
from __future__ import annotations

import filecmp
import typing

if typing.TYPE_CHECKING: # pragma: no cover
import sys

if sys.version_info >= (3, 11):
from typing import ParamSpec
else:
from typing_extensions import ParamSpec

T = typing.TypeVar("T")
P = ParamSpec("P")

# 3rd party import
import pathlib
Expand Down Expand Up @@ -87,6 +99,33 @@ def test_info2expr_exception() -> None:
io.vcf.info2expr(header[:-1], DATA_DIR / "no_genotypes.vcf")


def test_format2expr_no_format_vcf() -> None:
"""Check format2expr."""
header = io.vcf.extract_header(DATA_DIR / "no_genotypes.vcf")
assert (
io.vcf.format2expr(
header,
DATA_DIR / "no_genotypes.vcf",
)
== {}
)


def test_format2expr() -> None:
"""Check format2expr."""
header = io.vcf.extract_header(DATA_DIR / "no_info.vcf")

assert set(io.vcf.format2expr(header, DATA_DIR / "no_info.vcf").keys()) == {"AD", "DP", "GQ", "GT"}


def test_format2expr_exception() -> None:
"""Check format2expr."""
header = io.vcf.extract_header(DATA_DIR / "no_info.vcf")

with pytest.raises(exception.NotAVCFError):
io.vcf.format2expr(header[:-1], DATA_DIR / "no_info.vcf")


def test_sample_index() -> None:
"""Check sample index."""
truth = {"sample_1": 0, "sample_2": 1, "sample_3": 2}
Expand Down

0 comments on commit 98f0ea6

Please sign in to comment.