In [None]:
import pandas as pd

In [None]:
chromosomes = ['chr3', 'chr7', 'chr16', 'chr17']

all_dfs = []
first = True

for chromosome in chromosomes:
    df_temp = pd.read_table(f'output/dosage_{chromosome}.raw', sep=' ', low_memory=False)
    if first:
        # For the first dataframe, keep both IID and dosage sum columns
        all_dfs.append(df_temp[['IID', f'dosage_sum_{chromosome}']])
        first = False
    else:
        # For subsequent dataframes, only keep the dosage sum column
        all_dfs.append(df_temp[[f'dosage_sum_{chromosome}']])

dys = pd.concat(all_dfs, axis=1)

In [None]:
covariates = {
    # General
    'eid': 'IID', 
    'p31': 'sex',
    'p21022': 'Age at recruitment',
    'p22189': 'TDI',
    'p22006': 'Genetic ethnic grouping',
    # Category 120: Numeric memory (Cognitive function online)
    '20240': 'Maximum digits remembered correctly',
    # Category 1358: Broken letter recognition
    '20139': 'Number of letters correctly identified',
       
    # Category 100027: Fluid intelligence / reasoning
    '20016': 'Fluid intelligence score',
    
    # Category 100029: Numeric memory
    '4282': 'Maximum digits remembered correctly',

    # Category 100032: Reaction time
    '20023': 'Mean time to correctly identify matches',

    # Category 503: Tower rearranging
    '21004': 'Number of puzzles correct',
    # Category 504: Picture vocabulary
    '26302': 'Specific cognitive ability (AS)',
    '6364': 'Vocabulary level',
    # Category 501: Matrix pattern completion
    '6373': 'Number of puzzles correctly solved',
}

In [None]:
dys['dosage'] = dys.iloc[:, 1:].sum(axis=1)

In [None]:
analysis = Analysis()
all_participants = analysis.retrieve_data(covs)

all_participants.rename(columns=covs, inplace=True)

all_participants['IID'] = all_participants['IID'].astype(str)
dys['IID'] = dys['IID'].astype(str)

merged = dys[['IID', 'dosage']].merge(all_participants, on='IID', how='inner')
merged = merged[merged['Genetic ethnic grouping'] == 1]

In [None]:
def calculate_means(covariates, df=merged):
    means = {}
    for description in covariates.values():
        if description not in ('IID', 'sex', 'Age at recruitment', 'TDI', 'Genetic ethnic grouping'):
            base_col_name = description.replace(' ', '_')
            temp_cols = df.columns[df.columns.str.startswith(base_col_name)]
            mean = df[temp_cols].mean(axis=1)
            means[f"mean_{base_col_name}"] = mean
    return means

merged_means = pd.concat([merged, pd.DataFrame(calculate_means(covariates))], axis=1)

desc_and_PRS = ['IID', 'dosage', 'sex', 'Age at recruitment', 'TDI']
mean_cols = list(merged_means.columns[merged_means.columns.str.startswith('mean')])

cols = desc_and_PRS + mean_cols

df = merged_means[cols].dropna(axis=1, how='all')

df['case'] = df.loc[:, 'dosage'].apply(lambda x: 0 if x == 0 else 1)

In [None]:
from scipy import stats
from scipy.stats import mannwhitneyu, iqr  # explicitly import iqr
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import pandas as pd

# Initialize dictionary to store results
results_dict = {
    'Column': [],
    'Test_Used': [],
    'Test_Statistic': [],
    'P_Value': [],
    'Control_N': [],
    'Case_N': [],
    'Control_Mean': [],
    'Control_SD': [],
    'Control_Median': [],
    'Control_IQR': [],
    'Case_Mean': [],
    'Case_SD': [],
    'Case_Median': [],
    'Case_IQR': []
}

