In [1]:
import abc
import functools
import itertools as itt
from collections.abc import Sequence
from dataclasses import dataclass, field
from functools import cached_property, total_ordering
from numbers import Number
from typing import Optional, Tuple, Union

import numpy as np
import pandas as pd
from pydantic import (
    BaseModel,
    Field,
    PositiveInt,
    PrivateAttr,
    field_validator,
    model_validator,
)


- http://www.ensembl.org/info/website/upload/wig.html
  - Basic definition of the types of blocks.

- https://genome.ucsc.edu/goldenPath/help/wiggle
  - More complex track definition parameters.
  - Provides with an example that shows the flexibility of the Wiggle format, mixing different block types.
  - "track" line is not only at the top of the file, but can be also at the beginning of each block.
  - ⇒ A single Wiggle File can contain multiple tracks with different parameters for visualization...
  - **⇒ ... But also with different values for the same genomic positions!**
  - And also: a wiggle file might be composed of heterogeneous blocks.


Questions:
 
- Bedgraph-type blocks?
- Heterogeneous blocks: can they be for the same track, or a track is always homogeneous?

In [13]:
(300 * 79_250 / 100) / 60 / 60 / 24

2.751736111111111

## Blocks

### ABC

In [2]:
@functools.total_ordering
class WiggleBlock(BaseModel, abc.ABC):
    """Abstract base class for Wiggle blocks (fixedStep / variableStep)."""

    class Config:
        frozen = True  # immutable like a frozen dataclass

    chrom: str
    values: tuple[int | float, ...]
    span: int = 1

    # --- abstract properties/methods ---
    @property
    @abc.abstractmethod
    def start(self) -> int:
        """Start position of the block (1-based, inclusive)."""
        ...

    @property
    @abc.abstractmethod
    def stop(self) -> int:
        """Last position covered in the block (1-based, inclusive)."""
        ...

    @property
    @abc.abstractmethod
    def positions(self) -> tuple[int, ...]:
        """Returns the positions with values, not all positions in the range."""
        ...

    @property
    @abc.abstractmethod
    def indexed_values(self) -> tuple[tuple[int, int | float], ...]:
        """Return a tuple of (position, value) pairs for each position with a value."""
        ...

    @property
    @abc.abstractmethod
    def header(self) -> str: ...

    # --- common methods ---
    def as_series(
        self, full_range: bool = True, fill_value: float | int = np.nan
    ) -> pd.Series:
        """Return the block as a pandas Series.

        If `full_range` is True, the series will include all positions from
        `start` to `stop` (included), filling missing values with `fill_value`.
        If `full_range` is False, only the positions with values will be included.
        """
        series = pd.Series(dict(self.indexed_values))
        if full_range:
            return series.reindex(
                pd.RangeIndex(start=self.start, stop=self.stop + 1),
                fill_value=fill_value,
            )
        return series

    def intersect(self, other: "WiggleBlock") -> tuple[int, int] | None:
        """Return the intersection with another WiggleBlock.

        Returns a tuple (start, stop) both included) if there is an intersection,
        or None if not.
        """
        if not isinstance(other, WiggleBlock):
            raise TypeError("Other must be a WiggleBlock instance")

        if self.chrom != other.chrom:
            return None

        start = max(self.start, other.start)
        stop = min(self.stop, other.stop)

        if start > stop:
            return None
        return (start, stop)

    def overlaps(self, other: "WiggleBlock") -> bool:
        """Check if this block overlaps with another WiggleBlock."""
        return self.intersect(other) is not None

    def __len__(self) -> int:
        """Length of the genomic interval (from start to last covered position)."""
        return self.stop - self.start + 1

    def __eq__(self, other: "WiggleBlock") -> bool:
        if not isinstance(other, WiggleBlock):
            return NotImplemented
        return (self.chrom, self.start) == (other.chrom, other.start)

    def __lt__(self, other: "WiggleBlock") -> bool:
        if not isinstance(other, WiggleBlock):
            return NotImplemented
        if self.chrom != other.chrom:
            return self.chrom < other.chrom
        return self.start < other.start

    # TODO: some of the positions within the [start,end] interval may not be covered.
    # ⇒ should "indexing" correspond only to covered positions, or to all positions?
    # NOTE: the `indexed_values` method returns only covered positions.
    # This thus suggests that indexing should only correspond to covered positions.
    # **BUT** : does slicing make sense at all return only covered positions?
    # Also: is slicing expected to be done with genomic or relative positions?

    # def index_of(self, position: int) -> int:
    #    """Return the index of the position in the block."""
    #    return self.as_series(full_range=True).index.get_loc(position)

    # def __getitem__(
    #    self, key: Union[int, slice]
    # ) -> Union[tuple[int, Union[int, float]], list[tuple[int, Union[int, float]]]]:
    #    """Return single or multiple (position, value) tuples."""
    #    indexed = list(self.indexed_values)
    #    return indexed[key]

