In [1]:
import os
import zipfile
import numpy as np
import pandas as pd

In [2]:
def extract_tf_data(results_dir):
    """
    Extracts relevant data from ichorCNA results' params.txt files.

    Parameters:
    - results_dir (str): Path to the directory containing ichorCNA results folders.

    Returns:
    - DataFrame: A Pandas DataFrame containing extracted data.
    """

    # Columns for the resulting DataFrame
    columns = ["library", "tumor_fraction", "ploidy", "gender", "ChrY_coverage_fraction", "ChrX_median_log_ratio"]
    
    # Initialize a list to store extracted data dictionaries
    data_list = []
    
    # Iterating through each folder in the results directory
    for sample_folder in os.listdir(results_dir):
        sample_path = os.path.join(results_dir, sample_folder)
        
        # Confirming that it's a folder
        if os.path.isdir(sample_path):
            # Searching for the params.txt file with .params.txt extension
            for file_name in os.listdir(sample_path):
                if file_name.endswith(".params.txt"):
                    params_file_path = os.path.join(sample_path, file_name)
                    break  # Exit the loop once we find the first .params.txt file
            else:  # No params.txt file found
                print(f"No params.txt file found for sample: {sample_folder}")
                continue
            
            # Extracting data from params.txt
            with open(params_file_path, "r") as file:
                # Initialize a dictionary to store extracted data
                data = {"library": sample_folder}
                
                # Iterating through each line in params.txt to extract data
                for line in file:
                    line = line.strip()  # Remove leading/trailing whitespace
                    if "Gender:" in line:
                        data["gender"] = line.split(":")[1].strip()
                    elif "Tumor Fraction:" in line:
                        value = line.split(":")[1].strip()
                        data["tumor_fraction"] = float(value) if value != 'NA' else np.nan
                    elif "Ploidy:" in line:
                        value = line.split(":")[1].strip()
                        data["ploidy"] = float(value) if value != 'NA' else np.nan
                    elif "ChrY coverage fraction:" in line:
                        value = line.split(":")[1].strip()
                        data["ChrY_coverage_fraction"] = float(value) if value != 'NA' else np.nan
                    elif "ChrX median log ratio:" in line:
                        value = line.split(":")[1].strip()
                        data["ChrX_median_log_ratio"] = float(value) if value != 'NA' else np.nan
                
                # Adding extracted data to the data list
                data_list.append(data)
                    
    # Convert the list of data dictionaries to a DataFrame
    result_df = pd.DataFrame(data_list, columns=columns).sort_values('library').reset_index(drop = True)
    
    return result_df

In [3]:
import os
import pandas as pd

def extract_cna_data(parent_directory, logR_column_choice="logR_Copy_Number"):
    # Initialize an empty list to store the data
    data = []

    # Traverse through each item in the parent directory
    for item in os.listdir(parent_directory):
        item_path = os.path.join(parent_directory, item)
        
        if os.path.isdir(item_path):
            # Traverse through files in the library folder
            for file_name in os.listdir(item_path):
                if file_name.endswith(".cna.seg"):
                    library = file_name.replace(".cna.seg", "")
                    cna_seg_file = os.path.join(item_path, file_name)
                    
                    if os.path.isfile(cna_seg_file):
                        # Read the cna.seg file
                        df = pd.read_csv(cna_seg_file, sep="\t")
                        
                        # Dynamically identify the column name for the chosen logR column
                        logR_column = f"{library}.{logR_column_choice}"
                        
                        if logR_column in df.columns:
                            # Extract the required columns
                            required_columns = ['chr', 'start', 'end', logR_column]
                            extracted_data = df.loc[:, required_columns]
                            
                            # Rename the chosen logR column to a consistent generic name in the output
                            extracted_data = extracted_data.rename(columns={logR_column: logR_column_choice})
                            
                            # Add the library column
                            extracted_data['library'] = library
                            
                            # Append the data to the main list
                            data.extend(
                                extracted_data[['library', 'chr', 'start', 'end', logR_column_choice]].values.tolist()
                            )

    # Create a DataFrame from the collected data
    combined_df = pd.DataFrame(
        data, 
        columns=['library', 'chr', 'start', 'end', logR_column_choice]
    ).sort_values(by=['library', 'chr', 'start', 'end']).reset_index(drop=True)

    return combined_df

