In [1]:
import click
import pandas as pd
from pathlib import Path
import gffutils
import csv
from collections import namedtuple

In [2]:
def read_gff_db(db_file):
    """Read annotation sqlite database and return FeatureDB instance"""
    click.echo(f"Reading {db_file} database...")
    return gffutils.FeatureDB(db_file)

def match_mRNAs_and_CDSs(mRNA_list, CDS_list):
    """Join together into a tuple all CDS objects in list and their parent mRNA"""
    matched = []
    for mRNA in mRNA_list:
        # Start with an empty list of CDSs for each mRNA
        all_CDSs_of_mRNA = []
        for cds in CDS_list:
            if(mRNA.id == cds.attributes.get("Parent")[0]):
                all_CDSs_of_mRNA.append(cds)
        matched.append((mRNA, all_CDSs_of_mRNA))
    return matched

def collect_all_mRNAs_and_CDSs(db, gene):
    """
    Gather all mRNA and CDS features of a gene into lists and return 
    them as a tuple
    """
    mRNAs = []
    CDSs = []
    for feature in db.children(gene):
        if(feature.featuretype == "mRNA"):
            mRNAs.append(feature)
            continue
        if(feature.featuretype == "CDS"):
            CDSs.append(feature)
            continue
    return (mRNAs, CDSs)

def remove_dublicates(list_of_products):
    """Remove dublicate gene product names from a list"""
    return list(set(list_of_products))

def get_products_from_cds_list(cds_list):
    """Get a list of unique gene product strings from a list of CDS objects"""
    all_products = []
    found_products = False
    for cds in cds_list:
        # If there is a product it is returned inside of a list
        product = cds.attributes.get("Product")
        if product:
            all_products.append(product[0])
            found_products = True
    if found_products:
        return remove_dublicates(all_products)
    else:
        return list()
        
def get_IDs_from_cds_list(cds_list):
    """Get a list of CDS IDs from a list of CDS objects"""
    return [cds.id for cds in cds_list]

def create_gene_annotations(db):
    """Create gene annotions dictionary with all essential info in memory"""
    click.echo("Creation of annotions dictionary started...")
    gene_annotations = {}
    for gene in db.features_of_type('gene'):
        # Gather the gene's all mRNAs and CDSs into own lists
        mRNAs, CDSs = collect_all_mRNAs_and_CDSs(db, gene)
        # Iterate over CDSs and their parent mRNA and store all relevant info into
        # gene_annotation dictionary
        for tx_cds in match_mRNAs_and_CDSs(mRNAs, CDSs):        
            mRNA = tx_cds[0]
            all_CDSs_of_mRNA = tx_cds[1]
            cds_products = get_products_from_cds_list(all_CDSs_of_mRNA)
            cds_IDs = get_IDs_from_cds_list(all_CDSs_of_mRNA)
            # Populate gene_annotations dictionary
            if gene.id in gene_annotations:
                gene_annotations[gene.id].append({
                    "Gene_name" : gene.attributes.get("Name"),
                    "Transcript_ID" : mRNA.id,
                    "Ontology_term" : mRNA.attributes.get("Ontology_term"),
                    "Dbxref" : mRNA.attributes.get("Dbxref"),
                    "CDS_ID" : cds_IDs,
                    "CDS_Product" : cds_products
                    })
            else:
                gene_annotations[gene.id] = [{
                    "Gene_name" : gene.attributes.get("Name"),
                    "Transcript_ID" : mRNA.id,
                    "Ontology_term" : mRNA.attributes.get("Ontology_term"),
                    "Dbxref" : mRNA.attributes.get("Dbxref"),
                    "CDS_ID" : cds_IDs,
                    "CDS_Product" : cds_products
                    }]
    click.echo("Gene annotions dictionary finished")
    return gene_annotations

In [3]:
# {gene1_ID:
# #[
# #{'Gene_name': ['HIBN'],
# # 'Transcript_ID': 'gbgene20309.t1', 
# # 'Ontology_term': ['GO:0005634', 'GO:0061676'], 
# # 'Dbxref': ['UniProtKB/Swiss-Prot:P06180', 'GeneID:397901'], 
# # 'CDS_ID': 'gbgene5187.t2.CDS7', 
# # 'CDS_Product': ['Histone-binding protein N1/N2']}, 
# #{'Gene_name': ['HIBN'],
# # 'Transcript_ID': 'gbgene20309.t2', 
# # 'Ontology_term': ['GO:0005634', 'GO:0061676'], 
# # 'Dbxref': ['UniProtKB/Swiss-Prot:P06180', 'GeneID:397901'], 
# # 'CDS_ID': 'gbgene5187.t2.CDS7', 
# # 'CDS_Product': ['Histone-binding protein N1/N2']

def ntuple_to_list(ntuple):
    return list(ntuple)