### FixedStep

In [3]:
class FixedStepBlock(WiggleBlock):
    """A class representing a fixed-step Wiggle block."""

    chrom: str
    start_: PositiveInt = Field(alias="start")
    values: tuple[int | float, ...]
    step: PositiveInt
    span: PositiveInt = 1

    # --- validations ---
    @model_validator(mode="after")
    def validate_step_span(self):
        if self.span > self.step:
            raise ValueError(f"Span {self.span} cannot be larger than step {self.step}")
        return self

    @field_validator("values")
    @classmethod
    def check_non_empty(cls, v: tuple[int | float, ...]):
        if len(v) == 0:
            raise ValueError("values must contain at least one element.")
        return v

    # --- required implementations ---
    @property
    def start(self) -> int:
        return self.start_

    @property
    def stop(self) -> int:
        return self.start + self.step * (len(self.values) - 1) + self.span - 1

    @property
    def indexed_values(self) -> tuple[tuple[int, int | float], ...]:
        """Return indexed values as (position, value) tuples."""
        i = np.arange(len(self.values))
        j = np.arange(self.span)
        idx = self.start + i[:, None] * self.step + j[None, :]
        idx = idx.ravel()
        vals = np.repeat(np.array(self.values), self.span)
        return tuple(zip(idx, vals))

    @property
    def positions(self) -> tuple[int, ...]:
        return tuple(v[0] for v in self.indexed_values())

    @property
    def header(self) -> str:
        base = f"fixedStep chrom={self.chrom} start={self.start} step={self.step}"
        return f"{base} span={self.span}"

### VariableStep

In [4]:
class VariableStepBlock(WiggleBlock):
    """A class representing a variable-step Wiggle block."""

    chrom: str
    values: tuple[int | float, ...]
    positions_: tuple[PositiveInt, ...] = Field(alias="positions")
    span: PositiveInt = 1

    # --- validations ---
    @model_validator(mode="after")
    def validate_positions_span(self):
        if not all(
            self.positions_[i] < self.positions_[i + 1]
            for i in range(len(self.positions_) - 1)
        ):
            raise ValueError("Positions must be strictly increasing.")

        diffs = [b - a for a, b in zip(self.positions_[:-1], self.positions_[1:])]
        if any(diff < self.span for diff in diffs):
            raise ValueError(
                "Span value leads to overlap of some of the provided positions."
            )
        return self

    # @field_validator("values", "positions")
    # @classmethod
    # def check_non_empty(cls, v, info):
    #    if len(v) == 0:
    #        raise ValueError(f"{info.field_name} must not be empty")
    #    return v

    # --- required implementations ---

    @property
    def start(self) -> int:
        return self.positions_[0]

    @property
    def stop(self) -> int:
        return self.positions_[-1] + self.span - 1

    @property
    def positions(self) -> tuple[int, ...]:
        return tuple(
            np.repeat(self.positions_, self.span)
            + np.tile(range(self.span), len(self.positions_))
        )

    @property
    def indexed_values(self) -> tuple[tuple[int, int | float], ...]:
        pos = np.array(self.positions_)
        vals = np.array(self.values)
        idx = np.repeat(pos, self.span) + np.tile(range(self.span), len(vals))
        return tuple(zip(idx, np.repeat(vals, self.span)))

    @property
    def header(self) -> str:
        return f"variableStep chrom={self.chrom} span={self.span}"

### Tests

In [5]:
wfb = FixedStepBlock(chrom="chr1", start=101, values=(0.1, 0.2, 0.3), step=3, span=2)
display(wfb.as_series())

wfb2 = FixedStepBlock(chrom="chr1", start=104, values=(0.1, 0.2, 0.3), step=3, span=2)
display(wfb2.as_series())

101    0.1
102    0.1
103    NaN
104    0.2
105    0.2
106    NaN
107    0.3
108    0.3
dtype: float64

