In [1]:
import matplotlib
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go

In [None]:
A = 329.0525
C = 305.0413
G = 345.0474
U = 306.0253
H2O = 18.0106
M = 14.01

P = 79.9663
Na = 21.9819
K = 37.9559
CO = 27.9949

In [None]:
def thermo_df(df, key_rows_only=True):
    """Transfer the format of samples that were previously exported from Thermo BioPharma Finder.
    
    :param df: pandas DataFrame, the original sample.
    :param key_rows_only: bool, if handles the major columns only.
    :return: the dataset with a format that our algorithms can process.
    """
    df = df.rename(columns={'Monoisotopic Mass': 'Mass', 'Apex RT': 'RT', 'Sum Intensity': 'Vol',
                           'Relative Abundance': 'RA', 'Fractional Abundance': 'FA'})
    if key_rows_only:
        try:
            vols = ['Mass', 'RT', 'Vol', 'RA', 'FA']
            df = df[vols].dropna()
        except KeyError as err:
            vols = ['Mass', 'RT', 'Vol']
            df = df[vols]
        df = df.astype('float64')
    return df

def load_data(fpath, csv_format=False):
    """load the dataset from given path.
    
    :param fpath: str, the path to the excel/CSV file.
    :csv_format: bool, if the file path is in CSV format, default is Excel.
    """
    func = pd.read_csv if csv_format else pd.read_excel
    df = func(fpath)
    df = thermo_df(df)
    return df

In [None]:
def drop_dups(df_dup, digit=0):
    """drop duplicated masses, based on Mass
    """
    df = df_dup.copy()
    df['RoundedMass'] = np.round(df.Mass, digit)
    df = df.sort_values('Vol', ascending=False).drop_duplicates('RoundedMass')
    return df.sort_values('Mass')

In [None]:
# def match_dfs(df_src, df_dst, ppm=10, shift=0):
#     """find the subset contains common Mass values.
    
#     :param df_src, df_dst: pandas DataFrame, two datasets to find common subset from.
#     :param ppm: int, the PPM used to compare two Mass values.
#     :param shift: float, the difference of two Mass values.
#     :return: the subset dots from df_dst that have similiar Mass values in df_src.
#     """
#     def _find_mass(df, mass, ppm=10):
#         if df.empty:
#             return df
#         df = df[(df.Mass < mass+1) & (df.Mass > mass-1)]
#         if df.shape[0] == 0:
#             return df
        
#         df_ppm = abs(1E6 * (df.Mass - mass)) / mass
#         mask = df_ppm < ppm
#         df_found = df[mask].copy()
#         df_found['PPM'] = df_ppm[mask]
#         return df_found
    
#     df_src = df_src.copy()
#     if shift != 0:
#         df_src.Mass += shift
#     idxs = list()
#     for idx, row in df_src.iterrows():
#         mass = row.Mass
#         df_res = _find_mass(df_dst, mass, ppm)
#         if not df_res.empty:
#             df_src.loc[idx, 'Match'] = True
#             idxs.extend(list(df_res.index))
    
#     idxs = list(set(idxs))
#     df_common = df_dst[df_dst.index.isin(idxs)]
#     return df_common.copy()

def match_dfs(df_src, df_dst, ppm=10, shift=0, inplace=False, copy_cols=[]):
    """Find the subset contains common mass values that exist in both df_src and df_dst.
    
    :return: The subset of df_dst.
    """
    def _find_mass(df, mass, ppm=10):
        if df.empty:
            return df
        df = df[(df.Mass < mass+1) & (df.Mass > mass-1)]
        if df.shape[0] == 0:
            return df
        
        df_ppm = abs(1E6 * (df.Mass - mass)) / mass
        mask = df_ppm < ppm
        df_found = df[mask].copy()
        df_found['PPM'] = df_ppm[mask]
        return df_found
    
    df_dst = df_dst.reset_index(drop=True)
    if not inplace:
        df_src = df_src.copy()
        df_src.reset_index(inplace=True)
    if shift != 0:
        df_src.Mass += shift
    idxs = list()
    df_src['Match'] = False
    for idx, row in df_src.iterrows():
        mass = row.Mass
        df_res = _find_mass(df_dst, mass, ppm)
        if not df_res.empty:
            df_src.loc[idx, 'Match'] = True
            df_src.loc[idx, 'MatchedMass'] = df_res.iloc[0].Mass
            if copy_cols:
                for col in copy_cols:
                    df_src.loc[idx, col] = df_res.iloc[0][col]
            a, b = df_src.loc[idx, 'Mass'], df_src.loc[idx, 'MatchedMass']
            df_src.loc[idx, 'PPM'] = abs(a-b)*1E6/b
            idxs.extend(list(df_res.index))
    
    idxs = list(set(idxs))
    df_common = df_dst[df_dst.index.isin(idxs)]
    return df_common.copy()


