In [1]:
"""A module that makes downloading and using many GSE datasets fast and easy.

Author: Joshua Blanchard, last update: 4/28/2021

A typical workflow:

    Download wanted GSE family files using the download() function.

    Extract content from these files using the extract() function.

    Use the family_dict() function to convert data to pandas.DataFrame objects.

    With the info() function, perform data analysis on the previously created pandas.DataFrame objects.
"""

'A module that makes downloading and using many GSE datasets fast and easy.\n\nAuthor: Joshua Blanchard, last update: 4/28/2021\n\nA typical workflow:\n\n    Download wanted GSE family files using the download() function.\n\n    Extract content from these files using the extract() function.\n\n    Use the family_dict() function to convert data to pandas.DataFrame objects.\n\n    With the info() function, perform data analysis on the previously created pandas.DataFrame objects.\n'

In [3]:
import pandas as pd
import os
import numpy as np
import re as re
import xmltodict
import shutil
import ftplib
import tarfile
import pickle
# import requests
# import wget

In [2]:
__base_path = "data"

In [23]:
__hidden_path = ".aidp_files"

In [27]:
def family_dict(GSE_family):
    
    """
    Given a family ID, will output a dictionary. Keys will be the sample IDs, values will be the corresponding
    pandas.DataFrame object.
    """

#     let's check if we already have this dictionary saved
    if not os.path.exists("./" + __hidden_path):
        os.mkdir(__hidden_path)
    
    dict_path = __hidden_path + "/" + GSE_family + "_dict"    
    if not os.path.exists(dict_path):
    
        family_directory = __family_path(GSE_family)
        total_list = os.listdir(family_directory)
        valid_files = []

        for file_name in total_list:
            match = re.match(r"GSM", file_name)
            if match:
                valid_files.append(file_name)

        family_dict = {}
        for file_name in valid_files:
            file_df = __load_file(os.path.join(family_directory, file_name))
            sample_id = re.match(r"GSM\d+", file_name).group(0)
            family_dict[sample_id] = file_df
            
        dict_file = open(dict_path, 'wb')
        pickle.dump(family_dict, dict_file)
        dict_file.close()
        
    else:
        dict_file = open(dict_path, 'rb')
        family_dict = pickle.load(dict_file)
        dict_file.close()
        
        
    return family_dict


In [4]:
def __family_path(GSE_family):
    
    """
    Exists to build paths to the family's directory.
    
    I downloaded the files using the download() function in conjuction with the extract() function.
    """
        
    return os.path.join(__base_path, GSE_family + "/")


In [3]:
def __load_file(file_directory):
#     clean data file
#     convert to a dataframe object and return

    """
    Given a file name will output a corresponding pandas.DataFrame object.
    """
    
    try:
        clean_dict = __clean(file_directory)
    except PermissionError:
        print("You likely inputted the path of a directory, not a file.")
    
#     return pd.DataFrame({"site": clean_dict["col_1"], "measurement": clean_dict["col_2"]})
    return pd.DataFrame(data= clean_dict["col_2"], index= clean_dict["col_1"], columns= ["measurement"])


In [6]:
def __clean(file_path):
    
    """
    Cleans a given .txt file.
    
    Returns a dictionary:
    
    "site": first column
    "measurement": second column
    "bad_rows": list of all the invalid rows
    """

    valid_rows = []
    not_valid_rows = []
    file = open(file_path, 'r')
    
    for line in file:
        
#         checks for only the first two columns
        line_match = re.match(r"\S+\t\S+", line)
        if line_match:
            valid_rows.append(line_match.group(0))
        else:
            not_valid_rows.append(line)
        
    file.close()
    
#     now let's split our valid_rows list into two lists, one for each column
    col_1 = []
    col_2 = []
    for row in valid_rows:
        row_match = re.match(r"(\S+)\t(\S+)", row)
        col_1.append(row_match.group(1))
        col_2.append(row_match.group(2))
        
    return {"col_1":col_1, "col_2":col_2, "bad_rows":not_valid_rows}


In [4]:
def info(GSE_family, sample_id, info):
    
    """
    Given a GSE family and the ID of the wanted sample will return the desired information of the sample.
    
    Possible values for the info parameter:
    
    "age": The unit of the outputted age is the same as how it is documented by the study (typically in years).
    """
    
    xml_path = __xml_path(GSE_family)
    
    try:
        family_dict = __xml_to_dict(xml_path)
    except:
        raise RuntimeError("It seems the files for the GSE family you inputted have not been extracted.")
    
    
    family_dict_index = __dict_index(GSE_family, sample_id)
    section_len = len(family_dict["MINiML"]["Sample"][family_dict_index]["Channel"]["Characteristics"])

    if info == "age":
        
        for i in np.arange(section_len):
            tag = family_dict["MINiML"]["Sample"][family_dict_index]["Channel"]["Characteristics"][i]["@tag"]
            match = re.search(r"(a|A)(g|G)(e|E)", tag)
            if match:
                index = i

#         return int(family_dict["MINiML"]["Sample"][family_dict_index]["Channel"]["Characteristics"][index]["#text"])
        age_str = family_dict["MINiML"]["Sample"][family_dict_index]["Channel"]["Characteristics"][index]["#text"]
        int_match = re.search(r"\d+", age_str)
        return int_match.group(0)
        
    
    else:
        raise RuntimeError("You likely gave an invalid string for the info parameter.")


