##### Step1 Filter the interaction data from NPInter and separately obtain the LPI for human and mouse.

In [19]:
import pandas as pd

input_file = "../../data/raw/lncRNA_interaction.txt"
output_human = "../../data/LPI/human/npinter_lpi.csv"
output_mouse = "../../data/LPI/mouse/npinter_lpi.csv"

# indices to keep: (gene_name, gene_id, protein_name, uniprot_id, tissue_or_cellline)
keep_indices = [1, 2, 4, 5, 11]

# final output header
header = ['gene_name', 'gene_id', 'identifier', 'protein', 'uniprot_id', 'tissue_or_cellline']

# read input file
df = pd.read_csv(input_file, sep="\t", header=None, dtype=str)

# filter rows: lncRNA - protein - binding
df = df[(df[3] == "lncRNA") & (df[6] == "protein") & (df[13] == "binding")]

# select needed columns
filtered = df.iloc[:, keep_indices].copy()
filtered.columns = ['gene_name', 'gene_id', 'protein', 'uniprot_id', 'tissue_or_cellline']

# construct identifier column
filtered["identifier"] = filtered.apply(
    lambda x: x["gene_id"] if x["gene_id"] != "-" else x["gene_name"], axis=1
)

# reorder columns
filtered = filtered[['gene_name', 'gene_id', 'identifier', 'protein', 'uniprot_id', 'tissue_or_cellline']]

# remove self-loops (gene_name == protein)
filtered = filtered[~((filtered['gene_name'] != "") & (filtered['protein'] != "") & 
                     (filtered['gene_name'] == filtered['protein']))]

# split by species
human_lpi = filtered[df[10] == "Homo sapiens"].drop_duplicates()
mouse_lpi = filtered[df[10] == "Mus musculus"].drop_duplicates()

# save to CSV
human_lpi.to_csv(output_human, index=False, encoding='utf-8')
mouse_lpi.to_csv(output_mouse, index=False, encoding='utf-8')

print("Done. Generated npinter_lpi.csv for human and mouse with columns: "
      "gene_name, gene_id, identifier, protein_name, uniprot_id, tissue_or_cellline.")


  human_lpi = filtered[df[10] == "Homo sapiens"].drop_duplicates()
  mouse_lpi = filtered[df[10] == "Mus musculus"].drop_duplicates()


Done. Generated npinter_lpi.csv for human and mouse with columns: gene_name, gene_id, identifier, protein_name, uniprot_id, tissue_or_cellline.


##### Step2：修复LPI数据

Step 2.1: 修正gene id列。

Step 2.1.1 替换转录本id为对应的基因id

In [20]:
# Human
# -----------------------------
# Paths
# -----------------------------
mapping_file_noncode6 = "../../reference_lncRNA/human/transcript/NONCODEv6_human_hg38_lncRNA_trans.txt"
mapping_file_noncode5 = "../../reference_lncRNA/human/transcript/NONCODEv5_human_hg38_lncRNA_trans.txt"
out_file = "human_lpi_id_fixed.csv"

# Transcript-like ID prefixes to repair
transcript_prefixes = ("NONHSAT",)

# -----------------------------
# Helper: load transcript->gene mapping
# Assumes each line has at least two columns:
#   col0 = gene_id, col1 = transcript_id
# Auto-detects delimiter (comma/tab/whitespace) via engine='python'.
# -----------------------------
def load_mapping(path: str) -> dict:
    df = pd.read_csv(
        path,
        sep=None,                # auto-detect delimiter
        engine="python",
        header=None,
        usecols=[0, 1],          # [gene_id, transcript_id]
        names=["gene_id", "transcript_id"],
        dtype=str
    )
    # normalize strings
    df = df.dropna(subset=["gene_id", "transcript_id"])
    df["gene_id"] = df["gene_id"].str.strip()
    df["transcript_id"] = df["transcript_id"].str.strip()
    # build transcript -> gene mapping (v6/v5 priority handled outside)
    return dict(zip(df["transcript_id"], df["gene_id"]))

# Load mappings: v6 has higher priority than v5
map_v6 = load_mapping(mapping_file_noncode6)
map_v5 = load_mapping(mapping_file_noncode5)

