In [None]:
import os
from typing import Sequence

import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.axes import Axes
from matplotlib.patches import Patch

sns.set_style('white')
colors = sns.color_palette('Set2')

def get_matchnum(matched_ms2_metadata):
    target_match_num = np.array([], dtype=int)
    decoy_match_num = np.array([], dtype=int)

    for metadata in matched_ms2_metadata.values():
        if metadata['decoy']:
            decoy_match_num = np.append(decoy_match_num, len(metadata['candidate_ms2_metadata']['peaks']))
        else:
            target_match_num = np.append(target_match_num, len(metadata['candidate_ms2_metadata']['peaks']))

    return target_match_num, decoy_match_num


def plot_match_num(files):
    def get_patches(ax: Axes):
        # 获取条形图中的条形
        patches = [patch for patch in ax.patches if patch.get_height() > 0]

        # 获取有效的区间和对应的频数
        intervals = []
        for patch in patches:
            width = patch.get_width()
            left = patch.get_x()
            label = f'{left:.2f}-{left + width:.2f}'
            intervals.append(label)

        return patches, intervals
    
    def set_patches_xtick(ax: Axes, patches, intervals):
        # 设置 x 轴刻度
        ax.set_xticks([patch.get_x() + patch.get_width() / 2 for patch in patches])
        ax.set_xticklabels(intervals, rotation=90)
    
    def cal_bin_width(ax: Axes, length: int):
        print(len(ax.patches), length)
        return length / len(ax.patches)
    
    for file in files:
        path = os.path.join(file, end)
        
        match_num_metadata = np.load(path, allow_pickle=True).item()

        axes: Sequence[Sequence[Axes]]

        fig, axes = plt.subplots(2, 2, figsize=(5, 2.7), dpi=100, sharex='col')

        target_match_num, decoy_match_num = get_matchnum(match_num_metadata)

        sns.histplot(target_match_num, ax=axes[0][0], bins='auto', alpha=0.5, color=colors[0])
        # target_patches, target_intervals = get_patches(axes[0][0])
        sns.histplot(decoy_match_num, ax=axes[0][1], bins='auto', alpha=0.5, color=colors[1])
        # decoy_patches, decoy_intervals = get_patches(axes[0][1])

        sns.kdeplot(data=target_match_num, fill=True, ax=axes[1][0], color=colors[0])
        sns.kdeplot(data=decoy_match_num, fill=True, ax=axes[1][1], color=colors[1])
        # set_patches_xtick(axes[1][0], target_patches, target_intervals)
        # set_patches_xtick(axes[1][1], decoy_patches, decoy_intervals)

        for i in range(2):
            for j in range(2):
                axes[i][j].set_yticks([])

        handles = [Patch(facecolor=colors[0], alpha=0.5), Patch(facecolor=colors[1], alpha=0.5)]
        labels = ['target', 'decoy']
        fig.legend(handles=handles, labels=labels, bbox_to_anchor=(0.35, 0.95, 1, 0.1), ncol=2, loc='lower left', frameon=False)
        fig.tight_layout()

dir = '/data/xp/train_test_data/astral_20231016_300ngPlasmaSample/match_ms2'
files = [os.path.join(dir, f) for f in os.listdir(dir)]
end = 'collection.npy'

In [None]:
files[2]

In [None]:
plot_match_num(
    files[:20]
)

In [None]:
plot_match_num(
    files[20:40]
)

In [None]:
files[22]

In [None]:
plot_match_num(
    files[40:60]
)

In [None]:
plot_match_num(
    files[60:80]
)

In [None]:
plot_match_num(
    files[80:100]
)

In [None]:
plot_match_num(
    files[100:120]
)

In [None]:
plot_match_num(
    files[120:140]
)

In [None]:
plot_match_num(
    files[140:160]
)

In [None]:
plot_match_num(
    files[160:180]
)

In [None]:
plot_match_num(
    files[180:]
)

In [None]:
files[2], files[22], files[68], files[84], files[114]

查看肽段六个离子的 XIC 图像
---

匹配原则为

$$
    |mz_1 - mz_2| < \delta mz_1
$$

可以自定义查看匹配成功图谱数量为 $N$ 的肽段六个离子的 XIC 图像，需要形成比较明显的色谱峰，图谱数量至少要达到 $10$ 以上

In [None]:
from typing import Sequence

import seaborn as sns
from matplotlib.axes import Axes
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D

colors = sns.color_palette('Paired')

