In [1]:
import numpy as np
import serial, time
import matplotlib.pyplot as plt
from IPython import display
import datetime
%matplotlib inline
import pandas as pd
import openpyxl
import requests

In [12]:
def open_serial(port, baudrate=None):
    if baudrate == None:
        try:
            return serial.Serial(port)
        except Exception as e:
            print(e)
            return None
    else:
        try:
            return serial.Serial(port, baudrate)
        except Exception as e:
            print(e)
            return None
        
def close_serial(*devices):
    for device in devices:
        try:
            device.close()
        except Exception as e:
            print(e)

def turn_curtain(curtain_device, angle, TEST = False):
    if TEST:
        print(f'the curtain turned at an {angle}')
    else:
        try:
            command = str(angle)
            curtain_device.write(command.encode('utf-8'))
        except Exception as e:
            print(e)

def d_action(d_device, TEST = False):
    if TEST:
        print('diaphragm narrowed')
    else:
        try:
            values = bytes.fromhex('111140014400000000001111400153000A0000001111400150004600000011114001470000000000')
            d_device.write(values)
        except Exception as e:
            print(e)

def read_counts(counter_device, TEST = False):
    if TEST:
        return np.random.randint(low=0, high=100, size=3)
    else:
        try:
            counter_device.reset_input_buffer()
            raw_reading = counter_device.readline().decode(errors='ignore')
            return [int(part.strip()) for part in raw_reading.split('\t')] 
        except Exception as e:
            print(e)
            return None
        
def read_power(powermeter_device, TEST=False):
    if TEST:
        return float(np.random.random())
    else:
        try:
            powermeter_device.write(b'*CVU')
            return float(powermeter_device.readline().decode().strip())
        except Exception as e:
            print(e)


def telegram_bot_sendtext(bot_message, bot_token):

   bot_chatID ='-1001969155907'
   send_text = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + bot_chatID + '&parse_mode=Markdown&text=' + bot_message

   response = requests.get(send_text)

   return response.json()

In [4]:
s = open_serial('COM8', 115200)
curtain = open_serial('COM4', 9600)
d = open_serial('COM11')
maestro = open_serial('COM5', 115200)

f = open("token.txt", "r")
token = f.read()

could not open port 'COM8': PermissionError(13, 'Отказано в доступе.', None, 5)
could not open port 'COM4': PermissionError(13, 'Отказано в доступе.', None, 5)
could not open port 'COM11': FileNotFoundError(2, 'Не удается найти указанный файл.', None, 2)
could not open port 'COM5': PermissionError(13, 'Отказано в доступе.', None, 5)


In [8]:
close_serial(s, curtain, d, maestro)

'NoneType' object has no attribute 'close'
'NoneType' object has no attribute 'close'
'NoneType' object has no attribute 'close'
'NoneType' object has no attribute 'close'


In [None]:
length = 12 # количество усреднений

FLAG = True # в режиме эксперимента

data = pd.DataFrame(columns=['diaphragm',
                             'mas_sum_1', 'noise_1',
                             'mas_sum_2', 'noise_2',
                             'mas_sum_3', 'noise_3',
                             'power_abs', 'power_rel',
                             'init_power'])

telegram_bot_sendtext("Начало измерений", bot_token=token)

for i in range(55): 

    data.loc[i, 'diaphragm'] = 2.5 + 0.1 * i

    # измерение шумов

    turn_curtain(curtain, 90, TEST=FLAG)
    time.sleep(6)

    noise_counts = np.zeros(3, dtype=int)
    for j in range(length):
        noise_counts = noise_counts + read_counts(s, TEST=FLAG)
    noise_counts = noise_counts / length
    data.loc[i, ['noise_1', 'noise_2', 'noise_3']] = noise_counts

    # измерение счетов

    turn_curtain(curtain, 0, TEST=FLAG)
    time.sleep(6)

    if i == 0:
        init_power = 0
        for _ in range(length):
            init_power = init_power + read_power(maestro, TEST=FLAG)
        init_power = init_power / length
    
    abs_power = 0 
    true_counts = np.zeros(3, dtype=int)
    for j in range(length):
        true_counts = true_counts + read_counts(s, TEST=FLAG)
        abs_power = abs_power + read_power(maestro, TEST=FLAG)
    true_counts = true_counts / length
    abs_power = abs_power / length

    true_counts = (true_counts - noise_counts) / ( abs_power / init_power )

    data.loc[i, ['mas_sum_1', 'mas_sum_2', 'mas_sum_3']] = true_counts
    data.loc[i, ['power_abs', 'power_rel']] = abs_power, ( abs_power / init_power )
    data.loc[i, ['init_power']] = init_power

    # построение графиков

    display.clear_output(wait=True)

    fig, axs = plt.subplots(1, 3, figsize=(10, 10))

    for j in range(1, 4):
       ax = axs[j-1]
       ax.plot(data['diaphragm'], data[f'mas_sum_{j}'])
       ax.set_title(f'mas_sum_{j}')

    plt.show()

    telegram_bot_sendtext("{} / 55 : ".format(i) + str(true_counts),  bot_token=token)
        
    d_action(d, TEST=FLAG)

telegram_bot_sendtext("Конец работы", bot_token=token)

now = datetime.datetime.now()
filename = 'DWN_result_{}.xlsx'.format(now.strftime('%Y-%m-%d_%H-%M-%S'))
data.to_excel(filename, index=False)

In [None]:
close_serial(s, curtain, d, maestro)