In [2]:
# ──────────────────────────────────────────────────────────────
# Jupyter test cell for bag_pipe.io helpers
# ──────────────────────────────────────────────────────────────
%load_ext autoreload
%autoreload 2

import importlib, types, textwrap, sys
import bag_pipe.io as bio

# ——————————————————————————————————————————————
# inline sanity-check tests
# ——————————————————————————————————————————————
def test_parse_cigar_single_match():
    starts, ends, j1, j2 = bio.parse_cigar(100, "6M")
    assert starts == [100] and ends == [105] and j1 == [] and j2 == []

def test_parse_cigar_soft_clip():
    starts, ends, *_ = bio.parse_cigar(100, "5S95M")
    assert starts == [100] and ends == [194]      # 95 bp match → 100-194

def test_extract_alignment_features():
    # synthetic alignment row mimicking your read-name scheme
    aln = bio.SamAlignment("read1_BC01_VT17", 0, "chr1", 100, 42, "6M")
    row = bio.extract_alignment_features(aln, min_mapq=1)
    assert row and row[0] == "read1" and row[1] == "BC01" and row[2] == "VT17"

# ——————————————————————————————————————————————
# simple runner
# ——————————————————————————————————————————————
def run_inline_tests():
    for name, fn in globals().items():
        if name.startswith("test_") and isinstance(fn, types.FunctionType):
            fn()
    print("✅  inline sanity checks passed")

run_inline_tests()

# ——————————————————————————————————————————————
# optional: run your on-disk pytest suite
# (uncomment ↓ if you have tests/ directory)
# ——————————————————————————————————————————————
# import pytest, pathlib
# !pytest -q


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
CIGAR: 6M → [6] ['M']
CIGAR: 5S95M → [5, 95] ['S', 'M']
CIGAR: 6M → [6] ['M']
✅  inline sanity checks passed


In [4]:
# ─────────────────────────────────────────────────────────────
#  Run feature extraction on data/test_data/hisat2pe_sorted.sam
# ─────────────────────────────────────────────────────────────
%load_ext autoreload
%autoreload 2

from pathlib import Path
import bag_pipe.io as bio

# ❶  Path to the test SAM.  Adjust if your notebook’s CWD differs.
sam_path = Path("../data/test_data/hisat2pe_sorted.sam")
outfile = Path("../data/output/hisat2pe_sorted.tsv")

rows = []
with bio.SAMReader(sam_path) as rdr:
    for aln in rdr:
        row = bio.extract_alignment_features(aln, min_mapq=1)
        if row:
            rows.append(row)

print(f"✅  processed {len(rows):,} alignments that passed filters")

# ❷  Peek at the first five tab-delimited rows
for r in rows:
    print(*r, sep="\t")

# ❸  Quick junction/exon breakdown (optional sanity check)
junction_reads = sum(1 for r in rows if r[11] and r[12])   # columns 11–12 hold junction coords
print(f"Reads containing ≥1 splice junction: {junction_reads:,}")


CIGAR: 49M → [49] ['M']
CIGAR: 28M1D46M → [28, 1, 46] ['M', 'D', 'M']
CIGAR: 27M1D43M → [27, 1, 43] ['M', 'D', 'M']
CIGAR: 38M → [38] ['M']
CIGAR: 45M1I24M → [45, 1, 24] ['M', 'I', 'M']
CIGAR: 33M1I116M → [33, 1, 116] ['M', 'I', 'M']
CIGAR: 150M → [150] ['M']
CIGAR: 6S38M → [6, 38] ['S', 'M']
CIGAR: 26S118M927N6M → [26, 118, 927, 6] ['S', 'M', 'N', 'M']
CIGAR: 16M83N54M → [16, 83, 54] ['M', 'N', 'M']
CIGAR: 1M1493N140M9S → [1, 1493, 140, 9] ['M', 'N', 'M', 'S']
CIGAR: 70M → [70] ['M']
CIGAR: 48M58N38M194N64M → [48, 58, 38, 194, 64] ['M', 'N', 'M', 'N', 'M']
CIGAR: 8M194N62M → [8, 194, 62] ['M', 'N', 'M']
CIGAR: 70M → [70] ['M']
CIGAR: 70M → [70] ['M']
CIGAR: 150M → [150] ['M']
CIGAR: 150M → [150] ['M']
CIGAR: 43M → [43] ['M']
CIGAR: 70M → [70] ['M']
CIGAR: 80M → [80] ['M']
CIGAR: 70M → [70] ['M']
CIGAR: 43M3S → [43, 3] ['M', 'S']
CIGAR: 84M → [84] ['M']
CIGAR: 70M → [70] ['M']
CIGAR: 70M → [70] ['M']
CIGAR: 8S142M → [8, 142] ['S', 'M']
CIGAR: 70M → [70] ['M']
CIGAR: 70M → [70] ['M']
CI

In [4]:
# ------------------------------------------------------------
# 1. Imports from the Bagpipe code base
# ------------------------------------------------------------
from bag_pipe.io import load_refflat
from bag_pipe.gene_models import GeneModel

