# Second protein mapping implementation

Main idea:
- get from CATH domain_ids + cath_domain_coordinates
- get from RSCB protein_seq
- search in RSCB for  rscb_domain_coordinates
  - exact match of domain_id
  - cath_domain_coordinates
  - length
  - seq alignment
- cut domain_seq from protein using rscb_domain_coordinates

Problem:
- still some of domain_ids should be mapped manually (4pvaA00 &)

TODO:
- Check seq alignment

In [10]:
import pandas as pd
import requests
from tqdm import tqdm
from Bio import SeqIO
from Bio.Align import PairwiseAligner

df = pd.read_csv("../data/subset.csv")
df["protein_sequence"] = ""
df["domain_start"] = None
df["domain_end"] = None
df["domain_sequence"] = ""

cath_fasta = {
    record.id.split("|")[-1].split("/")[0]: record
    for record in SeqIO.parse("../data/cath-domain-seqs.fa", "fasta")
}

pdb_overrides = {
    "1vw4": {"pdb": "3j6b", "chain": "8"},
    "4gns": {"pdb": "4yg8", "chain": "A"},
    "1vs9": {"pdb": "4v4i", "chain": "S"},
    "3p9d": {"pdb": "4v81", "chain": "A"},
    "4d8q": {"pdb": "4v94", "chain": "F"},
    "4a17": {"pdb": "4v8p", "chain": "BE"},
}

def get_fasta_sequence_with_label(pdb_id, auth_chain_id):
    url = f"https://www.rcsb.org/fasta/entry/{pdb_id}"
    try:
        response = requests.get(url)
        if response.status_code != 200:
            return "", None

        fasta_blocks = response.text.strip().split(">")
        for block in fasta_blocks:
            lines = block.strip().splitlines()
            if not lines:
                continue
            header = lines[0]
            sequence = "".join(lines[1:])

            if "|Chain " in header or "|Chains " in header:
                chain_field = header.split("|")[1]
                parts = chain_field.replace("Chains ", "").replace("Chain ", "").split(",")
                for part in parts:
                    part = part.strip()
                    if "[auth " in part:
                        model_id = part.split("[auth")[0].strip().upper()
                        auth_id = part.split("[auth")[1].replace("]", "").strip().upper()
                        if auth_chain_id.upper() == auth_id:
                            return sequence, model_id
                    else:
                        if auth_chain_id.upper() == part.upper():
                            return sequence, auth_chain_id.upper()
        return "", None
    except Exception:
        return "", None

def get_cath_coordinates(pdb_id, chain_id, domain_id, expected_length, domain_start=None, domain_end=None, reference_seq=None, protein_seq=None):
    url = f"https://data.rcsb.org/rest/v1/core/polymer_entity_instance/{pdb_id}/{chain_id}"
    try:
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()

        features = data.get("rcsb_polymer_instance_feature", [])
        matched_coords = []

        # Step 1: match exact domain_id
        for feature in features:
            if feature.get("type") != "CATH":
                continue
            for prop in feature.get("additional_properties", []):
                if prop.get("name") == "CATH_DOMAIN_ID":
                    ids = [v.strip().lower() for v in prop.get("values", [])]
                    if domain_id.strip().lower() in ids:
                        coords = feature.get("feature_positions", [])
                        matched_coords.extend([(c["beg_seq_id"], c["end_seq_id"]) for c in coords])

        if matched_coords:
            return matched_coords

        # Step 2: match by domain start/end
        if domain_start and domain_end:
            for feature in features:
                if feature.get("type") != "CATH":
                    continue
                coords = feature.get("feature_positions", [])
                for c in coords:
                    if abs(c["beg_seq_id"] - domain_start) <= 2 and abs(c["end_seq_id"] - domain_end) <= 2:
                        matched_coords.append((c["beg_seq_id"], c["end_seq_id"]))
            if matched_coords:
                return matched_coords

        # Step 3: match by length
        for feature in features:
            if feature.get("type") != "CATH":
                continue
            coords = feature.get("feature_positions", [])
            for c in coords:
                length = c["end_seq_id"] - c["beg_seq_id"] + 1
                if expected_length - 2 <= length <= expected_length + 2:
                    matched_coords.append((c["beg_seq_id"], c["end_seq_id"]))
        if matched_coords:
            return matched_coords

        # Step 4: global alignment to protein sequence
        if reference_seq and protein_seq:
            aligner = PairwiseAligner()
            best_score = -1
            best_start = None
            for i in range(0, len(protein_seq) - expected_length + 1):
                window = protein_seq[i:i+expected_length]
                score = aligner.score(reference_seq[:expected_length], window)
                if score > best_score:
                    best_score = score
                    best_start = i
            if best_start is not None:
                return [(best_start + 1, best_start + expected_length)]

        print(f"No match for {domain_id}")
        return []

    except Exception as e:
        print(f"[Error] Failed to fetch CATH data for {pdb_id} {chain_id}: {e}")
        return []

