Skip to content

Commit

Permalink
乐动模块
Browse files Browse the repository at this point in the history
  • Loading branch information
lijutsang committed May 12, 2023
1 parent f3a24a4 commit bf18b8f
Show file tree
Hide file tree
Showing 6 changed files with 698 additions and 0 deletions.
145 changes: 145 additions & 0 deletions port/modules/ATGM336H_5N.py
@@ -0,0 +1,145 @@
from machine import UART, Pin
import time

class GPS():
def __init__(self,uart_num=1, pin_tx=Pin.P14, pin_rx=Pin.P13):
self.__uart_port = UART(uart_num, baudrate=9600, tx=pin_tx, rx=pin_rx)
self.GNSS_RX_Buffer = ""
self.GNSS_Buffer = ""
self.UTC_Time = ""
self.latitude = ""
self.N_S = ""
self.longitude = ""
self.E_W = ""
self.speed_to_groud = ""
self.speed_to_groud_kh = 0
self.course_over_ground = ""
self.date = ""
self.isDecodeData = False
self.isParseData = False
self.DataIsUseful = False # 是否有有效数据信息

def GNSS_Read(self):
self.GNSS_RX_Buffer = self.__uart_port.read(1200)
while type(self.GNSS_RX_Buffer)!=bytes:
self.GNSS_RX_Buffer = self.__uart_port.read(1200)

# 由于电路连接不可靠等原因,有时接收到的信息含有非 ASCII 字符,引发 UnicodeError
# AttributeError 引发原因未知
while self.isDecodeData==False:
try:
self.GNSS_Buffer = self.GNSS_RX_Buffer.decode('ASCII')
self.isDecodeData = True
except UnicodeError:
self.GNSS_RX_Buffer = self.__uart_port.read(1200)
except AttributeError:
self.GNSS_RX_Buffer = self.__uart_port.read(1200)

# 测试模块工作是否正常的代码,实际应用请注释掉以避免输出信息过多
#print("This is GNSS_RX_Buffer, origin data from uart:")
#print(self.GNSS_RX_Buffer)
#print("This is GNSS_Buffer, after decoding:")
#print(self.GNSS_Buffer)

# 模块输出的信息很多,为了简便起见我们只选择 RMC 最小定位信息,此部分代码用于从 GNSS_Buffer 中截取 RMC 部分
GNSS_BufferHead = self.GNSS_Buffer.find("$GPRMC,")
if GNSS_BufferHead == -1:
GNSS_BufferHead = self.GNSS_Buffer.find("GNRMC,")
#print(GNSS_BufferHead)
if GNSS_BufferHead == -1:
print("Cannot read the GPS , RMC imformation")
self.isDecodeData = False
else:
GNSS_BufferTail = self.GNSS_Buffer[GNSS_BufferHead:].find("\r\n")
#print(GNSS_BufferTail)
if GNSS_BufferTail == -1:
print("Not end with newline")
self.isDecodeData = False

self.GNSS_Buffer=self.GNSS_Buffer[GNSS_BufferHead:GNSS_BufferHead+GNSS_BufferTail]

# 测试模块工作是否正常的代码,实际应用请注释掉以避免输出信息过多
# print("This is GNSS_RX_Buffer, origin data from uart:")
# print(self.GNSS_RX_Buffer)
# print("This is GNSS_Buffer, including RMC info:")
# print(self.GNSS_Buffer)

def GNSS_Parese(self):
if(self.isDecodeData == True):
self.isDecodeData= False
print("*****************************************")

temp = self.GNSS_Buffer.split(',')

try:
# RMC 信息中自带的标识符, A 代表有效, V 代表无效
if temp[2] == 'A':
self.DataIsUseful = True
else:
self.DataIsUseful = False

if temp[1] == "":
self.UTC_Time = "-1"
else:
self.UTC_Time = temp[1]
self.UTC_Time_Beijing = str(int(self.UTC_Time[0:2])+8)+':'+self.UTC_Time[2:4]+':'+self.UTC_Time[4:6]
self.UTC_Time = self.UTC_Time[0:2]+':'+self.UTC_Time[2:4]+':'+self.UTC_Time[4:6]

if temp[3] == "":
self.latitude = "-1"
else:
self.latitude = temp[3]
self.latitude = self.latitude[0:2]+' degree '+self.latitude[2:]+'\''

self.N_S = temp[4]

if temp[5] == "":
self.longitude = "-1"
else:
self.longitude = temp[5]
#self.longitude = self.longitude[0:3]+'°'+self.latitude[3:]
self.longitude = self.longitude[0:3]+' degree '+self.longitude[3:]+'\''