# -----------------------------
# Load LPI table
# Keep strings to avoid unintended type casting
# -----------------------------

# Normalize gene_id string for testing and lookups
gid_norm = human_lpi["gene_id"].fillna("").astype(str).str.strip()

# Identify rows that look like transcript IDs (to be repaired)
mask_tx_like = gid_norm.str.startswith(transcript_prefixes)

# Map transcript -> gene via v6 (priority) and v5 (fallback) only on those rows
mapped_v6 = gid_norm.where(mask_tx_like).map(map_v6)
mapped_v5 = gid_norm.where(mask_tx_like).map(map_v5)

# Start with original gene_id and apply repairs
new_gene_id = human_lpi["gene_id"].copy()

# Apply v6 where available
mask_v6_hit = mapped_v6.notna()
new_gene_id.loc[mask_v6_hit] = mapped_v6.loc[mask_v6_hit].values

# Apply v5 where v6 missed but v5 hit
mask_v5_hit = (~mask_v6_hit) & mapped_v5.notna()
new_gene_id.loc[mask_v5_hit] = mapped_v5.loc[mask_v5_hit].values

# Update gene_id in the dataframe
human_lpi["gene_id"] = new_gene_id

# Rows actually repaired (either v6 or v5 hit)
mask_repaired = mask_v6_hit | mask_v5_hit

# Keep identifier in sync with repaired gene_id (same behavior as original script)
human_lpi.loc[mask_repaired, "identifier"] = human_lpi.loc[mask_repaired, "gene_id"]

# Save
human_lpi.to_csv(out_file, index=False)


In [21]:
# Mouse
# -----------------------------
# Paths
# -----------------------------
mapping_file_noncode5 = "../../reference_lncRNA/mouse/transcript/NONCODEv5_mouse_mm10_lncRNA_trans.txt"
out_file = "mouse_lpi_id_fixed.csv"

transcript_prefixes = ("NONMMUT",)

# -----------------------------
# Helper: load transcript->gene mapping
# Assumes each line has at least two columns:
#   col0 = gene_id, col1 = transcript_id
# Auto-detects delimiter (comma/tab/whitespace) via engine='python'.
# -----------------------------
def load_mapping(path: str) -> dict:
    df = pd.read_csv(
        path,
        sep=None,                # auto-detect delimiter
        engine="python",
        header=None,
        usecols=[0, 1],          # [gene_id, transcript_id]
        names=["gene_id", "transcript_id"],
        dtype=str
    )
    # normalize strings
    df = df.dropna(subset=["gene_id", "transcript_id"])
    df["gene_id"] = df["gene_id"].str.strip()
    df["transcript_id"] = df["transcript_id"].str.strip()
    # build transcript -> gene mapping (v6/v5 priority handled outside)
    return dict(zip(df["transcript_id"], df["gene_id"]))

# Load mappings
map_v5 = load_mapping(mapping_file_noncode5)

# -----------------------------
# Load LPI table
# Keep strings to avoid unintended type casting
# -----------------------------

# Normalize gene_id string for testing and lookups
gid_norm = mouse_lpi["gene_id"].fillna("").astype(str).str.strip()

# Identify rows that look like transcript IDs (to be repaired)
mask_tx_like = gid_norm.str.startswith(transcript_prefixes)

# Map transcript -> gene via v5
mapped_v5 = gid_norm.where(mask_tx_like).map(map_v5)

# Start with original gene_id and apply repairs
new_gene_id = mouse_lpi["gene_id"].copy()

# Apply v5
mask_v5_hit = mapped_v5.notna()
new_gene_id.loc[mask_v5_hit] = mapped_v5.loc[mask_v5_hit].values

# Update gene_id in the dataframe
mouse_lpi["gene_id"] = new_gene_id

# Rows actually repaired
mask_repaired = mask_v5_hit

# Keep identifier in sync with repaired gene_id (same behavior as original script)
mouse_lpi.loc[mask_repaired, "identifier"] = mouse_lpi.loc[mask_repaired, "gene_id"]

# Save
mouse_lpi.to_csv(out_file, index=False)


Step2.2 修正tissue_or_cellline列。

