## Constants and Imports

In [None]:
from functions_analysis import *
import glob
import pandas as pd
from DockQ.DockQ import load_PDB, run_on_all_native_interfaces
from download_functions import *
import os
import itertools

In [None]:
review_files = glob.glob('/home/markus/MPI_local/production1/structure_reviews/*.csv')
# review_files = glob.glob('/home/markus/MPI_local/production1/structure_reviews/intersect_df - set3.csv')
reviews_df = pd.concat([pd.read_csv(f) for f in review_files], ignore_index=True)
reviews_df = reviews_df.drop_duplicates(subset=['pdb_id', 'query_tf', 'query_arm', 'chain_tf', 'chain_arm'])


## Download PDBs

In [None]:
good_structs = reviews_df[reviews_df['comment'].str.contains('looks good|in complex', case=False, na=False)]

# Get unique PDB IDs from reviews_df
unique_pdb_ids = good_structs['pdb_id'].unique()
print(f"Found {len(unique_pdb_ids)} unique PDB IDs to download")

# Define download directory
pdb_download_dir = "/home/markus/MPI_local/data/PDB"

# Download each PDB structure (function handles duplicate checking)
downloaded_count = 0
failed_count = 0

for pdb_id in unique_pdb_ids:
    if pd.isna(pdb_id):  # Skip NaN values
        continue
        
    result = download_pdb_structure(pdb_id, pdb_download_dir)
    if result:
        downloaded_count += 1
    else:
        failed_count += 1

print(f"\nSummary:")
print(f"Successfully processed: {downloaded_count}")
print(f"Failed downloads: {failed_count}")
print(f"Total processed: {len([pdb_id for pdb_id in unique_pdb_ids if not pd.isna(pdb_id)])}")

In [None]:
from typing import List

def get_job_name(id_tf: str, id_arm: str, df: pd.DataFrame):
    row_arm = df[df['Entry'] == id_arm]
    if row_arm.empty:
        raise Exception(f"Not found in df: {id_arm}")
    row_tf = df[df['Entry'] == id_tf]
    if row_tf.empty:
        raise Exception(f"Not found in df: {id_tf}")
    length_arm = row_arm['Length'].iloc[0]
    length_tf = row_tf['Length'].iloc[0]
    return str.lower(f"{id_arm}_1-{length_arm}_{id_tf}_1-{length_tf}")

In [None]:

def get_all_chain_mappings(native_chains, model_chains) -> List:
    all_mappings = []
    
    if len(native_chains) > len(model_chains):
        all_subsets = itertools.combinations(native_chains, len(model_chains))
        for subset in all_subsets:
            all_mappings.extend(all_bijective_mappings(subset, model_chains))
    elif len(model_chains) > len(native_chains):
        all_subsets = itertools.combinations(model_chains, len(native_chains))
        for subset in all_subsets:
            all_mappings.extend(all_bijective_mappings(native_chains, subset))
    else:
        all_mappings = all_bijective_mappings(native_chains, model_chains)
    return all_mappings

def all_bijective_mappings(A, B):
    """
    Return a list containing every dictionary that maps each element of A
    to a unique element of B.  A and B must be the same length.
    """

    # For each permutation of B, zip it with A to make a mapping dict
    return [dict(zip(A, perm)) for perm in itertools.permutations(B)]

# test get_all_chain_mappings()
# Case 1: model_chains and native_chains have same length
native_chains = ['A', 'B']
model_chains = ['X', 'Y']
mappings = list(get_all_chain_mappings(native_chains, model_chains))
assert mappings == [{'A': 'X', 'B': 'Y'}, {'A': 'Y', 'B': 'X'}]
# Case 2: model_chains > native_chains
native_chains = ['A', 'B', 'C']
model_chains = ['X', 'Y']
mappings = get_all_chain_mappings(native_chains, model_chains)
assert mappings == [{'A': 'X', 'B': 'Y'}, {'A': 'Y', 'B': 'X'}, {'A': 'X', 'C': 'Y'}, {'A': 'Y', 'C': 'X'}, {'B': 'X', 'C': 'Y'}, {'B': 'Y', 'C': 'X'}]

