In [1]:
import pandas as pd
import glob
import os
import json
from anarci import anarci
from tqdm.notebook import tqdm

In [111]:
def load_asd_data_with_pandas(data_path: str) -> pd.DataFrame:
    """
    Загружает все parquet файлы из папки asd в один pandas DataFrame

    Args:
        data_path: путь к папке с данными

    Returns:
        pd.DataFrame: объединенный DataFrame со всеми данными
    """
    # Получаем все parquet файлы из папки
    parquet_files = glob.glob(os.path.join(data_path, "part-*.parquet"))

    if not parquet_files:
        raise ValueError(f"Не найдено parquet файлов в папке {data_path}")

    print(f"Найдено {len(parquet_files)} parquet файлов")

    # Загружаем все файлы в список DataFrame'ов
    dataframes = []
    for file_path in parquet_files:
        df = pd.read_parquet(file_path)
        dataframes.append(df)

    # Объединяем все DataFrame'ы в один
    combined_df = pd.concat(dataframes, ignore_index=True)

    print(f"Общий размер данных: {combined_df.shape}")
    print(f"Колонки: {list(combined_df.columns)}")

    return combined_df

# Загружаем данные
agab_df = load_asd_data_with_pandas('./asd')

Найдено 20 parquet файлов
Общий размер данных: (1199759, 11)
Колонки: ['dataset', 'heavy_sequence', 'light_sequence', 'scfv', 'affinity_type', 'affinity', 'antigen_sequence', 'confidence', 'nanobody', 'metadata', 'processed_measurement']


In [112]:
# Переводим affinity и processed_measurement во float где возможно
# Convert strings to floats, handling ">8.8" or "<0.01" patterns
def convert_to_float(value):
    """Convert string values like '>8.8' or '<0.01' to floats (8.8 and 0.01)"""
    if pd.isna(value):
        return value
    if isinstance(value, (int, float)):
        return float(value)
    if isinstance(value, str):
        # Remove leading > or < and convert to float
        cleaned = value.strip()
        if cleaned.startswith('>') or cleaned.startswith('<'):
            cleaned = cleaned[1:].strip()
        try:
            return float(cleaned)
        except (ValueError, TypeError):
            return value
    return value

# Apply conversion to processed_measurement
agab_df = agab_df.copy()
agab_df.loc[:, 'affinity'] = pd.to_numeric(agab_df['affinity'], errors='coerce').fillna(agab_df['affinity'])
agab_df.loc[:, 'processed_measurement'] = pd.to_numeric(agab_df['processed_measurement'], errors='coerce').fillna(agab_df['processed_measurement'])

agab_df['affinity'] = agab_df['affinity'].apply(convert_to_float)
agab_df['processed_measurement'] = agab_df['processed_measurement'].apply(convert_to_float)

#### Base filtering

In [113]:
is_not_nanobody = agab_df['nanobody'] == False
# is_high_confidence = agab_df['confidence'].isin(['high', 'very_high'])
is_scfv = agab_df['scfv'] == True
has_both_chains = (
    agab_df['light_sequence'].notna() 
    & agab_df['heavy_sequence'].notna()
    & (agab_df['light_sequence'] != '')
    & (agab_df['heavy_sequence'] != '')
)

agab_df = agab_df[is_not_nanobody 
# & is_high_confidence 
& (is_scfv | has_both_chains)]
agab_df.shape

(844872, 11)

In [114]:
agab_df.groupby(['affinity_type', 'dataset'])['antigen_sequence'].agg([
    ('total_count', 'count'),
    ('unique_antigens', 'nunique')
]).sort_values('unique_antigens', ascending=False)

Unnamed: 0_level_0,Unnamed: 1_level_0,total_count,unique_antigens
affinity_type,dataset,Unnamed: 2_level_1,Unnamed: 3_level_1
bool,patents,21621,3478
bool,structures-antibodies,2711,1083
bool,genbank,98,23
bool,skempiv2,34,21
ddg,skempiv2,400,19
elisa_mut_to_wt_ratio,abdesign,658,13
bool,abdesign,14,13
bool,ab-bind,13,10
ic_50,dlgo,360,10
ddg,ab-bind,270,9


In [115]:
# (порог, оператор): '<', '>', '=='
AFFINITY_THRESHOLDS = {
    'fuzzy': ('h', '=='),
    'bool': (1, '=='),
    'alphaseq': (2, '<'),
    '-log KD': (7, '>'),
    'kd': (100, '<'),
    # 'delta_g': (-9.5, '<'), ее нет
    'ddg': (0, '<'),  
    'log_enrichment': (0, '>'),
    'elisa_mut_to_wt_ratio': (1, '>'),
    'ic_50': (1, '<'), # мкг/мл 
}

def apply_affinity_filter(df: pd.DataFrame, thresholds: dict) -> pd.DataFrame:
    masks = []
    for affinity_type, (threshold, op) in thresholds.items():
        type_mask = df['affinity_type'] == affinity_type
        if op == '==':
            mask = type_mask & (df['processed_measurement'] == threshold)
        else:
            numeric_affinity = pd.to_numeric(df['processed_measurement'], errors='coerce')
            if op == '<':
                mask = type_mask & (numeric_affinity < threshold)
            else:
                mask = type_mask & (numeric_affinity > threshold)
        masks.append(mask)

    if masks:
        combined_mask = masks[0]
        for mask in masks[1:]:
            combined_mask = combined_mask | mask
        return df[combined_mask]
    else:
        return df

print(f"После базовых фильтров: {agab_df.shape}")
print(f"Распределение по affinity_type до фильтрации по порогам:")
print(agab_df['affinity_type'].value_counts())

