In [1]:
import gzip
from os import path
import re

from Bio import SeqIO
import gffpandas.gffpandas as gffpd
import numpy as np
import pandas as pd
import roman
from tqdm import tqdm

tqdm.pandas()

root_path = path.abspath(path.join(path.abspath(""), "../.."))

In [2]:
def format_chrom(x):
    if '_' not in x :
        x = x.replace('CHR', 'CHR_')
        return x
    return x


def format_roman_chrom(x):
    if 'MT' not in x :
        rom_nb = x.split('_')[1]
        nb = roman.fromRoman(rom_nb)
        x = 'CHR_'+ str(nb)
        return x
    return x


def format_numeric_chrom(x):
    if 'MT' not in x :
        num_nb = x.replace('CHR','')
        nb = roman.toRoman(int(num_nb))
        x = 'chr'+ str(nb)
        return x
    return 'chrXVII'

In [3]:
genome_annot_rossi_folder = f'{root_path}/data/genome_annotations/rossi_et_al_2021'
PROMOTER_TABLE = pd.read_csv(f'{genome_annot_rossi_folder}/nfr_ndr_feature_positions.csv').rename(columns={"Chromosome_Name":"seq_id", "systematic_id":"locus_id", "common_name":"name", "NFR/NDR_Start":"start", "NFR/NDR_End":"end", "nucleosome_stability":"description"})
PROMOTER_TABLE['seq_id'] = PROMOTER_TABLE['seq_id'].apply(format_numeric_chrom)
PROMOTER_TABLE['source'] = 'rossi_2021_promoters'
PROMOTER_TABLE['type'] = 'promoter'
PROMOTER_TABLE['score'] = 1
PROMOTER_TABLE['phase'] = '.'
PROMOTER_TABLE['name'] = PROMOTER_TABLE['name'].replace('IMP2\'','IMP21')
PROMOTER_TABLE['attributes'] = [f"Name={PROMOTER_TABLE['name'][idx]};Desc={PROMOTER_TABLE['description'][idx]}" for idx in PROMOTER_TABLE.index ]
prom_plus = PROMOTER_TABLE[PROMOTER_TABLE['strand'] == '+'].reset_index(drop=True)
prom_minus = PROMOTER_TABLE[PROMOTER_TABLE['strand'] == '-'].rename(columns={"start":"end", "end":"start"}).reset_index(drop=True)

PROMOTER_TABLE = pd.concat([prom_plus, prom_minus]).reset_index(drop=True)
PROMOTER_TABLE[['seq_id', 'source', 'type', 'start', 'end', 'score', 'strand', 'phase', 'attributes', 'locus_id']].to_csv(f'{genome_annot_rossi_folder}/Rossi_2021_Promoters_V64.csv', index=False)

#### Format SGD information table

In [4]:
genome_annot_sgd_folder = f'{root_path}/data/genome_annotations/sgd_database/original'
genome_annot_rossi_folder = f'{root_path}/data/genome_annotations/rossi_et_al_2021'

#### OTHER FEATURES
others_annotations = []
fasta_gz = f'{genome_annot_sgd_folder}/other_features_genomic_R64-3-1_20210421.fasta.gz'
with gzip.open(fasta_gz, "rt") as handle:
    for record in SeqIO.parse(handle, "fasta"):
        ID, location, genome_version, others_type = record.description.split(', ')[:4]
        others_id = ID.split(' ')[0]
        others_name = ID.split(' ')[1]
        sgd_id = ID.split(' ')[2]
        res = re.findall(r'(Chr [A-Za-z]+) from ([0-9]+)-([0-9]+)', str(location))[0]
        Chr = res[0].upper().replace(' ','_').replace('MITO', 'MT')
        description = ' ; '.join(record.description.split(', ')[4:])
        if 'reverse complement' == others_type:
            Start = res[2]
            End = res[1]
            cds_type = description.split(' ; ')[0]
            description = ' ; '.join(description.split(' ; ')[1:])
        else :
            Start = res[1]
            End = res[2]

        others_annotations.append([others_id, others_name, sgd_id, others_type, Chr, Start, End, description, genome_version])

