## Install and Import Necessary Libraries

In [None]:
# Installation for Colab environment. If not in Colab, manage environment separately.
import sys
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    print("Running in Google Colab. Installing dependencies...")
    !pip install -q condacolab
    import condacolab
    condacolab.install()
    !conda install -c bioconda anarci -y
    !pip install joblib dill
    # Pinned versions for potential Colab compatibility, adjust as needed for local env.
    !pip install keras==2.11.0 tensorflow==2.11.0 scikit-learn==1.0.2
else:
    print("Not running in Colab. Ensure ANARCI and other dependencies are installed in your environment.")

In [None]:
# Import libraries
import os
import subprocess
import random
import logging
import shutil

import numpy as np
import pandas as pd
import joblib
import tensorflow as tf
from tensorflow.keras.models import model_from_json
from tensorflow.keras.optimizers import Adam # Using tf.keras.optimizers
# from tensorflow.keras.utils import plot_model # Optional: if model plotting is needed

from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord

# --- Configuration and Constants ---
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1'

NOTEBOOK_DIR = os.path.abspath('')
BASE_PROJECT_DIR = os.path.dirname(NOTEBOOK_DIR)
BASE_DATA_DIR = os.path.join(BASE_PROJECT_DIR, 'data')
INPUT_DIR = os.path.join(BASE_DATA_DIR, 'input')
DEEPSP_MODEL_DIR = os.path.join(BASE_DATA_DIR, 'DeepSP_CNN_model')
DEEPVISCOSITY_SCALER_DIR = os.path.join(BASE_DATA_DIR, 'DeepViscosity_scaler')
DEEPVISCOSITY_MODEL_DIR = os.path.join(BASE_DATA_DIR, 'DeepViscosity_ANN_ensemble_models')
TEMP_OUTPUT_DIR = os.path.join(NOTEBOOK_DIR, 'temp_outputs') # For notebook-specific temporary files

# Ensure directories exist
os.makedirs(INPUT_DIR, exist_ok=True)
os.makedirs(TEMP_OUTPUT_DIR, exist_ok=True)

# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

## Helper Functions (Adapted from script)

In [None]:
def create_fasta_file_nb(sequences: list[str], names: list[str], file_path: str):
    logging.info(f"Creating FASTA file: {file_path}")
    with open(file_path, "w") as output_handle:
        for i, seq_str in enumerate(sequences):
            record = SeqRecord(Seq(seq_str), id=names[i], name="", description="")
            SeqIO.write(record, output_handle, "fasta")
    logging.info(f"FASTA file created: {file_path}")

def run_anarci_nb(input_fasta_path: str, output_base_name: str, chain_type: str):
    logging.info(f"Running ANARCI for {chain_type} chain: {input_fasta_path}")
    try:
        result = subprocess.run(
            ['ANARCI', '-i', input_fasta_path, '-o', output_base_name, '-s', 'imgt', '-r', chain_type, '--csv'],
            check=True, capture_output=True, text=True
        )
        logging.info(f"ANARCI alignment successful for {chain_type} chain. Output files prefixed with: {output_base_name}")
    except subprocess.CalledProcessError as e:
        logging.error(f"ANARCI failed for {chain_type} chain. Error: {e.stderr}")
        raise

