In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
from definitions import ROOT_DIR
import matplotlib.pyplot as plt
from matplotlib import rc, rc_context
import matplotlib
from anndata import AnnData
import scanpy as sc

matplotlib.rcParams['pdf.fonttype'] = 42
rc('font',**{'family':'sans-serif',
             'sans-serif':['Arial'],
             'size':12})

In [2]:
def filter_neutral_losses(df, neutral_losses=['']):
    '''
    Filter out entries for ions with neutral losses that are not in the list provided
    If neutral_loss value us "only_nl", than consider only ions that have neutral losses
    '''
    
    if neutral_losses == "only_nl":
        df = df[df.neutral_loss != ""]
    elif neutral_losses != None:
        df = df[df.neutral_loss.isin(neutral_losses)]
    return df


def filter_adducts(df, adducts=['']):
    '''
    Filter out entries for ions with adducts that are not in the list provided
    '''
    if adducts != None:
        df = df[df.adduct.isin(adducts)]
    return df


def filter_polarity(df, polarity=None):
    '''
    Filter out entries based on polarity pol ['positive', 'negative']
    '''
    if polarity != None:
        df = df[df.Polarity == polarity]
    return df


def filter_data(data, polarity=None, adducts=None, neutral_losses=None):
    '''
    Apply polarity, adduct and neutral_loss filters
    '''
    data = filter_polarity(data, polarity)
    data = filter_adducts(data, adducts)
    data = filter_neutral_losses(data, neutral_losses)
    return data


def group_by_molecule(df, groupby_columns):
    '''
    Aggregate intensity and detection values per groupby columns
    '''          
    data = df.groupby(groupby_columns).agg({
        'detectability' : 'max', # here detectability of metabolite is set to 1 if any of it's ions was detected
    }).reset_index()
    return data


def assemble_adata(pca):
    observables = pca.index.to_frame(index=False)
    for col in observables.columns:
        observables[col] = observables[col].astype('category')  
    variables = pca.columns.to_frame(index=False)
    adata = AnnData(pca.values, obs=observables, var=variables)
    return adata

# Interlaboratory comparison PCA

In [3]:
p_root_dir = Path(ROOT_DIR)
p_data = p_root_dir / "data"
p_out = p_root_dir / "plots" / 'PCA'
p_out.mkdir(exist_ok=True, parents=True)

# Predictions
p_predictions = p_data / "Interlab_data_19Apr2023.csv"# "All_data_19Apr2023.csv" #
source = p_predictions.stem

In [4]:
# Load predictions and format neutral loss column
df = pd.read_csv(p_predictions) 
df.neutral_loss.fillna('', inplace=True)

# Only consider data of detected ions
threshold = 0.8
df['detectability'] = df.pred_val >= threshold
data = df[df.detectability]

In [5]:
metadata_columns = ['Sample name', 
                    'Participant lab', 
                    'Technology', 
                    'Original technology', 
                    'Ionisation source',
                    'Mass analyser', 
                    'Source pressure',
                    'Matrix short',
                    'Polarity', 
                    'Slide code'
                   ]

for polarity in ['positive', 'negative']:

    # Choose polarity, filter adducts and neutral losses
    filtered_data = filter_data(data,
                                polarity=polarity, 
                                neutral_losses=['']
                               )

    # Summarise data per metabolite and dataset
    molecule_data = group_by_molecule(filtered_data, groupby_columns=np.append(metadata_columns, 'name_short').tolist())

    # Reshape
    pca = molecule_data.pivot_table(values=['detectability'],  
                                     index=metadata_columns, 
                                     columns='name_short',
                                     fill_value=0)
    
    # Assemble adata
    adata = assemble_adata(pca)

    # Apply Z-score normalisation: If you use only detectability for PCA, this is not needed
    sc.pp.scale(adata, zero_center=True) 

    # Compute PCA
    sc.tl.pca(adata, svd_solver='arpack')
    
    fname = f"PCA_interlab_{polarity}_{source}"
    labels = [      'Participant lab', 
                    'Technology',
                    'Ionisation source',
                    'Mass analyser', 
                    'Source pressure'
                   ]


    with rc_context():    
        ax = sc.pl.pca(adata, 
                       components=['1, 2'], 
                       color=labels,
                       size=200, 
                       ncols=1,
                       legend_loc='right margin',
                       show=False, 
                       wspace=0.5,
#                        palette=['#1965B0', '#DC050C', '#4EB265', '#7BAFDE', '#F7F056'],
                       edgecolor='#777777',
                       linewidth=0.1,
                       annotate_var_explained = True)

        for n in range(len(labels)):
            ax[n].set_box_aspect(1)

        plt.tight_layout()
        plt.savefig(p_out/ f"test{fname}.png")
        plt.savefig(p_out / f"{fname}.pdf", transparent=True)
        plt.close()

  adata = AnnData(pca.values, obs=observables, var=variables)
  cax = scatter(
  cax = scatter(
  cax = scatter(
  cax = scatter(
  cax = scatter(
  plt.tight_layout()
  adata = AnnData(pca.values, obs=observables, var=variables)
  cax = scatter(
  cax = scatter(
  cax = scatter(
  cax = scatter(
  cax = scatter(
  plt.tight_layout()
