In [1]:
from openpyxl import load_workbook, Workbook
import os
from tkinter.filedialog import askopenfilename
from numpy import mean
from array import array  # 识别getpar中的array
from bruker.api.topspin import Topspin
from bruker.data.nmr import *
from math import floor  # 精确取整
from decimal import Decimal    #精确四舍五入
import time
top = Topspin()
dp = top.getDataProvider()

In [2]:
def impxlsx():
    """ Read xls get the file context
    Return file context"""
    filepaths = askopenfilename()
    if filepaths:
        file = filepaths
    filepath = file.split('.')[-2]
    wb = load_workbook(file)
    sheet = wb.active
    secondcolum = [cell.value for cell in list(sheet.columns)[1]]

    return secondcolum, filepath

In [3]:
class Getpar:
    """get peaklist start chemical shift, end chemical shift """

    def __init__(self, pathloc):
        self.path = pathloc
        self.proton = dp.getNMRData(pathloc)

        if self.proton is None:
            top.showError("There is no data in %s " % pathloc)
            raise Exception("Open a 1D data set in Topspin")
        # 这里需要提高没有数据时的鲁棒性
        # print("processing on dataset: %s" % pathloc)

    def reprospec(self, mi=1):
        """set MI and lauch ppf to produce peak list """
        self.proton.setPar("MI", mi)
        self.proton.launch("ppf")  # 该命令可替换为 ppd基于倒数的算法  ppl指定区域 ppf 全谱取峰

    def getpeaks(self):
        """get the list"""
        peaklist = self.proton.getPeakList()  # 获取peak pick数据集
        if not peaklist:
            top.showError("There is no peaks info in %s " % self.path)
            print('There is no peaks info in %s ' % self.path)
        peakchemicalsfits = []  # 新建空峰位置列表
        for peak in peaklist:  # 信号峰位置
            peakchemicalsfits.extend(peak['position'])
        return array.array('d', peakchemicalsfits)

    def getpar(self):
        """get start range, end range, point number(SI),location of peak, intensity of points"""
        specdata = self.proton.getSpecDataPoints()

        requiredpar = {
            'srange': specdata['physicalRanges'][0]['start'],  # 获取谱图左边界
            'erange': specdata['physicalRanges'][0]['end'],  # 获取谱图右边界
            'pointnum': specdata['indexRanges'][0]['numberOfPoints'],  # 获取谱图点数（处理点数）numberOfPoints   of 0f 这里布鲁克原函数有错
            'peakpositions': self.getpeaks(),  # peak用array运算速度比列表快，
            'datapoints': specdata['dataPoints']  # 获取谱图数据点 数据类型为array使用类似list
        }  # 返回值考虑用键值对更稳定
        print("getting pars on dataset: %s" % self.path)
        return requiredpar


In [4]:
def sifer(paths, window=0.0025,peaknum=3):
    allpeaks = [Getpar(path).getpeaks() for path in paths]
    stdlist = [[-999]]
    for i in range(len(allpeaks)):
        for peak in allpeaks[i]:
            for j in range(len(stdlist)):
                if mean(stdlist[j]) - window <= peak <= mean(stdlist[j]) + window:
                    stdlist[j].append(peak)
                    break
                else:
                    if j == len(stdlist) - 1:
                        stdlist.append([peak])
    del stdlist[0]

    for peaklist in stdlist:
        if len(peaklist) <= peaknum:
            stdlist.remove(peaklist)
    resultlist = [mean(stdlist[k]) for k in range(len(stdlist))]
    resultlist.sort(reverse=True)
    print('finished peak collection')
    return resultlist

In [5]:
def peakin(centre, peaklist, window=0.0025):
    """ To find is any peak in list located in give window """
    for i in range(len(peaklist)):
        if (centre - window) <= peaklist[i] <= (centre + window):
            return peaklist[i]
            break
        elif i == len(peaklist) - 1:
            return None

In [6]:
def findhighpoint(peak, par):
    """find the highest intensity of servral point near paek"""
    s_range = par['srange']
    e_range = par['erange']
    point_num = par['pointnum']
    data_points = par['datapoints']

    left_point = floor((s_range - peak) / (s_range - e_range) * point_num)  # floor向下取整，峰
    max_value = max(data_points[left_point], data_points[left_point - 1], data_points[left_point + 1],
                    data_points[left_point + 2])  # 从left point附近四个点取最大值
    high_point = data_points.index(max_value)  # 通过最大值逆向索引点的位置
    return high_point