def preprocess_aligned_sequences_nb(h_aligned_path: str, l_aligned_path: str, outfile_path: str):
    logging.info(f"Preprocessing aligned sequences. H: {h_aligned_path}, L: {l_aligned_path} -> {outfile_path}")
    try:
        infile_H = pd.read_csv(h_aligned_path)
        infile_L = pd.read_csv(l_aligned_path)
    except FileNotFoundError as e:
        logging.error(f"Error reading ANARCI output files: {e}")
        raise

    with open(outfile_path, "w") as outfile:
        H_inclusion_list = [str(i) for i in range(1, 129)] + \
                           ['111A','111B','111C','111D','111E','111F','111G','111H',
                            '112I','112H','112G','112F','112E','112D','112C','112B','112A']
        L_inclusion_list = [str(i) for i in range(1, 128)]

        h_pos_map = {
            **{str(i): i-1 for i in range(1, 112)},
            '111A':111,'111B':112,'111C':113,'111D':114,'111E':115,'111F':116,'111G':117,'111H':118,
            '112I':119,'112H':120,'112G':121,'112F':122,'112E':123,'112D':124,'112C':125,'112B':126,'112A':127,
            **{str(i): i+16 for i in range(112, 129)}
        }
        h_idx = 110
        for letter_code in [''] + [chr(ord('A') + i) for i in range(9)]:
            if f"111{letter_code}" in H_inclusion_list and f"111{letter_code}" not in h_pos_map:
                h_pos_map[f"111{letter_code}"] = h_idx; h_idx+=1
            if f"112{letter_code}" in H_inclusion_list and f"112{letter_code}" not in h_pos_map:
                 h_pos_map[f"112{letter_code}"] = h_idx; h_idx+=1
        for i in range(113,129):
            if str(i) in H_inclusion_list and str(i) not in h_pos_map:
                h_pos_map[str(i)] = h_idx; h_idx+=1

        l_pos_map = {str(i): i-1 for i in range(1, 128)}

        N_mAbs = len(infile_H["Id"])
        for i in range(N_mAbs):
            H_tmp = 145*['-']
            L_tmp = 127*['-']
            for col in infile_H.columns:
                if col in H_inclusion_list and col in h_pos_map:
                    pos_idx = h_pos_map[col]
                    if 0 <= pos_idx < len(H_tmp): H_tmp[pos_idx]=infile_H.iloc[i][col]
                    else: logging.warning(f"H-chain index {pos_idx} for col {col} out of bounds (len {len(H_tmp)}).")
            for col in infile_L.columns:
                if col in L_inclusion_list and col in l_pos_map:
                    pos_idx = l_pos_map[col]
                    if 0 <= pos_idx < len(L_tmp): L_tmp[pos_idx]=infile_L.iloc[i][col]
                    else: logging.warning(f"L-chain index {pos_idx} for col {col} out of bounds (len {len(L_tmp)}).")
            aa_string = "".join(H_tmp + L_tmp)
            outfile.write(f"{infile_H.iloc[i,0]} {aa_string}\n")
    logging.info("Sequence preprocessing finished.")

def load_aligned_data_nb(filename: str) -> tuple[list[str], list[str]]:
    logging.info(f"Loading aligned data from: {filename}")
    name_list, seq_list = [], []
    try:
        with open(filename) as datafile:
            for line in datafile:
                parts = line.strip().split()
                if len(parts) == 2: name_list.append(parts[0]); seq_list.append(parts[1])
                else: logging.warning(f"Skipping malformed line in {filename}: {line.strip()}")
    except FileNotFoundError:
        logging.error(f"Aligned data file not found: {filename}"); raise
    return name_list, seq_list

def one_hot_encode_sequence_nb(sequence: str) -> np.ndarray:
    aa_dict = {'A':0,'C':1,'D':2,'E':3,'F':4,'G':5,'H':6,'I':7,'K':8,'L':9,
               'M':10,'N':11,'P':12,'Q':13,'R':14,'S':15,'T':16,'V':17,
               'W':18,'Y':19,'-':20, 'X':20}
    processed_sequence = "".join([s if s in aa_dict else 'X' for s in sequence])
    encoded_seq = np.zeros((len(aa_dict)-1, len(processed_sequence)))
    for i, char_s in enumerate(processed_sequence):
        if char_s in aa_dict and aa_dict[char_s] < (len(aa_dict)-1):
             encoded_seq[aa_dict[char_s], i] = 1
    return encoded_seq

def predict_deepsp_features_nb(X_encoded: np.ndarray, model_type: str) -> np.ndarray:
    logging.info(f"Predicting DeepSP for model: {model_type}")
    json_path = os.path.join(DEEPSP_MODEL_DIR, f'Conv1D_regression{model_type.upper()}.json')
    weights_path = os.path.join(DEEPSP_MODEL_DIR, f'Conv1D_regression_{model_type.lower()}.h5')
    try:
        with open(json_path, 'r') as json_file: loaded_model_json = json_file.read()
        model = model_from_json(loaded_model_json)
        model.load_weights(weights_path)
        model.compile(optimizer='adam', loss='mae', metrics=['mae'])
        predictions = model.predict(X_encoded, verbose=0)
        logging.info(f"DeepSP {model_type} prediction successful.")
        return predictions
    except Exception as e:
        logging.error(f"Error in DeepSP {model_type} prediction: {e}"); raise