Step2.2.1 提取组织与细胞系信息并进行初步清洗，用于生成修正文件。

In [22]:
import re
import pandas as pd
from typing import List

SPLITERS = [";", ",", "and"]
SPLIT_RE = re.compile(r"\s*(?:;|,|\band\b)\s*", flags=re.IGNORECASE)
PROTECTED_COMMA = "§COMMA§"

def normalize_commas(text: str) -> str:
    if pd.isna(text):
        return text
    return str(text).replace("，", ",")

def protect_commas_before_gestation_week(text: str) -> str:
    if not text:
        return text
    return re.sub(r",(\s*)(?=Gestation\s+Week\b)", PROTECTED_COMMA + r"\1", text, flags=re.IGNORECASE)

def restore_protected_commas(text: str) -> str:
    return text.replace(PROTECTED_COMMA, ",")

def smart_split(text: str) -> List[str]:
    """
    Split by ; , and "and", but ignore any of them that are inside (), [], or {}.
    """
    results = []
    buf = []
    level = 0  # parenthesis nesting
    i = 0
    n = len(text)
    while i < n:
        ch = text[i]
        # Handle entering/exiting parentheses
        if ch in "([{":
            level += 1
            buf.append(ch)
        elif ch in ")]}":
            level = max(level - 1, 0)
            buf.append(ch)
        elif level == 0:
            # Only split at top-level
            if text[i:i+3].lower() == "and" and \
                (i == 0 or not text[i-1].isalpha()) and \
                (i+3 == n or not text[i+3].isalpha()):
                # Split at "and"
                if buf:
                    results.append(''.join(buf).strip())
                    buf = []
                i += 3
                continue
            elif ch in ";,":
                if buf:
                    results.append(''.join(buf).strip())
                    buf = []
                i += 1
                continue
            else:
                buf.append(ch)
        else:
            buf.append(ch)
        i += 1
    # Last buffer
    if buf:
        results.append(''.join(buf).strip())
    # Clean empty
    return [x for x in results if x]

def split_cell_value(cell: str) -> List[str]:
    if pd.isna(cell) or str(cell).strip() == "":
        return []
    s = normalize_commas(str(cell))
    s = protect_commas_before_gestation_week(s)
    # Use smart_split instead of SPLIT_RE
    parts = smart_split(s)
    tokens = []
    for p in parts:
        if not p:
            continue
        p = restore_protected_commas(p).strip().strip('\"“”\'')
        if p:
            tokens.append(p)
    return tokens

def extract_split_unique(input_csv: pd.DataFrame) -> pd.DataFrame:
    df = input_csv[["tissue_or_cellline"]]
    contexts = (
        df["tissue_or_cellline"]
        .map(split_cell_value)
        .explode()
        .dropna()
        .astype(str)
        .str.strip()
    )
    out_df = (
        pd.DataFrame({"raw": contexts})
        .drop_duplicates()
        .sort_values("raw", kind="mergesort")
        .reset_index(drop=True)
    )
    return out_df

def main():

    human_out = "tissue_or_cellline_human_split.csv"
    mouse_out = "tissue_or_cellline_mouse_split.csv"

    # ---- Human ----
    human_ctx = extract_split_unique(human_lpi)
    human_ctx.to_csv(human_out, index=False)

    # ---- Mouse ----
    mouse_ctx = extract_split_unique(mouse_lpi)
    mouse_ctx.to_csv(mouse_out, index=False)

    print(f"Human: {len(human_ctx)} -> saved to {human_out}")
    print(f"Mouse: {len(mouse_ctx)} -> saved to {mouse_out}")


if __name__ == "__main__":
    main()


Human: 769 -> saved to tissue_or_cellline_human_split.csv
Mouse: 121 -> saved to tissue_or_cellline_mouse_split.csv


Step2.2.2 修复tissue_or_cellline列，并将每一个组织或细胞系划分为一行

In [23]:
# Human
import pandas as pd
import math
from typing import List, Dict