In [7]:
def calhighpoint(peak, par):
    """find the closest point near paek"""
    s_range = str(par['srange'])
    e_range = str(par['erange'])
    point_num = str(par['pointnum'])

    # left_point = floor((s_range - peak) / (s_range - e_range) * point_num)  # floor向下取整，峰
    close_point = Decimal((Decimal(s_range) - Decimal(peak)) / (Decimal(s_range) - Decimal(e_range)) * Decimal(point_num)).quantize(Decimal('0'))
    #20220927修改为十进制精确四舍五入
    return int(close_point)

In [8]:
def pointint(pos, data, sumpoint=11):
    sumint = 0
    startloc = int(pos - (sumpoint - 1) / 2)

    for calt in range(sumpoint):
        sumint += data[startloc + calt]
    return sumint

In [9]:
def exp2xlsx(exlname, colnum, colinput):
    if os.path.exists(exlname):
        wb = load_workbook(exlname)
    else:
        wb = Workbook()  # workbook = openpyxl.Workbook(encoding='UTF-8')
    worksheet = wb.active
    for i in range(len(colinput)):
        worksheet.cell(row=i + 1, column=colnum).value = colinput[i]  # 写入列
    wb.save(filename=exlname)


# 主程序开始位置

In [10]:
context, filepath = impxlsx()  # import pathfile and filepath .xlsx
time_start=time.time()
paths = list(filter(None, context[3:]))  # get the paths of nmr data
pointnum = context[0]
win = context[1]
droppeaks = context[2]
sifers = sifer(paths, window=win, peaknum=droppeaks)  # ger the mean value of peaks in different spectrum

exlname = filepath + '_P-Bin.xlsx'
exp2xlsx(exlname, 1, ['ppm'] + sifers)  # 写入第一列

for path in paths:
    curpar = Getpar(path).getpar()
    coltitle = path.rstrip().split('/')[-3]  # 每列名称为为expriment number实验序号
    intresults = [coltitle]
    for sifer in sifers:
        resultpeak = peakin(sifer, curpar['peakpositions'], window=win)  # 判断当前sifer范围内有无选中的peak
        if resultpeak:
            highpoint = findhighpoint(resultpeak, curpar)
            integral = pointint(highpoint, curpar['datapoints'], sumpoint=pointnum)
        else:
            highpoint = calhighpoint(sifer, curpar)
            integral = pointint(highpoint, curpar['datapoints'], sumpoint=pointnum)
        intresults.append(integral)
    pass
    columnnum = paths.index(path) + 2  # 第一列为peak ppm path索引从0开始
    exp2xlsx(exlname, columnnum, intresults)  # 写入列
pass
print('data set processing finished ')
time_end=time.time()
print('time cost',time_end-time_start,'s')

finished peak collection
getting pars on dataset: C:/Bruker/Topspin4.2.0/examdata/exam_DNMR_Me2NCOMe/10/pdata/1
getting pars on dataset: C:/Bruker/Topspin4.2.0/examdata/exam_DNMR_Me2NCOMe/11/pdata/1
getting pars on dataset: C:/Bruker/Topspin4.2.0/examdata/exam_DNMR_Me2NCOMe/12/pdata/1
getting pars on dataset: C:/Bruker/Topspin4.2.0/examdata/exam_DNMR_Me2NCOMe/13/pdata/1
getting pars on dataset: C:/Bruker/Topspin4.2.0/examdata/exam_DNMR_Me2NCOMe/14/pdata/1
getting pars on dataset: C:/Bruker/Topspin4.2.0/examdata/exam_DNMR_Me2NCOMe/15/pdata/1
getting pars on dataset: C:/Bruker/Topspin4.2.0/examdata/exam_DNMR_Me2NCOMe/16/pdata/1
getting pars on dataset: C:/Bruker/Topspin4.2.0/examdata/exam_DNMR_Me2NCOMe/17/pdata/1
getting pars on dataset: C:/Bruker/Topspin4.2.0/examdata/exam_DNMR_Me2NCOMe/18/pdata/1
getting pars on dataset: C:/Bruker/Topspin4.2.0/examdata/exam_DNMR_Me2NCOMe/19/pdata/1
getting pars on dataset: C:/Bruker/Topspin4.2.0/examdata/exam_DNMR_Me2NCOMe/20/pdata/1
data set processin