def match_dfs_v2(df_src, df_dst, ppm=10, shift=0, inplace=False, copy_cols=[]):
    """Find the subset contains common mass values that exist in both df_src and df_dst.
    
    :return: The subset of df_dst.
    """
    def _find_mass(df, mass, ppm=10):
        if df.empty:
            return df
        df = df[(df.Mass < mass+1) & (df.Mass > mass-1)]
        if df.shape[0] == 0:
            return df
        
        df_ppm = abs(1E6 * (df.Mass - mass)) / mass
        mask = df_ppm < ppm
        df_found = df[mask].copy()
        df_found['PPM'] = df_ppm[mask]
        return df_found
    
    df_src = df_src.reset_index(drop=True)
    if not inplace:
        df_dst = df_dst.copy()
        df_dst.reset_index(inplace=True)
    if shift != 0:
        df_src.Mass += shift
    idxs = list()
    df_dst['Match'] = False
    for idx, row in df_dst.iterrows():
        mass = row.Mass
        df_res = _find_mass(df_src, mass, ppm)
        if not df_res.empty:
            df_dst.loc[idx, 'Match'] = True
            df_dst.loc[idx, 'MatchedMass'] = df_res.iloc[0].Mass
            df_dst.loc[idx, 'Sft'] = shift
            if copy_cols:
                for col in copy_cols:
                    df_dst.loc[idx, col] = df_res.iloc[0][col]
            a, b = df_dst.loc[idx, 'Mass'], df_dst.loc[idx, 'MatchedMass']
            df_dst.loc[idx, 'PPM'] = abs(a-b)*1E6/b
            idxs.extend(list(df_res.index))
    
    idxs = list(set(idxs))
    df_common = df_dst[df_dst.index.isin(idxs)]
    return df_common.copy()


In [None]:
def plotly_zones(df_a, df_b, y='RT', title=None, names=None):
    """plot scatters for two datsets.
    
    :param df_a, df_b: pandas DataFrame, datasets need to be plot.
    :param y: the y axis to be used in a 2D figure.
    :param title: the title of the figure.
    :param names: the names of the datasets.
    """
    dfa = df_a.copy()
    dfb = df_b.copy()
    if names:
        dfa['type'] = names[0]
        dfb['type'] = names[1]
    else:
        dfa['type'] = 'ladder_a'
        dfb['type'] = 'ladder_b'
    df = pd.concat([dfa, dfb])
    fig = px.scatter(df, x='Mass', y=y, color='type')
    if title:
        fig.update_layout(title=title)
    fig.show()
    
def plotly_zone(df, y='RT', title=None):
    """plot scatters for the datset.
    
    :param df: pandas DataFrame, dataset need to be plot.
    :param y: the y axis to be used in a 2D figure.
    :param title: the title of the figure.
    """
    fig = px.scatter(df, x='Mass', y=y)
    if title:
        fig.update_layout(title=title)
    fig.show()

def plotly_multi_zones(dfs, y='RT', title=None, names=None):
    """plot scatters for multiple datsets.
    
    :param dfs: a list of pandas DataFrame, datasets need to be plot.
    :param y: the y axis to be used in a 2D figure.
    :param title: the title of the figure.
    :param names: the names of the datasets.
    """
    df_list = list()
    for idx, df in enumerate(dfs):
        dfa = df.copy()
        if names:
            dfa['type'] = names[idx]
        else:
            dfa['type'] = 'ladder_{}'.format(idx+1)
        df_list.append(dfa)
    df = pd.concat(df_list)
    fig = px.scatter(df, x='Mass', y=y, color='type')
    if title:
        fig.update_layout(title=title)
    fig.show()
    