OTHERS_TABLE = pd.DataFrame(others_annotations, columns=['locus_id', 'name', 'sgd_id', 'genome_annotations', 'chrom', 'start', 'end', 'description', 'genome_version'])
OTHERS_TABLE['chrom'] = OTHERS_TABLE['chrom'].apply(format_roman_chrom)
OTHERS_TABLE.to_csv(f'{root_path}/data/genome_annotations/sgd_database/other_features_genomic_R64-3-1.csv', index=False)
OTHERS_dict = OTHERS_TABLE.set_index('locus_id').to_dict()


### RNA
rna_annotations = []
fasta_gz = f'{genome_annot_sgd_folder}/rna_coding_R64-3-1_20210421.fasta.gz'
with gzip.open(fasta_gz, "rt") as handle:
    for record in SeqIO.parse(handle, "fasta"):
        ID, location, genome_version, rna_type = record.description.split(', ')[:4]
        rna_id = ID.split(' ')[0]
        rna_name = ID.split(' ')[1]
        sgd_id = ID.split(' ')[2]
        res = re.findall(r'(Chr [A-Za-z]+) from ([0-9]+)-([0-9]+)', str(location))[0]
        Chr = res[0].upper().replace(' ','_').replace('MITO', 'MT')
        description = (' ; '.join(record.description.split(', ')[4:]))
        if 'reverse complement' == rna_type:
            Start = res[2]
            End = res[1]
            try :
                rna_type = re.findall(r'([A-Za-z0-9]+\-)\)',str(description.split(' ; ')[0]))[0]
            except :
                rna_type = str(description.split(' ; ')[0])
        else :
            Start = res[1]
            End = res[2]

        rna_annotations.append([rna_id, rna_name, sgd_id, rna_type, Chr, Start, End, description, genome_version])

RNA_TABLE = pd.DataFrame(rna_annotations, columns=['locus_id', 'name', 'sgd_id', 'genome_annotations', 'chrom', 'start', 'end', 'description', 'genome_version'])
RNA_TABLE['chrom'] = RNA_TABLE['chrom'].apply(format_roman_chrom)
RNA_TABLE.to_csv(f'{root_path}/data/genome_annotations/sgd_database/rna_coding_R64-3-1.csv', index=False)
RNA_dict = RNA_TABLE.set_index('locus_id').to_dict()

#### INTERGENIC

intergenic_annotations = []
fasta_gz = f'{genome_annot_sgd_folder}/NotFeature_R64-3-1_20210421.fasta.gz'
with gzip.open(fasta_gz, "rt") as handle:
    for record in SeqIO.parse(handle, "fasta"):
        ID, location, genome_version, description = record.description.split(', ')
        intergenic_id = ID
        res = re.findall(r'(Chr [A-Za-z]+) from ([0-9]+)-([0-9]+)', str(location))[0]
        Chr = res[0].upper().replace(' ','_').replace('MITO', 'MT')
        Start = res[1]
        End = res[2]

        intergenic_annotations.append([intergenic_id, Chr, Start, End, description, genome_version])

INTERGENIC_TABLE = pd.DataFrame(intergenic_annotations, columns=['locus_id', 'chrom', 'start', 'end', 'description', 'genome_version'])
INTERGENIC_TABLE['chrom'] = INTERGENIC_TABLE['chrom'].apply(format_roman_chrom)
INTERGENIC_TABLE.to_csv(f'{root_path}/data/genome_annotations/sgd_database/notFeature_R64-3-1.csv', index=False)
INTERGENIC_dict = INTERGENIC_TABLE.set_index('locus_id').to_dict()