self.E_W = temp[6]

# RMC 中的速度以节为单位
if temp[7] != "":
try:
self.speed_to_groud = float(temp[7])
self.speed_to_groud_kh = self.speed_to_groud*1.852
except ValueError:
pass
else:
self.speed_to_groud = -1
self.speed_to_groud_kh = -1

if temp[8] != "":
try:
self.course_over_ground = float(temp[8])
except ValueError:
pass
else:
self.course_over_ground = -1

if temp[9] == "":
self.date = "-1"
else:
self.date = temp[9]
self.date = self.date[4:6]+'.'+self.date[2:4]+'.'+self.date[0:2]

self.isParseData = True
except IndexError:
pass

def print_GNSS_info(self):
if(self.isParseData):
self.isParseData = False
if(self.DataIsUseful):
print("维度: %s %s",self.latitude,self.N_S)
print("经度: %s %s",self.longitude,self.E_W)
print("日期: %s",self.date,end=' ')
print("UTC时间: %s",self.UTC_Time)
print("卫星对地速度: %f km/h",self.speed_to_groud_kh)
# print("course_over_groud: %f°",self.course_over_ground)
else:
print(" !")
4 changes: 4 additions & 0 deletions port/modules/bluebit.py
Expand Up @@ -33,6 +33,10 @@

from spl06_001 import Barometric
from apds9960 import Gesture
from ATGM336H_5N import GPS
from weather import WEATHER
from pm25 import PM25
from max30102 import MAX30102

