# __init__.py

In [None]:
# from . import * : importing packages from current package when __init__.py operates
from .constants import * # define constants
from .gene_info import GeneInfo
from .read import Read
from .feature import Feature
from .transcript_model import TranscriptModel
from .segment_match import SegmentMatch
from .indexes import FeatureIndex, TransciptsIndex
from .molitem import Molitem
from .logic import *
from .counter import ExInCounter
from .metadata import MetadataCollection, Metadata
from .neighbors import BalancedKNN, convolve_by_sparse_weights
from .estimation import fit_slope, _fit1_slope, clusters_stats
from .serialization import dump_hdf5, load_hdf5
from .analysis import VelocytoLoom, scatter_viz, ixs_thatsort_a2b, load_velocyto_hdf5
from ._version import __version__  # version update; __version__ = "0.17.16"


# Protect users from a nasty bug in Anaconda
# See https://github.com/velocyto-team/velocyto.py/issues/104
# and https://github.com/ContinuumIO/anaconda-issues/issues/10089

import math
import numpy as np

MKL_BUG_ERROR_MSG = """
Your current Python installation is affected by a critical bug in numpy and
MKL, and is going to return wrong results in velocyto and potentially other
scientific packages.
Please try updating your `numpy` version.
For more information, see
https://github.com/velocyto-team/velocyto.py/issues/104
and
https://github.com/ContinuumIO/anaconda-issues/issues/10089
"""

std_check = np.arange(1000000).std()
expected = 288675.1345946685

if not math.isclose(std_check, expected):
    raise RuntimeError(MKL_BUG_ERROR_MSG)

# gene_info package

In [None]:
from typing import * # making new data types
from collections import defaultdict # dictionary
import logging # tracking events during program running
import velocyto as vcy


class GeneInfo:
    """A simple objects that stores basic info on a gene.
    Parsed from the .gtf file and used to build the row_attrs of the loom file"""
    __slots__ = ["genename", "geneid", "chrom", "strand", "start", "end"]

    def __init__(self, genename: str, geneid: str, chromstrand: str, start: int, end: int) -> None:
        self.genename = genename
        self.geneid = geneid
        self.chrom = chromstrand[:-1]
        self.strand = chromstrand[-1]
        self.start = start
        self.end = end
        

(Note that the return type of __init__ ought to be annotated with -> None. The reason for this is subtle. If __init__ assumed a return annotation of -> None, would that mean that an argument-less, un-annotated __init__ method should still be type-checked? Rather than leaving this ambiguous or introducing an exception to the exception, we simply say that __init__ ought to have a return annotation; the default behavior is thus the same as for other methods.)

The main reason is to allow static type checking. By default, mypy will ignore unannotated functions and methods.

Traceback (most recent call last):
  File "tmp.py", line 5, in <module>
    f = Foo()
TypeError: __init__() should return None, not 'int'

# Read package

In [None]:
from typing import *
import inspect


class Read:
    """ Container for reads from sam alignment file """
    __slots__ = ["bc", "umi", "chrom", "strand", "pos", "segments", "clip5", "clip3", "ref_skipped"]

    def __init__(self, bc: str, umi: str, chrom: str, strand: str, pos: int, segments: List, clip5: Any, clip3: Any, ref_skipped: bool) -> None:
        self.bc, self.umi, self.chrom, self.strand, self.pos, self.segments, self.clip5, self.clip3, self.ref_skipped = \
            bc, umi, chrom, strand, pos, segments, clip5, clip3, ref_skipped

    @property # decorator. See below
    def is_spliced(self) -> bool:
        return self.ref_skipped  # len(self.segments) > 1
        
    @property
    def start(self) -> int:
        return self.segments[0][0]

    @property
    def end(self) -> int:
        return self.segments[-1][1]

    @property
    def span(self) -> int:
        return self.end - self.start + 1

    def __lt__(self, other: Any) -> bool: # leading strand
        if self.chrom == other.chrom:
            if self.start == other.start:
                return self.end < other.end
            return self.start < other.start
        return self.chrom < other.chrom
 
    def __gt__(self, other: Any) -> bool: # lagging strand
        if self.chrom == other.chrom:
            if self.start == other.start:
                return self.end > other.end
            return self.start > other.start
        return self.chrom > other.chrom

    def __str__(self) -> str:
        tmp = ""
        for i in self.__slots__:
            attribute = getattr(self, i)
            tmp += f"{i}: {type(attribute).__name__}={attribute}, "
        return tmp

If we use the object name to access, it is dangerous to change data.
Decorator uses getter and setter methods to protect data.
Also, it provides new method from data. 

# Feature package

In [None]:
from typing import *
from collections import defaultdict
import logging
import velocyto as vcy