## calculate DockQ

In [None]:
PROTEOME_PATHS = [
    '/home/markus/MPI_local/data/Proteome/uniprotkb_proteome_UP000005640_2025_05_28.tsv',
    '/home/markus/MPI_local/data/full_UP/uniprotkb_AND_reviewed_true_2025_07_10.tsv'
]

all_uniprot = pd.concat(
    [pd.read_csv(path, low_memory=False, sep='\t') for path in PROTEOME_PATHS],
    ignore_index=True
)

In [None]:
from pathlib import Path
def get_file_path(filename: str, search_dir: str):
    """search the file with filename in dir and return the full path if it exists, otherwise return False

    Args:
        filename (str): _description_
    """
    path = Path(search_dir)
    for file in path.rglob(filename):
        return file  # return the first match
    return False



In [None]:
NATIVE_PATH_PREFIX = "/home/markus/MPI_local/data/PDB/"
HPC_FULL_RESULTS_DIR = "/home/markus/MPI_local/HPC_results_full"

good_structs['chain_map'] = None
good_structs['dockq_score'] = None

no_model = []
no_native = []

# for every structure, calculate all dockQ score using all possible chain mappings and store the best score/mapping
for index, row in good_structs.iterrows():
    
    job_name = get_job_name(row['query_tf'].split('|')[0], row['query_arm'].split('|')[0], all_uniprot)
    model_path = get_file_path(f'{job_name}_model.cif', HPC_FULL_RESULTS_DIR)
    native_path_cif = f'{NATIVE_PATH_PREFIX}{row["pdb_id"].lower()}.cif'
    
    # Check if both paths exist before loading
    if not model_path:
        no_model.append((model_path,job_name))
        continue

    if not os.path.exists(native_path_cif):
        no_native.append((native_path_cif,job_name))
        continue
    
    model = load_PDB(model_path)
    native = load_PDB(native_path_cif)
    native_chains = [chain.id for chain in model]
    model_chains = [chain.id for chain in native]
    chain_map_dict = {}
    if len(model_chains) > 2:
        print(f"{row["pdb_id"]}: Warning: Native structure ({native}) has more than two chains!")
    for chain_map in get_all_chain_mappings(model_chains, native_chains):
        try:
            dockQ = run_on_all_native_interfaces(model, native, chain_map=chain_map)[1]
        except Exception as e:
            print(f"Exception for {row["pdb_id"]}: {e}. Comment in review: {row['comment']}")
            break
        chain_map_dict[str(chain_map)] = dockQ
    if chain_map_dict:
        best_chain_map = max(chain_map_dict.keys(), key=(lambda key: chain_map_dict[key]))
        best_dockq_score = chain_map_dict[best_chain_map]
        
        # Store results in the DataFrame
        good_structs.at[index, 'chain_map'] = best_chain_map
        good_structs.at[index, 'dockq_score'] = best_dockq_score
        
        # print(f"Best chain map: {best_chain_map}, DockQ: {best_dockq_score}")

if no_model:
    print("Missing model files:")
    print(str([(t[0], t[1]) for t in no_model]))
if no_native:
    print("Missing native files:")
    for path in no_native:
        print(f"  NATIVE: {path}")


In [None]:
# set pair_id
good_structs['pair_id'] = good_structs.apply(lambda row: str(tuple(sorted([row['query_arm'].split('|')[0].upper(), row['query_tf'].split('|')[0].upper()]))), axis=1)

In [None]:
RESULTS_DIR_ALL = "/home/markus/MPI_local/HPC_results"
from functions_analysis import *

In [None]:
results_df_all_uc = pd.DataFrame(data=find_summary_files(RESULTS_DIR_ALL))
print(f"Total jobs processed: {len(results_df_all_uc)}")
results_df_all_uc['pair_id'] = results_df_all_uc.apply(create_pair_id, axis=1)
results_df_all = clean_results(results_df_all_uc)