# -----------------------------
# 1) Load data
# -----------------------------
#human_lpi = pd.read_csv("human_lpi_id_fixed.csv")  # must contain column 'tissue_or_cellline'
# Mapping file: first column = raw term to fix; second column = standardized name
mapping_df = pd.read_csv("normalized_tissue_cellline_human.csv", header=None, names=["raw", "std"], dtype=str)
# Build a case-insensitive mapping (trimmed and lowercased on the left key)
mapping_dict: Dict[str, str] = { (r or "").strip().lower(): (s or "").strip() for r, s in mapping_df.values }

# -----------------------------
# 2) Split function (ignore separators inside parentheses)
# -----------------------------
def split_outside_parentheses(text: str) -> List[str]:
    """
    Split by comma, semicolon, and the word 'and' while ignoring any separators inside parentheses.
    - Treat 'and' as a word separator only when depth == 0 and flanked by word boundaries.
    - Returns a list of raw tokens (not yet mapped).
    """
    if text is None or (isinstance(text, float) and math.isnan(text)):
        return []
    s = str(text)

    tokens = []
    buf = []
    depth = 0  # parentheses depth
    i = 0
    L = len(s)

    def flush():
        tok = "".join(buf).strip()
        if tok:
            tokens.append(tok)
        buf.clear()

    def is_word_boundary(ch: str) -> bool:
        # Boundary if start/end or a non-alphanumeric char
        return (not ch) or (not ch.isalnum())

    while i < L:
        ch = s[i]

        # Track parentheses depth
        if ch == '(':
            depth += 1
            buf.append(ch)
            i += 1
            continue
        elif ch == ')':
            depth = max(0, depth - 1)
            buf.append(ch)
            i += 1
            continue

        if depth == 0:
            # Check hard separators: comma or semicolon
            if ch in {',', ';'}:
                flush()
                i += 1
                continue

            # Check the word 'and' as a separator (case-insensitive) with word boundaries
            if s[i:i+3].lower() == 'and':
                prev_char = s[i-1] if i-1 >= 0 else ""
                next_char = s[i+3] if i+3 < L else ""
                if is_word_boundary(prev_char) and is_word_boundary(next_char):
                    # finalize current token and skip 'and'
                    flush()
                    i += 3
                    continue

        # Default: keep character
        buf.append(ch)
        i += 1

    # flush remainder
    flush()

    # Drop empty tokens after stripping
    tokens = [t for t in (tok.strip() for tok in tokens) if t]
    return tokens

# -----------------------------
# 3) Normalize & map tokens
# -----------------------------
def map_token(token: str) -> str:
    """
    Map a raw token to standardized name using mapping_dict.
    - Case-insensitive lookup (lowercased key).
    - If not found, return stripped original token.
    """
    if token is None:
        return ""
    key = token.strip().lower()
    return mapping_dict.get(key, token.strip())

def split_and_map_cellline_field(text: str) -> List[str]:
    """
    Split the field outside parentheses and apply mapping to each token.
    Returns a list of standardized tokens.
    """
    raw_tokens = split_outside_parentheses(text)
    mapped = [map_token(t) for t in raw_tokens]
    # Optionally de-duplicate while preserving order
    seen = set()
    out = []
    for m in mapped:
        if m not in seen and m != "":
            seen.add(m)
            out.append(m)
    return out

# -----------------------------
# 4) Explode rows: each token -> one row, other columns unchanged
# -----------------------------
# Create an intermediate list column with standardized tokens
# Pre-clean: normalize fullwidth comma (U+FF0C) to ASCII comma
human_lpi["tissue_or_cellline"] = (
    human_lpi["tissue_or_cellline"]
    .astype(str)
    .str.replace("，", ",", regex=False)
)

human_lpi["_tissues_std"] = human_lpi["tissue_or_cellline"].apply(split_and_map_cellline_field)

# Explode into multiple rows (one per standardized tissue/cell line)
human_lpi = human_lpi.explode("_tissues_std", ignore_index=True)

# Replace original column
human_lpi["tissue_or_cellline"] = human_lpi["_tissues_std"]
human_lpi = human_lpi.drop(columns=["_tissues_std"])

# Optionally drop rows where the split produced no tokens
human_lpi = human_lpi[human_lpi["tissue_or_cellline"].notna() & (human_lpi["tissue_or_cellline"].str.strip() != "")]