class Feature:
    """A simple class representing an annotated genomic feature (e.g. exon, intron, masked repeat)"""
    __slots__ = ["start", "end", "kind", "exin_no", "is_validated", "transcript_model"]
    
    def __init__(self, start: int, end: int, kind: int, exin_no: str, transcript_model: Any=None) -> None:
        self.start = start
        self.end = end
        self.transcript_model = transcript_model
        self.kind = kind  # it should be ord("e"), ord("i"), ord("m"), ....
        self.exin_no = int(exin_no)
        self.is_validated = False
    
    def __lt__(self, other: Any) -> bool:
        if self.start == other.start:
            return self.end < other.end
        return self.start < other.start

    def __gt__(self, other: Any) -> bool:
        if self.start == other.start:
            return self.end > other.end
        return self.start > other.start

    def __len__(self) -> int:
        return (self.end - self.start) + 1
        
    def __repr__(self) -> str:
        if self.transcript_model is None:
            return f"Feature not linked to Transcript Model: {self.start}-{self.end} {chr(self.kind)}{self.exin_no}"
        return f"Feature: chr{self.transcript_model.chromstrand}:{self.start}-{self.end} {self.transcript_model.trname}\
    ({self.transcript_model.trid}) {chr(self.kind)}{self.exin_no} {self.transcript_model.genename}({self.transcript_model.geneid})"

    @property
    def is_last_3prime(self) -> bool:
        if self.transcript_model.chromstrand[-1] == "+":
            return self == self.transcript_model.list_features[-1]
        else:
            return self == self.transcript_model.list_features[0]

    def get_downstream_exon(self) -> Any:
        """To use only for introns. Returns the vcy.Feature corresponding to the neighbour exon downstream
        Note
        ----
        In a 15 exons transcript model:
        Downstream to intron10 is exon11 or the interval with index `20` if strand "+".
        Downtream to intron10 is exon10 or the interval with index `10` if strand "-"
        """
        if self.transcript_model.chromstrand[-1] == "+":
            ix = self.exin_no * 2
        else:
            # in the case on strand -
            ix = len(self.transcript_model.list_features) - 2 * self.exin_no + 1
        return self.transcript_model.list_features[ix]

    def get_upstream_exon(self) -> Any:
        """To use only for introns. Returns the vcy.Feature corresponding to the neighbour exon downstream
        Note
        ----
        In a 15 exons transcript model:
        Upstream to intron10 is exon9 or the interval with inxex `18` if strand "+".
        Upstream to intron10 is exon11 or the interval with inxex `8` if strand "-"
        """
        if self.transcript_model.chromstrand[-1] == "+":
            ix = (self.exin_no * 2) - 2
        else:
            # in the case on strand -
            ix = len(self.transcript_model.list_features) - 2 * self.exin_no - 1
        return self.transcript_model.list_features[ix]

    # if self.chromstrand[-1] == "+":
    #             intron_number = self.list_features[-1].exin_no
    #         else:
    #             intron_number = self.list_features[-1].exin_no - 1

    def ends_upstream_of(self, read: vcy.Read) -> bool:
        """The following situation happens
                                                            Read
                                               *|||segment|||-?-||segment|||????????
                ???????|||||Ivl|||||||||*
        """
        return self.end < read.pos  # NOTE: pos is diffetent from start, consider chagning

    def doesnt_start_after(self, segment: Tuple[int, int]) -> bool:
        """One of the following situation happens
                            *||||||segment|||||????????
            *||||Ivl|||||*
                *|||||||||||||Ivl||||||||||????????????
                                    *|||||||||||||Ivl||||||||||????????????
                                              *|||||||||||||Ivl||||||||||????????????
        """
        return self.start < segment[-1]

    def intersects(self, segment: Tuple[int, int], minimum_flanking: int=vcy.MIN_FLANK) -> bool:
        return (segment[-1] - minimum_flanking > self.start) and\
               (segment[0] + minimum_flanking < self.end)  # and ((segment[-1] - segment[0]) > minimum_flanking)

    def contains(self, segment: Tuple[int, int], minimum_flanking: int=vcy.MIN_FLANK) -> bool:
        """One of following situation happens
            *-----||||||segment|||||-----*
                *|||||||||||||Ivl||||||||||||||||*
                  *-----||||||segment|||||-----*
                *|||||||||||||Ivl||||||||||||||||*
                      *-----||||||segment|||||-----*
                *|||||||||||||Ivl||||||||||||||||*
        where `---` idicates the minimum flanking
        """
        return (segment[0] + minimum_flanking >= self.start) and (segment[-1] - minimum_flanking <= self.end) and ((segment[-1] - segment[0]) > minimum_flanking)

    def start_overlaps_with_part_of(self, segment: Tuple[int, int], minimum_flanking: int=vcy.MIN_FLANK) -> bool:
        """The following situation happens
          *---|||segment||---*
                *|||||||||||||Ivl||||||||||||||||*
        where `---` idicates the minimum flanking
        """
        return (segment[0] + minimum_flanking < self.start) and (segment[-1] - minimum_flanking > self.start)

    def end_overlaps_with_part_of(self, segment: Tuple[int, int], minimum_flanking: int=vcy.MIN_FLANK) -> bool:
        """The following situation happens
                                      *---|||segment||---*
                *|||||||||||||Ivl||||||||||||||||*
        where `---` idicates the minimum flanking
            
        """
        return (segment[0] + minimum_flanking < self.end) and (segment[-1] - minimum_flanking > self.end)