# Применяем фильтрацию по порогам аффиности
agab_df = apply_affinity_filter(agab_df, AFFINITY_THRESHOLDS)

print(f"\nПосле фильтрации по порогам аффинитета: {agab_df.shape}")
print(f"Распределение по affinity_type после фильтрации:")
print(agab_df['affinity_type'].value_counts())


После базовых фильтров: (844872, 11)
Распределение по affinity_type до фильтрации по порогам:
affinity_type
fuzzy                    524346
-log KD                  152401
alphaseq                 131645
bool                      24491
kd                         6849
long_enrichment            3452
ddg                         670
elisa_mut_to_wt_ratio       658
ic_50                       360
Name: count, dtype: int64

После фильтрации по порогам аффинитета: (401505, 11)
Распределение по affinity_type после фильтрации:
affinity_type
fuzzy                    172149
-log KD                  141805
alphaseq                  55670
bool                      24491
kd                         6744
ic_50                       309
ddg                         169
elisa_mut_to_wt_ratio       168
Name: count, dtype: int64


In [116]:
agab_df.groupby(['affinity_type', 'dataset'])['antigen_sequence'].agg([
    ('total_count', 'count'),
    ('unique_antigens', 'nunique')
]).sort_values('unique_antigens', ascending=False)

Unnamed: 0_level_0,Unnamed: 1_level_0,total_count,unique_antigens
affinity_type,dataset,Unnamed: 2_level_1,Unnamed: 3_level_1
bool,patents,21621,3478
bool,structures-antibodies,2711,1083
bool,genbank,98,23
bool,skempiv2,34,21
ddg,skempiv2,112,17
bool,abdesign,14,13
elisa_mut_to_wt_ratio,abdesign,168,12
bool,ab-bind,13,10
ic_50,dlgo,309,10
ddg,ab-bind,57,9


#### Affinity variation

In [117]:
print("=== АНАЛИЗ РАЗЛИЧИЙ В ПОЛЯХ ДЛЯ ДУБЛИКАТОВ ПО ПОСЛЕДОВАТЕЛЬНОСТЯМ (agab_df) ===\n")

# Поля для проверки
fields = [
    "dataset",
    "scfv",
    "affinity_type",
    "affinity",
    "confidence",
    "nanobody",
    "processed_measurement",
]

# --- Поиск групп дубликатов по heavy_sequence + light_sequence + antigen_sequence---
# Находим группы, где хотя бы 2 записи с одинаковой парой последовательностей
group_cols = ["heavy_sequence", "light_sequence", "antigen_sequence"]
grouped = agab_df.groupby(group_cols, sort=False)

# Маска строк, которые входят в группы размером > 1
dup_mask = grouped[group_cols[0]].transform("size") > 1
hl_duplicates = agab_df.loc[dup_mask].copy()

# Переиспользуем groupby только на дубликатах
dup_groups = hl_duplicates.groupby(group_cols, sort=False)
unique_groups_count = dup_groups.ngroups

print(f"Всего групп дубликатов: {unique_groups_count}\n")

# --- Анализ различий по полям ---

# Для каждой группы считаем, сколько уникальных значений в каждом поле
# dropna=False, чтобы различия NaN / не-NaN тоже учитывались
nunique_per_group = dup_groups[fields].nunique(dropna=False)

# Булева матрица: True, если в группе по полю есть различия
diff_mask = nunique_per_group > 1

# Есть ли вообще различия в группе
group_has_diffs = diff_mask.any(axis=1)

groups_with_diffs = int(group_has_diffs.sum())
groups_identical = int((~group_has_diffs).sum())
total = groups_with_diffs + groups_identical if (groups_with_diffs + groups_identical) > 0 else 1

print(f"Групп с различиями: {groups_with_diffs} ({groups_with_diffs / total * 100:.2f}%)")
print(f"Групп без различий: {groups_identical} ({groups_identical / total * 100:.2f}%)\n")

# Количество строк (записей) с различиями и без различий
rows_with_diffs = group_has_diffs[group_has_diffs].index  # индексы групп с различиями
rows_without_diffs = group_has_diffs[~group_has_diffs].index  # индексы групп без различий

num_rows_with_diffs = dup_groups.size().loc[rows_with_diffs].sum() if len(rows_with_diffs) > 0 else 0
num_rows_without_diffs = dup_groups.size().loc[rows_without_diffs].sum() if len(rows_without_diffs) > 0 else 0
rows_total = num_rows_with_diffs + num_rows_without_diffs if (num_rows_with_diffs + num_rows_without_diffs) > 0 else 1

print(f"Строк (записей) с различиями: {num_rows_with_diffs} ({num_rows_with_diffs / rows_total * 100:.2f}%)")
print(f"Строк (записей) без различий: {num_rows_without_diffs} ({num_rows_without_diffs / rows_total * 100:.2f}%)\n")

# Считаем, по скольким группам отличается каждое поле
field_diffs_counts = diff_mask.sum().sort_values(ascending=False)

if groups_with_diffs > 0 and not field_diffs_counts.empty:
    print("=== ПОЛЯ, КОТОРЫЕ ЧАЩЕ ВСЕГО РАЗЛИЧАЮТСЯ ===\n")
    for field, count in field_diffs_counts.items():
        if count == 0:
            continue
        print(f"{field}: {count} групп ({count / groups_with_diffs * 100:.2f}%)")

=== АНАЛИЗ РАЗЛИЧИЙ В ПОЛЯХ ДЛЯ ДУБЛИКАТОВ ПО ПОСЛЕДОВАТЕЛЬНОСТЯМ (agab_df) ===

Всего групп дубликатов: 64063

Групп с различиями: 63380 (98.93%)
Групп без различий: 683 (1.07%)