In [4]:
def create_params_zip(results_dir, output_zip):
    """
    Creates a zip file containing all .params.txt files from ichorCNA results.

    Parameters:
    - results_dir (str): Path to the directory containing ichorCNA results folders.
    - output_zip (str): Path to the output zip file.
    """
    with zipfile.ZipFile(output_zip, 'w') as zipf:
        # Iterating through each folder in the results directory
        for sample_folder in os.listdir(results_dir):
            sample_path = os.path.join(results_dir, sample_folder)
            
            # Confirming that it's a folder
            if os.path.isdir(sample_path):
                # Searching for the params.txt file with .params.txt extension
                for file_name in os.listdir(sample_path):
                    if file_name.endswith(".params.txt"):
                        params_file_path = os.path.join(sample_path, file_name)
                        zipf.write(params_file_path, arcname=file_name)
                        break  # Exit the loop once we find the first .params.txt file
                else:  # No params.txt file found
                    print(f"No params.txt file found for sample: {sample_folder}")

In [5]:
def create_cna_seg_zip(results_dir, output_zip):
    """
    Creates a zip file containing all .cna.seg files from ichorCNA results.

    Parameters:
    - results_dir (str): Path to the directory containing ichorCNA results folders.
    - output_zip (str): Path to the output zip file.
    """
    with zipfile.ZipFile(output_zip, 'w') as zipf:
        # Iterating through each folder in the results directory
        for sample_folder in os.listdir(results_dir):
            sample_path = os.path.join(results_dir, sample_folder)
            
            # Confirming that it's a folder
            if os.path.isdir(sample_path):
                # Searching for the params.txt file with .cna.seg extension
                for file_name in os.listdir(sample_path):
                    if file_name.endswith(".cna.seg"):
                        params_file_path = os.path.join(sample_path, file_name)
                        zipf.write(params_file_path, arcname=file_name)
                        break  # Exit the loop once we find the first .cna.seg file
                else:  # No cna.seg file found
                    print(f"No cna.seg file found for sample: {sample_folder}")

In [6]:
results_dir = "/aclm350-zpool1/jlinford/bioinfo_software/ichorCNA/scripts/snakemake/results_no_trim_low_1Mb_Taylor/ichorCNA"

In [None]:
tf_data = extract_tf_data(results_dir)
tf_data['library'] = tf_data['library'].str.replace('_filt.bam', '',regex=False)
tf_data

In [None]:
cna_data_logR = extract_cna_data(results_dir, logR_column_choice = "logR")
cna_data_logR['library'] = cna_data_logR['library'].str.replace('_filt.bam', '',regex = False)
cna_data_logR

In [None]:
cna_data_logR_Copy_Number = extract_cna_data(results_dir, logR_column_choice = "logR_Copy_Number")
cna_data_logR_Copy_Number['library'] = cna_data_logR['library'].str.replace('_filt.bam', '',regex = False)
cna_data_logR_Copy_Number

In [None]:
cna_matrix_logR = cna_data_logR.pivot(index=['chr', 'start', 'end'], columns='library', values='logR').reset_index()
cna_matrix_logR.columns.name = None
cna_matrix_logR

In [None]:
cna_matrix_logR_Copy_Number = cna_data_logR_Copy_Number.pivot(index=['chr', 'start', 'end'], columns='library', values='logR_Copy_Number').reset_index()
cna_matrix_logR_Copy_Number.columns.name = None
cna_matrix_logR_Copy_Number

In [10]:
tf_data.to_csv('/home/jupyter/jlinford/urine/analysis/tf_low_1Mb.txt', sep = '\t', index = False)

In [34]:
cna_data_logR.to_csv('/home/jupyter/jlinford/urine/analysis/cna_logR_long_lowtumor_1Mb.txt', sep = '\t', index = False)

In [None]:
cna_data_logR_Copy_Number.to_csv('/home/jupyter/jlinford/urine/analysis/cna_logR_Copy_Number_long_lowtumor_1Mb.txt', sep = '\t', index = False)

In [35]:
# cna_matrix_logR.to_csv('/home/jupyter/jlinford/urine/analysis/cna_logR_matrix_lowtumor_1Mb.txt', sep = '\t', index = False)

In [None]:
# cna_matrix_logR_Copy_Number.to_csv('/home/jupyter/jlinford/urine/analysis/cna_logR_Copy_Number_matrix_lowtumor_1Mb.txt', sep = '\t', index = False)

In [36]:
# create_params_zip(results_dir, '/home/jupyter/jlinford/urine/analysis/urine_lowtumor_1Mb_params.zip')

In [37]:
# create_cna_seg_zip(results_dir, '/home/jupyter/jlinford/urine/analysis/urine_lowtumor_1Mb_cna_seg.zip')