def view_XIC(macth_ms2_metadata, num_matched_ms2: int, is_decoy=False):

    def drop_spines(ax: Axes):
        ax.set_frame_on(False)

    def get_ion_label(fragment):
        return f'{fragment[0]}{fragment[1]}'

    match_num = np.array([])
    decoy = np.array([], dtype=bool)

    for metadata in macth_ms2_metadata.values():
        num = len(metadata['candidate_ms2_metadata']['peaks'])
        match_num = np.append(match_num, num)
        decoy = np.append(decoy, metadata['decoy'])

    array_items = np.array(list(macth_ms2_metadata.items()), dtype=object)
    if not is_decoy:
        bool_experission = (match_num == num_matched_ms2) & ~decoy
    else:
        bool_experission = (match_num == num_matched_ms2) & decoy
    
    print(len(np.nonzero(bool_experission)[0]))

    select_indices = np.nonzero(bool_experission)[0][:20]
    select_view_metadata = array_items[select_indices]

    for modified_peptide, metadata in select_view_metadata:
        print(metadata['FeaturedIons'])
        modified_peptide = tuple(modified_peptide)
        spectrum = metadata['Spectrum']
        mz, intensity = spectrum[:, 0], spectrum[:, 1]
        rts = metadata['candidate_ms2_metadata']['rt']
        peaks = metadata['candidate_ms2_metadata']['peaks']

        if not is_decoy:
            fragment = metadata['Fragment']
            non_match_color = '#b6c9b8' # , '#bae3eb', '#eaeae8'
            non_match_ion_handles = []
            non_match_ion_labels = []

            handles, labels = [], []

        view_data = []

        for select_ion_index in range(0, 6):
            if mz[select_ion_index] == 0:
                continue
            if not is_decoy:
                ion_label = get_ion_label(fragment[select_ion_index])

            rt_seq = np.array([])
            intensity = np.array([])

            for rt, peak in zip(rts, peaks):
                peak_intensity = peak[select_ion_index][1]
                if peak_intensity > 0:
                    intensity = np.append(intensity, peak_intensity)
                    rt_seq = np.append(rt_seq, rt)
            
            if len(intensity) > 0:
                view_data.append({
                    'x': rt_seq,
                    'y': intensity,
                    'color': colors[select_ion_index]
                })

            if not is_decoy:
                if len(intensity) == 0:
                    non_match_ion_handles.append(Line2D([0], [0], color=non_match_color))
                    non_match_ion_labels.append(f'{ion_label}')
                else:
                    handles.append(Line2D([0], [0], color=colors[select_ion_index]))
                    labels.append(f'{ion_label}')
        if not is_decoy:
            handles.extend(non_match_ion_handles)
            labels.extend(non_match_ion_labels)

        axes: Sequence[Axes]
        fig, axes = plt.subplots(len(view_data), 1, figsize=(5, 5), dpi=300, sharex=True)
        
        for data, ax in zip(view_data, axes):
            sns.lineplot(x=data['x'], y=data['y'], color=data['color'], ax=ax, marker='o', markersize=3)
        """
            bbox_to_anchor 参数 (x, y, width, height)
            x, y 表示你基准点的坐标，默认的 loc = 'lower left' 就是图例左下角，它在整个 figure 中的坐标
            width: 图例宽度
            height: 图例高度
        """
        for ax in axes:
            ax.set_yticks([])
        for ax in axes:
            drop_spines(ax)
        if not is_decoy:
            axes[0].legend(handles, labels, bbox_to_anchor=(0, 1.02, 1, 0.1),  loc='lower left', mode='expand', borderaxespad=0., frameon=False, ncol=6, alignment='center')
        
        axes[-1].set_xlabel('Retention Time(min)')

        fig.suptitle(f'{modified_peptide[0]}.{modified_peptide[1]}', fontsize=10)
        fig.set_edgecolor('black')
        fig.set_linewidth(1.0)

        fig.tight_layout()

In [None]:
import os

import seaborn as sns
import numpy as np

sns.set_style('white')

dir = '/data/xp/train_test_data/astral_20231016_300ngPlasmaSample/match_ms2'

files = [os.path.join(dir, f) for f in os.listdir(dir)]
file = os.path.join(dir, '20231016_300ngPlasmaSample_7p5min_100um-13cm_P1-46')

end = 'collection.npy'

path = os.path.join(file, end)

data = np.load(path, allow_pickle=True).item()

view_XIC(data, 100, False)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.axes import Axes
from matplotlib.lines import Line2D

colors = sns.color_palette('Paired')

sns.set_style('white')