Строк (записей) с различиями: 127441 (98.55%)
Строк (записей) без различий: 1870 (1.45%)

=== ПОЛЯ, КОТОРЫЕ ЧАЩЕ ВСЕГО РАЗЛИЧАЮТСЯ ===

affinity: 63361 групп (99.97%)
dataset: 51 групп (0.08%)
affinity_type: 3 групп (0.00%)
processed_measurement: 3 групп (0.00%)


In [118]:
print("=== АНАЛИЗ РАЗБРОСА АФФИНИТЕТА В ГРУППАХ ДУБЛИКАТОВ ===\n")

# Работаем с копией дубликатов
analysis_df = hl_duplicates.copy()

# Преобразуем аффинитет в числа, нечисловые значения станут NaN
analysis_df['affinity_numeric'] = pd.to_numeric(analysis_df['affinity'], errors='coerce')

# Группируем
grouped_analysis = analysis_df.groupby(group_cols, sort=False)

# Считаем размах (max - min) и проверяем единственность типа аффинитета
agg_funcs = {
    'affinity_numeric': lambda x: x.max() - x.min(),
    'affinity_type': 'nunique'
}

group_stats = grouped_analysis.agg(agg_funcs)
group_stats.columns = ['affinity_range', 'affinity_type_count']

# Фильтруем группы, где affinity_numeric удалось посчитать (не NaN)
valid_stats = group_stats.dropna(subset=['affinity_range'])

print(f"Групп с валидными числовыми аффинитетами: {len(valid_stats)}")
print(f"Средний разброс аффинитета в группе: {valid_stats['affinity_range'].mean():.4f}")
print(f"Медианный разброс: {valid_stats['affinity_range'].median():.4f}")
print(f"Максимальный разброс: {valid_stats['affinity_range'].max():.4f}\n")

# Проверка на разные типы аффинитета в одной группе
mixed_types = valid_stats[valid_stats['affinity_type_count'] > 1]
if not mixed_types.empty:
    print(f"ВНИМАНИЕ: В {len(mixed_types)} группах смешаны разные типы аффинитета (сравнение может быть некорректным).")
else:
    print("Типы аффинитета внутри каждой группы совпадают (корректно сравнивать числа).\n")

# Топ-5 групп с самым большим разбросом
print("=== ТОП-5 ГРУПП С САМЫМ БОЛЬШИМ РАЗБРОСОМ АФФИНИТЕТА ===")
top_diff_groups = valid_stats.nlargest(5, 'affinity_range')

for idx, (indices, row) in enumerate(top_diff_groups.iterrows()):
    heavy, light, antigen = indices
    print(f"\nГруппа {idx+1} (разброс {row['affinity_range']:.4f}):")
    # Получаем строки этой группы
    group_rows = analysis_df[
        (analysis_df['heavy_sequence'] == heavy) & 
        (analysis_df['light_sequence'] == light) &
        (analysis_df['antigen_sequence'] == antigen)
    ]
    display(group_rows[['dataset', 'affinity_type', 'affinity', 'confidence', 'processed_measurement']])

=== АНАЛИЗ РАЗБРОСА АФФИНИТЕТА В ГРУППАХ ДУБЛИКАТОВ ===



Групп с валидными числовыми аффинитетами: 64063
Средний разброс аффинитета в группе: 2.8959
Медианный разброс: 3.2736
Максимальный разброс: 3.8345

ВНИМАНИЕ: В 3 группах смешаны разные типы аффинитета (сравнение может быть некорректным).
=== ТОП-5 ГРУПП С САМЫМ БОЛЬШИМ РАЗБРОСОМ АФФИНИТЕТА ===

Группа 1 (разброс 3.8345):


Unnamed: 0,dataset,affinity_type,affinity,confidence,processed_measurement
845585,abbd,-log KD,9.834508,high,7.917254
845586,abbd,-log KD,6.0,high,7.917254



Группа 2 (разброс 3.7936):


Unnamed: 0,dataset,affinity_type,affinity,confidence,processed_measurement
187342,abbd,-log KD,9.793633,high,7.896816
187343,abbd,-log KD,6.0,high,7.896816



Группа 3 (разброс 3.7556):


Unnamed: 0,dataset,affinity_type,affinity,confidence,processed_measurement
1147556,abbd,-log KD,9.755628,high,7.877814
1147557,abbd,-log KD,6.0,high,7.877814



Группа 4 (разброс 3.7512):


Unnamed: 0,dataset,affinity_type,affinity,confidence,processed_measurement
124856,abbd,-log KD,9.75118,high,7.87559
124857,abbd,-log KD,6.0,high,7.87559



Группа 5 (разброс 3.7379):


Unnamed: 0,dataset,affinity_type,affinity,confidence,processed_measurement
1026354,abbd,-log KD,9.737911,high,7.868956
1026355,abbd,-log KD,6.0,high,7.868956


#### Drop duplicates

In [119]:
# Удаляем дубликаты по heavy_sequence + light_sequence + antigen_sequence
# Оставляем первую запись из каждой группы (affinity отличается, но это не важно)

print(f"Размер до удаления дубликатов: {agab_df.shape}")

agab_df = agab_df.drop_duplicates(
    subset=['heavy_sequence', 'light_sequence', 'antigen_sequence'],
    keep='first'
)

print(f"Размер после удаления дубликатов: {agab_df.shape}")

Размер до удаления дубликатов: (401505, 11)


Размер после удаления дубликатов: (336257, 11)


#### Анализ длинных и коротких антигенов

In [122]:
agab_df['antigen_sequence'].str.len().describe()

count    336257.00000
mean        473.43476
std         306.16660
min          14.00000
25%         318.00000
50%         607.00000
75%         607.00000
max       34350.00000
Name: antigen_sequence, dtype: float64

