# Пайплайн с неполным пересчётом и перезаписью матрицы

In [1]:
import hail as hl

In [None]:
hl.init()
hl.default_reference('GRCh38')

Running on Apache Spark version 3.5.5
SparkUI available at http://172.16.0.135:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.134-952ae203dbbe
LOGGING: writing to /home/julia/thesis/hail-20250521-1600-0.2.134-952ae203dbbe.log


2025-05-21 16:01:33.526 Hail: INFO: Reading table without type imputation
  Loading field 'ID' as type str (user-supplied)
  Loading field 'sex' as type str (user-supplied)
  Loading field 'profile' as type str (not specified)


In [3]:
import glob
import os

In [None]:
# конфигурация
VCF_DIR = '/vcfs/'  # папка с VCF
VCF_DIR_NEW = '/new/' # папка с VCF для добавления
SEX_TABLE_PATH = 'sids_test.csv' # файл с полом
BASE_DATA_PATH = '/vcfs/cache/combined.mt' # первый пул данных

In [6]:
# старые файлы

vcf_files = glob.glob(VCF_DIR + '*.vcf.gz')
print(vcf_files)

['/vcfs/vcf_test/000007000020.vcf.gz', '/vcfs/vcf_test/000007000040.vcf.gz', '/vcfs/vcf_test/000007000070.vcf.gz', '/vcfs/vcf_test/000007000030.vcf.gz', '/vcfs/vcf_test/000007000060.vcf.gz']


In [7]:
# новые файлы
new_vcf_files = glob.glob(VCF_DIR_NEW + '*.vcf.gz')
print(new_vcf_files)

['/vcfs/vcf_new/000007000050.vcf.gz']


In [8]:
# добавление пола

def set_sex(mt, sex_table):
    # преобразуем пол в is_female (True для 'ж'/'f')
    sex_table = sex_table.annotate(
        is_female = (
            (sex_table.sex.lower() == 'ж') | 
            (sex_table.sex.lower() == 'f')
        )
    )
    
    # добавляем is_female к образцам (простое соединение)
    mt = mt.annotate_cols(
        is_female = sex_table[mt.s].is_female  # mt.s - ID образца
    )

    return mt

In [10]:
# нормализация гемизигот у мужчин и МХ у всех

def normalize_ploidy(mt):
    return mt.annotate_entries(
        GT = hl.case()
            # Митохондриальная ДНК (гаплоидная у всех)
            .when(mt.locus.contig == "chrM",
                hl.if_else(
                    mt.GT.is_hom_ref(),
                    hl.call(0),
                    hl.if_else(
                        mt.GT.is_hom_var(),
                        hl.call(1),
                        hl.if_else(
                            (mt.VAF[0] > 0.3) | (mt.VAF[1] > 0.3),  # Учитываем оба аллеля
                            hl.call(1),
                            hl.call(0)
                        )
                    )
                )
            )
            # Гемизиготные участки у мужчин (X/Y)
            .when((~mt.is_female) & ((mt.locus.contig == "chrX") | (mt.locus.contig == "chrY")),
                hl.if_else(
                    mt.GT.is_hom_ref(),
                    hl.call(0),
                    hl.if_else(
                        mt.GT.is_hom_var(),
                        hl.call(1),
                        hl.if_else(
                            mt.VAF[0] > 0.3,
                            hl.call(1),
                            hl.call(0)
                        )
                    )
                )
            )
            # Все остальные случаи (аутосомы, X у женщин)
            .default(mt.GT)
    )

In [11]:
# фильтрация по глубине

def filter_variants_by_DP(combined_mt_all, dp):

    # отсекаем варианты, если нет ни одного образца с DP больше порога
    filtered_mt = combined_mt_all.filter_rows(
        hl.agg.count_where(
            (hl.is_defined(combined_mt_all.DP)) & 
            (combined_mt_all.DP >= dp)
        ) >= 1
    )

    # корректируем генотипы - варианты с DP меньше порога исключаем из расчёта частот, помечая как NA
    return filtered_mt.annotate_entries(
        GT = hl.if_else(
            (hl.is_defined(filtered_mt.DP)) & 
            (filtered_mt.DP >= dp),
            filtered_mt.GT,
            hl.missing(hl.tcall)
        )
    )


In [None]:
#препроцессинг до фильтрации включительно