In [22]:
def __xml_path(GSE_family):
    
    """
    Exists to build a path to the family's .xml.
    
    I downloaded these families using the download() function in conjuction with the extract() function.
    """
    
    return os.path.join(__base_path, GSE_family + "/" + GSE_family + "_family.xml")
    

In [9]:
def __xml_to_dict(xml_path):
    
    """
    Exists to convert a .xml file at xml_path to a dictionary.
    """
    
    family_file = open(xml_path,'r+b')
    family_dict = xmltodict.parse(family_file)
    family_file.close()
    
    return family_dict


In [10]:
def __dict_index(GSE_family, sample_id):
    
    """
    Gives the index of where in the dictionary the sample's information is.
    """
    
    return __sample_indices(GSE_family)[sample_id]


In [11]:
def __sample_indices(GSE_family):
    
    """
    Exists to return the indices of each sample's information within the associated family dictionary. Returns a dictionary
    with keys equal to the sample ID ("GSM***") and values equal to the index of that sample's information within the family
    dictionary.
    """
    
    family_dir = __family_path(GSE_family)
    file_list = os.listdir(family_dir)
    
    filtered_list = []
    for file_name in file_list:
        sample_match = re.match(r"GSM", file_name)
        if sample_match:
            filtered_list.append(file_name)
            
    for i in np.arange(len(filtered_list)):
        filtered_list[i] = re.match(r"GSM\d+", filtered_list[i]).group(0)
        
    index_dict = {}
    index = 0

    for sample_id in filtered_list:
        index_dict[sample_id] = index
        index += 1
        
    return index_dict


In [12]:
# url = "ftp://ftp.ncbi.nlm.nih.gov/geo/series/GSEnnn/" + "GSE41037" + "/miniml/" + "GSE41027" + "_family.xml.tgz"
# url = "ftp://ftp.ncbi.nlm.nih.gov/geo/series/GSEnnn/GSE1/miniml/GSE1_family.xml.tgz"

def download(GSE_family_list):
    
    """
    Will download family .tgz files to the following directory: "./data/"
    """
    
    if (type(GSE_family_list) == str):
        GSE_family_list = [GSE_family_list]

    url = "ftp.ncbi.nlm.nih.gov"
    ftp = ftplib.FTP(url)
    ftp.login()
    
    if not os.path.exists(__base_path):
        os.mkdir(__base_path)
    
    for GSE_family in GSE_family_list:
        
        ftp.cwd("/geo/series/" + __sub_directory(GSE_family) + "/" + GSE_family + "/miniml/")
        filename = GSE_family + "_family.xml.tgz"
        
        if not (os.path.exists(__base_path + "/" + filename) or os.path.exists(__base_path + "/" + GSE_family)):
            local_file = open(__base_path + "/" + filename, 'wb')
            ftp.retrbinary('RETR ' + filename, local_file.write, blocksize= 16_384)
            local_file.close()

    ftp.quit()
    

In [13]:
def __sub_directory(GSE_ID):
    
    """
    Given a GSE ID will return the corresponding sub-directory.
    """
    
    gse_int = __ID_to_int(GSE_ID)
    
    if gse_int <= 171:
        ret_str = "GSE" + str(gse_int) + "nnn"
    else:
        first_3_dig = int(str(gse_int)[0:3])
        if first_3_dig <= 171:
            ret_str = "GSE" + str(first_3_dig) + "nnn"
        else:
            first_2_dig_str = str(gse_int)[0:2]
            ret_str = "GSE" + first_2_dig_str + "nnn"
            
    return ret_str


In [14]:
def __ID_to_int(GSE_ID):
    
    """
    Given some GSE ID will return the corresponding integer as an int.
    
    Example:
    
    __ID_to_int("GSE41037") will return 41037.
    
    """
    
    match = re.search(r"\d+", GSE_ID)
    
    if not match:
        print(GSE_ID)
    
    return int(match.group(0))


In [3]:
def extract():
    
    """
    Will extract files from all downloaded family .tgz files to a respective directory: ./data/GSE***/
    
    This will also delete the compressed .tgz files.
    """
    
#     find all .tgz files
# extract those files
# delete the .tgz files

    file_list = os.listdir(__base_path)
    tgz_list = []
    
    for filename in file_list:
        match = re.search(r".tgz", filename)
        if match:
            tgz_list.append(filename)
                
    for filename in tgz_list:
        full_path = __base_path + "/" + filename
        family_id = re.search(r"GSE\d+", filename).group(0)
                
        file = tarfile.open(full_path)
        
        flag = False
        try:
            out_path = "./" + __base_path + "/" + family_id
            file.extractall(out_path)
        except:
#             let's end the content extraction of the file. will fix later.
            file.close()
    
#             out_file = open(out_path)
#             out_file.close()
            
#             os.remove(out_path)
            
            flag = True
            print("Something weird happened while extracting from the " + family_id + " compressed file. Ended extraction early for " + family_id + ".")
            
        if not flag:
            file.close()
            os.remove(full_path)
    