## **Overview**
This notebook demonstrates a scalable bioinformatics workflow for analyzing **rare loss-of-function (LoF) variants** across samples and genes.  
It uses [**Narwhals**](https://github.com/narwhals-dev/narwhals/) — a **unified dataframe API** — to write code once and run seamlessly on either **Polars** (fast, efficient) or **Pandas** (flexible, widely used).

The goal of this pipeline is to:
1. **Read** variant call data from `.tsv` or `.parquet` files.
2. **Filter** variants to identify rare LoF events based on allele frequency (`AF`) thresholds.
3. **Annotate** variants into frequency buckets (`ultra_rare`, `rare`, `common`).
4. **Aggregate** per-sample, per-gene burden counts.
5. **Pivot** the data into a **sample × gene burden matrix**.
6. **Join** with sample metadata to generate a final **design matrix** for downstream modeling.


In [None]:
import os
os.chdir("/home/katwre/projects/Data_engineering-projects/narwhals_data_pipeline/")

import pandas as pd
import numpy as np
import random

import narwhals as nw

### Create sample input data

In [None]:

# Set seed for reproducibility
np.random.seed(42)
random.seed(42)

# Parameters
n_samples = 50
n_genes = 30
n_variants = 1000

# Define possible genes and impacts
genes = [f"GENE{i}" for i in range(1, n_genes + 1)]
impacts = ["stop_gained", "frameshift_variant", "splice_donor_variant",
           "missense_variant", "synonymous_variant"]
consequences = {"stop_gained": "high", "frameshift_variant": "high",
                "splice_donor_variant": "high", "missense_variant": "moderate",
                "synonymous_variant": "low"}

# Generate sample IDs
sample_ids = [f"S{i:03d}" for i in range(1, n_samples + 1)]

# Generate variants table
variants = pd.DataFrame({
    "sample_id": np.random.choice(sample_ids, n_variants),
    "gene": np.random.choice(genes, n_variants),
    "impact": np.random.choice(impacts, n_variants, p=[0.1, 0.15, 0.1, 0.5, 0.15]),
    "AF": np.round(np.random.beta(0.5, 10, n_variants), 5)  # skewed towards rare variants
})
variants["consequence"] = variants["impact"].map(consequences)

# Save variants file
variants.to_csv("./input_data/variants.tsv", sep="\t", index=False)

# Generate sample metadata
metadata = pd.DataFrame({
    "sample_id": sample_ids,
    "cohort": np.random.choice(["Discovery", "Validation", "Replication"], n_samples, p=[0.5, 0.3, 0.2]),
    "sex": np.random.choice(["F", "M"], n_samples),
    "age": np.random.randint(35, 80, n_samples)
})
metadata.to_csv("./input_data/sample_metadata.tsv", sep="\t", index=False)

print("✅ Generated variants.tsv (~1,000 rows) and sample_metadata.tsv (~50 rows).")


✅ Generated variants.tsv (~1,000 rows) and sample_metadata.tsv (~50 rows).


In [None]:
# Generate cohort_variants.parquet

# Create random variants DataFrame
np.random.seed(42)
variants_df = pd.DataFrame({
    "sample_id": np.random.choice(sample_ids, n_variants),
    "gene": np.random.choice(genes, n_variants),
    "impact": np.random.choice(impacts, n_variants, p=[0.1, 0.15, 0.1, 0.5, 0.15]),
    "AF": np.round(np.random.beta(0.5, 10, n_variants), 5),
})
variants_df["consequence"] = variants_df["impact"].map(consequences)


# Convert to Arrow Table
table = pa.Table.from_pandas(variants_df)

# Save as Parquet
pq.write_table(table, "./input_data/cohort_variants.parquet")

print("✅ cohort_variants.parquet created successfully!")
#print(variants_df.head())

✅ cohort_variants.parquet created successfully!
  sample_id    gene              impact       AF consequence
0      S039  GENE28    missense_variant  0.07361    moderate
1      S029  GENE18  frameshift_variant  0.00102        high
2      S015   GENE4         stop_gained  0.00097        high
3      S043  GENE13    missense_variant  0.04071    moderate
4      S008   GENE4    missense_variant  0.05094    moderate


## One codebase, multiple engines (pandas ↔ polars)
What this shows: a single import path and IO that target your chosen engine without branching logic.

In [None]:
# Flip this to "pandas" or "polars" — no other changes needed
BACKEND = "polars"   # try "pandas" to compare

# Load input files
variants = nw.read_csv("./input_data/variants.tsv", separator="\t", backend=BACKEND)
samples  = nw.read_csv("./input_data/sample_metadata.tsv",  separator="\t", backend=BACKEND)

print("Variants shape:", variants.shape)
print("Samples shape:", samples.shape)
print("Variants head:")
print(variants.head())
print("Samples head:")
print(samples.head())

Variants shape: (1000, 5)
Samples shape: (50, 4)
Variants head:
┌───────────────────────────────────────────────────────────────────┐
|                        Narwhals DataFrame                         |
|-------------------------------------------------------------------|
|shape: (5, 5)                                                      |
|┌───────────┬────────┬────────────────────┬─────────┬─────────────┐|
|│ sample_id ┆ gene   ┆ impact             ┆ AF      ┆ consequence │|
|│ ---       ┆ ---    ┆ ---                ┆ ---     ┆ ---         │|
|│ str       ┆ str    ┆ str                ┆ f64     ┆ str         │|
|╞═══════════╪════════╪════════════════════╪═════════╪═════════════╡|
|│ S039      ┆ GENE25 ┆ stop_gained        ┆ 0.0     ┆ high        │|
|│ S029      ┆ GENE1  ┆ stop_gained        ┆ 0.0282  ┆ high        │|
|│ S015      ┆ GENE21 ┆ synonymous_variant ┆ 0.00003 ┆ low         │|
|│ S043      ┆ GENE22 ┆ synonymous_variant ┆ 0.12123 ┆ low         │|
|│ S008      ┆ GENE21 ┆ mi

## Expression API that composes (portable feature engineering)
Why unique: The same declarative expressions work across both pandas and polars through Narwhals, avoiding two codepaths or brittle .apply UDFs.

In [None]:
# ✅ cast using Narwhals dtype
variants = variants.with_columns([nw.col("AF").cast(nw.Float64).alias("AF")])

is_lof  = nw.col("impact").is_in(["stop_gained","frameshift_variant","splice_acceptor_variant","splice_donor_variant"])
is_rare = nw.col("AF") < 0.01

af_bucket = (
    nw.when(nw.col("AF") < 1e-4).then(nw.lit("ultra_rare"))
      .otherwise(
        nw.when(nw.col("AF") < 1e-2).then(nw.lit("rare"))
          .otherwise(nw.lit("common"))
      )
)

variants_fe = (
    variants
    .with_columns([
        (is_lof & is_rare).alias("is_rare_lof"),
        af_bucket.alias("af_bucket"),
    ])
    .filter(nw.col("is_rare_lof"))
)

print(variants_fe.head())
print(variants_fe.to_native())


┌───────────────────────────────────────┐
|          Narwhals DataFrame           |
| Use `.to_native` to see native output |
└───────────────────────────────────────┘
shape: (117, 7)
┌───────────┬────────┬──────────────────────┬─────────┬─────────────┬─────────────┬────────────┐
│ sample_id ┆ gene   ┆ impact               ┆ AF      ┆ consequence ┆ is_rare_lof ┆ af_bucket  │
│ ---       ┆ ---    ┆ ---                  ┆ ---     ┆ ---         ┆ ---         ┆ ---        │
│ str       ┆ str    ┆ str                  ┆ f64     ┆ str         ┆ bool        ┆ str        │
╞═══════════╪════════╪══════════════════════╪═════════╪═════════════╪═════════════╪════════════╡
│ S039      ┆ GENE25 ┆ stop_gained          ┆ 0.0     ┆ high        ┆ true        ┆ ultra_rare │
│ S003      ┆ GENE24 ┆ frameshift_variant   ┆ 0.00125 ┆ high        ┆ true        ┆ rare       │
│ S024      ┆ GENE23 ┆ frameshift_variant   ┆ 0.00958 ┆ high        ┆ true        ┆ rare       │
│ S002      ┆ GENE5  ┆ splice_donor_vari

## GroupBy + Agg that’s backend-agnostic
Why unique: Identical groupby semantics and naming regardless of engine; you don’t re-learn two flavors of grouping.


In [52]:
# Per-sample, per-gene burden = count of rare LoF variants
burden = (
    variants_fe
    .group_by(["sample_id", "gene"])
    .agg(nw.len().alias("burden"))
)
print(burden)

┌───────────────────────────────┐
|      Narwhals DataFrame       |
|-------------------------------|
|shape: (111, 3)                |
|┌───────────┬────────┬────────┐|
|│ sample_id ┆ gene   ┆ burden │|
|│ ---       ┆ ---    ┆ ---    │|
|│ str       ┆ str    ┆ u32    │|
|╞═══════════╪════════╪════════╡|
|│ S036      ┆ GENE21 ┆ 2      │|
|│ S038      ┆ GENE24 ┆ 1      │|
|│ S039      ┆ GENE8  ┆ 1      │|
|│ S035      ┆ GENE30 ┆ 1      │|
|│ S012      ┆ GENE5  ┆ 1      │|
|│ …         ┆ …      ┆ …      │|
|│ S037      ┆ GENE24 ┆ 1      │|
|│ S013      ┆ GENE25 ┆ 1      │|
|│ S004      ┆ GENE19 ┆ 1      │|
|│ S045      ┆ GENE27 ┆ 1      │|
|│ S024      ┆ GENE23 ┆ 1      │|
|└───────────┴────────┴────────┘|
└───────────────────────────────┘


## Pivot wider without engine-specific quirks
Why unique: Polars and pandas pivot APIs differ; Narwhals smooths that over.

In [None]:
# Wide sample × gene burden matrix
#burden_wide = (
#    burden
#    .pivot(index="sample_id", columns="gene", values="burden", fill_value=0)
#    .sort("sample_id")
#)

# Convert burden Narwhals DF to native polars
burden_native = burden.to_native()

# Use Polars pivot
burden_wide_native = burden_native.pivot(
    index="sample_id",
    columns="gene",
    values="burden"
).fill_null(0)  # fill missing cells with 0

# Wrap back into Narwhals if you want to continue with Narwhals API
burden_wide = nw.from_native(burden_wide_native)

print(burden_wide.head())

┌────────────────────────────────────────────────────────────────────────────┐
|                             Narwhals DataFrame                             |
|----------------------------------------------------------------------------|
|shape: (5, 30)                                                              |
|┌───────────┬───────┬───────┬────────┬───┬────────┬───────┬───────┬────────┐|
|│ sample_id ┆ GENE3 ┆ GENE8 ┆ GENE12 ┆ … ┆ GENE11 ┆ GENE2 ┆ GENE6 ┆ GENE18 │|
|│ ---       ┆ ---   ┆ ---   ┆ ---    ┆   ┆ ---    ┆ ---   ┆ ---   ┆ ---    │|
|│ str       ┆ u32   ┆ u32   ┆ u32    ┆   ┆ u32    ┆ u32   ┆ u32   ┆ u32    │|
|╞═══════════╪═══════╪═══════╪════════╪═══╪════════╪═══════╪═══════╪════════╡|
|│ S026      ┆ 1     ┆ 1     ┆ 0      ┆ … ┆ 0      ┆ 1     ┆ 1     ┆ 0      │|
|│ S009      ┆ 0     ┆ 1     ┆ 0      ┆ … ┆ 0      ┆ 0     ┆ 0     ┆ 0      │|
|│ S043      ┆ 0     ┆ 0     ┆ 1      ┆ … ┆ 0      ┆ 0     ┆ 0     ┆ 0      │|
|│ S045      ┆ 0     ┆ 1     ┆ 0      ┆ … ┆ 0      ┆

  burden_wide_native = burden_native.pivot(


In [65]:
# genes is your Python list of gene names

# build aggregations
burden_wide = (
    burden_expanded
    .group_by("sample_id")
    .agg(**{g: nw.col(g).sum() for g in genes})   # kwargs is OK
    .sort("sample_id")
)

print("has 'burden'?", "burden" in burden.columns)
print("n genes:", len(genes), "; sample of genes:", genes[:5])
print("expanded has first gene col?", genes[0] in burden_expanded.columns)




has 'burden'? True
n genes: 29 ; sample of genes: ['GENE18', 'GENE10', 'GENE30', 'GENE11', 'GENE25']
expanded has first gene col? True


## Typed joins that “just work”
Why unique: Joins often bite when dtypes (categorical vs string) differ across engines; Narwhals normalizes these.

In [69]:
# Join sample metadata for downstream modeling
# Convert to native Polars DataFrame first
design_native = design.to_native()

# Save as CSV
design_native.write_csv("./input_data/design_matrix.csv")

print("✅ Saved design_matrix.csv")


✅ Saved design_matrix.csv


## Optional: Lazy when available, eager when not (same code)
If your backend is polars, Narwhals can route to lazy execution and push filters/aggregations down; with pandas, it executes eagerly—but your code doesn’t change.

Why unique: You can prototype on pandas, then flip to polars for scale to get query planning and parallelism without touching the transformation code.


In [70]:
# (Conceptual toggle—Narwhals can map to a lazy plan where supported)
variants_lazy = variants.lazy()   # becomes a no-op/eager facade on pandas
pipeline = (
    variants_lazy
    .with_columns([is_rare_lof])
    .filter(nw.col("is_rare_lof"))
    .group_by(["sample_id", "gene"])
    .agg(nw.len().alias("burden"))
)
result = pipeline.collect()       # polars: executes the optimized plan; pandas: returns materialized result


## Arrow/Parquet zero-copy interop (great for genomics)
Why unique: In genomics, Parquet/Arrow are common for big tables (VCF→Parquet). Narwhals lets you stay in one API, avoiding manual conversions and engine-specific nuances.

In [None]:
def read_parquet_narwhals(path: str, backend: str = "polars"):
    if backend == "polars":
        import polars as pl
        native = pl.read_parquet(path)
    elif backend == "pandas":
        import pandas as pd
        native = pd.read_parquet(path)  # engine='pyarrow' if your setup requires it
    else:
        raise ValueError("backend must be 'polars' or 'pandas'")
    return nw.from_native(native)

variants = read_parquet_narwhals("./input_data/cohort_variants.parquet", backend="polars")
variants = variants.with_columns([nw.col("AF").cast(nw.Float64)])
print(variants.to_native().head())


shape: (5, 5)
┌───────────┬────────┬────────────────────┬─────────┬─────────────┐
│ sample_id ┆ gene   ┆ impact             ┆ AF      ┆ consequence │
│ ---       ┆ ---    ┆ ---                ┆ ---     ┆ ---         │
│ str       ┆ str    ┆ str                ┆ f64     ┆ str         │
╞═══════════╪════════╪════════════════════╪═════════╪═════════════╡
│ S039      ┆ GENE28 ┆ missense_variant   ┆ 0.07361 ┆ moderate    │
│ S029      ┆ GENE18 ┆ frameshift_variant ┆ 0.00102 ┆ high        │
│ S015      ┆ GENE4  ┆ stop_gained        ┆ 0.00097 ┆ high        │
│ S043      ┆ GENE13 ┆ missense_variant   ┆ 0.04071 ┆ moderate    │
│ S008      ┆ GENE4  ┆ missense_variant   ┆ 0.05094 ┆ moderate    │
└───────────┴────────┴────────────────────┴─────────┴─────────────┘


## Window ops without rewriting (per-sample QC example)
Why unique: Uniform window/over semantics; no need to learn/set up two different rolling/grouped transforms.

In [80]:
qc = (
    variants
    .group_by("sample_id")
    .agg(nw.len().alias("n_variants_sample"))
    .sort("sample_id")
)

print(qc.to_native().head())


shape: (5, 2)
┌───────────┬───────────────────┐
│ sample_id ┆ n_variants_sample │
│ ---       ┆ ---               │
│ str       ┆ u32               │
╞═══════════╪═══════════════════╡
│ S001      ┆ 24                │
│ S002      ┆ 21                │
│ S003      ┆ 22                │
│ S004      ┆ 16                │
│ S005      ┆ 23                │
└───────────┴───────────────────┘


Why this shows Narwhals’ uniqueness 

- Write once, scale later: Prototype on a laptop with pandas; switch to polars for large cohorts—all transformations stay identical.

- Declarative expressions: Clear, composable feature engineering without backend-specific .apply functions.

- Robust IO & interop: CSV/TSV now, Arrow/Parquet later—same API. Great fit for variant tables, gene counts, cell metadata, etc.

- Consistent wide-table shaping: Pivots, joins, groupbys, and window ops behave uniformly—critical in pipelines that hop between wide (sample×gene) and long (tidy) layouts.

- Optional laziness: When the backend supports it (polars), you get optimization and pushdown “for free.”