def plot_xic(key, spectrum, fragment, peaks, rts):
    def get_ion_label(fragment):
        return f'{fragment[0]}{fragment[1]}'
    
    def drop_spines(ax: Axes):
        ax.set_frame_on(False)

    mz = spectrum[:, 0]
    view_data = []
    non_match_ion_handles = []
    non_match_ion_labels = []
    handles = []
    labels = []
    non_match_color = '#b6c9b8'
    for select_ion_index in range(0, 6):
        if mz[select_ion_index] == 0:
            continue
        ion_label = get_ion_label(fragment[select_ion_index])

        rt_seq = np.array([])
        intensity = np.array([])

        for rt, peak in zip(rts, peaks):
            peak_intensity = peak[select_ion_index][1]
            if peak_intensity > 0:
                intensity = np.append(intensity, peak_intensity)
                rt_seq = np.append(rt_seq, rt)
        
        if len(intensity) > 0:
            view_data.append({
                'x': rt_seq,
                'y': intensity,
                'color': colors[select_ion_index]
            })

        if len(intensity) == 0:
            non_match_ion_handles.append(Line2D([0], [0], color=non_match_color))
            non_match_ion_labels.append(f'{ion_label}')
        else:
            handles.append(Line2D([0], [0], color=colors[select_ion_index]))
            labels.append(f'{ion_label}')
    handles.extend(non_match_ion_handles)
    labels.extend(non_match_ion_labels)

    axes: Sequence[Axes]
    fig, axes = plt.subplots(len(view_data), 1, figsize=(6, 5), dpi=300, sharex=True)
    
    for data, ax in zip(view_data, axes):
        sns.lineplot(x=data['x'], y=data['y'], color=data['color'], ax=ax, marker='o', markersize=3)
    
    for ax in axes:
        ax.set_yticks([])
        
    for ax in axes:
        drop_spines(ax)

    axes[0].legend(handles, labels, bbox_to_anchor=(0, 1.02, 1, 0.1),  loc='lower left', mode='expand', borderaxespad=0., frameon=False, ncol=6, alignment='center')

    axes[-1].set_xlabel('Retention Time(min)')

    fig.set_edgecolor('black')
    fig.set_linewidth(1.0)
    fig.suptitle(f'{key[0]}.{key[1]}')

def get_topN_metadata(data, key, scores_func, n):
    metadata = data[key]

    peaks = metadata['candidate_ms2_metadata']['peaks']
    spectrum = metadata['Spectrum']
    rts = metadata['candidate_ms2_metadata']['rt']
    ids = metadata['candidate_ms2_metadata']['id']
    fragment = metadata['Fragment']
    ex_inten = peaks[:, :, 1]
    ref_inten = spectrum[:, 1]
    cosine_scores = scores_func([ref_inten], ex_inten)[0]
    argmax_indices = np.argsort(cosine_scores)[::-1][:n]
    argmax_indices_top10 = np.sort(argmax_indices)

    return spectrum, fragment, peaks[argmax_indices_top10], rts[argmax_indices_top10], ids[argmax_indices_top10]

In [None]:
import os
import numpy as np

dir = '/data/xp/train_test_data/astral_20231016_300ngPlasmaSample/match_ms2'
file = '20231016_300ngPlasmaSample_7p5min_100um-13cm_P4-22'
end = 'collection.npy'
file = os.path.join(dir, file, end)
data = np.load(file, allow_pickle=True).item()
key = ('_ASQSVSSSYLTWYQQKPGQAPR_', 4)

print(data[key]['FeaturedIons'])

plot_xic(key, data[key]['Spectrum'], data[key]['Fragment'],data[key]['candidate_ms2_metadata']['peaks'], data[key]['candidate_ms2_metadata']['rt'])

In [None]:
import os
import numpy as np

dir = '/data/xp/train_test_data/astral_20231016_300ngPlasmaSample/filter_ms2/penalty_MAE_peaksum'
file = '20231016_300ngPlasmaSample_7p5min_100um-13cm_P1-46'
end = 'collection.npy'
file = os.path.join(dir, file, end)

data = np.load(file, allow_pickle=True).item()

keys = [
    ('_YVGGQEHFAHLLILR_', 3), ('_YFIDFVAR_', 2), ('_SSPVVIDASTAIDAPSNLR_', 3), ('_GQGTLSVVTMYHAK_', 2),
    ('_EKAEGDVAALNR_', 3), ('_DRDGNTLTYYR_', 3), ('_NTLHLQM[Oxidation (M)]NSLR_', 3), ('_GGPLDGTYR_', 2),
    ('_RLGMFNIQHC[Carbamidomethyl (C)]K_', 3), ('_FVTQAEGAK_', 2)   
]

for key in keys:
    plot_xic(key, data[key]['Spectrum'], data[key]['Fragment'],data[key]['filter_ms2_metadata']['peaks'], data[key]['filter_ms2_metadata']['rt'])

In [None]:
import numpy.typing as npt

def cosine(A: npt.NDArray, B: npt.NDArray):
    norm_A = np.linalg.norm(A, axis=1, keepdims=True)
    norm_B = np.linalg.norm(B, axis=1, keepdims=True)
    norlize_A = A / norm_A
    norlize_B = B / norm_B
    return np.dot(norlize_A, norlize_B.T)

