In [None]:
import os
import pandas as pd
import plotly.express as px
import numpy as np

In [None]:
# function to load fasta file

def load_fasta(file):
    fasta_dict = {}
    header = None
    sequence_lines = []

    with open(file) as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            if line.startswith(">"):
                if header:
                    fasta_dict[header] = ''.join(sequence_lines)
                header = line[1:].strip()
                sequence_lines = []
            else:
                sequence_lines.append(line)

        # Add the last record
        if header:
            fasta_dict[header] = ''.join(sequence_lines)

    return fasta_dict
    

## Load data

### Sequence data

In [None]:
# load sequences
sequences = load_fasta("data/all.fasta")
len(sequences)

In [None]:
# understand length of sequences
sequences_lengths = [len(seq) for seq in sequences.values()]

min(sequences_lengths), max(sequences_lengths)

In [None]:
# plot sequence lengths

df = pd.DataFrame(sequences_lengths, columns=["length"])
df["length"] = df["length"].astype(int)
# add title 
df["title"] = "Length of sequences in all.fasta"
fig = px.histogram(df, x="length", nbins=100)
fig.show()

### Data splits

In [None]:
# Load data splits 

data_splits = pd.DataFrame(columns=["id", "split"])

for split in range(1, 6):
    ids = pd.read_csv(f"data/ids_split{split}.txt", header=None)
    ids = ids[0].tolist()
    data = {"id": ids, "split": split}
    data = pd.DataFrame(data)
    data_splits = pd.concat([data_splits, data])

data_splits['split'].value_counts(), data_splits

In [None]:
# read in test labels
test_ids = pd.read_csv("data/uniprot_test.txt", header=None)
test_ids = test_ids[0].tolist()
len(test_ids)

In [None]:
all_ids = data_splits['id'].tolist() + test_ids
len(all_ids), len(set(all_ids))

In [None]:
# create a subset of the sequences with only training sequences

training_ids = data_splits['id'].tolist()
train_sequences = {k: v for k, v in sequences.items() if k in training_ids}
len(train_sequences)

In [None]:
seq_lengths = [len(v) for k, v in train_sequences.items()]
residue_number_all = sum(seq_lengths)
len(training_ids), residue_number_all # number of required embeddings

### Binding site files

In [None]:
# load binding sites files

binding_sites_files = [f for f in os.listdir("data") if f.startswith("binding_residues")]
binding_sites = {}

for file in binding_sites_files:
    ligand = file.split("_")[-1].replace(".txt", "")
    with open(f"data/{file}") as f:
        binding_sites[ligand] = {}
        lines = f.read().splitlines()
        for line in lines:
            id, sites = line.split("\t")
            id = id.strip()
            sites = sites.split(" ")
            binding_sites[ligand][id] = sites

In [None]:
binding_sites_df = pd.DataFrame(columns=["protein_id", "ligand", "site"])

for ligand, ids in binding_sites.items():
    for id, sites in ids.items():
        for site in sites:
            data = {"protein_id": id, "ligand": ligand, "site": site}
            binding_sites_df = pd.concat([binding_sites_df, pd.DataFrame(data, index=[0])])

In [None]:
binding_sites_df.head()

In [None]:
# number of protein_ids in binding sites data 
print("Number of protein_ids in binding sites data: ", len(binding_sites_df["protein_id"].unique()))

In [None]:
binding_sites_df = binding_sites_df.assign(site=binding_sites_df['site'].str.split(',')).explode('site').reset_index(drop=True)

In [None]:
# show any rows for which protein_id and site are duplicated
grouped = binding_sites_df.groupby(['protein_id', 'site'])['ligand'].nunique().reset_index(name='n_unique_ligands')
filtered = grouped[grouped['n_unique_ligands'].isin([2, 3])]
print(f"Number of protein id's with multiple ligands and sites: {filtered['protein_id'].nunique()}")

# plot histogram of number of ligands per protein
fig = px.histogram(grouped, x="n_unique_ligands", nbins=10)
fig.update_layout(title="Number of ligands per protein", width=600, height=400)

#### Subset binding data to proteins from the training set

In [None]:
# check howmany of the train_ids are in the binding sites data

train_ids_in_binding_sites = binding_sites_df[binding_sites_df["protein_id"].isin(train_sequences.keys())]['protein_id'].unique()
len(train_ids_in_binding_sites)

In [None]:
# filter down binding sites to training ids

binding_sites_train_df = binding_sites_df[binding_sites_df["protein_id"].isin(train_sequences.keys())]
print("Shape of binding sites train df: ", binding_sites_train_df.shape)
print("Number of binding sites in training data: ", len(binding_sites_train_df))
print("Number of unique protein ids in training data: ", binding_sites_train_df["protein_id"].nunique())
print("Number of unique ligands in training data: ", binding_sites_train_df["ligand"].nunique())

In [None]:
n_unique_combinations = binding_sites_train_df[['protein_id', 'site']].drop_duplicates().shape[0]
print("Number of unique combinations of protein_id and site: ", n_unique_combinations)
print("Number of unique combinations of protein_id and site not in binding sites data: ", residue_number_all - n_unique_combinations)