for idx, row in tqdm(df.iterrows(), total=len(df)):
    domain_id = row.get("domain_id", "")
    if not isinstance(domain_id, str) or len(domain_id) < 6:
        continue

    base_id = domain_id[:4].lower()
    if base_id in pdb_overrides:
        pdb_id = pdb_overrides[base_id]["pdb"]
        auth_chain_id = pdb_overrides[base_id]["chain"]
    else:
        pdb_id = base_id
        auth_chain_id = domain_id[4]

    cath_seq = str(row.get("sequence", "")).strip()
    domain_start = row.get("cath_domain_start")
    domain_end = row.get("cath_domain_end")
    expected_length = int(row.get("length", 0))

    ref_seq_record = cath_fasta.get(domain_id)
    reference_seq = str(ref_seq_record.seq) if ref_seq_record else None

    protein_seq, label_asym_id = get_fasta_sequence_with_label(pdb_id, auth_chain_id)
    if not protein_seq or not label_asym_id:
        continue

    df.at[idx, "protein_sequence"] = protein_seq

    coordinates = get_cath_coordinates(
        pdb_id, label_asym_id, domain_id, expected_length,
        domain_start, domain_end, reference_seq, protein_seq
    )

    if not coordinates:
        continue

    full_seq = ""
    starts, ends = [], []

    for beg, end in coordinates:
        starts.append(beg)
        ends.append(end)
        full_seq += protein_seq[beg - 1:end]

    df.at[idx, "domain_start"] = min(starts)
    df.at[idx, "domain_end"] = max(ends)
    df.at[idx, "domain_sequence"] = full_seq

df.to_csv("../data/subset_protein-mapped.csv", index=False)

 54%|█████▎    | 537/1000 [08:45<08:40,  1.13s/it]

No match for 4pvaA00


 62%|██████▏   | 616/1000 [10:06<06:01,  1.06it/s]

No match for 6zvhC01


100%|██████████| 1000/1000 [16:25<00:00,  1.01it/s]


In [11]:
import pandas as pd
from Bio import SeqIO
from Bio.Align import PairwiseAligner
import requests

df = pd.read_csv("../data/subset_protein-mapped.csv")
target_domains = ["4pvaA00", "6zvhC01"]

cath_fasta = {
    record.id.split("|")[-1].split("/")[0]: record
    for record in SeqIO.parse("../data/cath-domain-seqs.fa", "fasta")
}

def get_fasta_sequence_case_sensitive(pdb_id, exact_chain_id):
    url = f"https://www.rcsb.org/fasta/entry/{pdb_id}"
    try:
        response = requests.get(url)
        if response.status_code != 200:
            return ""
        fasta_blocks = response.text.strip().split(">")
        for block in fasta_blocks:
            lines = block.strip().splitlines()
            if not lines:
                continue
            header = lines[0]
            sequence = "".join(lines[1:])
            if "|Chain " in header or "|Chains " in header:
                chain_field = header.split("|")[1]
                parts = chain_field.replace("Chains ", "").replace("Chain ", "").split(",")
                for part in parts:
                    part = part.strip()
                    if "[auth " in part:
                        auth_id = part.split("[auth")[1].replace("]", "").strip()
                        if auth_id == exact_chain_id:
                            return sequence
                    else:
                        if part == exact_chain_id:
                            return sequence
        return ""
    except Exception:
        return ""

for domain_id in target_domains:
    idx = df[df["domain_id"] == domain_id].index
    if len(idx) == 0:
        print(f"{domain_id} not found in the table.")
        continue

    idx = idx[0]
    pdb_id = domain_id[:4].lower()
    chain_id = domain_id[4]

    protein_seq = get_fasta_sequence_case_sensitive(pdb_id, chain_id)
    if not protein_seq:
        print(f"Failed to fetch protein sequence for {domain_id} (chain: {chain_id})")
        continue
    df.at[idx, "protein_sequence"] = protein_seq

    ref_seq_record = cath_fasta.get(domain_id)
    if not ref_seq_record:
        print(f"No CATH sequence found for {domain_id}")
        continue
    reference_seq = str(ref_seq_record.seq)
    expected_length = len(reference_seq)

    aligner = PairwiseAligner()
    best_score = -1
    best_start = None
    for i in range(len(protein_seq) - expected_length + 1):
        window = protein_seq[i:i + expected_length]
        score = aligner.score(reference_seq, window)
        if score > best_score:
            best_score = score
            best_start = i

    if best_start is None:
        print(f"Failed to align {domain_id}")
        continue

    domain_start = best_start + 1
    domain_end = best_start + expected_length
    domain_seq = protein_seq[best_start:domain_end]

    df.at[idx, "domain_start"] = domain_start
    df.at[idx, "domain_end"] = domain_end
    df.at[idx, "domain_sequence"] = domain_seq

df.to_csv("../data/subset_protein-mapped.csv", index=False)