def preprocessing(vcf_files, sex_table):
    # комбайн
    mts_all = []
    for vcf in vcf_files: 
        mt = hl.import_vcf(vcf, force_bgz=True, array_elements_required=False)
        mt = set_sex(mt, sex_table)
        mt = normalize_ploidy(mt)
        mts_all.append(mt)

    # Объединение MatrixTable по колонкам (образцам)
    combined_mt_all = mts_all[0]
    if len(mts_all) > 1:
        for mt in mts_all[1:]:
            combined_mt_all = combined_mt_all.union_cols(mt, row_join_type='outer')

    #фильтрация по глубине
    return filter_variants_by_DP(combined_mt_all, 3)

In [None]:
# расчёт частот

def mt_AF_calculated(mt):
    freq_mt_all = mt.annotate_rows(
    call_stats=hl.agg.call_stats(mt.GT, mt.alleles)
    hwe_stats=hl.agg.hardy_weinberg_test(mt.GT)
    )

    # извлечение частот аллелей
    return freq_mt_all.annotate_rows(
        allele_frequencies=freq_mt_all.call_stats.AF  # AF — это массив частот аллелей, включая мультиаллели
    )

In [19]:
# функция сбора первых данных

def save_base_data(vcf_files, sex_table):    
    #препроцессированные данные - установка пола, нормализация гемизигот, фильтрация по глубине
    mt_combined = preprocessing(vcf_files, sex_table)
    
    #расчёт частот
    mt_af = mt_AF_calculated(mt_combined)

    #сохранение
    mt_af.write(BASE_DATA_PATH)
    return mt_af


In [None]:
# определение пола
sex_table = hl.import_table(SEX_TABLE_PATH,
        delimiter=',',
        types={'ID': hl.tstr, 'sex': hl.tstr},
        key='ID'
    )

In [None]:
%%time
# не нужен, если данные уже сохранены
mt = save_base_data(vcf_files, sex_table)

In [None]:
mt.show(n_cols=6, n_rows=10)



Unnamed: 0_level_0,Unnamed: 1_level_0,'000007000020','000007000020','000007000020','000007000020','000007000020','000007000020','000007000020','000007000020','000007000040','000007000040','000007000040','000007000040','000007000040','000007000040','000007000040','000007000040','000007000070','000007000070','000007000070','000007000070','000007000070','000007000070','000007000070','000007000070','000007000030','000007000030','000007000030','000007000030','000007000030','000007000030','000007000030','000007000030','000007000060','000007000060','000007000060','000007000060','000007000060','000007000060','000007000060','000007000060'
locus,alleles,GT,GQ,DP,MIN_DP,AD,VAF,PL,MED_DP,GT,GQ,DP,MIN_DP,AD,VAF,PL,MED_DP,GT,GQ,DP,MIN_DP,AD,VAF,PL,MED_DP,GT,GQ,DP,MIN_DP,AD,VAF,PL,MED_DP,GT,GQ,DP,MIN_DP,AD,VAF,PL,MED_DP
locus<GRCh38>,array<str>,call,int32,int32,int32,array<int32>,array<float64>,array<int32>,int32,call,int32,int32,int32,array<int32>,array<float64>,array<int32>,int32,call,int32,int32,int32,array<int32>,array<float64>,array<int32>,int32,call,int32,int32,int32,array<int32>,array<float64>,array<int32>,int32,call,int32,int32,int32,array<int32>,array<float64>,array<int32>,int32
chr1:10177,"[""A"",""AC""]",,,,,,,,,,,,,,,,,1/1,4.0,3.0,,"[0,2]",[6.67e-01],"[1,13,0]",,,,,,,,,,,,,,,,,
chr1:10230,"[""AC"",""A""]",,,,,,,,,,,,,,,,,,,,,,,,,,13.0,5.0,,"[3,2]",[4.00e-01],"[0,21,13]",,,,,,,,,
chr1:10241,"[""T"",""C""]",,,,,,,,,,,,,,,,,,,,,,,,,,18.0,5.0,,"[3,2]",[4.00e-01],"[0,21,19]",,,,,,,,,
chr1:10291,"[""C"",""T""]",,,,,,,,,,,,,,,,,,,,,,,,,,18.0,6.0,,"[4,2]",[3.33e-01],"[0,20,20]",,,,,,,,,
chr1:10315,"[""C"",""T""]",,,,,,,,,,,,,,,,,,,,,,,,,,14.0,5.0,,"[2,3]",[6.00e-01],"[0,18,15]",,,,,,,,,
chr1:10333,"[""CT"",""C""]",,,,,,,,,,,,,,,,,,,,,,,,,,13.0,5.0,,"[2,2]",[4.00e-01],"[0,16,16]",,,,,,,,,
chr1:10407,"[""T"",""C""]",,5.0,3.0,,"[1,2]",[6.67e-01],"[0,8,5]",,,,,,,,,,,,,,,,,,,3.0,4.0,,"[2,2]",[5.00e-01],"[0,8,1]",,,3.0,6.0,,"[2,4]",[6.67e-01],"[0,6,0]",
chr1:10417,"[""C"",""G""]",,6.0,4.0,,"[2,2]",[5.00e-01],"[0,5,12]",,,,,,,,,,,,,,,,,,,6.0,3.0,,"[1,2]",[6.67e-01],"[0,14,5]",,,4.0,6.0,,"[2,4]",[6.67e-01],"[0,14,1]",
chr1:10428,"[""CCCTAA"",""C""]",0/1,3.0,4.0,,"[1,3]",[7.50e-01],"[0,0,7]",,,,,,,,,,,,,,,,,,,4.0,5.0,,"[3,2]",[4.00e-01],"[0,14,2]",,,4.0,7.0,,"[3,4]",[5.71e-01],"[0,15,1]",
chr1:10433,"[""A"",""AC""]",,,,,,,,,,,,,,,,,,,,,,,,,1/1,9.0,3.0,,"[0,3]",[1.00e+00],"[11,12,0]",,,,,,,,,