In [None]:
good_structs_annotated = pd.merge(good_structs, results_df_all, how='left', on='pair_id')

In [None]:
from functions_plotting import *

In [None]:
create_scatter_plot(good_structs_annotated, 'ranking_score', 'dockq_score')
create_scatter_plot(good_structs_annotated, 'iptm', 'dockq_score')
create_scatter_plot(good_structs_annotated, 'ptm', 'dockq_score')

In [None]:
from Bio.PDB import MMCIFParser, PDBIO

In [None]:
def get_pdb_path(cif_path: str, pdb_dir: str) -> str:
    """for the .cif structure at cif_path, check if there is a pdb file available in pdb_dir.
    If not, convert the .cif to a pdb file and store it in pdb_dir. Return the path to the pdb file

    Args:
        cif_path (str): Path to the input CIF file
        pdb_dir (str): Directory to store/find PDB files

    Returns:
        str: Path to the PDB file
    """
    # Create pdb_dir if it doesn't exist
    os.makedirs(pdb_dir, exist_ok=True)
    
    # Extract filename without extension from cif_path
    cif_filename = os.path.basename(cif_path)
    pdb_filename = os.path.splitext(cif_filename)[0] + '.pdb'
    pdb_path = os.path.join(pdb_dir, pdb_filename)
    
    # Check if PDB file already exists
    if os.path.exists(pdb_path):
        return pdb_path
    
    # Convert CIF to PDB
    parser = MMCIFParser(QUIET=False)
    structure = parser.get_structure("structure", cif_path)
    
    # Write to PDB
    io = PDBIO()
    io.set_structure(structure)
    io.save(pdb_path)
    
    return pdb_path

In [None]:
import subprocess
from pathlib import Path

TM_BIN = '/home/markus/MPI_local/bin/TMscore'
USALIGN_BIN = '/home/markus/MPI_local/bin/USalign'
def calc_USalign(model, native, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    model_name = Path(model).stem
    native_name = Path(native).stem
    
    out_path = f"{output_dir}/{model_name}--{native_name}"
    
    if os.path.exists(out_path):
        return out_path
    
    try:
        with open(out_path, 'w') as output_file:
            subprocess.run([
                USALIGN_BIN,
                model,
                native,
                "-ter", "1",
                "-mm", "1",
                "-outfmt", "2"
            ], stdout=output_file, check=True)
    except subprocess.CalledProcessError as e:
        print(f"Error: {e}")
        
    return out_path

In [None]:
PDB_DIR = '../../production1/pdb_cache'
ALIGN_DIR = '../../production1/us_align_out'


us_align_score_df = pd.DataFrame()
error_count = 0
no_model = []
no_native = []

for index, row in good_structs.iterrows():
    
    job_name = get_job_name(row['query_tf'].split('|')[0], row['query_arm'].split('|')[0], all_uniprot)
    model_path_cif = get_file_path(f'{job_name}_model.cif', HPC_FULL_RESULTS_DIR)
    native_path_cif = f'{NATIVE_PATH_PREFIX}{row["pdb_id"].lower()}.cif'
    
    
    # Check if both paths exist before loading
    if not model_path_cif:
        no_model.append((model_path_cif,job_name))
        continue

    if not os.path.exists(native_path_cif):
        no_native.append((native_path_cif,job_name))
        continue
    
    try:
        native_path_pdb = get_pdb_path(native_path_cif, PDB_DIR)
        model_path_pdb = get_pdb_path(model_path_cif, PDB_DIR)
        us_align = calc_USalign(model_path_pdb, native_path_pdb, ALIGN_DIR)
        us_align_df = pd.read_csv(us_align, sep='\t')
        us_align_df['job_name'] = job_name
        us_align_df['pdb_id'] = row['pdb_id']
        us_align_score_df = pd.concat([us_align_score_df, us_align_df], ignore_index=True)
    except Exception as e:
        error_count += 1
        print(e)
        
print(error_count)
print(f"{len(no_model)}")
print(f"{len(no_native)}")

for list in no_model:
    print(list)
    