#### CDS
cds_annotations = []
fasta_gz = f'{genome_annot_sgd_folder}/orf_coding_all_R64-3-1_20210421.fasta.gz'
with gzip.open(fasta_gz, "rt") as handle:
    for record in SeqIO.parse(handle, "fasta"):
        ID, location, genome_version, cds_type = record.description.split(', ')[:4]
        cds_id = ID.split(' ')[0]
        cds_name = ID.split(' ')[1]
        sgd_id = ID.split(' ')[2]
        res = re.findall(r'(Chr [A-Za-z]+) from ([0-9]+)-([0-9]+)', str(location))[0]
        Chr = res[0].upper().replace(' ','_').replace('MITO', 'M')
        description = ' ; '.join(record.description.split(', ')[4:])
        if 'reverse complement' == cds_type:
            Start = res[2]
            End = res[1]
            cds_type = description.split(' ; ')[0]
            description = ' ; '.join(description.split(' ; ')[1:])
        else :
            Start = res[1]
            End = res[2]

        cds_annotations.append([cds_id, cds_name, sgd_id, cds_type, Chr, Start, End, description, genome_version])

CDS_TABLE = pd.DataFrame(cds_annotations, columns=['locus_id', 'name', 'sgd_id', 'genome_annotations', 'chrom', 'start', 'end', 'description', 'genome_version'])
CDS_TABLE['chrom'] = CDS_TABLE['chrom'].apply(format_roman_chrom)
CDS_TABLE.to_csv(f'{root_path}/data/genome_annotations/sgd_database/orf_coding_R64-3-1.csv', index=False)
CDS_dict = CDS_TABLE.set_index('locus_id').to_dict()


#### 1KB-AWAY

gene_annotations = []
fasta_gz = f'{genome_annot_sgd_folder}/orf_genomic_1000_all.fasta.gz'
with gzip.open(fasta_gz, "rt") as handle:
    for record in SeqIO.parse(handle, "fasta"):
        ID, location, genome_version, cds_type = record.description.split(', ')[:4]
        cds_id = ID.split(' ')[0]
        cds_name = ID.split(' ')[1]
        sgd_id = ID.split(' ')[2]
        res = re.findall(r'(Chr [A-Za-z]+) from ([0-9]+)-([0-9]+)', str(location))[0]
        Chr = res[0].upper().replace(' ','_').replace('MITO', 'MT')
        description = ' ; '.join(record.description.split(', ')[4:])
        if 'reverse complement' == cds_type:
            Start = res[2]
            End = res[1]
            cds_type = description.split(' ; ')[0]
            description = ' ; '.join(description.split(' ; ')[1:])
        else :
            Start = res[1]
            End = res[2]

        gene_annotations.append([cds_id, cds_name, sgd_id, cds_type, Chr, Start, End, description, genome_version])

GENE_TABLE = pd.DataFrame(gene_annotations, columns=['locus_id', 'name', 'sgd_id', 'genome_annotations', 'chrom', 'start', 'end', 'description', 'genome_version'])
GENE_TABLE['chrom'] = GENE_TABLE['chrom'].apply(format_roman_chrom)
GENE_TABLE.to_csv(f'{root_path}/data/genome_annotations/sgd_database/orf_genomic_1000_R64-3-1.csv', index=False)
GENE_dict = GENE_TABLE.set_index('locus_id').to_dict()

#### PROMOTERS
PROMOTER_TABLE = pd.read_csv(f'{genome_annot_rossi_folder}/nfr_ndr_feature_positions.csv').rename(columns={"Chromosome_Name":"chrom", "systematic_id":"locus_id", "common_name":"name", "NFR/NDR_Start":"start", "NFR/NDR_End":"end", "nucleosome_stability":"description"})
PROMOTER_TABLE['chrom'] = PROMOTER_TABLE['chrom'].apply(format_chrom)
PROMOTER_TABLE['start'] = PROMOTER_TABLE['start'].astype(int)
PROMOTER_TABLE['end'] = PROMOTER_TABLE['end'].astype(int)
PROMOTER_dict = PROMOTER_TABLE[['locus_id', 'name', 'chrom','strand','start', 'end', 'description']].set_index('locus_id').to_dict()