In [None]:
%%time

# если начальные данные уже сохранены - считываем, если нет - сохраняем
if os.path.exists(BASE_DATA_PATH):
    old_data = hl.read_matrix_table(BASE_DATA_PATH)
else:
    old_data = save_base_data(vcf_files, sex_table)

In [None]:
%%time

# подготовка новых данных
new_data = preprocessing(new_vcf_files, sex_table)

CPU times: user 120 ms, sys: 0 ns, total: 120 ms
Wall time: 323 ms


In [None]:
%%time

# поиск уникальных вариантов
new_variants = new_data.rows()

# фильтруем старые данные: оставляем только варианты, которые есть в новых образцах
old_mt_to_update = old_data.filter_rows(
    hl.is_defined(new_variants[old_data.row_key])
)

CPU times: user 55.9 ms, sys: 1.62 ms, total: 57.5 ms
Wall time: 54.2 ms


In [None]:
%%time

# объединение старых и новых данных с пересчётом частот PART 1

# объединяем старые и новые данные по этим вариантам
updated_mt = old_mt_to_update.union_cols(new_data, row_join_type="outer")

# пересчитываем статистики для обновлённых вариантов
updated_mt = mt_AF_calculated(updated_mt)

In [None]:
%%time

# объединяем пересчитанные данные со старыми

old_mt = old_data.filter_rows(
    ~hl.is_defined(new_variants[old_data.row_key])
)

# объединяем таблицы
combined_mt = hl.experimental.full_outer_join_mt(old_mt, updated_mt)

# создаем новое неключевое поле с объединенными sample IDs
combined_mt = combined_mt.annotate_cols(
    combined_s = hl.coalesce(combined_mt.right_col.s, \
        combined_mt.left_col.s),
    is_female = hl.coalesce(combined_mt.right_col.is_female, \
        combined_mt.left_col.is_female)
)

# переключаем ключ столбца на новое поле
combined_mt = combined_mt.key_cols_by('combined_s')

# возвращаем структуру ключей
combined_mt = combined_mt.drop('s', 'left_col', 'right_col')
combined_mt = combined_mt.rename({'combined_s': 's'})


# собираем поля для локуса и аллелей
combined_mt = combined_mt.annotate_rows(
    combined_locus = hl.coalesce(combined_mt.right_row.locus, \
        combined_mt.left_row.locus),
    combined_alleles = hl.coalesce(combined_mt.right_row.alleles, \
        combined_mt.left_row.alleles),
    rsid = hl.coalesce(combined_mt.right_row.rsid, \
        combined_mt.left_row.rsid),
    qual = hl.coalesce(combined_mt.right_row.qual, \
        combined_mt.left_row.qual),
    call_stats = hl.coalesce(combined_mt.right_row.call_stats, \
        combined_mt.left_row.call_stats),
    allele_frequencies = hl.coalesce(combined_mt.right_row.\
        allele_frequencies, combined_mt.left_row.allele_frequencies),
)