In [None]:
agab_df[agab_df['antigen_sequence'].str.len() > 1000]['antigen_sequence'].nunique()

497

In [134]:
#### Analysis of Long Antigens (>700 AA)

# Filter for long antigens
long_antigen_threshold = 1030
long_antigens_df = agab_df[agab_df['antigen_sequence'].str.len() > long_antigen_threshold].copy()

print(f"=== ANALYSIS OF LONG ANTIGENS (> {long_antigen_threshold} AA) ===\n")
print(f"Total records with long antigens: {len(long_antigens_df)}")
print(f"Percentage of total dataset: {len(long_antigens_df) / len(agab_df) * 100:.2f}%\n")

# Add antigen length column for analysis
long_antigens_df['antigen_length'] = long_antigens_df['antigen_sequence'].str.len()

# Basic statistics
print("=== ANTIGEN LENGTH STATISTICS ===")
print(long_antigens_df['antigen_length'].describe())
print(f"\nMin length: {long_antigens_df['antigen_length'].min()}")
print(f"Max length: {long_antigens_df['antigen_length'].max()}")
print(f"Median length: {long_antigens_df['antigen_length'].median():.1f}")
print(f"Mean length: {long_antigens_df['antigen_length'].mean():.1f}\n")

# Distribution by dataset
print("=== DISTRIBUTION BY DATASET ===")
dataset_counts = long_antigens_df['dataset'].value_counts()
print(dataset_counts)
print(f"\nUnique datasets: {long_antigens_df['dataset'].nunique()}\n")

# Distribution by affinity_type
print("=== DISTRIBUTION BY AFFINITY TYPE ===")
affinity_counts = long_antigens_df['affinity_type'].value_counts()
print(affinity_counts)
print(f"\nUnique affinity types: {long_antigens_df['affinity_type'].nunique()}\n")

# Unique antigens
print("=== UNIQUE ANTIGENS ===")
unique_antigens = long_antigens_df['antigen_sequence'].nunique()
print(f"Unique antigen sequences: {unique_antigens}")
print(f"Average records per unique antigen: {len(long_antigens_df) / unique_antigens:.2f}\n")

# Sample of longest antigens
print("=== TOP 10 LONGEST ANTIGENS ===")
top_longest = long_antigens_df.nlargest(10, 'antigen_length')[
    ['dataset', 'affinity_type', 'antigen_length', 'confidence']
]
print(top_longest)
print()

# Check for patterns in metadata if available
if 'metadata' in long_antigens_df.columns:
    print("=== METADATA ANALYSIS ===")
    non_null_metadata = long_antigens_df['metadata'].notna().sum()
    print(f"Records with metadata: {non_null_metadata} ({non_null_metadata / len(long_antigens_df) * 100:.1f}%)")
    print()

# Display sample records
print("=== SAMPLE RECORDS (first 5) ===")
sample_cols = ['dataset', 'affinity_type', 'antigen_length', 'confidence', 'scfv']
display_cols = [col for col in sample_cols if col in long_antigens_df.columns]
display(long_antigens_df[display_cols].head())


=== ANALYSIS OF LONG ANTIGENS (> 1030 AA) ===

Total records with long antigens: 2928
Percentage of total dataset: 0.87%

=== ANTIGEN LENGTH STATISTICS ===
count     2928.000000
mean      1877.172473
std       1898.660162
min       1032.000000
25%       1210.000000
50%       1321.000000
75%       2165.000000
max      34350.000000
Name: antigen_length, dtype: float64

Min length: 1032
Max length: 34350
Median length: 1321.0
Mean length: 1877.2

=== DISTRIBUTION BY DATASET ===
dataset
patents                    2264
flab_shanehsazzadeh2023     341
dlgo                        309
genbank                       7
skempiv2                      6
structures-antibodies         1
Name: count, dtype: int64

Unique datasets: 6

=== DISTRIBUTION BY AFFINITY TYPE ===
affinity_type
bool     2274
kd        341
ic_50     309
ddg         4
Name: count, dtype: int64

Unique affinity types: 4

=== UNIQUE ANTIGENS ===
Unique antigen sequences: 479
Average records per unique antigen: 6.11

=== TOP 10 LONGE

Unnamed: 0,dataset,affinity_type,antigen_length,confidence,scfv
12,skempiv2,ddg,1267,very_high,False
48243,dlgo,ic_50,1283,very_high,False
48244,dlgo,ic_50,1283,very_high,False
48245,dlgo,ic_50,1283,very_high,False
48246,dlgo,ic_50,1283,very_high,False


Сделаем фильтр до 1000 аминокислот. Нужно обосновать это так, что во-первых мы будем тюнить с не очень большим кол-вом параметров и данных в целом, поэтому слишком длинные антигены будут для модели скорее каким-то шумом. Помимо этого, мы будем подавать сиквенс как эмбеддинг скорее всего ESM-модели, а у них есть ограничение на длину белка. 

Для коротких тоже какой-нибудь порог типа 30 АК

#### ANARCI

In [68]:
def is_empty(x):
    return x is None or (isinstance(x, float) and np.isnan(x)) or (isinstance(x, str) and x.strip() == "")

scfv_df = agab_df[agab_df['scfv'] == True].copy()

# сколько scFv-строк уже имеют обе цепи
both_present = (~scfv_df["heavy_sequence"].apply(is_empty)) & (~scfv_df["light_sequence"].apply(is_empty))
only_heavy   = (~scfv_df["heavy_sequence"].apply(is_empty)) & ( scfv_df["light_sequence"].apply(is_empty))
only_light   = ( scfv_df["heavy_sequence"].apply(is_empty)) & (~scfv_df["light_sequence"].apply(is_empty))