In [None]:
# for each ligand class - show number of unique protein_ids, number of sites

for ligand in binding_sites_train_df['ligand'].unique():
    ligand_df = binding_sites_train_df[binding_sites_train_df['ligand'] == ligand]
    n_unique_proteins = ligand_df['protein_id'].nunique()
    n_sites = ligand_df.shape[0]
    print(f"Ligand: {ligand}, Unique Proteins: {n_unique_proteins}, Unique Sites: {n_sites}")

In [None]:
#  again check for duplicates of binding labels for multiple binding types

# show any rows for which protein_id and site are duplicated
grouped = binding_sites_train_df.groupby(['protein_id', 'site'])['ligand'].nunique().reset_index(name='n_unique_ligands')
filtered = grouped[grouped['n_unique_ligands'].isin([2, 3])]
print(f"Number of protein id's with multiple ligands and sites: {filtered['protein_id'].nunique()}")

# plot histogram of number of ligands per protein
fig = px.histogram(grouped, x="n_unique_ligands", nbins=10)
fig.update_layout(title="Number of ligands per protein", width=600, height=400)

In [None]:
# check ligand information for duplicated ligand labels 

ligands_per_site = binding_sites_train_df[binding_sites_train_df["protein_id"].isin(filtered['protein_id'].unique())].groupby(['protein_id', 'site'])['ligand'].unique().reset_index(name='ligands')
ligands_per_site['ligands_tuple'] = ligands_per_site['ligands'].apply(lambda x: tuple(sorted(x)))
ligand_comb_counts = ligands_per_site['ligands_tuple'].value_counts()
ligand_comb_counts


## Generate a joint feature data

In [None]:
records = []
for protein_id, sequence in train_sequences.items():
    for pos in range(len(sequence)):
        records.append({
            'protein_id': protein_id,
            'pos': pos +1,  # 0-based, can change to 1-based if needed
            'residue_id': f"{protein_id}_{pos +1}"
        })

feature_df = pd.DataFrame(records)
feature_df

In [None]:
# Ensure binding_sites_train_df['site'] is int
binding_sites_train_df['site'] = binding_sites_train_df['site'].astype(int)

In [None]:
binding_set = set(zip(binding_sites_train_df['protein_id'], binding_sites_train_df['site']))
feature_df['binding'] = feature_df.apply(lambda row: 'yes' if (row['protein_id'], row['pos']) in binding_set else 'no', axis=1)

In [None]:
feature_df['binding'].value_counts()

In [None]:
unique_ligands = binding_sites_train_df['ligand'].unique()
# Add one column per unique ligand
triplet_set = set(binding_sites_train_df[['protein_id', 'site', 'ligand']].itertuples(index=False, name=None))

# Map of ligand â†’ set of proteins that have ANY site annotated for it
ligand_proteins = binding_sites_train_df.groupby('ligand')['protein_id'].unique().to_dict()

for ligand in unique_ligands:
    annotated_proteins = set(ligand_proteins[ligand])
    
    def determine_label(row):
        key = (row['protein_id'], row['pos'], ligand)
        if key in triplet_set:
            return 'yes'
        elif row['protein_id'] in annotated_proteins:
            return 'no'
        else:
            return 'no_information'
    
    feature_df[ligand] = feature_df.apply(determine_label, axis=1)

In [None]:
feature_df['nuclear'].value_counts(), feature_df['small'].value_counts(), feature_df['metal'].value_counts()

In [None]:
ligand_cols = [col for col in feature_df.columns if col not in ['protein_id', 'pos', 'residue_id', 'binding']]

def concat_ligands(row):
    present_ligands = [lig for lig in ligand_cols if row[lig] == 'yes']
    return ",".join(sorted(present_ligands)) if present_ligands else "none"

feature_df['ligand'] = feature_df.apply(concat_ligands, axis=1)

In [None]:
feature_df[(feature_df['small']=="no_information") & (feature_df['metal']=="no_information") ]

In [None]:
feature_df['ligand'].value_counts()

In [None]:
sequence_lengths = {pid: len(seq) for pid, seq in train_sequences.items()}

# Add sequence_length column
feature_df['sequence_length'] = feature_df['protein_id'].map(sequence_lengths)

# Bin the sequence lengths
bin_size = 50  # You can change this to whatever granularity you prefer

def length_to_bin(length):
    start = (length // bin_size) * bin_size + 1
    end = start + bin_size - 1
    return f"{start}-{end}"

feature_df['sequence_length_bin'] = feature_df['sequence_length'].apply(length_to_bin)

In [None]:
feature_df['sequence_length_bin'].value_counts().sort_index()

In [None]:
# export as csv

feature_df.to_csv("binding_train_features.csv", index=False)

## Export sequence fasta file containing only proteins from the train set

In [None]:
# export training sequences to fasta file without line breaks

with open("binding_train_sequences.fasta", "w") as f:
    for protein_id, sequence in train_sequences.items():
        f.write(f">{protein_id}\n")
        f.write(f"{sequence}\n")