104    0.1
105    0.1
106    NaN
107    0.2
108    0.2
109    NaN
110    0.3
111    0.3
dtype: float64

In [6]:
variable_step_content = (
    (300701, 12.5),
    (300718, 0.3),
    (300804, 1.5),
    (300845, 0.8),
)

vsb = VariableStepBlock(
    chrom="chr2",
    positions=tuple(pos for pos, _ in variable_step_content),
    values=tuple(val for _, val in variable_step_content),
    span=3,
)

In [7]:
vsb.as_series(full_range=True)

300701    12.5
300702    12.5
300703    12.5
300704     NaN
300705     NaN
          ... 
300843     NaN
300844     NaN
300845     0.8
300846     0.8
300847     0.8
Length: 147, dtype: float64

## Track definition

In [8]:
class WigTrackDefinition(BaseModel):
    """A class representing a Wiggle track definition."""

    type_: str = Field(
        alias="type",
        description="Type of the track, mandatory 'wiggle_0'",
        default="wiggle_0",
    )
    name: str = Field(
        description="Unique name to identify this track when parsing the file",
        default="wiggle_track",
    )
    description: str = Field(
        description="Label to be displayed under the track in Region in Detail",
        default="Wiggle Track",
    )

    priority: int = Field(
        description="Integer defining the order in which to display tracks, if multiple tracks are defined.",
        default=0,
    )
    color: str = Field(
        description="Color as RGB, hex or X11 named color",
        default="#000000",
    )
    graphType: str = Field(
        description="Graph type, either 'bar' or 'points'.",
        choices=["bar", "points"],
    )

    @model_validator(mode="after")
    def validate_type(self):
        if self.type_ != "wiggle_0":
            raise ValueError(f"Invalid type: {self.type_}. Expected 'wiggle_0'.")
        return self


## Block collection

In [394]:
@dataclass(slots=True)
class WigBlockCollection(BaseModel):
    blocks: Sequence[WiggleBlock] = field(default_factory=list)
    header: WigTrackDefinition | None = field(
        default=None,
        metadata={
            "description": "Optional header for the collection, defining track properties."
        },
    )

    @field_validator("blocks")
    @classmethod
    def check_homogeneous(cls, v, info):
        if len(v) > 0 and not all(isinstance(item, v[0].__class__) for item in v):
            raise ValueError("All blocks must be of the same type.")
        return v

    def add(self, block):
        """Add a new WigBlock at the correct location into the collection."""
        if len(self.blocks) > 0 and not isinstance(
            self.block, self.blocks[0].__class__
        ):
            raise ValueError("All blocks must be of the same type.")

        starts = [b.start for b in self.blocks]
        # Find the index where the new block should be inserted
        index_insert = next(
            (i for i, start in enumerate(starts) if start > block.start),
            len(self.blocks),
        )
        # Verify that the new block does not overlap with the previous or next block.
        prev_block = self.blocks[index_insert - 1] if index_insert > 0 else None
        next_block = (
            self.blocks[index_insert] if index_insert < len(self.blocks) else None
        )

        if prev_block and prev_block.overlaps(block):
            raise ValueError("New block overlaps with the previous block.")

        if next_block and next_block.overlaps(block):
            raise ValueError("New block overlaps with the next block.")

        # Insert the new block at the correct position
        self.blocks.insert(index_insert, block)

    def __iter__(self):
        """Iterate over the blocks in the collection."""
        return iter(self.blocks)

    def __len__(self):
        """Return the number of blocks in the collection."""
        return len(self.blocks)

    def __getitem__(self, index):
        """Get a block by index."""
        return self.blocks[index]

    def as_bed3(self) -> tuple[tuple[str, int, int]]:
        if not self.blocks:
            return tuple()

        return tuple(
            (block.chrom, block.start - 1, block.stop) for block in self.blocks
        )


In [None]:
# Exmaple data tuples


@dataclass
class ExampleWigFiwBlockData:
    values = (0.1, 0.2, 0.3, 0.4, 0.5)
    start = 1
    span = 3
    step = 3

    tuple_data = (
        (1, 0.1),
        (2, 0.1),
        (3, 0.1),
        (4, 0.4),
        (5, 0.5),
    )


## Wiggle reader

## Function : from BED to WigFix

Most basic function : produces a WigFix collection from regions in BED file.

More elaborated:

- strand aware : but needs to assume whether values were reversed or not.
- 