#### UNSTABLE TRANSCRIPTS 2009
CUTs_2009_annotation = gffpd.read_gff3(f'{root_path}/data/genome_annotations/xu_2009/Xu_2009_CUTs_V64.gff3')
CUTs_2009_annotation = CUTs_2009_annotation.df.rename(columns={'seq_id':'chrom'})
CUTs_2009_annotation['chrom'] = [ CUTs_2009_annotation['chrom'][idx].replace("chr", 'CHR_') for idx in CUTs_2009_annotation.index ]
CUTs_2009_annotation['locus_id'] = [ CUTs_2009_annotation['attributes'][idx].split(';')[1].replace("Name=", '') for idx in CUTs_2009_annotation.index ]
CUTs_2009_annotation['description'] = [ 'CUT' for idx in CUTs_2009_annotation.index ]
CUTs_2009_annotation['chrom'] = CUTs_2009_annotation['chrom'].apply(format_roman_chrom)
CUT_dict = CUTs_2009_annotation[['locus_id', 'chrom','strand','start', 'end', 'description']].set_index('locus_id').to_dict()


SUTs_2009_annotation = gffpd.read_gff3(f'{root_path}/data/genome_annotations/xu_2009/Xu_2009_SUTs_V64.gff3')
SUTs_2009_annotation = SUTs_2009_annotation.df.rename(columns={'seq_id':'chrom'})
SUTs_2009_annotation['chrom'] = [ SUTs_2009_annotation['chrom'][idx].replace("chr", 'CHR_') for idx in SUTs_2009_annotation.index ]
SUTs_2009_annotation['locus_id'] = [ SUTs_2009_annotation['attributes'][idx].split(';')[1].replace("Name=", '') for idx in SUTs_2009_annotation.index ]
SUTs_2009_annotation['description'] = [ 'SUT' for idx in SUTs_2009_annotation.index ]
SUTs_2009_annotation['chrom'] = SUTs_2009_annotation['chrom'].apply(format_roman_chrom)
SUT_dict = SUTs_2009_annotation[['locus_id', 'chrom','strand','start', 'end', 'description']].set_index('locus_id').to_dict()

#### UNSTABLE TRANSCRIPTS 2011
XUTs_2011_annotation = gffpd.read_gff3(f'{root_path}/data/genome_annotations/van_dikj_2011/van_Dijk_2011_XUTs_V64.gff3')
XUTs_2011_annotation = XUTs_2011_annotation.df.rename(columns={'seq_id':'chrom'})
XUTs_2011_annotation['chrom'] = [ XUTs_2011_annotation['chrom'][idx].replace("chr", 'CHR_') for idx in XUTs_2011_annotation.index ]
XUTs_2011_annotation['locus_id'] = [ XUTs_2011_annotation['attributes'][idx].split(';')[0].replace("ID=", 'XUT_') for idx in XUTs_2011_annotation.index ]
XUTs_2011_annotation['description'] = [ XUTs_2011_annotation['attributes'][idx].split(';')[-1].replace("desc=", '') for idx in XUTs_2011_annotation.index ]
XUTs_2011_annotation['chrom'] = XUTs_2011_annotation['chrom'].apply(format_roman_chrom)
XUT_dict = XUTs_2011_annotation[['locus_id', 'chrom','strand','start', 'end', 'description']].set_index('locus_id').to_dict()

FileNotFoundError: [Errno 2] No such file or directory: '/tmp/work/data/genome_annotations/sgd_database/original/other_features_genomic_R64-3-1_20210421.fasta.gz'

_____

In [None]:
def check_position(annotation_type, annotation_dict, candidate, snp_pos):
    if 'Promoter' in annotation_type : 
        cdd_strand = annotation_dict['strand'][candidate]
        if cdd_strand == '+':
            cdd_start = annotation_dict['start'][candidate]
            cdd_end = annotation_dict['end'][candidate]
        elif cdd_strand == '-':
            cdd_start = annotation_dict['end'][candidate]
            cdd_end = annotation_dict['start'][candidate]
    elif 'Unstable Transcript' in annotation_type : 
        cdd_strand = annotation_dict['strand'][candidate]
        if cdd_strand == '+':
            cdd_start = annotation_dict['start'][candidate]
            cdd_end = annotation_dict['end'][candidate]
        elif cdd_strand == '-':
            cdd_start = annotation_dict['end'][candidate]
            cdd_end = annotation_dict['start'][candidate]
    else :
        cdd_start = annotation_dict['start'][candidate]
        cdd_end = annotation_dict['end'][candidate]
        if '1kb-away' in annotation_type :
            distance_start = np.abs(int(snp_pos) - int(cdd_start))
            distance_end = np.abs(int(snp_pos) - int(cdd_end))
            if min([distance_start,distance_end]) == distance_start:
                annotation_type = 'Close to 5\'-UTR'
            if min([distance_start,distance_end]) == distance_end:
                annotation_type = 'Close to 3\'-UTR'

    if (int(snp_pos) >= int(cdd_start)) & (int(snp_pos) <= int(cdd_end)):
        return True, annotation_type 
    return False, annotation_type


