# Пайплайн с полным пересчётом и записью только малой матрицы

In [3]:
import hail as hl

In [4]:
hl.init()
hl.default_reference('GRCh38')

Running on Apache Spark version 3.5.5
SparkUI available at http://172.16.0.135:4041
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.134-952ae203dbbe
LOGGING: writing to /home/julia/Documents/thesis/scripts/hail-20250504-1448-0.2.134-952ae203dbbe.log
2025-05-04 14:48:31.225 Hail: INFO: Reading table without type imputation
  Loading field 'ID' as type str (user-supplied)
  Loading field 'sex' as type str (user-supplied)
  Loading field 'profile' as type str (not specified)
2025-05-04 14:51:33.662 Hail: WARN: Filtered with contig 'MT', but 'MT' is not a valid contig in reference genome GRCh38
2025-05-04 14:51:37.459 Hail: INFO: scanning VCF for sortedness...
2025-05-04 14:51:53.982 Hail: INFO: Coerced sorted VCF - no additional import work to do
2025-05-04 14:51:54.050 Hail: INFO: scanning VCF for sortedness...
2025-05-04 14:52:09.533 Hail: INFO: Coerced sorted VCF - no additional import work to do
2025-05-04 14:52:09.616 Hail: IN

In [69]:
hl.stop()

In [70]:
hl.init(
    master="local[58]",  # Используем 16 ядер из ~32 (оставляем ресурсы для системы)
    spark_conf={
        # Память
        "spark.driver.memory": "64G",          # Главный процесс
        "spark.executor.memory": "48G",        # Рабочие процессы
        "spark.memory.fraction": "0.8",        # Доля памяти для Spark (остальное - буфер)
        
        # Параллелизм
        "spark.sql.shuffle.partitions": "400", # Для операций JOIN/GROUP BY
        "spark.default.parallelism": "200",     # Дефолтный уровень параллелизма
        
        # Настройки для больших данных
        "spark.driver.maxResultSize": "8G",    # Макс. размер возвращаемых данных
        "spark.kryoserializer.buffer.max": "1G",# Сериализация больших объектов
        "spark.network.timeout": "600s",       # Таймауты для тяжелых задач
        
        # Оптимизация Hail
        "spark.hadoop.mapreduce.input.fileinputformat.split.minsize": "134217728"  # 128MB (размер партиций)
    },
    log="/tmp/hail.log"
)

Running on Apache Spark version 3.5.5
SparkUI available at http://172.16.0.135:4041
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.134-952ae203dbbe
LOGGING: writing to /tmp/hail.log


In [71]:
hl.default_reference('GRCh38')

In [72]:
import glob
import os

In [92]:
# конфигурация

VCF_DIR = '/home/julia/Downloads/genomes/links/'  # папка с VCF
VCF_DIR_NEW = '/home/julia/Downloads/genomes/new/' # папка с VCF для добавления
# SEX_TABLE_PATH = '/home/julia/Downloads/gnomADru/sids.csv' # файл с полом
SEX_TABLE_PATH = '/home/julia/Downloads/genomes/links/info.csv' # файл с полом
BASE_DATA_PATH = '/home/julia/Downloads/genomes/cache/combined.mt' # первый пул данных
AF_PATH = '/home/julia/Downloads/genomes/cache/af.tsv'

In [None]:
# старые файлы

vcf_files = glob.glob(VCF_DIR + '*.vcf.gz')
print(vcf_files)

In [76]:
# новые файлы
new_vcf_files = glob.glob(VCF_DIR_NEW + '*.vcf.gz')
print(new_vcf_files)

['/home/julia/Downloads/genomes/new/000007029510.vcf.gz', '/home/julia/Downloads/genomes/new/000007029520.vcf.gz', '/home/julia/Downloads/genomes/new/000007029530.vcf.gz', '/home/julia/Downloads/genomes/new/000007030390.vcf.gz']


In [77]:
# добавление пола

def set_sex(mt, sex_table):
    # преобразуем пол в is_female (True для 'ж'/'f')
    sex_table = sex_table.annotate(
        is_female = (
            (sex_table.sex.lower() == 'ж') | 
            (sex_table.sex.lower() == 'f')
        )
    )
    
    # добавляем is_female к образцам (простое соединение)
    mt = mt.annotate_cols(
        is_female = sex_table[mt.s].is_female  # mt.s - ID образца
    )

    return mt

