In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
from glob import glob
import os

class TCRDataLoader:
    def __init__(self, data_dir):
        self.data_dir = data_dir
        self.aa_dict = None
        self.v_dict = None
        self.j_dict = None
        self.label_map = None
        self.max_length = None
        
    def load_files(self):
        df_list = []
        files = glob(self.data_dir + "/*/*.tsv")
        
        for file_path in files:
            df = pd.read_csv(file_path, sep='\t')
            
            # Extract folder name from file path
            folder_name = os.path.basename(os.path.dirname(file_path))
            
            # Split folder name to get Antigen
            # "HLA-A-CMV" → ["HLA", "A", "CMV"]
            parts = folder_name.split('-')
            antigen = parts[-1]  # Get last part (the antigen)
            
            # Add antigen column to dataframe
            df['Antigen'] = antigen
            
            df_list.append(df)
        
        combined_df = pd.concat(df_list, ignore_index=True)
        return combined_df
    
    def build_vocabulary(self, df):
        # Amino acids
        all_sequences = df['aminoAcid'].dropna()
        giant_string = ''.join(all_sequences)
        unique_letters = sorted(set(giant_string))
        self.aa_dict = {}
        for i, letter in enumerate(unique_letters):
            self.aa_dict[letter] = i+1
        
        # V genes
        v_genes = df['v_beta'].dropna().unique()
        v_genes = sorted(v_genes)
        self.v_dict = {}
        for i, gene in enumerate(v_genes):
            self.v_dict[gene] = i + 1

        # J genes
        j_genes = df['j_beta'].dropna().unique()
        j_genes = sorted(j_genes)
        self.j_dict = {}
        for i, gene in enumerate(j_genes):
            self.j_dict[gene] = i + 1

        # Labels
        antigens = df['Antigen'].unique()
        self.label_map = {}
        for i, antigen in enumerate(sorted(antigens)):
            self.label_map[antigen] = i

        # Max length (keep for reference, but won't use for padding)
        sequence_lengths = [len(seq) for seq in df['aminoAcid'].dropna()]
        self.max_length = max(sequence_lengths)
    
    def encode_sequences_ragged(self, sequences):
        """Encode sequences without padding for ragged tensors"""
        encoded_aa = []
        for sequence in sequences:
            sequence_numbers = []
            for letter in sequence:
                number = self.aa_dict.get(letter, 0)  # 0 if letter not found
                sequence_numbers.append(number)
            # NO PADDING - keep original length
            encoded_aa.append(sequence_numbers)
        return encoded_aa
    
    def encode_v_genes(self, v_genes):
        encoded_v = []
        for v_gene in v_genes:
            encoded_v.append(self.v_dict.get(v_gene, 0))
        return encoded_v
    
    def encode_j_genes(self, j_genes):
        encoded_j = []
        for j_gene in j_genes:
            encoded_j.append(self.j_dict.get(j_gene, 0))
        return encoded_j
    
    def load_and_encode_data(self, batch_size=100, shuffle=True):
        # 1. Load files
        df = self.load_files()
        
        # 2. Build vocabularies
        self.build_vocabulary(df)
        
        # 3. Filter out rows with missing amino acid sequences AND very short sequences
        valid_rows = df['aminoAcid'].notna()
        df_valid = df[valid_rows].copy()
        
        # Filter out very short sequences that would cause issues with CNN
        min_length = 10
        long_enough = df_valid['aminoAcid'].str.len() >= min_length
        df_valid = df_valid[long_enough].copy()
        
        print(f"Filtered dataset: {len(df_valid)} sequences (min length: {min_length})")
        
        # 4. Get the data you want to encode
        sequences = df_valid['aminoAcid'].values
        v_genes = df_valid['v_beta'].fillna('UNK').values
        j_genes = df_valid['j_beta'].fillna('UNK').values
        labels = [self.label_map[antigen] for antigen in df_valid['Antigen']]
        
        # 5. Encode everything (NO PADDING)
        X_sequences = self.encode_sequences_ragged(sequences)
        X_v_genes = self.encode_v_genes(v_genes)
        X_j_genes = self.encode_j_genes(j_genes)
        
        # 6. Create ragged tensors
        X_sequences_ragged = tf.ragged.constant(X_sequences, dtype=tf.int32)
        X_v_genes_tensor = tf.constant(X_v_genes, dtype=tf.int32)
        X_j_genes_tensor = tf.constant(X_j_genes, dtype=tf.int32)
        y_labels_tensor = tf.constant(labels, dtype=tf.int32)
        
        # 7. Create TensorFlow dataset from ragged tensors
        dataset = tf.data.Dataset.from_tensor_slices({
            'cdr3_sequence': X_sequences_ragged,
            'v_gene': X_v_genes_tensor,
            'j_gene': X_j_genes_tensor,
            'labels': y_labels_tensor
        })
        
        # Map to the format expected by the model: ((inputs), labels)
        dataset = dataset.map(lambda x: (
            (x['cdr3_sequence'], x['v_gene'], x['j_gene']),
            tf.one_hot(x['labels'], len(self.label_map))
        ))
        
        if shuffle:
            dataset = dataset.shuffle(len(X_sequences))
        
        # Batch ragged tensors
        dataset = dataset.batch(batch_size, drop_remainder=True)
        dataset = dataset.prefetch(tf.data.AUTOTUNE)
        
        return dataset

    def get_vocab_sizes(self):
        """Return vocabulary sizes for model construction"""
        return {
            'aa_vocab_size': len(self.aa_dict) + 1,  # +1 for padding/unknown
            'v_vocab_size': len(self.v_dict) + 1,    # +1 for unknown
            'j_vocab_size': len(self.j_dict) + 1,    # +1 for unknown
            'num_classes': len(self.label_map),
            'max_length': self.max_length
        }
    
    def get_mappings(self):
        """Return the created mappings for inspection"""
        return {
            'aa_dict': self.aa_dict,
            'v_dict': self.v_dict,
            'j_dict': self.j_dict,
            'label_map': self.label_map
        }

  _warn(("h5py is running against HDF5 {0} when it was built against {1}, "


In [None]:
def create_tcr_model_cnn(vocab_sizes):
    """Create a CNN model that handles ragged tensors by converting to dense"""
    # Input layers for ragged tensors
    cdr3_input = tf.keras.Input(shape=[None], dtype=tf.int32, name='cdr3_sequence')
    v_gene_input = tf.keras.Input(shape=(), dtype=tf.int32, name='v_gene')
    j_gene_input = tf.keras.Input(shape=(), dtype=tf.int32, name='j_gene')
    
    # Convert ragged tensor to dense tensor for CNN processing
    # This adds minimal padding per batch
    cdr3_dense = cdr3_input.to_tensor(default_value=0)
    
    # CDR3 sequence encoder
    cdr3_embed = tf.keras.layers.Embedding(
        input_dim=vocab_sizes['aa_vocab_size'], 
        output_dim=64, 
        mask_zero=True  # Handle the padding we just added
    )(cdr3_dense)
    
    # CNN layers with smaller kernels to handle shorter sequences
    conv1 = tf.keras.layers.Conv1D(
        filters=64, 
        kernel_size=3,
        strides=1, 
        padding='valid',
        activation='relu'
    )(cdr3_embed)
    
    conv2 = tf.keras.layers.Conv1D(
        filters=128, 
        kernel_size=3,
        strides=1, 
        padding='valid',
        activation='relu'
    )(conv1)
    
    conv3 = tf.keras.layers.Conv1D(
        filters=256, 
        kernel_size=2,
        strides=1, 
        padding='valid',
        activation='relu'
    )(conv2)
    
    # Global pooling to handle remaining variable lengths
    cdr3_encoded = tf.keras.layers.GlobalMaxPooling1D()(conv3)
    
    # Gene embeddings
    v_embed = tf.keras.layers.Embedding(vocab_sizes['v_vocab_size'], 32)(v_gene_input)
    j_embed = tf.keras.layers.Embedding(vocab_sizes['j_vocab_size'], 32)(j_gene_input)
    
    # Flatten gene embeddings
    v_flat = tf.keras.layers.Flatten()(v_embed)
    j_flat = tf.keras.layers.Flatten()(j_embed)
    
    # Concatenate all features
    fused = tf.keras.layers.Concatenate()([cdr3_encoded, v_flat, j_flat])
    
    # Classifier
    dropout1 = tf.keras.layers.Dropout(rate=0.1)(fused)
    dense1 = tf.keras.layers.Dense(units=128, activation='relu')(dropout1)
    dropout2 = tf.keras.layers.Dropout(rate=0.05)(dense1)
    dense2 = tf.keras.layers.Dense(units=64, activation='relu')(dropout2)
    output = tf.keras.layers.Dense(units=vocab_sizes['num_classes'], activation='softmax')(dense2)
    
    model = tf.keras.Model(
        inputs=[cdr3_input, v_gene_input, j_gene_input],
        outputs=output
    )
    
    return model

In [7]:
# Initialize loader
data_dir = 'Data/Human_Antigens'
loader = TCRDataLoader(data_dir)

# Load data
print("Loading data...")
dataset = loader.load_and_encode_data(batch_size=100, shuffle=True)
print("Data loaded successfully!")

Loading data...
Filtered dataset: 2063 sequences (min length: 10)
Data loaded successfully!


In [8]:
# Create model
vocab_sizes = loader.get_vocab_sizes()
print("Creating model...")
model = create_tcr_model_cnn(vocab_sizes)

# Compile model
model.compile(
    optimizer='adam', 
    loss='categorical_crossentropy', 
    metrics=['accuracy']
)

# Display model summary
model.summary()

Creating model...


AttributeError: 'KerasTensor' object has no attribute 'to_tensor'