def predict_deepviscosity_nb(df_deepsp_features: pd.DataFrame) -> pd.DataFrame:
    logging.info("Predicting DeepViscosity...")
    X_features = df_deepsp_features.iloc[:, 1:].values
    scaler_path = os.path.join(DEEPVISCOSITY_SCALER_DIR, "DeepViscosity_scaler.save")
    try:
        scaler = joblib.load(scaler_path)
        X_scaled = scaler.transform(X_features)
    except Exception as e:
        logging.error(f"Error with scaler: {e}"); raise

    model_preds = []
    num_models = 102
    for i in range(num_models):
        file_prefix = f'ANN_logo_{i}'
        model_json_path = os.path.join(DEEPVISCOSITY_MODEL_DIR, f'{file_prefix}.json')
        model_h5_path = os.path.join(DEEPVISCOSITY_MODEL_DIR, f'{file_prefix}.h5')
        try:
            with open(model_json_path, 'r') as json_file: loaded_model_json = json_file.read()
            model = model_from_json(loaded_model_json)
            model.load_weights(model_h5_path)
            model.compile(optimizer=Adam(learning_rate=0.0001), metrics=['accuracy'])
            pred = model.predict(X_scaled, verbose=0)
            model_preds.append(pred)
        except Exception as e:
            logging.error(f"Error with ANN model {file_prefix}: {e}"); raise
    
    final_pred = np.where(np.array(model_preds).mean(axis=0) >= 0.5, 1, 0)
    logging.info("DeepViscosity prediction finished.")
    return final_pred

## 1. Import Dataset

In [None]:
dataset_path = os.path.join(INPUT_DIR, 'DeepViscosity_input.csv')
try:
    dataset = pd.read_csv(dataset_path)
    logging.info(f"Dataset loaded successfully from {dataset_path}")
    display(dataset.head())
except FileNotFoundError:
    logging.error(f"Input dataset not found: {dataset_path}")
    dataset = pd.DataFrame() # Ensure dataset is defined for subsequent cells

names = dataset['Name'].to_list() if not dataset.empty else []
heavy_seqs = dataset['Heavy_Chain'].to_list() if not dataset.empty else []
light_seqs = dataset['Light_Chain'].to_list() if not dataset.empty else []

## 2. Create FASTA files and Run ANARCI

In [None]:
seq_H_fasta_path_nb = os.path.join(TEMP_OUTPUT_DIR, 'seq_H.fasta')
seq_L_fasta_path_nb = os.path.join(TEMP_OUTPUT_DIR, 'seq_L.fasta')
anarci_H_out_csv_path_nb = os.path.join(TEMP_OUTPUT_DIR, 'seq_aligned_H.csv')
anarci_L_out_csv_path_nb = os.path.join(TEMP_OUTPUT_DIR, 'seq_aligned_KL.csv')
anarci_base_out_name_nb = os.path.join(TEMP_OUTPUT_DIR, 'seq_aligned') # ANARCI output prefix

if names: # Proceed only if data was loaded
    create_fasta_file_nb(heavy_seqs, names, seq_H_fasta_path_nb)
    create_fasta_file_nb(light_seqs, names, seq_L_fasta_path_nb)
    
    run_anarci_nb(seq_H_fasta_path_nb, anarci_base_out_name_nb, 'heavy')
    run_anarci_nb(seq_L_fasta_path_nb, anarci_base_out_name_nb, 'light')
else:
    logging.warning("Dataset is empty. Skipping FASTA creation and ANARCI.")

## 3. Preprocess Aligned Sequences

In [None]:
aligned_HL_txt_path_nb = os.path.join(TEMP_OUTPUT_DIR, 'seq_aligned_HL.txt')
if names and os.path.exists(anarci_H_out_csv_path_nb) and os.path.exists(anarci_L_out_csv_path_nb):
    preprocess_aligned_sequences_nb(anarci_H_out_csv_path_nb, anarci_L_out_csv_path_nb, aligned_HL_txt_path_nb)
else:
    logging.warning("Skipping sequence preprocessing due to missing ANARCI outputs or empty dataset.")

## 4. Load Aligned Data and One-Hot Encode