In [78]:
# нормализация гемизигот у мужчин и МХ у всех

def normalize_ploidy(mt):
    return mt.annotate_entries(
        GT = hl.case()
            # Митохондриальная ДНК (гаплоидная у всех)
            .when(mt.locus.contig == "chrM",
                hl.if_else(
                    mt.GT.is_hom_ref(),
                    hl.call(0),
                    hl.if_else(
                        mt.GT.is_hom_var(),
                        hl.call(1),
                        hl.if_else(
                            (mt.VAF[0] > 0.3) | (mt.VAF[1] > 0.3),  # Учитываем оба аллеля
                            hl.call(1),
                            hl.call(0)
                        )
                    )
                )
            )
            # Гемизиготные участки у мужчин (X/Y)
            .when((~mt.is_female) & ((mt.locus.contig == "chrX") | (mt.locus.contig == "chrY")),
                hl.if_else(
                    mt.GT.is_hom_ref(),
                    hl.call(0),
                    hl.if_else(
                        mt.GT.is_hom_var(),
                        hl.call(1),
                        hl.if_else(
                            mt.VAF[0] > 0.3,
                            hl.call(1),
                            hl.call(0)
                        )
                    )
                )
            )
            # Все остальные случаи (аутосомы, X у женщин)
            .default(mt.GT)
    )

In [79]:
# нормализация гемизигот у мужчин

def gemizygote_normalize(mt):
    return mt.annotate_entries(
        GT = hl.if_else(
            (~mt.is_female) & ((mt.locus.contig == "chrX") | (mt.locus.contig == "chrY")),
            hl.if_else(
                mt.GT.is_hom_ref(),  # гомозигота по REF 0/0 → 0
                hl.call(0),
                hl.if_else(
                    mt.GT.is_hom_var(),  # гомозигота по ALT 1/1 → 1
                    hl.call(1),
                    hl.if_else(
                        mt.VAF[0] > 0.3,  # для гетерозигот - если VAF > 30% → 1 считаем гомозиготой по ALT
                        hl.call(1),
                        hl.call(0)     # иначе → 0 считаем гомозиготой по REF
                    )
                )
            ),
            mt.GT  # Для женщин и аутосом оставляем без изменений
        )
    )

In [80]:
# фильтрация по глубине

def filter_variants_by_DP(combined_mt_all, dp):

    # отсекаем варианты, если нет ни одного образца с DP больше порога
    filtered_mt = combined_mt_all.filter_rows(
        hl.agg.count_where(
            (hl.is_defined(combined_mt_all.DP)) & 
            (combined_mt_all.DP >= dp)
        ) >= 1
    )

    # корректируем генотипы - варианты с DP меньше порога исключаем из расчёта частот, помечая как NA
    return filtered_mt.annotate_entries(
        GT = hl.if_else(
            (hl.is_defined(filtered_mt.DP)) & 
            (filtered_mt.DP >= dp),
            filtered_mt.GT,
            hl.missing(hl.tcall)
        )
    )


In [81]:
#препроцессинг до фильтрации включительно

def preprocessing(vcf_files, sex_table):
    # комбайн
    mts_all = []
    for vcf in vcf_files: 
        mt = hl.import_vcf(vcf, force_bgz=True, array_elements_required=False)
        mt = set_sex(mt, sex_table)
        # mt = gemizygote_normalize(mt)
        mt = normalize_ploidy(mt)
        mts_all.append(mt)

    # Объединение MatrixTable по колонкам (образцам)
    combined_mt_all = mts_all[0]
    if len(mts_all) > 1:
        for mt in mts_all[1:]:
            combined_mt_all = combined_mt_all.union_cols(mt, row_join_type='outer')

    #фильтрация по глубине
    return filter_variants_by_DP(combined_mt_all, 3)

In [82]:
# расчёт частот - 4 версия.попытаться ускорить.пока ориентировочно лучший вариант

def mt_AF_calculated(mt):
    freq_mt_all = mt.annotate_rows(
    call_stats=hl.agg.call_stats(mt.GT, mt.alleles)
    )

    # извлечение частот аллелей
    return freq_mt_all.annotate_rows(
        allele_frequencies=freq_mt_all.call_stats.AF  # AF — это массив частот аллелей, включая мультиаллели
    )

## Пайплайн с добавлением новых данных - отсюда и до конца