# -----------------------------
# 5) Save
# -----------------------------
human_lpi.to_csv("human_lpi_tissue_fixed.csv", index=False)


In [24]:
# Mouse

# -----------------------------
# 1) Load data
# -----------------------------
#mouse_lpi = pd.read_csv("mouse_lpi_id_fixed.csv")  # must contain column 'tissue_or_cellline'

# Mapping file: first column = raw term to fix; second column = standardized name
mapping_df = pd.read_csv("normalized_tissue_cellline_mouse.csv", header=None, names=["raw", "std"], dtype=str)
# Build a case-insensitive mapping (trimmed and lowercased on the left key)
mapping_dict: Dict[str, str] = { (r or "").strip().lower(): (s or "").strip() for r, s in mapping_df.values }

# -----------------------------
# 2) Split function (ignore separators inside parentheses)
# -----------------------------
def split_outside_parentheses(text: str) -> List[str]:
    """
    Split by comma, semicolon, and the word 'and' while ignoring any separators inside parentheses.
    - Treat 'and' as a word separator only when depth == 0 and flanked by word boundaries.
    - Returns a list of raw tokens (not yet mapped).
    """
    if text is None or (isinstance(text, float) and math.isnan(text)):
        return []
    s = str(text)

    tokens = []
    buf = []
    depth = 0  # parentheses depth
    i = 0
    L = len(s)

    def flush():
        tok = "".join(buf).strip()
        if tok:
            tokens.append(tok)
        buf.clear()

    def is_word_boundary(ch: str) -> bool:
        # Boundary if start/end or a non-alphanumeric char
        return (not ch) or (not ch.isalnum())

    while i < L:
        ch = s[i]

        # Track parentheses depth
        if ch == '(':
            depth += 1
            buf.append(ch)
            i += 1
            continue
        elif ch == ')':
            depth = max(0, depth - 1)
            buf.append(ch)
            i += 1
            continue

        if depth == 0:
            # Check hard separators: comma or semicolon
            if ch in {',', ';'}:
                flush()
                i += 1
                continue

            # Check the word 'and' as a separator (case-insensitive) with word boundaries
            if s[i:i+3].lower() == 'and':
                prev_char = s[i-1] if i-1 >= 0 else ""
                next_char = s[i+3] if i+3 < L else ""
                if is_word_boundary(prev_char) and is_word_boundary(next_char):
                    # finalize current token and skip 'and'
                    flush()
                    i += 3
                    continue

        # Default: keep character
        buf.append(ch)
        i += 1

    # flush remainder
    flush()

    # Drop empty tokens after stripping
    tokens = [t for t in (tok.strip() for tok in tokens) if t]
    return tokens

# -----------------------------
# 3) Normalize & map tokens
# -----------------------------
def map_token(token: str) -> str:
    """
    Map a raw token to standardized name using mapping_dict.
    - Case-insensitive lookup (lowercased key).
    - If not found, return stripped original token.
    """
    if token is None:
        return ""
    key = token.strip().lower()
    return mapping_dict.get(key, token.strip())

def split_and_map_cellline_field(text: str) -> List[str]:
    """
    Split the field outside parentheses and apply mapping to each token.
    Returns a list of standardized tokens.
    """
    raw_tokens = split_outside_parentheses(text)
    mapped = [map_token(t) for t in raw_tokens]
    # Optionally de-duplicate while preserving order
    seen = set()
    out = []
    for m in mapped:
        if m not in seen and m != "":
            seen.add(m)
            out.append(m)
    return out

# -----------------------------
# 4) Explode rows: each token -> one row, other columns unchanged
# -----------------------------
# Create an intermediate list column with standardized tokens
# Pre-clean: normalize fullwidth comma (U+FF0C) to ASCII comma
mouse_lpi["tissue_or_cellline"] = (
    mouse_lpi["tissue_or_cellline"]
    .astype(str)
    .str.replace("，", ",", regex=False)
)
mouse_lpi["_tissues_std"] = mouse_lpi["tissue_or_cellline"].apply(split_and_map_cellline_field)