In [None]:
X_one_hot_nb = np.array([]) # Initialize to avoid NameError if skipped
loaded_names_nb = []
if os.path.exists(aligned_HL_txt_path_nb):
    loaded_names_nb, loaded_seqs_nb = load_aligned_data_nb(aligned_HL_txt_path_nb)
    logging.info("One-hot encoding sequences...")
    X_encoded_list_nb = [one_hot_encode_sequence_nb(s) for s in loaded_seqs_nb]
    
    X_transposed_list_nb = [x.T for x in X_encoded_list_nb]
    max_seq_len_nb = 0
    if X_transposed_list_nb: max_seq_len_nb = max(x.shape[0] for x in X_transposed_list_nb)
    
    X_padded_list_nb = []
    num_features_nb = 0
    if X_transposed_list_nb: num_features_nb = X_transposed_list_nb[0].shape[1]

    for x_t_nb in X_transposed_list_nb:
        padding_length_nb = max_seq_len_nb - x_t_nb.shape[0]
        if padding_length_nb > 0:
            padding_array_nb = np.zeros((padding_length_nb, num_features_nb))
            x_padded_nb = np.vstack((x_t_nb, padding_array_nb))
        else:
            x_padded_nb = x_t_nb
        X_padded_list_nb.append(x_padded_nb)
    
    if X_padded_list_nb: X_one_hot_nb = np.asarray(X_padded_list_nb)
    logging.info(f"One-hot encoded data shape: {X_one_hot_nb.shape if X_one_hot_nb.size > 0 else 'Empty'}")
else:
    logging.warning("Skipping one-hot encoding as aligned sequence file is missing.")

## 5. DeepSP Predictions

In [None]:
df_deepsp_nb = pd.DataFrame() # Initialize
if X_one_hot_nb.size > 0:
    sap_pos_nb = predict_deepsp_features_nb(X_one_hot_nb, 'SAPpos')
    scm_pos_nb = predict_deepsp_features_nb(X_one_hot_nb, 'SCMpos')
    scm_neg_nb = predict_deepsp_features_nb(X_one_hot_nb, 'SCMneg')

    df_deepsp_nb = pd.concat([
        pd.DataFrame(loaded_names_nb, columns=['Name']),
        pd.DataFrame(sap_pos_nb), pd.DataFrame(scm_neg_nb), pd.DataFrame(scm_pos_nb)
    ], axis=1)
    
    # Define column names (ensure this matches the actual number of features from DeepSP models)
    num_sap_f = sap_pos_nb.shape[1] if sap_pos_nb.ndim > 1 else 1
    num_scm_neg_f = scm_neg_nb.shape[1] if scm_neg_nb.ndim > 1 else 1
    num_scm_pos_f = scm_pos_nb.shape[1] if scm_pos_nb.ndim > 1 else 1
    
    deepsp_cols_nb = ['Name'] + \
                     [f'SAP_pos_{i+1}' for i in range(num_sap_f)] + \
                     [f'SCM_neg_{i+1}' for i in range(num_scm_neg_f)] + \
                     [f'SCM_pos_{i+1}' for i in range(num_scm_pos_f)]
    
    if len(deepsp_cols_nb) == len(df_deepsp_nb.columns):
        df_deepsp_nb.columns = deepsp_cols_nb
    else:
        logging.warning("Mismatch in DeepSP feature columns count for notebook. Using default numbered columns.")

    deepsp_descriptors_path_nb = os.path.join(BASE_DATA_DIR, 'DeepSP_descriptors_notebook.csv')
    df_deepsp_nb.to_csv(deepsp_descriptors_path_nb, index=False)
    logging.info(f"DeepSP descriptors (notebook) saved to: {deepsp_descriptors_path_nb}")
    display(df_deepsp_nb.head())
else:
    logging.warning("Skipping DeepSP predictions as one-hot encoded data is not available.")

## 6. DeepViscosity Predictions

In [None]:
df_deepvis_nb = pd.DataFrame() # Initialize
if not df_deepsp_nb.empty:
    final_pred_nb = predict_deepviscosity_nb(df_deepsp_nb)
    
    df_deepvis_nb = pd.DataFrame({'Name': loaded_names_nb, 'DeepViscosity_classes': final_pred_nb.flatten()})
    deepviscosity_classes_path_nb = os.path.join(BASE_DATA_DIR, 'DeepViscosity_classes_notebook.csv')
    df_deepvis_nb.to_csv(deepviscosity_classes_path_nb, index=False)
    logging.info(f"DeepViscosity classes (notebook) saved to: {deepviscosity_classes_path_nb}")
    display(df_deepvis_nb.head())
else:
    logging.warning("Skipping DeepViscosity prediction as DeepSP features are not available.")

## 7. Clean up temporary files

In [None]:
logging.info(f"Attempting to clean up temporary directory: {TEMP_OUTPUT_DIR}")
if os.path.exists(TEMP_OUTPUT_DIR):
    try:
        shutil.rmtree(TEMP_OUTPUT_DIR)
        logging.info("Temporary directory cleaned successfully.")
    except Exception as e:
        logging.error(f"Error cleaning up temporary directory {TEMP_OUTPUT_DIR}: {e}")
else:
    logging.info("Temporary directory not found, no cleanup needed.")