In [83]:
# определение пола
sex_table = hl.import_table(SEX_TABLE_PATH,
        delimiter=',',
        types={'ID': hl.tstr, 'sex': hl.tstr},
        key='ID'
    )

In [84]:
# vcf_files = new_vcf_files + vcf_files
# print(vcf_files)
print(len(vcf_files))

996


In [85]:
%%time

# препроцессинг данных
data = preprocessing(vcf_files, sex_table)

#16 образцов
#CPU times: user 774 ms, sys: 14.8 ms, total: 789 ms
#Wall time: 1.74 s

#996
#CPU times: user 49.3 s, sys: 1.26 s, total: 50.6 s
#Wall time: 1min 5s

CPU times: user 49.3 s, sys: 1.26 s, total: 50.6 s
Wall time: 1min 5s


In [86]:
%%time

# расчёт частот
data_af = mt_AF_calculated(data)

#16 образцов
#CPU times: user 21.2 ms, sys: 2.73 ms, total: 23.9 ms
#Wall time: 21.6 ms

CPU times: user 16.9 ms, sys: 832 μs, total: 17.7 ms
Wall time: 17.5 ms


In [89]:
data_af.describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
    'is_female': bool
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'rsid': str
    'qual': float64
    'filters': set<str>
    'info': struct {
        END: int32
    }
    'call_stats': struct {
        AC: array<int32>, 
        AF: array<float64>, 
        AN: int32, 
        homozygote_count: array<int32>
    }
    'allele_frequencies': array<float64>
----------------------------------------
Entry fields:
    'GT': call
    'GQ': int32
    'DP': int32
    'MIN_DP': int32
    'AD': array<int32>
    'VAF': array<float64>
    'PL': array<int32>
    'MED_DP': int32
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------


In [None]:
data_af.count()

In [90]:
def make_af_table(data_af):
    filtered_mt = data_af.select_entries('DP', 'AD') \
                   .select_rows('rsid', 'allele_frequencies')
    return filtered_mt

In [91]:
%%time

af = make_af_table(data_af)

# 16 образцов
# CPU times: user 16.1 ms, sys: 142 μs, total: 16.3 ms
# Wall time: 15.9 ms

# 996 образцов
#CPU times: user 11.8 ms, sys: 0 ns, total: 11.8 ms
# Wall time: 11.3 ms

CPU times: user 11.8 ms, sys: 0 ns, total: 11.8 ms
Wall time: 11.3 ms


In [None]:
%%time
af.write(AF_PATH, overwrite=True)

In [93]:
%%time

# Выбираем нужные поля
def save_tsv(data_af):
    filtered_mt = data_af.select_entries('DP', 'AD') \
                   .select_rows('rsid', 'allele_frequencies')
    
    # 1. Создаем таблицу с плоской структурой
    table_to_export = filtered_mt.entries()
    
    # 2. Распаковываем вложенные поля (если нужно)
    table_to_export = table_to_export.annotate(
        AD_ref=table_to_export.AD[0],
        AD_alt=table_to_export.AD[1],
        AF=table_to_export.allele_frequencies.get('AF')
    )
    
    # 3. Выбираем только нужные колонки для экспорта
    table_to_export = table_to_export.select(
        'rsid',
        'locus',
        'alleles',
        'DP',
        'AD_ref',
        'AD_alt',
        'AF'
    )
    
    # 4. Экспортируем в TSV
    table_to_export.export(AF_PATH)
    
save_tsv(data_af)

AttributeError: 'ArrayNumericExpression' object has no attribute 'get'

In [44]:
af.describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
    'is_female': bool
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'rsid': str
    'allele_frequencies': array<float64>
----------------------------------------
Entry fields:
    'DP': int32
    'AD': array<int32>
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------


In [45]:
af.count()



(10918244, 6)

In [None]:
final_af = combined_mt.filter_rows(
    (combined_mt.locus.contig == "22") & 
    (combined_mt.locus.position == 50808270)
)

final_af.rows().show()

In [40]:
hl.stop()

Exception in thread "RemoteBlock-temp-file-clean-thread" java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.storage.BlockManager$RemoteBlockDownloadFileManager$$Lambda$985/0x0000000100489040.get$Lambda(Unknown Source)
	at java.base/java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(DirectMethodHandle$Holder)
	at java.base/java.lang.invoke.Invokers$Holder.linkToTargetMethod(Invokers$Holder)
	at org.apache.spark.storage.BlockManager$RemoteBlockDownloadFileManager.org$apache$spark$storage$BlockManager$RemoteBlockDownloadFileManager$$keepCleaning(BlockManager.scala:2228)
	at org.apache.spark.storage.BlockManager$RemoteBlockDownloadFileManager$$anon$2.run(BlockManager.scala:2194)