def MAE(A: npt.NDArray, B: npt.NDArray):
    sum_norm_B = B / np.sum(B, axis=1, keepdims=True)
    scores = np.zeros((len(A), len(B)))
    for i in range(len(A)):
        for j in range(len(B)):
            scores[i, j] = np.sum(np.abs(A[i] - sum_norm_B[j]))
    return -scores

def penalty_MAE(A: npt.NDArray, B: npt.NDArray):
    sum_norm_B = B / np.sum(B, axis=1, keepdims=True)
    scores = np.zeros((len(A), len(B)))
    sum_norm_B[sum_norm_B == 0] = 1
    for i in range(len(A)):
        for j in range(len(B)):
            scores[i, j] = np.sum(np.abs(A[i] - sum_norm_B[j]))
    return -scores

def peaksum(A: npt.NDArray, B: npt.NDArray):
    max_norm_B = B / np.max(B)
    scores = np.zeros((len(A), len(B)))
    for i in range(len(A)):
        for j in range(len(B)):
            scores[i, j] = np.sum(max_norm_B[j])
    return scores

def penalty_MAE_peaksum(A: npt.NDArray, B: npt.NDArray):
    peaksum_score = peaksum(A, B)
    penalty_MAE_score = penalty_MAE(A, B)
    return peaksum_score + penalty_MAE_score

n = 6

for key in keys:
    spectrum, fragment, peaks, rts, ids = get_topN_metadata(data, key, cosine, n)
    plot_xic(key, spectrum, fragment, peaks, rts)

In [None]:
for key in keys:
    spectrum, fragment, peaks, rts, ids = get_topN_metadata(data, key, MAE, n)
    plot_xic(key, spectrum, fragment, peaks, rts)

In [None]:
for key in keys:
    spectrum, fragment, peaks, rts, ids = get_topN_metadata(data, key, penalty_MAE, n)
    plot_xic(key, spectrum, fragment, peaks, rts)

In [None]:
for key in keys:
    spectrum, fragment, peaks, rts, ids = get_topN_metadata(data, key, peaksum, n)
    plot_xic(key, spectrum, fragment, peaks, rts)

In [None]:
for key in keys:
    spectrum, fragment, peaks, rts, ids = get_topN_metadata(data, key, penalty_MAE_peaksum, n)
    plot_xic(key, spectrum, fragment, peaks, rts)

In [None]:
import numpy as np

path = '/data/xp/train_test_data/astral_20231016_300ngPlasmaSample/train/identification/cosine/collection.npy'

data = np.load(path, allow_pickle=True)

In [None]:
len(np.nonzero(data['Mask'] == True)[0]) / len(data['Mask']) / 10

In [None]:
path = '/data/xp/train_test_data/astral_20231016_300ngPlasmaSample/test/identification/cosine/collection.npy'

data1 = np.load(path, allow_pickle=True)

In [1]:
import numpy as np

dtype = np.dtype([('x', np.float32, (1, 6)), ('y', np.float32, (5, ))])

x = np.random.rand(6)
y = np.random.rand(5)

In [2]:
d = []
for i in range(10):
    d.append(np.array([(x, y)], dtype=dtype))

In [3]:
data = np.concatenate(d, axis=0)

In [8]:
import torch

data

array([([[0.48884606, 0.47345838, 0.24402949, 0.6571189 , 0.3344257 , 0.54226094]], [0.2928983 , 0.7954803 , 0.59721065, 0.43082952, 0.85167235]),
       ([[0.48884606, 0.47345838, 0.24402949, 0.6571189 , 0.3344257 , 0.54226094]], [0.2928983 , 0.7954803 , 0.59721065, 0.43082952, 0.85167235]),
       ([[0.48884606, 0.47345838, 0.24402949, 0.6571189 , 0.3344257 , 0.54226094]], [0.2928983 , 0.7954803 , 0.59721065, 0.43082952, 0.85167235]),
       ([[0.48884606, 0.47345838, 0.24402949, 0.6571189 , 0.3344257 , 0.54226094]], [0.2928983 , 0.7954803 , 0.59721065, 0.43082952, 0.85167235]),
       ([[0.48884606, 0.47345838, 0.24402949, 0.6571189 , 0.3344257 , 0.54226094]], [0.2928983 , 0.7954803 , 0.59721065, 0.43082952, 0.85167235]),
       ([[0.48884606, 0.47345838, 0.24402949, 0.6571189 , 0.3344257 , 0.54226094]], [0.2928983 , 0.7954803 , 0.59721065, 0.43082952, 0.85167235]),
       ([[0.48884606, 0.47345838, 0.24402949, 0.6571189 , 0.3344257 , 0.54226094]], [0.2928983 , 0.7954803 , 0.5972106