# Explode into multiple rows (one per standardized tissue/cell line)
mouse_lpi = mouse_lpi.explode("_tissues_std", ignore_index=True)

# Replace original column
mouse_lpi["tissue_or_cellline"] = mouse_lpi["_tissues_std"]
mouse_lpi = mouse_lpi.drop(columns=["_tissues_std"])

# Optionally drop rows where the split produced no tokens
mouse_lpi = mouse_lpi[mouse_lpi["tissue_or_cellline"].notna() & (mouse_lpi["tissue_or_cellline"].str.strip() != "")]

# -----------------------------
# 5) Save
# -----------------------------
#mouse_lpi.to_csv("mouse_lpi_tissue.csv", index=False)


Step2.3 Replace invalid identifier with gene_name

In [None]:
# Human
import pandas as pd
import os
import re

# ----------------------------
# Paths (adjust as needed)
# ----------------------------
ensembl_dir = "../../reference_lncRNA/human/bed/ensembl/"
# human_lpi = pd.read_csv('human_lpi_tissue_fixed.csv')

# ----------------------------
# Load NONCODE gene_id lists (only gene_id column is needed)
# NONCODE BED columns: chr, start, end, gene_id, score, strand
# ----------------------------
noncodev5_ids = pd.read_csv(
    '../../reference_lncRNA/human/bed/NONCODEv5_hg38.lncRNAGene.bed',
    sep='\t', header=None, usecols=[3], names=['gene_id']
)['gene_id']

noncodev6_ids = pd.read_csv(
    '../../reference_lncRNA/human/bed/NONCODEv6_hg38.lncRNAGene.bed',
    sep='\t', header=None, usecols=[3], names=['gene_id']
)['gene_id']

# ----------------------------
# Collect Ensembl gene_id from all BED files in the directory
# Ensembl BED columns: chr, start, end, gene_name, gene_id, strand
# ----------------------------
def extract_version(filename: str) -> int:
    """
    Extract Ensembl GRCh38 version number from filename (e.g., '...GRCh38.<n>.bed').
    Returns -1 if not matched.
    """
    m = re.search(r'GRCh38\.(\d+)\.bed', filename)
    return int(m.group(1)) if m else -1

bed_files = [f for f in os.listdir(ensembl_dir) if f.endswith(".bed")]
# Sorting not strictly required for validity checking, but kept for consistency
bed_files_sorted = sorted(bed_files, key=extract_version, reverse=True)

ensembl_ids_list = []
for bed_file in bed_files_sorted:
    bed_path = os.path.join(ensembl_dir, bed_file)
    # Read only the gene_id column (index 4)
    gid = pd.read_csv(bed_path, sep='\t', header=None, usecols=[4], names=['gene_id'])['gene_id']
    ensembl_ids_list.append(gid)

ensembl_ids = pd.concat(ensembl_ids_list, ignore_index=True) if ensembl_ids_list else pd.Series([], dtype=object)

# ----------------------------
# Build a set of valid gene IDs across all sources
# ----------------------------
def to_id_set(series: pd.Series) -> set:
    """
    Normalize a Series to a set of non-empty string IDs:
    - drop NaN
    - strip spaces
    - drop empty strings
    """
    s = series.dropna().astype(str).str.strip()
    s = s[s != ""]
    return set(s)

valid_ids = to_id_set(noncodev6_ids) | to_id_set(noncodev5_ids) | to_id_set(ensembl_ids)

# ----------------------------
# Determine invalid IDs and update 'identifier' accordingly
# Rules:
# - If gene_id is NOT in valid_ids (or is null/empty) -> treat as invalid
# - For invalid rows: identifier := gene_name (gene_name!=-,gene_name is valid)
# - For valid rows: identifier remains unchanged
# - Filter out rows with invalid gene_id and invalid gene_name
# ----------------------------
# ----------------------------
# Determine valid and invalid gene_id
# ----------------------------
gene_id_raw = human_lpi['gene_id']
gene_id_norm = gene_id_raw.astype(str).str.strip()

mask_valid_id = gene_id_norm.isin(valid_ids)
mask_invalid_id = gene_id_raw.isna() | (gene_id_norm == "") | (~gene_id_norm.isin(valid_ids))