# переключаем ключи строк на новые поля
combined_mt = combined_mt.key_rows_by('combined_locus', \
    'combined_alleles')

combined_mt = combined_mt.drop('locus', 'alleles', \
    'right_row', 'left_row')
combined_mt = combined_mt.rename({'combined_locus': 'locus', \
    'combined_alleles': 'alleles'})

# объединяем записи (entry fields)
combined_mt = combined_mt.annotate_entries(
    # используем записи из правой таблицы, если они есть, иначе из левой
    GT = hl.coalesce(combined_mt.right_entry.GT, \
        combined_mt.left_entry.GT),
    DP = hl.coalesce(combined_mt.right_entry.DP, \
        combined_mt.left_entry.DP),
    AD = hl.coalesce(combined_mt.right_entry.AD, \
        combined_mt.left_entry.AD),
    VAF = hl.coalesce(combined_mt.right_entry.VAF, \
        combined_mt.left_entry.VAF),
)

# убираем временные поля
combined_mt = combined_mt.drop('left_entry', 'right_entry')

# перезаписываем таблицу
combined_mt.write(BASE_DATA_PATH, overwrite = True)

In [None]:
combined_mt.show(n_cols = 10)



Unnamed: 0_level_0,Unnamed: 1_level_0,'000007000020','000007000020','000007000020','000007000020','000007000030','000007000030','000007000030','000007000030','000007000040','000007000040','000007000040','000007000040','000007000050','000007000050','000007000050','000007000050','000007000060','000007000060','000007000060','000007000060','000007000070','000007000070','000007000070','000007000070'
locus,alleles,GT,DP,AD,VAF,GT,DP,AD,VAF,GT,DP,AD,VAF,GT,DP,AD,VAF,GT,DP,AD,VAF,GT,DP,AD,VAF
locus<GRCh38>,array<str>,call,int32,array<int32>,array<float64>,call,int32,array<int32>,array<float64>,call,int32,array<int32>,array<float64>,call,int32,array<int32>,array<float64>,call,int32,array<int32>,array<float64>,call,int32,array<int32>,array<float64>
chr1:10177,"[""A"",""AC""]",,,,,,,,,,,,,,,,,,,,,1/1,3.0,"[0,2]",[6.67e-01]
chr1:10230,"[""AC"",""A""]",,,,,,5.0,"[3,2]",[4.00e-01],,,,,,,,,,,,,,,,
chr1:10241,"[""T"",""C""]",,,,,,5.0,"[3,2]",[4.00e-01],,,,,,,,,,,,,,,,
chr1:10291,"[""C"",""T""]",,,,,,6.0,"[4,2]",[3.33e-01],,,,,,,,,,,,,,,,
chr1:10315,"[""C"",""T""]",,,,,,5.0,"[2,3]",[6.00e-01],,,,,,,,,,,,,,,,
chr1:10333,"[""CT"",""C""]",,,,,,5.0,"[2,2]",[4.00e-01],,,,,,,,,,,,,,,,
chr1:10407,"[""T"",""C""]",,3.0,"[1,2]",[6.67e-01],,4.0,"[2,2]",[5.00e-01],,,,,,6.0,"[2,4]",[6.67e-01],,6.0,"[2,4]",[6.67e-01],,,,
chr1:10417,"[""C"",""G""]",,4.0,"[2,2]",[5.00e-01],,3.0,"[1,2]",[6.67e-01],,,,,,6.0,"[2,4]",[6.67e-01],,6.0,"[2,4]",[6.67e-01],,,,
chr1:10428,"[""CCCTAA"",""C""]",0/1,4.0,"[1,3]",[7.50e-01],,5.0,"[3,2]",[4.00e-01],,,,,,7.0,"[3,4]",[5.71e-01],,7.0,"[3,4]",[5.71e-01],,,,
chr1:10433,"[""A"",""AC""]",,,,,1/1,3.0,"[0,3]",[1.00e+00],,,,,,,,,,,,,,,,


In [None]:
hl.stop()