def isAnnotated(snp_id, args):
    snps_dict = args[0]
    annotation_dict = args[1]
    snps_class = args[2]

    snp_chrom = snps_dict['chrom'][snp_id]
    snp_pos = snps_dict['position'][snp_id]
    
    candidates = []
    for key, val in annotation_dict['chrom'].items():
        if snp_chrom == val : 
            candidates.append(key)
    for cdt in candidates :
        RES, snps_class = check_position(snps_class, annotation_dict, cdt, snp_pos)
        if RES == True :
            try: 
                return (int(snp_id), cdt, annotation_dict["name"][cdt], annotation_dict["sgd_id"][cdt], annotation_dict["description"][cdt], snps_class, annotation_dict["genome_annotations"][cdt])
            except: 
                try :
                    return (int(snp_id), cdt, np.nan, np.nan,  annotation_dict["description"][cdt], snps_class, snps_class)
                except:
                    return (int(snp_id), cdt, np.nan, np.nan,  np.nan, snps_class, snps_class)
    return (np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan)

In [None]:
all_snps = pd.read_csv(f'{root_path}/data/genotype_information/yeast_snps_loc_dec2022.txt') #### need to be generated by 00a_generate_genotype_matrix.py
all_snps['position'] = all_snps['position'].replace(0,1)
all_snps['chrom'] = all_snps['chrom'].apply(format_chrom)
all_snps_dict = all_snps.set_index('snp_id').to_dict()

In [None]:
annotations = []
for snp_id in tqdm(all_snps_dict['chrom'].keys()):
    annotations.append(isAnnotated(snp_id, (all_snps_dict, INTERGENIC_dict, 'Intergenic region')))
    annotations.append(isAnnotated(snp_id, (all_snps_dict, GENE_dict, '1kb-away')))
    annotations.append(isAnnotated(snp_id, (all_snps_dict, PROMOTER_dict, 'Promoter')))
    annotations.append(isAnnotated(snp_id, (all_snps_dict, CDS_dict, 'ORF')))
    annotations.append(isAnnotated(snp_id, (all_snps_dict, RNA_dict, 'Non-coding RNA')))
    annotations.append(isAnnotated(snp_id, (all_snps_dict, CUT_dict, 'CUT')))
    annotations.append(isAnnotated(snp_id, (all_snps_dict, SUT_dict, 'SUT')))
    annotations.append(isAnnotated(snp_id, (all_snps_dict, XUT_dict, 'XUT')))
    annotations.append(isAnnotated(snp_id, (all_snps_dict, OTHERS_dict, 'Other features')))

In [None]:
ALL_ANNOTATIONS = pd.DataFrame(annotations, columns=['snp_id', 'locus_id', 'name', 'sgd_id', 'description','snps_class_up', 'genome_annotations']).dropna(how='all').reset_index(drop=True) # .drop_duplicates(subset=['snp_id'], keep='last')

In [None]:
ALL_ANNOTATIONS

In [None]:
snp_annotations = [] 
for snp_id in np.unique(ALL_ANNOTATIONS['snp_id']):
    TMP = ALL_ANNOTATIONS[ALL_ANNOTATIONS['snp_id'] == snp_id]
    snps_annots = []
    for snps_class in TMP['snps_class_up'] :
        if ('ORF' in snps_class) or ('CUT' in snps_class) or ('SUT' in snps_class) or ('XUT' in snps_class):
            snps_annots.append(snps_class)
    snps_annots = '_'.join(snps_annots)
    snp_annotations.append([int(snp_id), snps_annots])