In [22]:
def check_ploidy(mt, contig):
    contig_mt = mt.filter_rows(mt.locus.contig == contig)
    return contig_mt.aggregate_entries(
        hl.struct(
            hom_ref=hl.agg.count_where(contig_mt.GT.is_hom_ref()),
            hom_alt=hl.agg.count_where(contig_mt.GT.is_hom_var()),
            het=hl.agg.count_where(contig_mt.GT.is_het()),
            missing=hl.agg.count_where(hl.is_missing(contig_mt.GT))
    ))

# Проверка для MT, X, Y и аутосомы
contigs_to_check = ["chrM", "chrX", "chrY", "chr1"]
results = {contig: check_ploidy(data_af, contig) for contig in contigs_to_check}



In [23]:
print(results)

{'chrM': Struct(hom_ref=3, hom_alt=137, het=0, missing=340), 'chrX': Struct(hom_ref=86928, hom_alt=505736, het=89489, missing=1012582), 'chrY': Struct(hom_ref=93869, hom_alt=24713, het=1447, missing=259546), 'chr1': Struct(hom_ref=150792, hom_alt=761510, het=1051723, missing=2406045)}


In [27]:
data_af.filter_rows(data_af.locus.contig == "chrM").GT.show(10)

[Stage 43:>                                                         (0 + 1) / 1]

Unnamed: 0_level_0,Unnamed: 1_level_0,'000007000020','000007000040','000007000070','000007000030'
locus,alleles,GT,GT,GT,GT
locus<GRCh38>,array<str>,call,call,call,call
chrM:150,"[""C"",""T""]",,,,
chrM:152,"[""T"",""C""]",,,,
chrM:195,"[""T"",""C""]",,,,
chrM:263,"[""A"",""G""]",1.0,1.0,1.0,1.0
chrM:285,"[""C"",""T""]",,1.0,,
chrM:295,"[""C"",""T""]",,,,1.0
chrM:309,"[""C"",""CCCT"",""CCT""]",1.0,,,
chrM:309,"[""C"",""CCT""]",,1.0,1.0,1.0
chrM:310,"[""T"",""C""]",1.0,1.0,1.0,1.0
chrM:456,"[""C"",""T""]",1.0,,,


In [26]:
data_af.describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
    'is_female': bool
----------------------------------------
Row fields:
    'locus': locus<GRCh38>
    'alleles': array<str>
    'rsid': str
    'qual': float64
    'filters': set<str>
    'info': struct {
        END: int32
    }
    'call_stats': struct {
        AC: array<int32>, 
        AF: array<float64>, 
        AN: int32, 
        homozygote_count: array<int32>
    }
    'allele_frequencies': array<float64>
----------------------------------------
Entry fields:
    'GT': call
    'GQ': int32
    'DP': int32
    'MIN_DP': int32
    'AD': array<int32>
    'VAF': array<float64>
    'PL': array<int32>
    'MED_DP': int32
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------


In [28]:
data_af.filter_rows(data_af.locus.contig == "chrY").GT.show(10)

[Stage 50:>                                                         (0 + 1) / 1]

Unnamed: 0_level_0,Unnamed: 1_level_0,'000007000020','000007000040','000007000070','000007000030'
locus,alleles,GT,GT,GT,GT
locus<GRCh38>,array<str>,call,call,call,call
chrY:2781761,"[""CA"",""C""]",,1.0,1.0,1.0
chrY:2783604,"[""G"",""C""]",,,,
chrY:2789135,"[""C"",""T""]",,,,1.0
chrY:2790748,"[""T"",""TA""]",,1.0,,
chrY:2791980,"[""T"",""G""]",,0.0,,
chrY:2793653,"[""A"",""G""]",,1.0,1.0,1.0
chrY:2795902,"[""C"",""T""]",,1.0,,
chrY:2796208,"[""C"",""T""]",,,,
chrY:2797100,"[""C"",""CA""]",,,,
chrY:2797491,"[""A"",""C""]",,,,


In [None]:
final_af = data_af.filter_rows(
    (data_af.locus.contig == "chrM") & 
    (data_af.locus.position == 16278)
)

data_af.rows().show()