In [None]:
"""
HeaterController class provides all the methods
required to control HC module. It looks through any ports
in appropriate devicelist, listening for key hello messages.

See readme.md for more information on protocol

06/15/2017 Nikita Kuklev
"""

In [1]:
import serial, threading, sys, time
import numpy as np

In [2]:
# Constants
devicelist_win = ['COM4','COM3'] # List of devices that will be attempted, in order
devicelist_unix = ['/dev/ADR_heatercontrol']
devicekey = 'HCA011' # HeaterControl type A0 and serial number 11
debug = 1
run_tests = 1

In [3]:
# Detect opeating system and choose right device list
def get_device_list(): 
    if sys.platform.startswith('win'):
        devicelist = devicelist_win
    elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
        devicelist = devicelist_unix
    else:
        raise EnvironmentError('Unsupported platform')

    if(debug): print('HCDBG: Platform ({}) -> device list is {}'.format(sys.platform,devicelist))
    return devicelist

In [5]:
def establish_comms(ser):
    devicefound = False
    deviceportname = ''
    linelim = 20
    if(debug): print('HCDBG: Starting device search')
    for device in devicelist:
        try:
            ser.port = device
            if (ser.isOpen()):
                if(debug): print('HCDBG: Port {} already open, skipping'.format(device))
                #ser.close()
            else:
                ser.open()
                time.sleep(1)
                for i in range(1,linelim):
                    ln = ser.readline().decode('ASCII').rstrip()
                    time.sleep(0.1)
                    if(ln == devicekey):
                        devicefound = True
                        deviceportname = device
                        if(debug): print('HCDBG: Port {} CORRECT msg: {}'.format(device, ln))
                        break  
                    else:
                        if(debug): print('HCDBG: Port {} incorrect msg: {}'.format(device, ln))

        except serial.SerialException as err:
            if(debug): print('HCDBG: Port {} did not work, exception: {}'.format(device, err))

        finally:
            if (not devicefound):
                ser.close()
            else:
                break

    if(not devicefound):
        print('HC: Did not find any suitable ports -> terminating')
        raise EnvironmentError('No appropriate devices found!')
    else:
        if(debug): print('HCDBG: Found device on port {} -> connected!'.format(device))
           
    # Reply with greeting
    time.sleep(0.1)
    ser.write((devicekey+'\n').encode('ASCII'))
    time.sleep(0.01)
    ser.reset_input_buffer()
    ser.reset_output_buffer()
    time.sleep(0.5)
    rdyresp = ser.readline().decode('ASCII').rstrip()
    if(rdyresp == "RDY"):
        if(debug): print('HCDBG: Device READY!')
    else:
        if(debug): print('HCDBG: Device rdy reply was {}'.format(rdyresp))
        raise EnvironmentError('Device did not reply it was ready')

In [None]:
def send_command(msg):
    msg = msg+'\n'
    msg = msg.encode('ASCII')
    ser.write(msg)
    time.sleep(0.05)
    return ser.readline().decode('ASCII').rstrip()

In [14]:
devicelist = get_device_list()
ser = prepare_serial_port()
establish_comms(ser)

HCDBG: Platform (win32) -> device list is ['COM4', 'COM3']
HCDBG: Port ready: Serial<id=0x1e9f1237710, open=False>(port=None, baudrate=9600, bytesize=8, parity='N', stopbits=1, timeout=0.2, xonxoff=False, rtscts=False, dsrdtr=False)
HCDBG: Starting device search
HCDBG: Port COM4 incorrect msg: 
HCDBG: Port COM4 incorrect msg: 
HCDBG: Port COM4 incorrect msg: I am HeaterController: 1E 95 F
HCDBG: Port COM4 incorrect msg: Type: A0
HCDBG: Port COM4 incorrect msg: FW: 12
HCDBG: Port COM4 incorrect msg: SN: 11
HCDBG: Port COM4 incorrect msg: Freq: 16000000
HCDBG: Port COM4 incorrect msg: Checking EEPROM...TYPE OK...SN OK!
HCDBG: Port COM4 incorrect msg: Timer 1 setup: TOP=31250 and f=4
HCDBG: Port COM4 CORRECT msg: HCA011
HCDBG: Found device on port COM4 -> connected!
HCDBG: Device READY!


