In [None]:
%%capture
!pip install rdata


In [None]:
import rdata
import pandas as pd

In [None]:
%%capture
!wget https://github.com/rondolab/MR-PRESSO/raw/master/data/SummaryStats.rda # Changed to the raw URL for direct download.


In [None]:
converted = rdata.read_rda(rdata.TESTDATA_PATH / "/content/SummaryStats.rda")

In [None]:

# Assuming 'converted' already holds the data from 'rdata.read_rda'
df = pd.DataFrame(converted['SummaryStats']) # Access the 'SummaryStats' key in the 'converted' dictionary
df.head()

Unnamed: 0,E1_effect,E1_se,E1_pval,E2_effect,E2_se,E2_pval,Y_effect,Y_se,Y_pval
1,2.050695,0.13819,8.315548000000001e-27,0.160536,0.14719,0.27809,1.484821,0.174916,2.273003e-13
2,2.034668,0.160594,2.312328e-22,-0.139266,0.145707,0.341528,0.857612,0.212624,0.000108891
3,2.010319,0.143271,3.5313010000000003e-25,0.032016,0.146139,0.827046,1.147766,0.194145,4.93337e-08
4,2.062171,0.147082,3.718466e-25,-0.122786,0.150158,0.415504,0.841744,0.190183,2.494011e-05
5,2.114919,0.134503,1.506785e-28,0.003687,0.157837,0.981412,1.121555,0.172139,3.140024e-09


In [None]:
import pandas as pd
df = pd.read_csv('/content/presso.csv')

In [None]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy.linalg import eig, inv