def plot_and_test(col, df=df):

    warnings.filterwarnings('ignore')
    
    df = df.dropna(subset=[col])
    controls = df[df['case'] == 0][col]
    cases = df[df['case'] == 1][col]
    
    stats_dict = {
        'Controls': {
            'N': len(controls),
            'Mean': np.mean(controls),
            'SD': np.std(controls),
            'Median': np.median(controls),
            'IQR': iqr(controls)
        },
        'Cases': {
            'N': len(cases),
            'Mean': np.mean(cases),
            'SD': np.std(cases),
            'Median': np.median(cases),
            'IQR': iqr(cases)
        }
    }
    
    # Test for normality using Shapiro-Wilk test
    sw_control = stats.shapiro(controls) if len(controls) <= 5000 else (None, None)
    sw_cases = stats.shapiro(cases) if len(cases) <= 5000 else (None, None)
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # Raw distribution plot
    bins = min(50, int(np.sqrt(len(df))))
    ax1.hist(controls, bins=bins, alpha=0.5, color='blue', 
             label=f'Controls (n={len(controls):,})', density=False)
    ax1.hist(cases, bins=bins, alpha=0.5, color='red', 
             label=f'Cases (n={len(cases):,})', density=False)
    ax1.set_title(f'Raw Distribution of {col}')
    ax1.set_xlabel(col)
    ax1.set_ylabel('Frequency')
    ax1.legend()
    
    # Density plot
    sns.kdeplot(data=controls, ax=ax2, color='blue', label='Controls', 
                fill=True, alpha=0.5)
    sns.kdeplot(data=cases, ax=ax2, color='red', label='Cases', 
                fill=True, alpha=0.5)
    ax2.set_title(f'Density Distribution of {col}')
    ax2.set_xlabel(col)
    ax2.set_ylabel('Density')
    ax2.legend()
    
    plt.tight_layout()
    plt.show()
    
    # Determine if data is normal
    is_normal = (sw_control[1] is None or sw_control[1] > 0.05) and \
               (sw_cases[1] is None or sw_cases[1] > 0.05)
    
    # Perform statistical testing
    if is_normal:
        test_name = "Welch's t-test"
        stat, p_value = stats.ttest_ind(controls, cases, equal_var=False)
    else:
        test_name = "Mann-Whitney U test"
        stat, p_value = mannwhitneyu(controls, cases, alternative='two-sided')
    
    # Store results
    results_dict['Column'].append(col)
    results_dict['Test_Used'].append(test_name)
    results_dict['Test_Statistic'].append(stat)
    results_dict['P_Value'].append(p_value)
    results_dict['Control_N'].append(stats_dict['Controls']['N'])
    results_dict['Case_N'].append(stats_dict['Cases']['N'])
    results_dict['Control_Mean'].append(stats_dict['Controls']['Mean'])
    results_dict['Control_SD'].append(stats_dict['Controls']['SD'])
    results_dict['Control_Median'].append(stats_dict['Controls']['Median'])
    results_dict['Control_IQR'].append(stats_dict['Controls']['IQR'])
    results_dict['Case_Mean'].append(stats_dict['Cases']['Mean'])
    results_dict['Case_SD'].append(stats_dict['Cases']['SD'])
    results_dict['Case_Median'].append(stats_dict['Cases']['Median'])
    results_dict['Case_IQR'].append(stats_dict['Cases']['IQR'])
    
    # Print results
    print(f"#### Statistical Analysis Results for {col}")
    print("\nDescriptive Statistics:")
    for group, stats_values in stats_dict.items():
        print(f"\n{group}:")
        print(f"N: {stats_values['N']:,}")
        print(f"Mean ± SD: {stats_values['Mean']:.2f} ± {stats_values['SD']:.2f}")
        print(f"Median (IQR): {stats_values['Median']:.2f} ({stats_values['IQR']:.2f})")
    
    if sw_control[1] is not None:
        print("\nNormality Tests (Shapiro-Wilk):")
        print(f"Controls: p = {sw_control[1]:.2e}")
        print(f"Cases: p = {sw_cases[1]:.2e}")
    else:
        print("\nNormality tests not performed (n>5000)")
    
    print(f"\n{test_name}:")
    print(f"Statistic: {stat:.2f}")
    print(f"P-value: {p_value:.2e}")
    
    print("-"*80)
    print("\n")

In [None]:
for col in df.columns[df.columns.str.startswith('mean')]:
    plot_and_test(col, df)

In [None]:
results_df = pd.DataFrame(results_dict)
# Format P-values and test statistics
results_df['P_Value'] = results_df['P_Value'].apply(lambda x: f"{x:.2e}")
results_df['Test_Statistic'] = results_df['Test_Statistic'].apply(lambda x: f"{x:.2f}")
print("\n#### Summary of Statistical Analysis Results")
# results_df.to_string(index=False)