# ----------------------------
# For invalid gene_id, check gene_name validity
# ----------------------------
gene_name_norm = human_lpi['gene_name'].astype(str).str.strip()
mask_valid_name = (gene_name_norm != "") & (gene_name_norm != "-")

# Rows to replace identifier with gene_name
mask_replace = mask_invalid_id & mask_valid_name

# Rows to drop: both ID invalid and name invalid
mask_drop = mask_invalid_id & (~mask_valid_name)

# Apply replacement
human_lpi.loc[mask_replace, 'identifier'] = human_lpi.loc[mask_replace, 'gene_name']

# Drop completely invalid rows
human_lpi = human_lpi.loc[~mask_drop].copy()

# save the updated table
human_lpi.to_csv('./human_lpi_fixed.csv', index=False)


In [None]:
# Mouse
import pandas as pd
import os
import re

# ----------------------------
# Paths (adjust as needed)
# ----------------------------
ensembl_dir = "../../reference_lncRNA/mouse/bed/ensembl/"
#mouse_lpi = pd.read_csv('mouse_lpi_tissue_fixed.csv')

# ----------------------------
# Load NONCODE gene_id lists (only gene_id column is needed)
# NONCODE BED columns: chr, start, end, gene_id, score, strand
# ----------------------------
noncodev5_ids = pd.read_csv(
    '../../reference_lncRNA/mouse/bed/NONCODEv5_mm10.lncRNAGene.bed',
    sep='\t', header=None, usecols=[3], names=['gene_id']
)['gene_id']

noncodev6_ids = pd.read_csv(
    '../../reference_lncRNA/mouse/bed/NONCODEv6_mm10.lncRNAGene.bed',
    sep='\t', header=None, usecols=[3], names=['gene_id']
)['gene_id']

# ----------------------------
# Collect Ensembl gene_id from all BED files in the directory
# Ensembl BED columns: chr, start, end, gene_name, gene_id, strand
# ----------------------------
def extract_version(filename: str) -> int:
    """
    Extract Ensembl GRCm38 version number from filename (e.g., '...GRCm38.<n>.bed').
    Returns -1 if not matched.
    """
    m = re.search(r'GRCm38\.(\d+)\.bed', filename)
    return int(m.group(1)) if m else -1

bed_files = [f for f in os.listdir(ensembl_dir) if f.endswith(".bed")]
# Sorting not strictly required for validity checking, but kept for consistency
bed_files_sorted = sorted(bed_files, key=extract_version, reverse=True)

ensembl_ids_list = []
for bed_file in bed_files_sorted:
    bed_path = os.path.join(ensembl_dir, bed_file)
    # Read only the gene_id column (index 4)
    gid = pd.read_csv(bed_path, sep='\t', header=None, usecols=[4], names=['gene_id'])['gene_id']
    ensembl_ids_list.append(gid)

ensembl_ids = pd.concat(ensembl_ids_list, ignore_index=True) if ensembl_ids_list else pd.Series([], dtype=object)

# ----------------------------
# Build a set of valid gene IDs across all sources
# ----------------------------
def to_id_set(series: pd.Series) -> set:
    """
    Normalize a Series to a set of non-empty string IDs:
    - drop NaN
    - strip spaces
    - drop empty strings
    """
    s = series.dropna().astype(str).str.strip()
    s = s[s != ""]
    return set(s)

valid_ids = to_id_set(noncodev6_ids) | to_id_set(noncodev5_ids) | to_id_set(ensembl_ids)

# ----------------------------
# Determine invalid IDs and update 'identifier' accordingly
# Rules:
# - If gene_id is NOT in valid_ids (or is null/empty) -> treat as invalid
# - For invalid rows: identifier := gene_name (gene_name!=-,gene_name is valid)
# - For valid rows: identifier remains unchanged
# - Filter out rows with invalid gene_id and invalid gene_name
# ----------------------------
# ----------------------------
# Determine valid and invalid gene_id
# ----------------------------
gene_id_raw = mouse_lpi['gene_id']
gene_id_norm = gene_id_raw.astype(str).str.strip()

