In [1]:
from collections import namedtuple

import pandas as pd
import polars as pl
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

import util
import xgboost as xgb

from sklearn import metrics
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

print(f"pandas-version: {pd.__version__}")
print(f"polars-version: {pl.__version__}")

pandas-version: 2.2.0
polars-version: 0.20.6


In [2]:
READ_ORIGINS="/storage2/tbrekalo/HG002-simulated/chr19-read-origins.csv"
REF_MAP_PATH="/storage2/tbrekalo/HG002-simulated/chr19-ref.paf"
CHAINS_PATH="/home/tbrekalo/dev/tb-ram/dev-data/HG002/chr19-sim-sample-chain.tsv"

In [None]:
df_ovlps = util.load_paf_df(REF_MAP_PATH)
df_chains = util.load_chains_df(CHAINS_PATH)
df_origins = util.load_origins_df(READ_ORIGINS)

In [None]:
df_chain_ovlps = util.create_annotated_ref_overlaps_from_chains(
    df_chains,
    df_origins,
)

In [None]:
df_chain_ovlps

In [None]:
df_chains_detail = df_chains.join(
    df_chain_ovlps.select(
        "chain-id",
        "query-start",
        "query-end",
        "target-start",
        "target-end",
        "ratio",
    ),
    on="chain-id",
    how="left",
)

df_chains_detail.head(1)

In [None]:
df_features = df_chains_detail.group_by(
    "chain-id",
    maintain_order=True,
).agg(
    pl.col("query-match").diff().min().alias("query-match-diff-min"),
    pl.col("query-match").diff().mean().alias("query-match-diff-mean"),
    pl.col("query-match").diff().median().alias("query-match-diff-median"),
    pl.col("query-match").diff().max().alias("query-match-diff-max"),

    pl.col("query-length").first(),
    pl.col("query-start").first(),
    pl.col("query-end").first(),
    pl.col("query-matches").first(),

    (
        pl.when(
            pl.col("strand") == "+"
        ).then(
            1
        ).otherwise(
            0
        ).cast(pl.Int64)
    ).first().alias("strand"),

    pl.col("chain-length").first(),

    pl.col("target-length").first(),
    pl.col("target-start").first(),
    pl.col("target-end").first(),
    pl.col("target-matches").first(),

    pl.col("ratio").first(),
)

In [None]:
df_features

In [None]:
FEATURE_SETS = {
    "overlap": [
        "strand",
        "query-start", "query-end", "query-matches",
        "target-start", "target-end", "target-matches",
    ],
    "matches": [
        "strand",
        *[
            f"query-match-diff-{stat}" for stat in [
                "min", "mean", "median", "max",
            ]
        ],

        "query-start", "query-end", "query-matches",
        "target-start", "target-end", "target-matches",
    ]
}

In [None]:
print(FEATURE_SETS)

In [None]:
TrainTestData = namedtuple(
    "TrainTestData", [
        "X_train", "X_test", "y_train", "y_test",
    ],
)

def make_model_data(
    df_features: pl.DataFrame,
    feature_subset: list[str],
    top_k: int = 4_000,
    random_state: int = 42,
) -> TrainTestData:
    df_features = df_features.select(
        *feature_subset, 
        "ratio",
        (
            pl.col("ratio") >= 0.99
        ).cast(pl.Int64).alias("label")
    ).top_k(
        top_k,
        by="ratio",
    )

    X = df_features.select(*feature_subset).to_pandas()
    Y = df_features.select("label").to_pandas()
    return TrainTestData(*train_test_split(
        X, Y, test_size=0.3, random_state=random_state,
    ))
    

data = make_model_data(df_features, FEATURE_SETS["overlap"])

In [None]:
models = {
    "logistic-regression": LogisticRegression(),
    "std-svc": make_pipeline(StandardScaler(), SVC(gamma="auto")),
    "xgb-classifier": xgb.XGBClassifier(
        objective="binary:logistic",
        grow_policy="lossguide",
        random_state=42,
    ),
}

In [None]:
for model in models.values():
    model.fit(data.X_train, data.y_train)

In [None]:
df_report = pl.DataFrame(schema={"model": str, "class": pl.Int64, "metric": str, "value": pl.Float64})
for name, model in models.items():
    y_pred = model.predict(data.X_test)
    report = metrics.classification_report(data.y_test, y_pred, output_dict=True)
    df_report = pl.concat([
        df_report,
        pl.DataFrame(
            pd.melt(
                pd.DataFrame(report)[["0", "1"]].transpose()[["precision", "recall", "f1-score"]].reset_index(), 
                id_vars=["index"],
                var_name="metric",
            )[["index", "metric", "value"]]
        ).select(
            pl.lit(name).alias("model"),
            pl.col("index").alias("class").cast(pl.Int64),
            pl.col("metric"),
            pl.col("value"),
        ),
    ])

df_report

In [None]:
plt.tight_layout()
g = sns.FacetGrid(df_report.to_pandas(), row="class", col="metric")
g.map_dataframe(
    sns.barplot, 
    x="model",
    y="value",
    hue="model",
    palette="deep",
    legend=True,
).set(yscale = "log")

g.set_xlabels()
g.set_xticklabels([])

min_val = np.round(df_report["value"].min() - 0.01, 2)
max_val = np.round(df_report["value"].max() + 0.01, 2)
g.set(
    yticks=np.arange(min_val, max_val, 0.01),
    yticklabels=[np.round(x, 2) for x in np.arange(min_val, max_val, 0.01)],
)
g.add_legend()

In [None]:
xgb.plot_importance(models["xgb-classifier"])

In [None]:
print(models["logistic-regression"].intercept_)
print(models["logistic-regression"].coef_)

print(models["logistic-regression"].feature_names_in_)