# ------------------------------------------------------------
# 2. Point to your refFlat and chromosome of interest
# ------------------------------------------------------------
REF_PATH   = "../data/databases/hg19.refFlat.20200301.igh.txt"   # ← edit me
CHROM      = "chrX"                                       # or any other chrom

# ------------------------------------------------------------
# 3. Load & normalise the refFlat
# ------------------------------------------------------------
ref_df = load_refflat(REF_PATH, chrom=CHROM)
print(f"Loaded {len(ref_df):,} transcripts on {CHROM}")

## build gene model
model = GeneModel(ref_df)


Loaded 2,897 transcripts on chrX


In [15]:
# ---------------------------------------------------------------------
# imports
# ---------------------------------------------------------------------
from itertools import groupby
from collections import Counter
from bag_pipe.io          import tag_and_map_stream, TagMapRow
from bag_pipe.gene_models import GeneModel, summarise_vt
from bag_pipe.io          import load_refflat
from bag_pipe.tags        import rollup_tags

REF_PATH   = "/data/databases/hg19.refFlat.20200301.igh.txt"   # ← edit me
chrom = "chr21"
# ---------------------------------------------------------------------
# prep once
# ---------------------------------------------------------------------
tsv_path = Path(f"../output/bag.bam.extract.{chrom}.sorted.head.txt")
rows_iter = tag_and_map_stream(tsv_path)
model     = GeneModel(
    load_refflat(REF_PATH, chrom=chrom)
)

# threshold + k for roll-up
K          = 1        # max Hamming distance
THRESHOLD  = 0.10     # 10 % abundance cutoff

# ---------------------------------------------------------------------
# outer loop: barcode (cell)
# ---------------------------------------------------------------------
summary_rows = []        # hold VTSummary objects for the first 100 barcodes
for bc_index, (bc, bc_iter) in enumerate(groupby(rows_iter, key=lambda r: r.barcode), 1):
    print(f"Processing barcode {bc_index}: {bc}")

    # materialise this barcode's rows and bucket by VT
    vt_dict: dict[str, list[TagMapRow]] = {}
    for vt, vt_iter in groupby(bc_iter, key=lambda r: r.vt):
        vt_dict[vt] = list(vt_iter)

    # -----------------------------------------------------------------
    # roll-up: decide which VTs survive
    # -----------------------------------------------------------------
    tag_count_pairs = [(vt, len(rows)) for vt, rows in vt_dict.items()]
    keepers, mapping, rejects = rollup_tags(
        tag_count_pairs,
        k=K,
        threshold=THRESHOLD,
    )

    # -----------------------------------------------------------------
    # summarise keeper VTs
    # -----------------------------------------------------------------
    for vt in keepers:
        vt_rows = vt_dict[vt]
        summary = summarise_vt(vt_rows, model)   # returns VTSummary dataclass
        print(summary)

    # break after the first 100 barcodes to keep the test small
    if bc_index >= 100:
        break


FileNotFoundError: [Errno 2] No such file or directory: '/data/databases/hg19.refFlat.20200301.igh.txt'

In [11]:
# ---------------------------------------------------------------------
# notebook cell – stream VT summarisation barcode-by-barcode
# ---------------------------------------------------------------------
from pathlib import Path
from itertools import groupby
from dataclasses import asdict                 # 🔄 NEW
from bag_pipe.io          import tag_and_map_stream, load_refflat
from bag_pipe.gene_models import GeneModel, summarise_vt
from bag_pipe.tags        import rollup_tags

# ------------------------  paths & constants  ------------------------
CHROM      = "chr21"
TSV_IN     = Path(f"../output/bag.bam.extract.{CHROM}.sorted.head.txt")
REF_FLAT   = Path("../data/databases/hg19.refFlat.20200301.igh.txt")
TSV_OUT    = Path(f"../output/{CHROM}.vt_summary.tsv")

K          = 1        # max Hamming distance for VT roll-up
THRESHOLD  = 0.10     # 10 % abundance cutoff

# ------------------------  initialise helpers  -----------------------
rows_iter = tag_and_map_stream(TSV_IN)
model     = GeneModel(load_refflat(REF_FLAT, chrom=CHROM))

# ------------------------  streaming loop  ---------------------------
with TSV_OUT.open("w") as fout:
    header_written = False

    for barcode, bc_iter in groupby(rows_iter, key=lambda r: r.barcode):

        vt_buckets = {vt: list(vt_iter)
                      for vt, vt_iter in groupby(bc_iter, key=lambda r: r.vt)}

        tag_counts = [(vt, len(rows)) for vt, rows in vt_buckets.items()]
        keepers, _map, _rej = rollup_tags(tag_counts, k=K, threshold=THRESHOLD)

        for vt in keepers:
            summary = summarise_vt(vt_buckets[vt], model)   # VTSummary dataclass
            d = asdict(summary)                             # 🔄 turn into dict

            if not header_written:
                fout.write("\t".join(d.keys()) + "\n")      # 🔄
                header_written = True

            fout.write("\t".join(map(str, d.values())) + "\n")  # 🔄