mask_valid_id = gene_id_norm.isin(valid_ids)
mask_invalid_id = gene_id_raw.isna() | (gene_id_norm == "") | (~gene_id_norm.isin(valid_ids))

# ----------------------------
# For invalid gene_id, check gene_name validity
# ----------------------------
gene_name_norm = mouse_lpi['gene_name'].astype(str).str.strip()
mask_valid_name = (gene_name_norm != "") & (gene_name_norm != "-")

# Rows to replace identifier with gene_name
mask_replace = mask_invalid_id & mask_valid_name

# Rows to drop: both ID invalid and name invalid
mask_drop = mask_invalid_id & (~mask_valid_name)

# Apply replacement
mouse_lpi.loc[mask_replace, 'identifier'] = mouse_lpi.loc[mask_replace, 'gene_name']

# save the updated table
mouse_lpi.to_csv('./mouse_lpi_fixed.csv', index=False)

##### 构建LPI的无向有权图

In [36]:
# Human

human_lpi = pd.read_csv("human_lpi_fixed.csv")

human_lpi['lncRNA_id'] = ["l" + str(x) for x in human_lpi['identifier']]
human_lpi["gene_name"] = (
    human_lpi["gene_name"]
    .astype(str)
    .str.replace("‐", "-", regex=False)
)

def concat_ignore_dash(series):
    vals = [str(v).strip() for v in series if pd.notna(v) and str(v).strip() != "-"]
    seen = set()
    out = []
    for v in vals:
        if v not in seen:
            seen.add(v)
            out.append(v)
    return ";".join(out)

lncRNA = human_lpi[['identifier','gene_name','gene_id','lncRNA_id']].drop_duplicates()
lncRNA = (
    lncRNA.groupby(["lncRNA_id","identifier"], as_index=False)
      .agg({
          "gene_name": concat_ignore_dash,
          "gene_id": concat_ignore_dash,
      })
)
lncRNA.to_csv("../../data/LPI/human/lncRNA.csv", index=False)

protein = human_lpi[['protein', 'uniprot_id']].drop_duplicates()
protein.to_csv("../../data/LPI/human/protein.csv", index=False)

human_lpi = human_lpi.drop(['gene_id', 'gene_name', 'identifier', 'uniprot_id'], axis=1)

human_lpi_weighted = (
    human_lpi
    .groupby(["lncRNA_id", "protein"])
    .size()
    .reset_index(name="edge_weight")
)
human_lpi_weighted.to_csv("../../data/LPI/human/lpi_weighted.csv",index=False)


In [None]:
# Mouse

mouse_lpi = pd.read_csv("mouse_lpi_fixed.csv")

mouse_lpi['lncRNA_id'] = ["l" + str(x) for x in mouse_lpi['identifier']]
mouse_lpi["gene_name"] = (
    mouse_lpi["gene_name"]
    .astype(str)
    .str.replace("‐", "-", regex=False)
)

def concat_ignore_dash(series):
    vals = [str(v).strip() for v in series if pd.notna(v) and str(v).strip() != "-"]
    seen = set()
    out = []
    for v in vals:
        if v not in seen:
            seen.add(v)
            out.append(v)
    return ";".join(out)

lncRNA = mouse_lpi[['identifier', 'gene_name','gene_id','lncRNA_id']].drop_duplicates()
lncRNA = (
    lncRNA.groupby(["lncRNA_id","identifier"], as_index=False)
      .agg({
          "gene_name": concat_ignore_dash,
          "gene_id": concat_ignore_dash
      })
)
lncRNA.to_csv("../../data/LPI/mouse/lncRNA.csv", index=False)

protein = mouse_lpi[['protein', 'uniprot_id']].drop_duplicates()
protein.to_csv("../../data/LPI/mouse/protein.csv", index=False)

mouse_lpi = mouse_lpi.drop(['gene_id', 'gene_name', 'identifier', 'uniprot_id'], axis=1)

mouse_lpi_weighted = (
    mouse_lpi
    .groupby(["lncRNA_id", "protein"])
    .size()
    .reset_index(name="edge_weight")
)
mouse_lpi_weighted.to_csv("../../data/LPI/mouse/lpi_weighted.csv",index=False)


: 