class Thermistor:
"""
Expand Down
178 changes: 178 additions & 0 deletions port/modules/hrcalc.py
@@ -0,0 +1,178 @@
# 25 samples per second (in algorithm.h)
SAMPLE_FREQ = 25
#计算HR时取4个样本的移动平均值
MA_SIZE = 4
#采样频率
BUFFER_SIZE = 100

#取平均值

def get_mean(ls):
return sum(ls)/len(ls)

def calc_hr_and_spo2(ir_data, red_data):

ir_mean = int(get_mean(ir_data))
x = []
for k in ir_data:
x.append((k-ir_mean)*-1)
m = len(x) - MA_SIZE
for i in range(m):
x[i] = sum(x[i:i+MA_SIZE]) / MA_SIZE

#计算阈值
n_th = int(get_mean(x))
n_th = 30 if n_th < 30 else n_th #允许的最小值
n_th = 60 if n_th > 60 else n_th #允许的最大值

ir_valley_locs, n_peaks = find_peaks(x, BUFFER_SIZE, n_th, 4, 15)
peak_interval_sum = 0
if n_peaks >= 2:
for i in range(1, n_peaks):
peak_interval_sum += (ir_valley_locs[i] - ir_valley_locs[i-1])
peak_interval_sum = int(peak_interval_sum / (n_peaks - 1))
hr = int(SAMPLE_FREQ * 60 / peak_interval_sum)
hr_valid = True
else:
hr = -999 #错误值
hr_valid = False

# ---------spo2 血氧浓度---------
exact_ir_valley_locs_count = n_peaks
for i in range(exact_ir_valley_locs_count):
if ir_valley_locs[i] > BUFFER_SIZE:
spo2 = -999
spo2_valid = False
return hr, hr_valid, spo2, spo2_valid

i_ratio_count = 0
ratio = []

# find max between two valley locations 在两个山谷位置之间找到最大值
# and use ratio between AC component of Ir and Red DC component of Ir and Red for SpO2 以及Ir的AC分量与Ir和Red的Red DC分量对于SpO2的使用比率
red_dc_max_index = -1
ir_dc_max_index = -1
for k in range(exact_ir_valley_locs_count-1):
red_dc_max = -16777216
ir_dc_max = -16777216
if ir_valley_locs[k+1] - ir_valley_locs[k] > 3:
for i in range(ir_valley_locs[k], ir_valley_locs[k+1]):
if ir_data[i] > ir_dc_max:
ir_dc_max = ir_data[i]
ir_dc_max_index = i
if red_data[i] > red_dc_max:
red_dc_max = red_data[i]
red_dc_max_index = i

red_ac = int((red_data[ir_valley_locs[k+1]] - red_data[ir_valley_locs[k]]) * (red_dc_max_index - ir_valley_locs[k]))
red_ac = red_data[ir_valley_locs[k]] + int(red_ac / (ir_valley_locs[k+1] - ir_valley_locs[k]))
red_ac = red_data[red_dc_max_index] - red_ac # subtract linear DC components from raw 从原始中减去线性直流分量

ir_ac = int((ir_data[ir_valley_locs[k+1]] - ir_data[ir_valley_locs[k]]) * (ir_dc_max_index - ir_valley_locs[k]))
ir_ac = ir_data[ir_valley_locs[k]] + int(ir_ac / (ir_valley_locs[k+1] - ir_valley_locs[k]))
ir_ac = ir_data[ir_dc_max_index] - ir_ac # subtract linear DC components from raw

nume = red_ac * ir_dc_max
denom = ir_ac * red_dc_max
if (denom > 0 and i_ratio_count < 5) and nume != 0:
# original cpp implementation uses overflow intentionally.
# but at 64-bit OS, Pyhthon 3.X uses 64-bit int and nume*100/denom does not trigger overflow
# so using bit operation ( &0xffffffff ) is needed
ratio.append(int(((nume * 100) & 0xffffffff) / denom))
i_ratio_count += 1

# choose median value since PPG signal may vary from beat to beat
ratio = sorted(ratio) # sort to ascending order
mid_index = int(i_ratio_count / 2)

ratio_ave = 0
if mid_index > 1:
ratio_ave = int((ratio[mid_index-1] + ratio[mid_index])/2)
else:
if len(ratio) != 0:
ratio_ave = ratio[mid_index]

# why 184?
# print("ratio average: ", ratio_ave)
if ratio_ave > 2 and ratio_ave < 184:
# -45.060 * ratioAverage * ratioAverage / 10000 + 30.354 * ratioAverage / 100 + 94.845
spo2 = -45.060 * (ratio_ave**2) / 10000.0 + 30.054 * ratio_ave / 100.0 + 94.845
spo2_valid = True
else:
spo2 = -999
spo2_valid = False

return hr, hr_valid, spo2, spo2_valid


def find_peaks(x, size, min_height, min_dist, max_num):
"""
Find at most MAX_NUM peaks above MIN_HEIGHT separated by at least MIN_DISTANCE
"""
ir_valley_locs, n_peaks = find_peaks_above_min_height(x, size, min_height, max_num)
ir_valley_locs, n_peaks = remove_close_peaks(n_peaks, ir_valley_locs, x, min_dist)

n_peaks = min([n_peaks, max_num])

return ir_valley_locs, n_peaks


def find_peaks_above_min_height(x, size, min_height, max_num):
"""
Find all peaks above MIN_HEIGHT
"""

i = 0
n_peaks = 0
ir_valley_locs = [] # [0 for i in range(max_num)]
while i < size - 1:
if x[i] > min_height and x[i] > x[i-1]: # find the left edge of potential peaks
n_width = 1
# original condition i+n_width < size may cause IndexError
# so I changed the condition to i+n_width < size - 1
while i + n_width < size - 1 and x[i] == x[i+n_width]: # find flat peaks
n_width += 1
if x[i] > x[i+n_width] and n_peaks < max_num: # find the right edge of peaks
# ir_valley_locs[n_peaks] = i
ir_valley_locs.append(i)
n_peaks += 1 # original uses post increment
i += n_width + 1
else:
i += n_width
else:
i += 1

return ir_valley_locs, n_peaks


def remove_close_peaks(n_peaks, ir_valley_locs, x, min_dist):
"""
Remove peaks separated by less than MIN_DISTANCE
"""

# should be equal to maxim_sort_indices_descend
# order peaks from large to small
# should ignore index:0
sorted_indices = sorted(ir_valley_locs, key=lambda i: x[i])
sorted_indices.reverse()

# this "for" loop expression does not check finish condition
# for i in range(-1, n_peaks):
i = -1
while i < n_peaks:
old_n_peaks = n_peaks
n_peaks = i + 1
# this "for" loop expression does not check finish condition
# for j in (i + 1, old_n_peaks):
j = i + 1
while j < old_n_peaks:
n_dist = (sorted_indices[j] - sorted_indices[i]) if i != -1 else (sorted_indices[j] + 1) # lag-zero peak of autocorr is at index -1
if n_dist > min_dist or n_dist < -1 * min_dist:
sorted_indices[n_peaks] = sorted_indices[j]
n_peaks += 1 # original uses post increment
j += 1
i += 1

sorted_indices[:n_peaks] = sorted(sorted_indices[:n_peaks])

return sorted_indices, n_peaks

0 comments on commit bf18b8f

Please sign in to comment.