In [4]:
db = read_gff_db(Path("genome.genes.sqlite"))
gene_annotations = create_gene_annotations(db)

Reading genome.genes.sqlite database...
Creation of annotions dictionary started...
Gene annotions dictionary finished


In [41]:
def parse_refs(ref):
    """Read an unparsed reference element and return db id"""
    return (":").join(ref.split(":")[1:])

def add_ref(ref_dict, reference_db_name, unparsed_ref_element):
    if reference_db_name in ref_dict:
        ref_dict[reference_db_name].append(parse_refs(unparsed_ref_element))
    else:
        ref_dict[reference_db_name] = [parse_refs(unparsed_ref_element)]
    return ref_dict
        
def parse_Dbxref(dbxref_list):
    """Parse a list dbxrefs and return a dict of db as key and ids as values"""
    dbxref = {}
    # Handle completely missing Dbxref data
    if dbxref_list is None:
        return {
            "UniProtKB":[""],
            "GeneID":[""],
            "KEGG":[""],
            "PFAM":[""],
            "InterPro":[""],
            "EMBL":[""]
               }
    
    # Keep track of partially missing Dbxref data
    all_found = {
    "UniProtKB" : False,
    "GeneID" : False,
    "KEGG" : False,
    "PFAM" : False,
    "InterPro" : False,
    "EMBL" : False,
    "Unknown" : False
    }
    
    for unparse_reference in dbxref_list:
        if (unparse_reference.startswith("UniProtKB") or unparse_reference.startswith("Swiss-Prot")):
            dbxref = add_ref(dbxref, "UniProtKB", unparse_reference)
            all_found['UniProtKB'] = True
        elif (unparse_reference.startswith("GeneID")):
            dbxref = add_ref(dbxref, "GeneID", unparse_reference)
            all_found['GeneID'] = True
        elif (unparse_reference.startswith("KEGG")):
            dbxref = add_ref(dbxref, "KEGG", unparse_reference)
            all_found['KEGG'] = True
        elif (unparse_reference.startswith("PFAM")):
            dbxref = add_ref(dbxref, "PFAM", unparse_reference)
            all_found['PFAM'] = True
        elif (unparse_reference.startswith("InterPro")):
            dbxref = add_ref(dbxref, "InterPro", unparse_reference)
            all_found['InterPro'] = True
        elif (unparse_reference.startswith("EMBL")):
            dbxref = add_ref(dbxref, "EMBL", unparse_reference)
            all_found['EMBL'] = True
        else:
            # If not matched with any, create an unparsed unknown element
            dbxref["Unknown"] = unparse_reference
            all_found['Unknown'] = True
            print("Unknown reference added"+unparse_reference)
    
    # Fill in missing data with empty strings in a list
    for db, found in all_found.items():
        if not found:
            dbxref[db] = [""]
    return dbxref

def remove_duplicate_values(dbxref_dict):
    for key, value in dbxref_dict.items():
        dbxref_dict[key] = remove_dublicates(value)
    return dbxref_dict

def check_if_NoneType(element):
    if element is None:
        return []
    else:
        return element


def concat_list(in_list,concat_char = ","):
    """Concatenate all list elements and return a single element list of the results"""
    if isinstance(in_list, list):
        return [concat_char.join(in_list)]
    else:
        return [""]
    

with open(Path("shrunken_padj-filtered.tsv"), newline="") as infile, open('output.tsv', 'wt') as out_file:
    # Initialise reading and writing tsvs
    tsv_writer = csv.writer(out_file, delimiter='\t')
    reader = csv.reader(infile, delimiter="\t")
    
    # Write tsv header
    header = next(reader)
    gff_columns = ["Gene_name", "GeneID", "CDS_Products", "Ontology_terms", "Transcript_IDs", "UniProtKB_Swiss-Prot", "KEGG", "PFAM", "InterPro", "EMBL"]
    tsv_writer.writerow(header+gff_columns)
    
    # Read tsv file row-wise
    Data = namedtuple("Data", header)  # get names from column headers
    for data in map(Data._make, reader):
        # Capture DE data into a list
        data_row = ntuple_to_list(data)
        # Reset all lists/dict for each gene
        ontology_terms = []
        transcript_IDs = []
        CDS_Products = []
        dbxref_dict = {}
        
        # Gather all annotation data 
        # Gene annotations for each gene ID consist of a list of one or more 
        # transcripts with all available annotations
        for transcript in gene_annotations[data.Genes]:
            # gene_name is the same for all the transcripts
            gene_name = transcript["Gene_name"]
            transcript_IDs.append(transcript["Transcript_ID"])
            ontology_terms = ontology_terms + check_if_NoneType(transcript["Ontology_term"])
            dbxref_dict = parse_Dbxref(transcript["Dbxref"])
            CDS_Products = CDS_Products + transcript["CDS_Product"]
        
        # Remove dublicates among different transcripts
        ontology_terms = remove_dublicates(ontology_terms)
        CDS_Products = remove_dublicates(CDS_Products)
        dbxref_dict = remove_duplicate_values(dbxref_dict)
        
        # Write all new data (don't need the DE data) to a row
        tsv_writer.writerow([data.Genes]+
                            concat_list(gene_name)+
                            concat_list(dbxref_dict["GeneID"])+
                            concat_list(CDS_Products)+
                            concat_list(ontology_terms)+
                            concat_list(transcript_IDs)+
                            concat_list(dbxref_dict["UniProtKB"])+
                            concat_list(dbxref_dict["KEGG"])+
                            concat_list(dbxref_dict["PFAM"])+
                            concat_list(dbxref_dict["InterPro"])+
                            concat_list(dbxref_dict["EMBL"]))


