In [2]:
import csv
import gzip

In [3]:
headers = ['Chr', 'Position', 'rsID', 'Ref', 'Alt', 'Consequence',
           'Gene_symbol', 'LoF_flag', 'LoF_filter',
           'AC', 'AC_afr', 'AC_amr', 'AC_nfe', 'AC_asj', 'AC_sas', 'AC_eas', 'AC_mid', 'AC_fin',
           'AN', 'AN_afr', 'AN_amr', 'AN_nfe', 'AN_asj', 'AN_sas', 'AN_eas', 'AN_mid', 'AN_fin',
           'AF', 'AF_afr', 'AF_amr', 'AF_nfe', 'AF_asj', 'AF_sas', 'AF_eas', 'AF_mid', 'AF_fin']

In [4]:
vcf_columns = ['CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO']
vcf_field_mapping = dict(zip(vcf_columns, range(len(vcf_columns))))

In [5]:
vep_field_names = 'Allele|Consequence|IMPACT|SYMBOL|Gene|Feature_type|Feature|BIOTYPE|EXON|INTRON|HGVSc|HGVSp|cDNA_position|CDS_position|Protein_position|Amino_acids|Codons|ALLELE_NUM|DISTANCE|STRAND|FLAGS|VARIANT_CLASS|SYMBOL_SOURCE|HGNC_ID|CANONICAL|MANE_SELECT|MANE_PLUS_CLINICAL|TSL|APPRIS|CCDS|ENSP|UNIPROT_ISOFORM|SOURCE|DOMAINS|miRNA|HGVS_OFFSET|PUBMED|MOTIF_NAME|MOTIF_POS|HIGH_INF_POS|MOTIF_SCORE_CHANGE|TRANSCRIPTION_FACTORS|LoF|LoF_filter|LoF_flags|LoF_info'.split('|')

In [15]:
population_data = {
    'AC': None, 'AC_afr': None, 'AC_amr': None, 'AC_nfe': None, 'AC_asj': None, 'AC_sas': None, 'AC_eas': None, 'AC_mid': None, 'AC_fin': None,
    'AN': None, 'AN_afr': None, 'AN_amr': None, 'AN_nfe': None, 'AN_asj': None, 'AN_sas': None, 'AN_eas': None, 'AN_mid': None, 'AN_fin': None,
    'AF': None, 'AF_afr': None, 'AF_amr': None, 'AF_nfe': None, 'AF_asj': None, 'AF_sas': None, 'AF_eas': None, 'AF_mid': None, 'AF_fin': None
}

In [11]:
# Функция для извлечения информации из нужных полей vep

def get_unique_vep_info(line, target_field):
    info_index = vcf_field_mapping['INFO']
    column_with_info = line[info_index].split(';')  # делим все колонки vep по ;
    vep_info = column_with_info[-1].split('|')

    all_values = []
    unique_values = set()

    start_index = vep_field_names.index(target_field)
    step = 47  # т.к. всего 47 полей в поле vep

    for i in range(start_index, len(vep_info), step):
        value = vep_info[i]
        if value:  # проверяем, что значение не пустое
            all_values.append(value)
            unique_values.add(value)

    return list(unique_values)

In [12]:
# Функция для получения любой информации для каждого транскрипта

def get_transcript_info(line, target_field):
    info_index = vcf_field_mapping['INFO']
    column_with_info = line[info_index].split(';')  # делим все колонки vep по ;
    vep_info = column_with_info[-1].split('|')
    transcript_info_pairs = []
    
    transcript_field_index = 6
    target_field_index = vep_field_names.index(target_field)
    
    step = 47  # т.к. всего 46 полей в vep
    
    for i in range(transcript_field_index, len(vep_info), step):
        transcript = vep_info[i]
        info = vep_info[target_field_index]
        
        if info:  # оставляем только непустые значения
            transcript_info_pairs.append(f'{transcript}: {info}')
            
        target_field_index += step  # увеличиваем значение для следующей итерации
        
    return transcript_info_pairs

In [13]:
# Функция для парсинга вцф-файла

def get_parse_vcf(vcf_file, output_file = ''):
    
    # Если имя выходного файла не задано
    if output_file == '':
        output_file = vcf_file.replace('.vcf.bgz', '.tsv')
    # Если не указано расширение файла (указано неверно)
    else:
        if not output_file.endswith('.tsv'):
            output_file += '.tsv'
    
    # Запись шапки таблицы в выходной файл
    with open(output_file, 'w', newline='', encoding='utf-8') as table_file:
        writer = csv.writer(table_file, delimiter='\t')
        writer.writerow(headers)

    # Открываем на чтение файл, который надо распарсить
    with gzip.open(vcf_file, 'rt') as input_file, open(output_file, 'a', newline='', encoding='utf-8') as output_file:
        vcf_reader = csv.reader(input_file, delimiter='\t')
        tsv_writer = csv.writer(output_file, delimiter='\t')

        for line in vcf_reader:
            if line[vcf_field_mapping['CHROM']].startswith('chr'):
                if line[vcf_field_mapping['FILTER']] == 'PASS':
                    
                    # Общая информация
                    chrom = line[vcf_field_mapping['CHROM']][3:]
                    position = line[vcf_field_mapping['POS']]
                    rsID = line[vcf_field_mapping['ID']]
                    Ref = line[vcf_field_mapping['REF']]
                    Alt = line[vcf_field_mapping['ALT']]
                    
                    # Получаем пары транскрипт: характеристика
                    consequence = get_transcript_info(line, 'Consequence')
                    lof_filters = get_transcript_info(line, 'LoF_filter')
                    lof_flags = get_transcript_info(line, 'LoF_flags')
                    
                    # Получаем уникальные значения нужного поля
                    unique_genes = get_unique_vep_info(line, 'SYMBOL')
                    
                    # Популяционные данные
                    info_index = vcf_field_mapping['INFO']
                    column_with_info = line[info_index].split(';')
                    for element in column_with_info:
                        # содержит ли элемент (поле) информацию о популяционных частотах, указанных в словаре
                        for category in population_data:
                            if element.startswith(f'{category}='):
                                frequency_value = element.split('=')
                                population_data[category] = frequency_value[-1]  # забираем только численное значение                
                    
                    # Записываем новые данные в таблицу
                    filtered_data = [chrom, position, rsID, Ref, Alt, consequence] + [', '.join(unique_genes)] + [', '.join(lof_flags)] + [', '.join(lof_filters)] + list(population_data.values()) 
                    tsv_writer.writerow(filtered_data)


In [14]:
# Указываем input файл (или путь к нему), а также имя output (необязательно, задаётся по умолчанию по названию вцф)

# get_parse_vcf('example.vcf.bgz')