In [None]:
def plotly_basecalling(df, mass_pairs, annotate=False, endpoints=pd.DataFrame(), 
                       df_ori=pd.DataFrame(), y='RT', title=None, mark_vol=False):
    """plot compounds and their basecallings.
    
    :param df, mass_pairs: the results of the function mass_sum().
    """
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=df.Mass, y=df[y], mode='markers'))
    
    if annotate:
        for idx, row in df.iterrows():
            fig.add_annotation(x=row.Mass, y=row[y], yshift=-10,
                text='{:2f}'.format(row.Mass),
                showarrow=False,
                arrowhead=1)
    
    if mark_vol:
        for idx, row in df.iterrows():
            fig.add_annotation(x=row.Mass, y=row[y], yshift=-10,
                text='{:.2f}'.format(row.Vol),
                showarrow=False,
                arrowhead=1)
        
    if not df_ori.empty:
        fig.add_trace(go.Scatter(x=df_ori.Mass, y=df_ori['y'], mode='markers'))
    
    for t in mass_pairs:
        df_pair = df[df.Mass.isin(t)]
        if df_pair.empty:
            continue
        fig.add_trace(go.Scatter(x=df_pair.Mass, y=df_pair[y], mode='lines+markers', 
                                 name=t[2], line=go.scatter.Line(color="pink")))
        
        idmax = df_pair['Mass'].idxmax()
        x_pos = df_pair.Mass.mean()
        y_pos = df_pair[y].mean()
        fig.add_annotation(x=x_pos, y=y_pos, yshift=5,
            text=t[2],
            showarrow=False,
            arrowhead=1)
        
    if not endpoints.empty:
        fig.add_trace(go.Scatter(x=endpoints.Mass, y=endpoints[y], mode='markers'))
        print(endpoints[['Mass', 'RT', 'Vol']])
        
    if title:
        fig.update_layout(title=title)
    fig.show()

In [None]:
def plot_zones(df3p, df5p, trend=False, y='RT', figsize=(16, 12), 
               title='', xlabel='Monoisotopic Mass (Da)', ylabel='Retention Time (min)', 
               colors=[None, None]):
    plt.figure(figsize=figsize)
    plt.title(title)
    plt.xlabel(xlabel, fontname="Arial", fontsize=15, color='black')
    plt.ylabel(ylabel, fontname="Arial", fontsize=15, color='black')
    plt.xticks(fontname="Arial", size=13, color='black')
    plt.yticks(fontname="Arial", size=13, color='black')
    if trend:
        sns.regplot(df3p.Mass, df3p[y])
        sns.regplot(df5p.Mass, df5p[y], order=2)
    else:
        plt.scatter(df3p.Mass, df3p[y], c=colors[0])
        plt.scatter(df5p.Mass, df5p[y], c=colors[1])
    
    return plt

def plot_basecalling(df, mass_pairs, endpoints=pd.DataFrame(), 
                     xlabel='Monoisotopic Mass (Da)', ylabel='Retention Time (min)',
                     annotate=False, plt=None, title='', y='RT', figsize=(12, 9)):
    if not plt:
        plt = matplotlib.pyplot
    fig = plt.figure(figsize=figsize)
    plt.title(title)
    plt.xlabel(xlabel, fontname="Arial", fontsize=15, color='black')
    plt.ylabel(ylabel, fontname="Arial", fontsize=15, color='black')
    
    plt.scatter(df.Mass, df[y], color='C0')
    for idx, row in df.iterrows():
        x_pos = row.Mass
        y_pos = row[y]
        mass = '{:.2f}'.format(x_pos)

    for t in mass_pairs:
        df_pair = df[df.Mass.isin(t)]
        if df_pair.empty:
            continue
        plt.plot(df_pair.Mass, df_pair[y], marker='o', color='black')
        
            
        idmax = df_pair.Mass.idxmax()
        x_pos = df_pair.Mass.mean()
        y_pos = df_pair[y].mean()
        plt.annotate(text=t[2], size=15, xy=(x_pos, y_pos), 
                     textcoords="offset points", xytext=(-10, 10), ha='center', color='black')
        
        if not annotate:
            continue
            
        mass = '{:.2f}'.format(df_pair.iloc[0].Mass)
        plt.annotate(text=mass, size=13, xy=(df_pair.iloc[0].Mass, df_pair.iloc[0][y]), 
                     textcoords="offset points", xytext=(10, -20), ha='center')
        mass = '{:.2f}'.format(df_pair.iloc[1].Mass)
        plt.annotate(text=mass, size=13, xy=(df_pair.iloc[1].Mass, df_pair.iloc[1][y]), 
                     textcoords="offset points", xytext=(10, -20), ha='center')

    if not endpoints.empty:
        plt.scatter(endpoints.Mass, endpoints.RT, color='r')
        print(endpoints[['Mass', 'RT', 'Vol']])
        
    return plt, fig