def mr_presso(BetaOutcome, BetaExposure, SdOutcome, SdExposure, data,
              OUTLIERtest=False, DISTORTIONtest=False,
              SignifThreshold=0.05, NbDistribution=1000, seed=None):

    if seed is not None:
        np.random.seed(seed)

    if SignifThreshold > 1:
        raise ValueError("The significance threshold cannot be greater than 1")

    if len(BetaExposure) != len(SdExposure):
        raise ValueError("BetaExposure and SdExposure must have the same number of elements")

    if not isinstance(data, pd.DataFrame):
        raise TypeError("data must be a pandas DataFrame")

    # Helper function for matrix power using eigenvalue decomposition
    def matrix_power(x, n):
        eigenvalues, eigenvectors = np.linalg.eig(x)
        diag_power = np.diag(eigenvalues ** n)
        inv_eigenvectors = np.linalg.inv(eigenvectors)
        return eigenvectors @ diag_power @ inv_eigenvectors

    # Function to compute the residual sum of squares using leave-one-out approach
    def getRSS_LOO(BetaOutcome, BetaExposure, data, returnIV):
        dataW = data[[BetaOutcome] + BetaExposure].apply(
            lambda x: x * np.sqrt(data['Weights'])
        )
        X = dataW[BetaExposure].values
        Y = dataW[BetaOutcome].values.reshape(-1, 1)
        n = len(dataW)

        CausalEstimate_LOO = []
        for i in range(n):
            X_loo = np.delete(X, i, axis=0)
            Y_loo = np.delete(Y, i, axis=0)

            XtX_inv = inv(X_loo.T @ X_loo)
            beta_hat = XtX_inv @ X_loo.T @ Y_loo
            CausalEstimate_LOO.append(beta_hat.flatten())

        CausalEstimate_LOO = np.array(CausalEstimate_LOO)

        if len(BetaExposure) == 1:
            residuals = Y.flatten() - CausalEstimate_LOO.flatten() * X.flatten()
            RSS = np.nansum(residuals ** 2)
        else:
            residuals = Y.flatten() - np.sum(CausalEstimate_LOO * X, axis=1)
            RSS = np.nansum(residuals ** 2)

        if returnIV:
            return [RSS, CausalEstimate_LOO]
        else:
            return RSS

    # Function to generate random data for simulations
    def getRandomData(BetaOutcome, BetaExposure, SdOutcome, SdExposure, data):
        n = len(data)
        mod_IVW = []
        for i in range(n):
            data_loo = data.drop(data.index[i])
            X_loo = data_loo[BetaExposure]
            Y_loo = data_loo[BetaOutcome]
            W_loo = data_loo['Weights']
            model_loo = sm.WLS(Y_loo, X_loo, weights=W_loo)
            results_loo = model_loo.fit()
            mod_IVW.append(results_loo)

        # Simulate exposures
        random_exposures = pd.DataFrame()
        for exp_var, sd_var in zip(BetaExposure, SdExposure):
            random_exposures[exp_var] = np.random.normal(
                loc=data[exp_var], scale=data[sd_var]
            )

        # Simulate outcomes
        random_outcome = []
        for i in range(n):
            pred = mod_IVW[i].predict(data.iloc[i][BetaExposure])
            sim_outcome = np.random.normal(
                loc=pred, scale=data.iloc[i][SdOutcome]
            )
            random_outcome.append(sim_outcome)

        random_data = pd.concat([random_exposures], axis=1)
        random_data[BetaOutcome] = random_outcome
        random_data['Weights'] = data['Weights'].values
        return random_data

    # Data preparation and validation
    required_columns = [BetaOutcome] + BetaExposure + [SdOutcome] + SdExposure
    data = data[required_columns].dropna()
    data = data.reset_index(drop=True)

    # Adjust signs based on the first exposure variable
    sign_exposure = np.sign(data[BetaExposure[0]])
    data[BetaOutcome] = data[BetaOutcome] * sign_exposure
    for exp_var in BetaExposure:
        data[exp_var] = data[exp_var] * sign_exposure

    # Calculate weights
    data['Weights'] = 1 / data[SdOutcome] ** 2

    # Validate number of observations
    if len(data) <= len(BetaExposure) + 2:
        raise ValueError("Not enough instrumental variables")

    if len(data) >= NbDistribution:
        raise ValueError("Not enough elements to compute empirical P-values, increase NbDistribution")

    # Step 1: Compute observed RSS
    RSSobs = getRSS_LOO(BetaOutcome, BetaExposure, data, returnIV=OUTLIERtest)

    # Step 2: Compute expected RSS distribution
    randomData = [getRandomData(BetaOutcome, BetaExposure, SdOutcome, SdExposure, data)
                  for _ in range(NbDistribution)]

    RSSexp = []
    for rd in randomData:
        rss = getRSS_LOO(BetaOutcome, BetaExposure, rd, returnIV=OUTLIERtest)
        if OUTLIERtest:
            RSSexp.append(rss[0])
        else:
            RSSexp.append(rss)

    RSSexp = np.array(RSSexp)

    if OUTLIERtest:
        GlobalTest = {
            'RSSobs': RSSobs[0],
            'Pvalue': np.sum(RSSexp > RSSobs[0]) / NbDistribution
        }
    else:
        GlobalTest = {
            'RSSobs': RSSobs,
            'Pvalue': np.sum(RSSexp > RSSobs) / NbDistribution
        }

    # Step 3: Perform single IV outlier test
    if GlobalTest['Pvalue'] < SignifThreshold and OUTLIERtest:
        OutlierTest = []
        n_snps = len(data)
        for snv in range(n_snps):
            randomSNP = pd.DataFrame([rd.iloc[snv] for rd in randomData])
            if len(BetaExposure) == 1:
                Dif = data.iloc[snv][BetaOutcome] - data.iloc[snv][BetaExposure] * RSSobs[1][snv]
                Exp = randomSNP[BetaOutcome] - randomSNP[BetaExposure].values.flatten() * RSSobs[1][snv]
            else:
                Dif = data.iloc[snv][BetaOutcome] - np.sum(data.iloc[snv][BetaExposure].values * RSSobs[1][:, snv])
                Exp = randomSNP[BetaOutcome] - np.sum(randomSNP[BetaExposure].values * RSSobs[1][:, snv], axis=1)
            pval = np.sum(Exp ** 2 > Dif ** 2) / NbDistribution
            OutlierTest.append({
                'RSSobs': Dif ** 2,
                'Pvalue': pval
            })

        OutlierTest = pd.DataFrame(OutlierTest)
        OutlierTest.index = data.index

        # Bonferroni correction
        OutlierTest['Pvalue'] = np.minimum(OutlierTest['Pvalue'] * n_snps, 1)
    else:
        OUTLIERtest = False

    # Step 4: Perform distortion test
    X_all = data[BetaExposure]
    Y_all = data[BetaOutcome]
    W_all = data['Weights']
    model_all = sm.WLS(Y_all, X_all, weights=W_all)
    results_all = model_all.fit()

    if DISTORTIONtest and OUTLIERtest:
        refOutlier = OutlierTest[OutlierTest['Pvalue'] <= SignifThreshold].index.tolist()

        if len(refOutlier) > 0:
            if len(refOutlier) < len(data):
                def getRandomBias(BetaOutcome, BetaExposure, data, refOutlier):
                    n = len(data)
                    indices = refOutlier + list(
                        np.random.choice(
                            data.index.difference(refOutlier),
                            size=n - len(refOutlier),
                            replace=False
                        )
                    )
                    data_subset = data.loc[indices[:n - len(refOutlier)]]
                    X_subset = data_subset[BetaExposure]
                    Y_subset = data_subset[BetaOutcome]
                    W_subset = data_subset['Weights']
                    model_random = sm.WLS(Y_subset, X_subset, weights=W_subset)
                    results_random = model_random.fit()
                    return results_random.params

                BiasExp = []
                for _ in range(NbDistribution):
                    bias = getRandomBias(BetaOutcome, BetaExposure, data, refOutlier)
                    BiasExp.append(bias)
                BiasExp = pd.DataFrame(BiasExp)

                data_no_outliers = data.drop(refOutlier)
                X_no_outliers = data_no_outliers[BetaExposure]
                Y_no_outliers = data_no_outliers[BetaOutcome]
                W_no_outliers = data_no_outliers['Weights']
                model_no_outliers = sm.WLS(Y_no_outliers, X_no_outliers, weights=W_no_outliers)
                results_no_outliers = model_no_outliers.fit()

                BiasObs = (results_all.params - results_no_outliers.params) / np.abs(results_no_outliers.params)
                BiasExp = (results_all.params.values.reshape(1, -1) - BiasExp) / np.abs(BiasExp)

                pvalues = []
                for idx in range(len(BetaExposure)):
                    pvalue = np.sum(np.abs(BiasExp.iloc[:, idx]) > np.abs(BiasObs[idx])) / NbDistribution
                    pvalues.append(pvalue)

                BiasTest = {
                    'Outliers Indices': refOutlier,
                    'Distortion Coefficient (%)': 100 * BiasObs,
                    'Pvalue': pvalues
                }
            else:
                BiasTest = {
                    'Outliers Indices': "All SNPs considered as outliers",
                    'Distortion Coefficient (%)': None,
                    'Pvalue': None
                }
        else:
            BiasTest = {
                'Outliers Indices': "No significant outliers",
                'Distortion Coefficient (%)': None,
                'Pvalue': None
            }
    else:
        DISTORTIONtest = False

    # Step 5: Formatting the results
    if GlobalTest['Pvalue'] == 0:
        GlobalTest['Pvalue'] = f"<{1 / NbDistribution}"

    results = {
        'Global Test': GlobalTest
    }

    if OUTLIERtest:
        OutlierTest['Pvalue'] = OutlierTest['Pvalue'].apply(lambda x: f"<{1 / NbDistribution}" if x == 0 else x)
        results['Outlier Test'] = OutlierTest

        if DISTORTIONtest:
            BiasTest['Pvalue'] = [f"<{1 / NbDistribution}" if p == 0 else p for p in BiasTest['Pvalue']]
            results['Distortion Test'] = BiasTest

    # Main MR results
    MR_results = pd.DataFrame({
        'Exposure': BetaExposure,
        'MR Analysis': 'Raw',
        'Causal Estimate': results_all.params.values,
        'Std Error': results_all.bse.values,
        'T-stat': results_all.tvalues,
        'P-value': results_all.pvalues
    })

    if DISTORTIONtest and 'Outliers Indices' in BiasTest and isinstance(BiasTest['Outliers Indices'], list):
        data_no_outliers = data.drop(BiasTest['Outliers Indices'])
        X_no_outliers = data_no_outliers[BetaExposure]
        Y_no_outliers = data_no_outliers[BetaOutcome]
        W_no_outliers = data_no_outliers['Weights']
        model_no_outliers = sm.WLS(Y_no_outliers, X_no_outliers, weights=W_no_outliers)
        results_no_outliers = model_no_outliers.fit()

        MR_results_no_outliers = pd.DataFrame({
            'Exposure': BetaExposure,
            'MR Analysis': 'Outlier-corrected',
            'Causal Estimate': results_no_outliers.params.values,
            'Std Error': results_no_outliers.bse.values,
            'T-stat': results_no_outliers.tvalues,
            'P-value': results_no_outliers.pvalues
        })
    else:
        MR_results_no_outliers = pd.DataFrame({
            'Exposure': BetaExposure,
            'MR Analysis': 'Outlier-corrected',
            'Causal Estimate': [np.nan] * len(BetaExposure),
            'Std Error': [np.nan] * len(BetaExposure),
            'T-stat': [np.nan] * len(BetaExposure),
            'P-value': [np.nan] * len(BetaExposure)
        })

    MR_main_results = pd.concat([MR_results, MR_results_no_outliers], ignore_index=True)
    results['Main MR results'] = MR_main_results

    # Warning if necessary
    if OUTLIERtest and (len(data) / NbDistribution > SignifThreshold):
        print(f"Warning: Outlier test unstable. The significance threshold of {SignifThreshold} for the outlier test "
              f"is not achievable with only {NbDistribution} simulations to compute the null distribution. "
              f"The current precision is <{len(data) / NbDistribution}. Increase NbDistribution.")

    return results