In [15]:
if (debug):
    print('HCDBG: Device type is {}'.format(send_command('TYPE??')))
    print('HCDBG: Device serial is {}'.format(send_command('SN????')))
    print('HCDBG: Device status is {}'.format(send_command('QS????')))
    print('HCDBG: Device duty cycle is {}'.format(send_command('QC????')))

HCDBG: Device type is A0
HCDBG: Device serial is 11
HCDBG: Device status is 0
HCDBG: Device duty cycle is 0


In [24]:
#Testing block
if(run_tests): 
    ser.reset_input_buffer()
    ser.reset_output_buffer()
    send_command('SC0000')
    send_command('OFF000')
    commands = ['ON0000','OFF000','ON0000','SC0000','SC9999','SC0000','SC1000',\
              'SC0000','SC9999','OFF000','ON0000','OFF000','SC0000','ON0000',\
              'SC0050','SC1000','SC3000','SC5000','SC7000','SC9950','SC5000',\
               '666666','SC-100','SC100+','BLA','']
    expected_s = [1,0,1,1,1,1,1,\
                  1,1,0,1,0,0,1,\
                  1,1,1,1,1,1,1,\
                  1,1,1,1,1,1,1]
    expected_c = [0,0,0,0,9999,0,1000,\
                  0,9999,9999,9999,9999,0,0,\
                  50,1000,3000,5000,7000,9950,5000,\
                  5000,5000,5000,5000,5000,5000,5000]
    expected_r = ['OK','OK','OK','OK','OK','OK','OK',\
                  'OK','OK','OK','OK','OK','OK','OK',\
                  'OK','OK','OK','OK','OK','OK','OK',\
                  'BAD CMD','BAD SC NUMBER','BAD SC NUMBER','BAD CMD LEN','BAD CMD LEN']
    print('HCDBG:    Command {:2} | R{:19} | S{:1} | C{:4}'.format('','','',''))
    for i,cmd  in enumerate(commands):        
        resp1 = send_command(cmd)
        resp2 = send_command('QS????')
        resp3 = send_command('QC????')
        print('HCDBG:{:2}| {:10} | {:20} | {:2} | {:4}'.format(i,cmd,resp1,resp2,resp3))
        if (resp1 != expected_r[i]):
            print('HCDBG: Wrong resp {}, wanted {}'.format(resp1,expected_r[i]))
        if (resp2 != str(expected_s[i])):
            print('HCDBG: Wrong S {}, wanted {}'.format(resp2,expected_s[i]))
        if (resp3 != str(expected_c[i])):
            print('HCDBG: Wrong C {}, wanted {}'.format(resp3,expected_c[i]))
        time.sleep(1)
    send_command('SC0000')
    send_command('OFF000')

HCDBG:    Command    | R                    | S  | C    
HCDBG: 0| ON0000     | OK                   | 1  | 0   
HCDBG: 1| OFF000     | OK                   | 0  | 0   
HCDBG: 2| ON0000     | OK                   | 1  | 0   
HCDBG: 3| SC0000     | OK                   | 1  | 0   
HCDBG: 4| SC9999     | OK                   | 1  | 9999
HCDBG: 5| SC0000     | OK                   | 1  | 0   
HCDBG: 6| SC1000     | OK                   | 1  | 1000
HCDBG: 7| SC0000     | OK                   | 1  | 0   
HCDBG: 8| SC9999     | OK                   | 1  | 9999
HCDBG: 9| OFF000     | OK                   | 0  | 9999
HCDBG:10| ON0000     | OK                   | 1  | 9999
HCDBG:11| OFF000     | OK                   | 0  | 9999
HCDBG:12| SC0000     | OK                   | 0  | 0   
HCDBG:13| ON0000     | OK                   | 1  | 0   
HCDBG: Wrong S 1, wanted 0
HCDBG:14| SC0100     | OK                   | 1  | 100 
HCDBG:15| SC1000     | OK                   | 1  | 1000
HCDBG:16| SC3000    

In [17]:
send_command('QS????')

'0'

In [18]:
send_command('QC????')

'0'

In [26]:
send_command('ON0000')

'OK'

In [34]:
send_command('SC0001')

'OK'

def handle_data(data):
    print(data)

def read_from_port(ser):
    while not connected:
        #serin = ser.read()
        connected = True

        while True:
           print("test")
           reading = ser.readline().decode()
           handle_data(reading)

thread = threading.Thread(target=read_from_port, args=(serial_port,))
thread.start()

In [None]:
ser.in_waiting

In [None]:
ser.readline()

In [None]:
ser.readable()

In [13]:
ser.close()