In [None]:
annotation_stats = pd.DataFrame(snp_annotations, columns=['snp_id', 'snps_annotations']) 

In [None]:
np.unique(annotation_stats['snps_annotations'])

In [None]:
annotation_stats[annotation_stats['snps_annotations'] != ''].to_csv(f'{root_path}/data/genotype_information/snps_annotations_genome-version-3-64-1_UTs.txt', index=False)

In [None]:
def get_gene_name(x):
    try : 
        return GENE_TABLE[GENE_TABLE['locus_id'] == x ]['name'].values[0]
    except : 
        return ALL_ANNOTATIONS[ALL_ANNOTATIONS['locus_id'] == x ]['name'].values[0]

In [None]:
ALL_ANNOTATIONS['name'] = ALL_ANNOTATIONS['locus_id'].apply(get_gene_name)

In [None]:
ALL_ANNOTATIONS

In [None]:
ALL_ANNOTATIONS.groupby('snps_class_up').count()

In [None]:
def get_snps_class_down(x):
    locus_id = ALL_ANNOTATIONS[ALL_ANNOTATIONS['snp_id'] == x]['locus_id'].values[0]
    snps_class = ALL_ANNOTATIONS[ALL_ANNOTATIONS['snp_id'] == x]['snps_class_up'].values[0]
    genome_annotations = ALL_ANNOTATIONS[ALL_ANNOTATIONS['snp_id'] == x]['genome_annotations'].values[0]
    if snps_class == 'ORF' :
        if '_' in genome_annotations :
            genome_annotations = genome_annotations.replace('_',' ')
        return genome_annotations
    if snps_class == 'Intergenic region':
        return 'Intergenic region'
    if snps_class == 'Close to 5\'-UTR':
        return 'Close to 5\'-UTR'
    if snps_class == 'Close to 3\'-UTR':
        return 'Close to 3\'-UTR'
    if snps_class == 'Promoter':
        return 'Promoter'
    if snps_class == 'Non-coding RNA':
        if '_' in genome_annotations :
            genome_annotations = genome_annotations.replace('_',' ')
        return genome_annotations
    if 'UT' in locus_id : 
        try : 
            x = re.findall(r'^([CSXUT]{3})', locus_id)[0]
            return x 
        except: 
            return locus_id 
    if 'ARS' in locus_id : 
        return 'Replication' 
    if 'CEN' in locus_id : 
        return "Centromere"
    if 'TEL' in locus_id : 
        return "Telomere"
    if 'NTS1-2' in locus_id :
        return 'Mating-related region'
    if 'RE' in locus_id :
        return 'Recombination enhancer'
    if 'HM' in locus_id :
        return "Mating-related region"
    if 'MATALPHA' in locus_id :
        return "Mating-related region"
    else : 
        try : 
            res = re.findall(r'(^[A-Z]+[a-z]+)', locus_id)
            if len(res) == 1 :
                if genome_annotations == "LTR_retrotransposon":
                    return "LTR retrotransposon"
                else : 
                    return "LTR"
        except :
            return x 
    return np.nan

In [None]:
ALL_ANNOTATIONS["snps_class_down"] = ALL_ANNOTATIONS['snp_id'].progress_apply(get_snps_class_down)

In [None]:
ALL_ANNOTATIONS.groupby("snps_class_up").count()

In [None]:
ALL_ANNOTATIONS[(ALL_ANNOTATIONS['snps_class_up'] == 'CDS') & (ALL_ANNOTATIONS['snps_class_down'] != 'CDS')]

In [None]:
ALL_ANNOTATIONS.groupby("snps_class_down").count()

In [None]:
all_snps.merge(ALL_ANNOTATIONS, on='snp_id', how='outer').to_csv(f'{root_path}/data/genotype_information/snps_annotations_genome-version-3-64-1.txt', index=False)