In [None]:
BetaOutcome = 'Y_effect'
BetaExposure = ['E1_effect', 'E2_effect']
SdOutcome = 'Y_se'
SdExposure = ['E1_se', 'E2_se']

In [None]:
#single variable MR_PRESSO
import numpy as np
import pandas as pd
import statsmodels.api as sm
from joblib import Parallel, delayed

def mr_presso_single_optimized(BetaOutcome, BetaExposure, SdOutcome, SdExposure, data,
                               OUTLIERtest=False, DISTORTIONtest=False,
                               SignifThreshold=0.05, NbDistribution=1000, seed=None, n_jobs=-1):
    """
    Optimized version of MR-PRESSO analysis for a single exposure and outcome.
    """

    if seed is not None:
        np.random.seed(seed)

    if SignifThreshold > 1 or SignifThreshold < 0:
        raise ValueError("The significance threshold must be between 0 and 1")

    if not isinstance(data, pd.DataFrame):
        raise TypeError("data must be a pandas DataFrame")

    # Ensure that inputs are in the correct format
    if isinstance(BetaExposure, str):
        BetaExposure = [BetaExposure]
    if isinstance(SdExposure, str):
        SdExposure = [SdExposure]

    if len(BetaExposure) != len(SdExposure):
        raise ValueError("BetaExposure and SdExposure must have the same number of elements")

    # Data preparation and validation
    required_columns = [BetaOutcome] + BetaExposure + [SdOutcome] + SdExposure
    data = data[required_columns].dropna()
    data = data.reset_index(drop=True)  # Reset index to ensure alignment
    n_snps = len(data)

    # Adjust signs based on the first exposure variable
    sign_exposure = np.sign(data[BetaExposure[0]].values)
    data[BetaOutcome] *= sign_exposure
    data[BetaExposure[0]] *= sign_exposure

    # Calculate weights
    data['Weights'] = 1 / data[SdOutcome].values ** 2

    # Validate number of observations
    if len(data) <= len(BetaExposure) + 2:
        raise ValueError("Not enough instrumental variables")

    if len(data) >= NbDistribution:
        raise ValueError("Not enough elements to compute empirical P-values; increase NbDistribution")

    # Precompute variables for efficiency
    sqrt_weights = np.sqrt(data['Weights'].values)
    dataW = data[[BetaOutcome] + BetaExposure].multiply(sqrt_weights[:, np.newaxis], axis=0)
    X = dataW[BetaExposure].values
    Y = dataW[BetaOutcome].values
    n = len(dataW)

    # Function to compute the residual sum of squares using leave-one-out approach
    def getRSS_LOO(X, Y):
        XtX = X.T @ X
        XtY = X.T @ Y
        XtX_inv = np.linalg.inv(XtX)
        beta_hat_full = XtX_inv @ XtY
        residuals_full = Y - X @ beta_hat_full
        RSS_full = np.sum(residuals_full ** 2)

        if OUTLIERtest:
            # Precompute leave-one-out estimates
            CausalEstimate_LOO = np.zeros(n)
            residuals_LOO = np.zeros(n)
            for i in range(n):
                X_loo = np.delete(X, i, axis=0)
                Y_loo = np.delete(Y, i)
                XtX_loo = X_loo.T @ X_loo
                XtY_loo = X_loo.T @ Y_loo
                beta_hat_loo = np.linalg.inv(XtX_loo) @ XtY_loo
                CausalEstimate_LOO[i] = beta_hat_loo[0]
                residuals_LOO[i] = Y[i] - X[i] @ beta_hat_loo
            RSS = np.sum(residuals_LOO ** 2)
            return RSS, CausalEstimate_LOO
        else:
            return RSS_full

    # Function to generate random data for simulations
    def getRandomData(data_np):
        n = len(data_np['BetaOutcome'])
        # Simulate exposures
        random_exposures = np.random.normal(
            loc=data_np['BetaExposure'], scale=data_np['SdExposure']
        )
        # Simulate outcomes
        pred_outcomes = random_exposures * data_np['IVW_estimates']
        random_outcome = np.random.normal(
            loc=pred_outcomes, scale=data_np['SdOutcome']
        )
        random_data = {
            'BetaOutcome': random_outcome,
            'BetaExposure': random_exposures,
            'Weights': data_np['Weights']
        }
        random_data = pd.DataFrame(random_data)
        return random_data

    # Precompute IVW estimates for leave-one-out
    if OUTLIERtest:
        IVW_estimates = np.zeros(n)
        for i in range(n):
            indices = np.arange(n) != i
            X_loo = X[indices]
            Y_loo = Y[indices]
            XtX_loo = X_loo.T @ X_loo
            XtY_loo = X_loo.T @ Y_loo
            beta_hat_loo = np.linalg.inv(XtX_loo) @ XtY_loo
            IVW_estimates[i] = beta_hat_loo[0]
        data_np = {
            'BetaOutcome': data[BetaOutcome].values,
            'BetaExposure': data[BetaExposure[0]].values,
            'SdOutcome': data[SdOutcome].values,
            'SdExposure': data[SdExposure[0]].values,
            'Weights': data['Weights'].values,
            'IVW_estimates': IVW_estimates
        }
    else:
        # Compute full IVW estimate
        XtX = X.T @ X
        XtY = X.T @ Y
        beta_hat_full = np.linalg.inv(XtX) @ XtY
        data_np = {
            'BetaOutcome': data[BetaOutcome].values,
            'BetaExposure': data[BetaExposure[0]].values,
            'SdOutcome': data[SdOutcome].values,
            'SdExposure': data[SdExposure[0]].values,
            'Weights': data['Weights'].values,
            'IVW_estimates': np.full(n, beta_hat_full[0])
        }

    # Step 1: Compute observed RSS
    RSSobs = getRSS_LOO(X, Y)

    # Step 2: Compute expected RSS distribution (Parallelized)
    random_data_list = Parallel(n_jobs=n_jobs)(
        delayed(getRandomData)(data_np) for _ in range(NbDistribution)
    )

    # Function to process each random dataset
    def process_random_data(random_data):
        # Recompute weights for the random data
        sqrt_weights_random = np.sqrt(random_data['Weights'].values)
        X_random = random_data[['BetaExposure']].values * sqrt_weights_random[:, np.newaxis]
        Y_random = random_data['BetaOutcome'].values * sqrt_weights_random
        return getRSS_LOO(X_random, Y_random)

    RSSexp_list = Parallel(n_jobs=n_jobs)(
        delayed(process_random_data)(random_data) for random_data in random_data_list
    )

    # Extract RSS values from RSSexp_list
    if OUTLIERtest:
        RSSexp = np.array([rss[0] for rss in RSSexp_list])
    else:
        RSSexp = np.array(RSSexp_list)

    RSSobs_value = RSSobs[0] if OUTLIERtest else RSSobs
    GlobalTest_Pvalue = np.mean(RSSexp > RSSobs_value)
    if GlobalTest_Pvalue == 0:
        GlobalTest_Pvalue = f"<{1 / NbDistribution}"

    GlobalTest = {
        'RSSobs': RSSobs_value,
        'Pvalue': GlobalTest_Pvalue
    }

    # Step 3: Perform single IV outlier test
    if ((float(str(GlobalTest['Pvalue']).lstrip('<')) if isinstance(GlobalTest['Pvalue'], str) else GlobalTest['Pvalue'])
        < SignifThreshold) and OUTLIERtest:
        # Vectorized calculation for outlier test
        CausalEstimate_LOO = RSSobs[1]
        Dif = data[BetaOutcome].values - data[BetaExposure[0]].values * CausalEstimate_LOO
        RSSobs_squared = Dif ** 2

        Exp_diff = np.empty((NbDistribution, n_snps))
        for i, rd in enumerate(random_data_list):
            # Recompute weights for each random dataset
            sqrt_weights_random = np.sqrt(rd['Weights'].values)
            X_random = rd[['BetaExposure']].values * sqrt_weights_random[:, np.newaxis]
            Y_random = rd['BetaOutcome'].values * sqrt_weights_random
            _, CausalEstimate_LOO_random = getRSS_LOO(X_random, Y_random)
            Exp_diff[i] = rd['BetaOutcome'].values - rd['BetaExposure'].values * CausalEstimate_LOO_random

        Pvalues = np.mean(Exp_diff ** 2 > RSSobs_squared[None, :], axis=0)
        Pvalues_corrected = np.minimum(Pvalues * n_snps, 1)
        Pvalues_corrected = np.where(Pvalues_corrected == 0, f"<{1 / NbDistribution}", Pvalues_corrected)

        OutlierTest = pd.DataFrame({
            'SNP': data.index,
            'RSSobs': RSSobs_squared,
            'Pvalue': Pvalues_corrected
        })
    else:
        OUTLIERtest = False
        OutlierTest = pd.DataFrame()

    # Step 4: Perform distortion test
    X_all = data[BetaExposure].values
    Y_all = data[BetaOutcome].values
    W_all = data['Weights'].values
    model_all = sm.WLS(Y_all, X_all, weights=W_all)
    results_all = model_all.fit()

    BiasTest = {}
    if DISTORTIONtest and OUTLIERtest:
        significant_mask = OutlierTest['Pvalue'].apply(
            lambda x: float(x) if isinstance(x, str) and not x.startswith('<') else 0.0
        ).astype(float) <= SignifThreshold
        refOutlier = OutlierTest.loc[significant_mask, 'SNP'].tolist()

        if len(refOutlier) > 0 and len(refOutlier) < len(data):
            # Function to get random distortion coefficients
            def getRandomBias(refOutlier_indices):
                indices_remaining = list(set(range(n_snps)) - set(refOutlier_indices))
                random_indices = refOutlier_indices + list(
                    np.random.choice(indices_remaining, size=n_snps - len(refOutlier_indices), replace=False)
                )
                data_subset = data.iloc[random_indices]
                X_subset = data_subset[BetaExposure].values
                Y_subset = data_subset[BetaOutcome].values
                W_subset = data_subset['Weights'].values
                model_random = sm.WLS(Y_subset, X_subset, weights=W_subset)
                results_random = model_random.fit()
                return results_random.params[0]

            BiasExp = Parallel(n_jobs=n_jobs)(
                delayed(getRandomBias)(refOutlier) for _ in range(NbDistribution)
            )
            BiasExp = np.array(BiasExp)

            data_no_outliers = data.drop(index=refOutlier).reset_index(drop=True)
            X_no_outliers = data_no_outliers[BetaExposure].values
            Y_no_outliers = data_no_outliers[BetaOutcome].values
            W_no_outliers = data_no_outliers['Weights'].values
            model_no_outliers = sm.WLS(Y_no_outliers, X_no_outliers, weights=W_no_outliers)
            results_no_outliers = model_no_outliers.fit()

            BiasObs = (results_all.params[0] - results_no_outliers.params[0]) / np.abs(results_no_outliers.params[0])
            BiasExp = (results_all.params[0] - BiasExp) / np.abs(BiasExp)

            BiasTest_Pvalue = np.mean(np.abs(BiasExp) > np.abs(BiasObs))
            if BiasTest_Pvalue == 0:
                BiasTest_Pvalue = f"<{1 / NbDistribution}"

            BiasTest = {
                'Outliers Indices': refOutlier,
                'Distortion Coefficient (%)': 100 * BiasObs,
                'Pvalue': BiasTest_Pvalue
            }
        else:
            BiasTest = {
                'Outliers Indices': "All SNPs considered as outliers or no significant outliers",
                'Distortion Coefficient (%)': None,
                'Pvalue': None
            }

    # Step 5: Formatting results
    results = {
        'Global Test': GlobalTest
    }

    if OUTLIERtest:
        results['Outlier Test'] = OutlierTest

        if DISTORTIONtest:
            results['Distortion Test'] = BiasTest

    # Main MR results
    MR_results_list = []

    # Original MR
    MR_results_list.append({
        'Exposure': BetaExposure[0],
        'MR Analysis': 'Raw',
        'Causal Estimate': results_all.params[0],
        'Sd': results_all.bse[0],
        'T-stat': results_all.tvalues[0],
        'P-value': results_all.pvalues[0]
    })

    # Outlier-corrected MR
    if DISTORTIONtest and 'Outliers Indices' in BiasTest and \
            isinstance(BiasTest['Outliers Indices'], list) and len(BiasTest['Outliers Indices']) < len(data):
        data_no_outliers = data.drop(index=BiasTest['Outliers Indices']).reset_index(drop=True)
        X_no_outliers = data_no_outliers[BetaExposure].values
        Y_no_outliers = data_no_outliers[BetaOutcome].values
        W_no_outliers = data_no_outliers['Weights'].values
        model_no_outliers = sm.WLS(Y_no_outliers, X_no_outliers, weights=W_no_outliers)
        results_no_outliers = model_no_outliers.fit()

        MR_results_list.append({
            'Exposure': BetaExposure[0],
            'MR Analysis': 'Outlier-corrected',
            'Causal Estimate': results_no_outliers.params[0],
            'Sd': results_no_outliers.bse[0],
            'T-stat': results_no_outliers.tvalues[0],
            'P-value': results_no_outliers.pvalues[0]
        })
    else:
        MR_results_list.append({
            'Exposure': BetaExposure[0],
            'MR Analysis': 'Outlier-corrected',
            'Causal Estimate': np.nan,
            'Sd': np.nan,
            'T-stat': np.nan,
            'P-value': np.nan
        })

    MR_results = pd.DataFrame(MR_results_list)
    results['Main MR results'] = MR_results

    # Warning if necessary
    if OUTLIERtest and (len(data) / NbDistribution > SignifThreshold):
        print(f"Warning: Outlier test unstable. The significance threshold of {SignifThreshold} for the outlier test "
              f"is not achievable with only {NbDistribution} simulations to compute the null distribution. "
              f"The current precision is <{len(data) / NbDistribution}. Increase NbDistribution.")

    return results

In [None]:
import warnings
warnings.filterwarnings('ignore')

# Assuming you have a DataFrame 'data' with the necessary columns
results = mr_presso_single_optimized(
    BetaOutcome='Y_effect',
    BetaExposure='E1_effect',
    SdOutcome='Y_se',
    SdExposure='E1_se',
    data=df2,
    OUTLIERtest=True,
    DISTORTIONtest=True,  # Enable the DISTORTIONtest
    SignifThreshold=0.05,
    NbDistribution=8000,
    seed=80
)
results