print("scFv всего:", len(scfv_df))
print("оба домена уже раздельно:", both_present.sum())
print("только heavy заполнен:", only_heavy.sum())
print("только light заполнен:", only_light.sum())
print("оба пустые:", ((~both_present) & (~only_heavy) & (~only_light)).sum())

scFv всего: 55670
оба домена уже раздельно: 0
только heavy заполнен: 55670
только light заполнен: 0
оба пустые: 0


In [None]:
from multiprocessing import Pool
import numpy as np
import sys
import os

# Add current directory to path to import worker module
if os.getcwd() not in sys.path:
    sys.path.append(os.getcwd())

# Import worker function from external file to fix pickling error
try:
    from data.anarci_worker import run_anarci_batch
except ImportError:
    # Fallback if running from data directory directly
    try:
        from anarci_worker import run_anarci_batch
    except ImportError:
        print("Error: Could not import anarci_worker. Make sure data/anarci_worker.py exists.")

def process_antibody_sequences_optimized(df, n_jobs=8, batch_size=5000):
    """
    Optimized version of antibody processing using multiprocessing.
    Runs ANARCI in parallel batches and minimizes DataFrame operations.
    """
    df = df.copy()
    
    # 1. Prepare sequences list efficiently
    print("Preparing sequences...")
    sequences_to_anarci = []
    mapping_list = [] 
    
    rows_iter = zip(df.index, df['scfv'], df['heavy_sequence'], df['light_sequence'])
    
    for idx, is_scfv, h_seq, l_seq in tqdm(rows_iter, total=len(df), desc="Collecting sequences"):
        if is_scfv:
            seq = h_seq
            if pd.isna(seq) or len(seq) == 0:
                 seq = l_seq if isinstance(l_seq, str) else ''
            
            if isinstance(seq, str) and len(seq) > 0:
                internal_id = f"{idx}_scfv"
                sequences_to_anarci.append((internal_id, seq))
                mapping_list.append((idx, 'scfv', seq))
        else:
            if isinstance(h_seq, str) and len(h_seq) > 0:
                internal_id = f"{idx}_heavy"
                sequences_to_anarci.append((internal_id, h_seq))
                mapping_list.append((idx, 'heavy', h_seq))
            
            if isinstance(l_seq, str) and len(l_seq) > 0:
                internal_id = f"{idx}_light"
                sequences_to_anarci.append((internal_id, l_seq))
                mapping_list.append((idx, 'light', l_seq))
                
    total_seqs = len(sequences_to_anarci)
    print(f"Running ANARCI on {total_seqs} sequences using {n_jobs} processes...")
    
    # 2. Parallel execution
    chunk_size = min(batch_size, max(1, total_seqs // (n_jobs * 2)))
    chunks = [sequences_to_anarci[i : i + chunk_size] for i in range(0, total_seqs, chunk_size)]
    
    all_numbering = []
    all_alignment_details = []
    
    with Pool(processes=n_jobs) as pool:
        results = list(tqdm(pool.imap(run_anarci_batch, chunks), total=len(chunks), desc="ANARCI Parallel"))
    
    # Unpack results
    for num, aln, err in results:
        if err and not isinstance(err, str): 
             pass
        if num is None:
            print(f"Batch failed: {err}")
            continue 
        all_numbering.extend(num)
        all_alignment_details.extend(aln)

    # 3. Process results efficiently
    print("Processing ANARCI results...")
    
    heavy_seqs = {}
    light_seqs = {}
    heavy_specs = {}
    light_specs = {}
    heavy_germs = {}
    light_germs = {}
    heavy_nums = {}
    light_nums = {}
    
    for i, (num_hits, hits) in tqdm(enumerate(zip(all_numbering, all_alignment_details)), total=len(all_numbering), desc="Parsing results"):
        if not hits or not num_hits: 
            continue
            
        original_idx, seq_type, full_seq = mapping_list[i]
        
        for domain_idx, hit in enumerate(hits):
            if not hit: continue
            
            chain_type = hit.get('chain_type', 'unknown')
            start = hit.get('query_start')
            end = hit.get('query_end')
            species = hit.get('species')
            germlines = hit.get('germlines')
            
            domain_data = num_hits[domain_idx]
            numbering_json = None
            if domain_data:
                residues_json = [
                    {"pos": r[0][0], "ins": r[0][1].strip(), "aa": r[1]}
                    for r in domain_data[0]
                ]
                numbering_obj = {
                    "domain_start": domain_data[1],
                    "domain_end": domain_data[2],
                    "residues": residues_json
                }
                numbering_json = json.dumps([numbering_obj])
            
            domain_seq = full_seq[start : end] if (start is not None and end is not None) else None

            is_heavy = False
            if seq_type == 'scfv':
                if chain_type == 'H': is_heavy = True
                elif chain_type in ['K', 'L']: is_heavy = False
                else: continue
            elif seq_type == 'heavy':
                if chain_type == 'H': is_heavy = True
                else: continue
            elif seq_type == 'light':
                if chain_type in ['K', 'L']: is_heavy = False
                else: continue
            
            if is_heavy:
                heavy_seqs[original_idx] = domain_seq
                heavy_specs[original_idx] = species
                heavy_germs[original_idx] = str(germlines)
                heavy_nums[original_idx] = numbering_json
            else:
                light_seqs[original_idx] = domain_seq
                light_specs[original_idx] = species
                light_germs[original_idx] = str(germlines)
                light_nums[original_idx] = numbering_json

    print("Updating DataFrame columns...")
    
    # Convert dictionaries to Series once
    heavy_seqs_series = pd.Series(heavy_seqs, name='heavy_sequence')
    light_seqs_series = pd.Series(light_seqs, name='light_sequence')
    
    # Use combine_first to update only found values
    # heavy_seqs_series contains only updated sequences, combine_first fills gaps from original
    df['heavy_sequence'] = heavy_seqs_series.combine_first(df['heavy_sequence'])
    df['light_sequence'] = light_seqs_series.combine_first(df['light_sequence'])
    
    # For new columns, map is fine as they are empty or overwritten
    df['heavy_species'] = df.index.map(heavy_specs)
    df['light_species'] = df.index.map(light_specs)
    df['heavy_germlines'] = df.index.map(heavy_germs)
    df['light_germlines'] = df.index.map(light_germs)
    df['heavy_numbering'] = df.index.map(heavy_nums)
    df['light_numbering'] = df.index.map(light_nums)
    found_h = df['heavy_sequence'].notna().sum()
    found_l = df['light_sequence'].notna().sum()
    print(f"Extraction complete. Found {found_h} Heavy chains and {found_l} Light chains.")
    
    return df

# Apply the optimized function
if __name__ == '__main__':
    agab_df = process_antibody_sequences_optimized(agab_df, n_jobs=16)


# Сохраняем датафрейм в .parquet
output_path = 'agab_after_anarci.parquet'
df = agab_df

# Fix mixed types for Parquet export
# 'affinity' contains both numbers and strings (e.g. 'h'), so we must save as string
df['affinity'] = df['affinity'].astype(str)
df['processed_measurement'] = df['processed_measurement'].astype(str)

df.to_parquet(output_path, index=False, engine='pyarrow')
print(f"Отфильтрованные данные сохранены в {output_path}")
print(f"Размер сохраненных данных: {df.shape}")


Preparing sequences...


Collecting sequences:   0%|          | 0/337196 [00:00<?, ?it/s]

Running ANARCI on 620789 sequences using 16 processes...


ANARCI Parallel:   0%|          | 0/125 [00:00<?, ?it/s]

Processing ANARCI results...


Parsing results:   0%|          | 0/620789 [00:00<?, ?it/s]

Updating DataFrame columns...
Extraction complete. Found 337196 Heavy chains and 337196 Light chains.


### Фильтрация после ANARCI разметки


Ответ на вопрос про антитела > 140 а.к.
Стоит ли их оставлять?
Да, однозначно стоит, но с проверкой.
Длинные последовательности (>140 а.к.) в большинстве случаев — это не ошибка, а антитела с ультра-длинным CDRH3 (петлей, связывающей антиген). Такие антитела встречаются в природе (например, у ВИЧ-инфицированных пациентов или у некоторых видов животных, таких как коровы) и могут обладать уникальными свойствами нейтрализации.
Если вы удалите их просто по порогу длины, вы потеряете самые интересные и редкие данные. Проверка совокупной длины CDR — отличная идея, чтобы убедиться, что "лишняя" длина находится именно в вариабельных петлях (где ей и место), а не в каркасных регионах (что было бы ошибкой секвенирования).

In [None]:
agab_df = pd.read_parquet('agab_after_anarci.parquet')
print(f"Всего записей: {agab_df.shape}")

Всего записей: (337196, 17)


CDR3 <= 37: "У тебя не может быть одной петли длиной в километр".  

total_cdr_len: "Если ты в целом длинный гигант, докажи это длиной своих петель, а не ошибкой в каркасе".

In [15]:
import pandas as pd
import json

# --- Функции --- (те же самые, без изменений)
def has_conservative_cysteines(numbering_json):
    if not numbering_json or pd.isna(numbering_json):
        return False
    try:
        data = json.loads(numbering_json)
        if not data: return False
        residues = data[0].get('residues', [])
        has_c23 = any(r['pos'] == 23 and r['aa'] == 'C' for r in residues)
        has_c104 = any(r['pos'] == 104 and r['aa'] == 'C' for r in residues)
        return has_c23 and has_c104
    except: return False

def check_cdr3_length(numbering_json, threshold=37):
    if not numbering_json or pd.isna(numbering_json): return False
    try:
        data = json.loads(numbering_json)
        if not data: return False
        residues = data[0].get('residues', [])
        cdr3_len = sum(1 for r in residues if 105 <= r['pos'] <= 117)
        return cdr3_len <= threshold
    except: return False

def get_total_cdr_length(numbering_json):
    if not numbering_json or pd.isna(numbering_json): return 0
    try:
        data = json.loads(numbering_json)
        if not data: return 0
        residues = data[0].get('residues', [])
        cdr_len = 0
        for r in residues:
            p = int(r['pos'])
            if (27 <= p <= 38) or (56 <= p <= 65) or (105 <= p <= 117):
                cdr_len += 1
        return cdr_len
    except: return 0

# === ПРИМЕНЕНИЕ ФИЛЬТРОВ ===

print(f"ИСХОДНЫЙ размер датасета: {len(agab_df)}")
current_df = agab_df.copy()

# --- ШАГ 1: Фильтрация по цистеинам (C23 & C104) ---
print("\n[Шаг 1/3] Проверка консервативных цистеинов (C23 + C104)...")
if 'heavy_cys_ok' not in current_df.columns:
    current_df['heavy_cys_ok'] = current_df['heavy_numbering'].apply(has_conservative_cysteines)
    current_df['light_cys_ok'] = current_df['light_numbering'].apply(has_conservative_cysteines)

h_cys = current_df['heavy_sequence'].isna() | current_df['heavy_cys_ok']
l_cys = current_df['light_sequence'].isna() | current_df['light_cys_ok']
not_empty = current_df['heavy_sequence'].notna() | current_df['light_sequence'].notna()

before_len = len(current_df)
current_df = current_df[h_cys & l_cys & not_empty]
removed_cys = before_len - len(current_df)

print(f"   -> Осталось: {len(current_df)}")
print(f"   -> Удалено: {removed_cys} (отсутствуют критические цистеины)")


# --- ШАГ 2: Фильтрация по длине CDR3 (<= 37 а.к.) ---
print("\n[Шаг 2/3] Проверка длины CDR3 (порог OAS: <= 37 а.к.)...")
current_df['heavy_cdr3_ok'] = current_df['heavy_numbering'].apply(check_cdr3_length)
current_df['light_cdr3_ok'] = current_df['light_numbering'].apply(check_cdr3_length)

h_cdr3 = current_df['heavy_sequence'].isna() | current_df['heavy_cdr3_ok']
l_cdr3 = current_df['light_sequence'].isna() | current_df['light_cdr3_ok']

before_len = len(current_df)
current_df = current_df[h_cdr3 & l_cdr3]
removed_cdr3 = before_len - len(current_df)

print(f"   -> Осталось: {len(current_df)}")
print(f"   -> Удалено: {removed_cdr3} (CDR3 > 37 аминокислот)")


# --- ШАГ 3: Проверка длинных последовательностей (>140 а.к.) ---
print("\n[Шаг 3/3] Анализ сверхдлинных цепей (>140 а.к.)...")
current_df['heavy_len'] = current_df['heavy_sequence'].str.len().fillna(0)
long_mask = current_df['heavy_len'] > 140

long_count = long_mask.sum()
if long_count > 0:
    print(f"   Найдено длинных цепей (>140): {long_count}")
    
    # Считаем сумму длин CDR только для длинных цепей
    current_df.loc[long_mask, 'total_cdr_len'] = current_df.loc[long_mask, 'heavy_numbering'].apply(get_total_cdr_length)
    
    # Условие удаления: Длинная цепь (>140) НО короткие CDR (<35)
    # Это значит, что длина набрана за счет каркаса (ошибка)
    suspicious_mask = long_mask & (current_df['total_cdr_len'] < 35)
    suspicious_count = suspicious_mask.sum()
    
    if suspicious_count > 0:
        before_len = len(current_df)
        current_df = current_df[~suspicious_mask]
        print(f"   -> Удалено: {suspicious_count} (длинные цепи с подозрительно короткими CDR)")
    else:
        print("   -> Удалено: 0 (все длинные цепи имеют адекватно длинные CDR)")
else:
    print("   -> Длинных цепей (>140 а.к.) не обнаружено. Пропуск шага.")

# Удаляем технические колонки перед сохранением
cols_to_drop = ['heavy_cys_ok', 'light_cys_ok', 'heavy_cdr3_ok', 'light_cdr3_ok', 'total_cdr_len', 'heavy_len']
agab_filtered = current_df.drop(columns=cols_to_drop, errors='ignore').copy()

print(f"\n=== ИТОГ ===")
print(f"Финальный размер: {len(agab_filtered)}")
print(f"Всего удалено: {len(agab_df) - len(agab_filtered)}")

ИСХОДНЫЙ размер датасета: 337196

[Шаг 1/3] Проверка консервативных цистеинов (C23 + C104)...
   -> Осталось: 336751
   -> Удалено: 445 (отсутствуют критические цистеины)

[Шаг 2/3] Проверка длины CDR3 (порог OAS: <= 37 а.к.)...
   -> Осталось: 336741
   -> Удалено: 10 (CDR3 > 37 аминокислот)

[Шаг 3/3] Анализ сверхдлинных цепей (>140 а.к.)...
   -> Длинных цепей (>140 а.к.) не обнаружено. Пропуск шага.

=== ИТОГ ===
Финальный размер: 336741
Всего удалено: 455


### Save to agab.parquet

In [16]:
# Сохраняем датафрейм в .parquet
output_path = 'agab.parquet'
df = agab_filtered

# Fix mixed types for Parquet export
# 'affinity' contains both numbers and strings (e.g. 'h'), so we must save as string
df['affinity'] = df['affinity'].astype(str)
df['processed_measurement'] = df['processed_measurement'].astype(str)

df.to_parquet(output_path, index=False, engine='pyarrow')
print(f"Отфильтрованные данные сохранены в {output_path}")
print(f"Размер сохраненных данных: {df.shape}")


Отфильтрованные данные сохранены в agab.parquet
Размер сохраненных данных: (336741, 17)


### Делим на train test

сделаем разбиение данных в два этапа:
1) easy-cluster на антигенах с --min-seq-id 0.4, --cov-mode 2, -c 0.7. Рандомно поделим КЛАСТЕРЫ на train и test, чтобы примерно соблюсти 80/20 соотношение (по уникальным антигенам, не по числу строк в датасете, т.к. все равно потом будем бороться с дисбалансом) 
2) easy-search: train против test с теми же параметрами. Если кто-то из test оказывается похож на train, переносим его из test в train и повторяем шаг (2) полностью, пока похожие не перестанут находиться
3) easy-search: train против test уже на антителах, --min-seq-id 0.7. (т.к. CDR занимают) 