gbgene898
['347']
gbgene18399
['100337595']
gbgene7672
['3713']
gbgene2487
['']
gbgene15915
['199692']
gbgene12445
['']
gbgene7693
['3713']
gbgene5076
['27385']
gbgene6642
['658208']
gbgene11933
['100135492']
gbgene9359
['27385']
gbgene14984
['']
gbgene15509
['']
gbgene8521
['10847']
gbgene8756
['']
gbgene15500
['']
gbgene17373
['184247']
gbgene15499
['']
gbgene11357
['3290499']
gbgene12463
['396865']
gbgene15504
['']
gbgene5757
['27385']
gbgene15147
['']
gbgene1080
['']
gbgene9384
['']
gbgene17286
['']
gbgene7683
['3713']
gbgene9382
['']
gbgene16793
['']
gbgene15534
['']
gbgene17833
['']
gbgene13284
['57473']
gbgene8288
['507264']
gbgene5761
['57473']
gbgene7681
['3713']
gbgene18912
['3290499']
gbgene19283
['']
gbgene15749
['3290499']
gbgene1837
['']
gbgene7689
['']
gbgene14858
['']
gbgene7125
['57473']
gbgene11967
['']
gbgene15506
['']
gbgene11360
['3290499']
gbgene10081
['']
gbgene9996
['34805']
gbgene2941
['3290499']
gbgene15166
['']
gbgene15505
['']
gbgene16792
['']
gbgene4699
[''

In [20]:
len(ontology_terms)

5

In [30]:
gene_name, dbxref_dict, CDS_Products, transcript_IDs

(['APOD_0'],
 {'UniProtKB': ['P05090'],
  'GeneID': ['347'],
  'KEGG': ['hsa:347'],
  'PFAM': ['PF08212'],
  'InterPro': ['IPR026222',
   'IPR002969',
   'IPR012674',
   'IPR022272',
   'IPR022271',
   'IPR000566'],
  'EMBL': ['J02611',
   'M16696',
   'CR456838',
   'AK312090',
   'BC007402',
   'CH471052',
   'BT019860',
   'CR541773',
   'S80440',
   'BT019861']},
 ['Apolipoprotein D'],
 ['gbgene898.t1'])

In [65]:
parse_refs("EMBL:AL162596")

'AL162596'

In [42]:
print(gene_annotations['gbgene7693'])

[{'Gene_name': ['IVL_13'], 'Transcript_ID': 'gbgene7693.t1', 'Ontology_term': ['GO:0005813', 'GO:0001533', 'GO:0005737', 'GO:0005829', 'GO:0070062', 'GO:0016604', 'GO:0070268', 'GO:0018153', 'GO:0030216', 'GO:0018149', 'GO:0010224'], 'Dbxref': ['UniProtKB/Swiss-Prot:P07476', 'GeneID:3713', 'KEGG:hsa:3713', 'PFAM:PF00904', 'PFAM:PF10583', 'InterPro:IPR002360', 'InterPro:IPR019743', 'InterPro:IPR019571', 'InterPro:IPR000354', 'EMBL:M13903', 'EMBL:AL162596', 'EMBL:BC046391'], 'CDS_ID': ['gbgene7693.t1.CDS1', 'gbgene7693.t1.CDS2'], 'CDS_Product': ['Involucrin']}, {'Gene_name': ['IVL_13'], 'Transcript_ID': 'gbgene7693.t2', 'Ontology_term': ['GO:0005813', 'GO:0001533', 'GO:0005737', 'GO:0005829', 'GO:0070062', 'GO:0016604', 'GO:0070268', 'GO:0018153', 'GO:0030216', 'GO:0018149', 'GO:0010224'], 'Dbxref': ['UniProtKB/Swiss-Prot:P07476', 'GeneID:3713', 'KEGG:hsa:3713', 'PFAM:PF00904', 'PFAM:PF10583', 'InterPro:IPR002360', 'InterPro:IPR019743', 'InterPro:IPR019571', 'InterPro:IPR000354', 'EMBL:M

In [None]:
def annotate(gene_annotations, unannotated_DE_results):
    