print(f"✅  Finished. Wrote summaries to {TSV_OUT}")


✅  Finished. Wrote summaries to ..\output\chr21.vt_summary.tsv


In [12]:
from itertools import groupby
from bag_pipe.io          import tag_and_map_stream
from bag_pipe.gene_models import GeneModel, summarise_vt
from bag_pipe.tags        import rollup_tags

chrom          = "chr21"                           # adjust as required
tsv_path = Path(f"../output/bag.bam.extract.{chrom}.sorted.head.txt")
rows_iter      = tag_and_map_stream(tsv_path)
model          = GeneModel(load_refflat(REF_PATH, chrom=chrom))
K, THRESHOLD   = 1, 0.10                           # roll-up parameters
header_emitted = False

with open(f"../output/{chrom}.vt_summary.tsv", "w") as fout:
    # ── group by barcode ───────────────────────────────────────────────
    for bc, bc_iter in groupby(rows_iter, key=lambda r: r.barcode):
        # bucket rows by varietal-tag (VT)
        vt_buckets = {
            vt: list(vt_iter)
            for vt, vt_iter in groupby(bc_iter, key=lambda r: r.vt)
        }

        # decide which VTs survive the roll-up
        tag_counts        = [(vt, len(rows)) for vt, rows in vt_buckets.items()]
        keepers, _, _     = rollup_tags(tag_counts, k=K, threshold=THRESHOLD)

        # summarise surviving VTs
        for vt in keepers:
            summary = summarise_vt(vt_buckets[vt], model)

            # ---- header (once) ----
            if not header_emitted:
                fout.write("\t".join(summary.header()) + "\n")  # header() already tab-safe
                header_emitted = True

            # ---- row ----
            #  1. replace empty/None gene with NO_GENE
            if not summary.gene:
                summary.gene = "NO_GENE"

            #  2. ensure list → comma-sep str  (e.g. exon_starts, exon_ends, etc.)
            row_cells = [
                ",".join(map(str, cell)) if isinstance(cell, list) else cell
                for cell in summary.to_row()                     # -> List[Any]
            ]

            fout.write("\t".join(map(str, row_cells)) + "\n")


NameError: name 'REF_PATH' is not defined

In [19]:
from itertools import groupby
from bag_pipe.io          import tag_and_map_stream
from bag_pipe.gene_models import GeneModel, summarise_vt
from bag_pipe.tags        import rollup_tags
header_emitted = False

CHROM      = "chr21"
TSV_IN     = Path(f"../output/bag.bam.extract.{CHROM}.sorted.head.txt")
REF_FLAT   = Path("../data/databases/hg19.refFlat.20200301.igh.txt")
TSV_OUT    = Path(f"../output/{CHROM}.vt_summary.tsv")

K          = 1        # max Hamming distance for VT roll-up
THRESHOLD  = 0.10     # 10 % abundance cutoff

# ------------------------  initialise helpers  -----------------------
rows_iter = tag_and_map_stream(TSV_IN)
model     = GeneModel(load_refflat(REF_FLAT, chrom=CHROM))

# ------------------------  streaming loop  ---------------------------
with TSV_OUT.open("w") as fout:
    header_written = False
# ── group by barcode ───────────────────────────────────────────────
    for bc, bc_iter in groupby(rows_iter, key=lambda r: r.barcode):
        # bucket rows by varietal-tag (VT)
        vt_buckets = {
            vt: list(vt_iter)
            for vt, vt_iter in groupby(bc_iter, key=lambda r: r.vt)
        }

        # decide which VTs survive the roll-up
        tag_counts        = [(vt, len(rows)) for vt, rows in vt_buckets.items()]
        keepers, _, _     = rollup_tags(tag_counts, k=K, threshold=THRESHOLD)

        # summarise surviving VTs
        for vt in keepers:
            summary = summarise_vt(vt_buckets[vt], model)

            # ---- header (once) ----
            if not header_emitted:
                fout.write("\t".join(summary.header()) + "\n")  # header() already tab-safe
                header_emitted = True

            #  2. ensure list → comma-sep str  (e.g. exon_starts, exon_ends, etc.)
            row_cells = [
                ",".join(map(str, cell)) if isinstance(cell, list) else cell
                for cell in summary.to_row()                     # -> List[Any]
            ]

            fout.write(str(summary) + "\n")


In [22]:
from pathlib import Path

def show_non_cp1252_bytes(path, context=50):
    raw = Path(path).read_bytes()
    for i, b in enumerate(raw):
        if b > 0x7F:                       # non-ASCII
            try:
                b.to_bytes(1).decode('cp1252')
            except UnicodeDecodeError:
                left  = raw[max(i-context, 0): i].decode('utf-8', 'ignore')
                right = raw[i+1: i+1+context].decode('utf-8', 'ignore')
                print(f"offset {i}: 0x{b:02X}\n…{left}▶◀{right}…\n")

show_non_cp1252_bytes("../src/bagpipe/io.py")


offset 14553: 0x90
…       int
    union_starts:  List[int]      # ▶◀ real lists
    union_ends:    List[int]
    spa…