In [6]:
import pandas as pd

# Загружаем из agab_mmseq.parquet
agab_mmseq = pd.read_parquet('agab_mmseq.parquet', engine='pyarrow')
print(f"Загружено: {agab_mmseq.shape}")

Загружено: (336741, 18)


In [7]:
agab_mmseq.head()

Unnamed: 0,dataset,heavy_sequence,light_sequence,scfv,affinity_type,affinity,antigen_sequence,confidence,nanobody,metadata,processed_measurement,heavy_species,light_species,heavy_germlines,light_germlines,heavy_numbering,light_numbering,mmseq_cluster_rep
0,abdesign,QVQLVQSGAEVKKPGASVKVSCKASGYTFTSYWINWVRQAPGQGLE...,DIVMTQSPDSLAVSLGERATINCKSSQSLWDSGNQKNFLTWYQQKP...,False,elisa_mut_to_wt_ratio,1.008535785,WNPPTFSPALLVVTEGDNATFTCSFSNTSESFVLNWYRMSPSNQTD...,very_high,False,{'heavy_riot_numbering': {'cdr1_aa': 'GYTFTSYW...,1.008535785,mouse,human,"{'v_gene': [('human', 'IGHV1-46*01'), 0.857142...","{'v_gene': [('human', 'IGKV4-1*01'), 0.8877551...","[{""domain_start"": 0, ""domain_end"": 116, ""resid...","[{""domain_start"": 0, ""domain_end"": 112, ""resid...",HHHHHHGENLYFQGSLDSPDRPWNPPTFSPALLVVTEGDNATFTCS...
1,abdesign,VQLVQSGVEVKKPGASVKVSCKASGYTFTNYYMYWVRQAPGQGLEW...,EIVLTQSPATLSLSPGERATLSCRASKGVSTSGYSYLHWYQQKPGQ...,False,elisa_mut_to_wt_ratio,1.052205794,PWNPPTFSPALLVVTEGDNATFTCSFSNTSESFVLNWYRMSPSNQT...,very_high,False,{'heavy_riot_numbering': {'cdr1_aa': 'GYTFTNYY...,1.052205794,mouse,human,"{'v_gene': [('human', 'IGHV1-2*02'), 0.7857142...","{'v_gene': [('human', 'IGKV3-11*01'), 0.858695...","[{""domain_start"": 0, ""domain_end"": 118, ""resid...","[{""domain_start"": 0, ""domain_end"": 110, ""resid...",HHHHHHGENLYFQGSLDSPDRPWNPPTFSPALLVVTEGDNATFTCS...
2,structures-antibodies,QVQLQQPGAELVKPGASVKLSCKASGYTFTSDWIHWVKQRPGHGLE...,DILLTQSPAILSVSPGERVSFSCRASQSIGTDIHWYQQRTNGSPRL...,False,bool,1.0,SALHWRAAGAATVLLVIVLLAGSYLAVLAERGAPGAQLITYPRALW...,very_high,False,{'heavy_riot_numbering': {'cdr1_aa': 'GYTFTSDW...,1.0,mouse,mouse,"{'v_gene': [('mouse', 'IGHV1-69-5*01'), 0.8877...","{'v_gene': [('mouse', 'IGKV5-48*01'), 0.978260...","[{""domain_start"": 0, ""domain_end"": 117, ""resid...","[{""domain_start"": 0, ""domain_end"": 106, ""resid...",MAPMLSGLLARLVKLLLGRHGSALHWRAAGAATVLLVIVLLAGSYL...
3,structures-antibodies,EVQLVESGGGLVQPGGSLRLSCAASGFTFDDYSMNWVRQAPGKGLE...,DIQLTQSPSSLSASVGDRVTITCQASQDISNYLNWYQQKPGKAPKL...,False,bool,1.0,FKVATPYSLYVCPEGQNVTLTCRLLGPVDKGHDVTFYKTWYRSSRG...,very_high,False,{'heavy_riot_numbering': {'cdr1_aa': 'GFTFDDYS...,1.0,human,human,"{'v_gene': [('human', 'IGHV3-48*01'), 0.948979...","{'v_gene': [('human', 'IGKV1-33*01'), 0.967391...","[{""domain_start"": 0, ""domain_end"": 121, ""resid...","[{""domain_start"": 0, ""domain_end"": 107, ""resid...",FKVATPYSLYVCPEGQNVTLTCRLLGPVDKGHDVTFYKTWYRSSRG...
4,structures-antibodies,VKLQESGAVVQPGGSLRLSCAASGFTGSDYDMSWIRQAPGKGLEWV...,DIQMTQSPASLAVSPGQRATITCRASESVSNYGINFINWFQQKPGQ...,False,bool,1.0,ADPGYLLEFDTLCIGYHANNSTDTVDTVLEKNVTVTHSVNLLEDKH...,very_high,False,{'heavy_riot_numbering': {'cdr1_aa': 'GFTGSDYD...,1.0,human,mouse,"{'v_gene': [('human', 'IGHV3-66*01'), 0.773195...","{'v_gene': [('mouse', 'IGKV3-2*01'), 0.8020833...","[{""domain_start"": 0, ""domain_end"": 119, ""resid...","[{""domain_start"": 0, ""domain_end"": 110, ""resid...",MEEIVLLFAIVSLARSDQICIGYHANNSTKQVDTIMEKNVTVTHAQ...


In [None]:
import pandas as pd
from sklearn.model_selection import GroupShuffleSplit

df = pd.read_parquet('data/agab_mmseq.parquet')

# Используем GroupShuffleSplit
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)

# Группируем по представителю кластера!
splitter = gss.split(df, groups=df['mmseq_cluster_rep'])
train_idx, test_idx = next(splitter)

train_df = df.iloc[train_idx]
test_df = df.iloc[test_idx]

print(f"Train size: {len(train_df)}")
print(f"Test size: {len(test_df)}")

# Проверка на утечки (должно быть 0 пересечений по кластерам)
train_clusters = set(train_df['mmseq_cluster_rep'])
test_clusters = set(test_df['mmseq_cluster_rep'])
print(f"Пересечение кластеров: {len(train_clusters.intersection(test_clusters))}") 
# Если 0 — значит, сплит идеальный.