In [25]:
import serial
import threading
import time
# from utils import LogLevel
import os
# from subsystem import interlocks

# Ask electronics team if the G9 is storing the data that it is reading from sensors or if it is just checking 
# if it is just checking we will have to figure out how to read that data thur the G9, to display on GUI

# Ask if we need to communicate the status should be also end the the power supplies 

# Ask what the unwritten area of response data should be (0s or Fs, or random)

# Ask him what he says that we don't understand 

# What configuration data in the G9SP configuration data from the config program
# - system settings, saftey program I/O terminal settings

# what does the PLC mean in the manual refer to? Our program? or something else?
inStatus = {
    0: "No error",
    1: "Invalid configuration",
    2: 'External test signal failure',
    3: 'Internal circuit error',
    4: 'Discrepancy error',
    5: 'Failure of the associated dual-channel input'
}

outStatus = {
    0: 'No error',
    1: 'Invalid configuration',
    2: 'Overcurrent detection',
    3: 'Short circuit detection',
    4: 'Stuck-at-high detection',
    5: 'Failure of the associated dual-channel output',
    6: 'Internal circuit error',
    8: 'Dual channel violation'
}

usStatus = {
    32: "Output Power Supply Error Flag",
    64: "Safety I/O Terminal Error Flag",
    512: "Function Block Error Flag"
}

class G9Driver:
    #TODO: Return to this and check if these parms are good by default
    def __init__(self, port=None, baudrate=9600, timeout=0.5, logger=None, debug_mode=False):
        try:
            self.ser = serial.Serial("COM11", 9600, parity=serial.PARITY_EVEN, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=timeout)
        except:
            print("here")
        self.debug_mode = debug_mode
        self.logger = logger
        self.lastResponse = None
        self.msgOptData = None


    #TODO: send query for data
    #TODO: decided if we want to store command args in here (like with a dict) or if we should do it in the callee file
    def sendCommand(self):
        # TODO: frontend topic : decided how we want to display the exception
        if not self.is_connected():
            raise ConnectionError("Seiral Port is Not Open.")
        query = b'\x40\x00\x00\x0F\x4B\x03\x4D\x00\x01' # could also use bytes.fromhex() method in future for simplicity
        data = data.ljust(6, b'\x00')[:6]
        self.msgOptData = data
        checksum_data = query + data
        checksum = self.calculate_checksum(checksum_data)
        footer = b'\x2A\x0D' # marks the end of the command 
        self.ser(query + data + checksum + footer)

        self.response()

    # used mainly for the check sum but can also be used to check for error flags
    # needs an input of a byte string and the range of bytes that need to be sum
    # will return the sum of the bytes in the a byte string in the form of b'\x12'
    def calculate_checksum(byteString, startByte, endByte):
        assert isinstance(byteString, bytes)
        return sum(byteString[startByte:endByte + 1]).to_bytes(1, "big") 

    # helper function to convert bytes to bits for checking flags
    # not currently being used but many be helpful in the future for getting errors
    def bytesToBinary(byte_string):
        return ''.join(format(byte, '08b') for byte in byte_string)
    
    # this method is made to check the error flags, right not only checks the last 13 bits
    # of a byte string
    def checkFlags13(self, byteString, inputs = 13, norm = 1):
        assert isinstance(byteString, bytes)
        # this is for if we only need the last 13 bits (more or less hardcoding this 
        # just including the rest if it might be helpful in the future
        if inputs == -1:
            if sum(byteString[-1] >= inputs):
                # all flags we care about are 1
                return True
            else:
                return False


    #TODO: async function, waiting for responce from query
    #TODO: how do we want to handle the data 
    def response(self):
        # if not self.is_connected():
        #     raise ConnectionError("Seiral Port is Not Open.")
        
        # data = self.ser.read(size=199)
        # self.lastResponse = data
        data = self.lastResponse
        if len(data) == 199:
            alwaysHeader = data[0:3]
            alwaysFooter = data[-2:]
            print(alwaysFooter, alwaysHeader)
            if alwaysHeader != b'\x40\x00\x00' or alwaysFooter != b'\x2A\x0D':
                
                raise ValueError("Always bits are incorrect")
            OCTD = data[7:11]
            print("OCTD: ", OCTD)
            # if OCTD != self.msgOptData:
            #     raise ValueError("Optional Transmission data doesn't match data sent to the G9SP")

            # TODO: Need to add SITDF functionality
            SITDF = data[11:17]
            print("SITDF: ", SITDF)

            # TODO: Need to add SOTDF functionality
            SOTDF = data[17:21]
            print("SOTDF: ", SOTDF)

            SITSF = data[21:27]
            print("SITSF: ", SITSF)

            if not self.checkFlags13(SITSF):
                if self.safetyInTerminalError(data[31:55]):
                    raise ValueError("Error was detected but was not found")
                
            SOTSF = data[27:31]
            print("SOTSF: ", SITSF)

            if not self.checkFlags13(SOTSF):
                if self.safetyOutTerminalError(data[55:71]):
                    raise ValueError("Error was detected but was not found")
                
            # TODO: Need to add error cause

            # US - Unit Status
            US = data[73:75]
            print("US: ", US)
            if US != 0:
                if self.unitStateError(US):
                    raise ValueError("Error was detected in Unit State. Could be more than one")
                
            
            # TODO: Need to add error log
            errorLog = data[108:149]

            # TODO: Need to add operation log
            operationLog = data[148:199]
                


        else:
            self.sendCommand()

        pass


    """
    0: No error
    1: Invalid configuration
    2: External test signal failure
    3: Internal circuit error
    4: Discrepancy error
    5: Failure of the associated dual-channel input
    """

    # checks all the SITSFs, throws error is one is found
    def safetyInTerminalError(self, data):
        if len(data) != 24:
            raise ValueError(f"Expected 24 bytes, but received {len(data)}.")

        last_bytes = data[-13:]
        last_bytes = last_bytes[::-1]

        for i, byte in enumerate(last_bytes):
            msb = byte >> 4  # most sig bits
            lsb = byte & 0x0F  # least sig bits

            # check high bits for errors
            if msb in inStatus and msb != 0:
                raise ValueError(f"Error at byte {i}H, MSB: {inStatus[msb]} (code {msb})")
            # check low bits for errors
            if lsb in inStatus and lsb != 0:
                raise ValueError(f"Error at byte {i}L, LSB: {inStatus[lsb]} (code {lsb})")
        return True
        


    """
    0: No error
    1: Invalid configuration
    2: Overcurrent detection
    3: Short circuit detection
    4: Stuck-at-high detection
    5: Failure of the associated dual-channel output
    6: Internal circuit error
    8: Dual channel violation
    """

    # checks all the SOTSFs, throws error is one is found 
    def safetyOutTerminalError(self, data, inputs = 13):
        if len(data) != 16:
            raise ValueError(f"Expected 16 bytes, but received {len(data)}.")

        # only keep needs bytes
        last_bytes = data[-inputs:]
        # flip direction so enumerate can if us the byte number in the error
        last_bytes = last_bytes[::-1]

        for i, byte in enumerate(last_bytes):
            msb = byte >> 4  # most sig bits
            lsb = byte & 0x0F  # least sig bits

            # check high bits for errors
            if msb in outStatus and msb != 0:
                raise ValueError(f"Error at byte {i}H, MSB: {outStatus[msb]} (code {msb})")
            # check low bits for errors
            if lsb in outStatus and lsb != 0:
                raise ValueError(f"Error at byte {i}L, LSB: {outStatus[lsb]} (code {lsb})")
        return True
    
    """
    32: Output Power Supply Error Flag
    64: Safety I/O Terminal Error Flag
    512: Function Block Error Flag
    """
    
    # rn am hoping that only one of the error flags can be set at a time
    def unitStateError(self, data):
        if len(data) != 2:
            raise ValueError(f"Expected at least 2 bytes, but received {len(data)}.")
        
        er = sum(data)
        
        if er in usStatus:
            raise ValueError(f"Unit State Error: {usStatus[er]} (code {er})")
        return True

    #TODO: make a method that is constantly running to be pulling data all the time. 
    def run(self):
        if self.is_connected():
            # call sendCommand to get G9 to send new data
            self.sendCommand()

            time.sleep(0.)

    #TODO: Check to see if the G9 switch is allowing high Voltage or not
    # this function will need to be constantly sending requests/receiving to check when the high voltage is off/on
    def checkStatus():
        pass

    def flush_serial(self):
        self.ser.reset_input_buffer()    # flushes the input buffer to rid of unwanted bits


    #TODO: make funtion to turn all interlocks to red
    def is_connected(self):
        try:
            #TODO: check if this works with G9 copied from Power Supply Driver
            # Attempt to write a simple command to the device
            self.ser.write(b'\r')  # Send a carriage return
            # Try to read a response (there might not be one)
            self.ser.read(1)

            self.isConnected = True
            return True
        except serial.SerialException:
            return False
        

    #TODO: Figure out how to handle all the errors (end task)


In [26]:
_ = G9Driver()

res = b'\x40\x00\x00\xc3\x00\x00\xcb\x00\x00\x00\x00\xff\x0f\x00\x00\x00\x00$\x00\x00\x00\xff\xff\x0f\x00\x00\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00\x12\x00\x9a\x08\xac\x14\x00\x0020000012X17M\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00@\x00\x00\x00\x00\n\n?\x00\x14\xac?\x00\x14\xac?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\x9a?\x00\x14\x9a\x13\x00\x14\x9a\x06\x00\x14\x9a\x01\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14\x9a\x04\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14p\x01\x00\x14p\x06\x00\x14p\x06\x00\x14^\x19\xfe\x2A\x0D'

here


@
\x00\x00
\xc3
\x00\x00
\xcb
\x00\x00\x00\x00\xff\x0f\x00\x00\x00\x00$\x00\x00\x00\xff\xff\x0f\x00\x00\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00\x12\x00\x9a\x08\xac\x14\x00\x0020000012X17M\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00@\x00\x00\x00\x00\n\n?\x00\x14\xac?\x00\x14\xac?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\x9a?\x00\x14\x9a\x13\x00\x14\x9a\x06\x00\x14\x9a\x01\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14\x9a\x04\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14p\x01\x00\x14p\x06\x00\x14p\x06\x00\x14^\x19
\xfe
*

Always 
40
0000
Response length:
c3
End Code:
0000
Service Code:
cb
Data:
OCTD - \x00\x00\x00\x00
SITDF - \xff\x0f\x00\x00\x00\x00
SOTDF - $\x00\x00\x00
SITSF - \xff\xff\x0f\x00\x00\x00
SOTSF - \xff\x00\x00\x00
Input causes - \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
Output Causes - \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
Reserved - \x08\x00
Unit Status - \x12\x00
ID - \x9a\x08
Unit Conduct time - \xac\x14\x00\x00
Reserve2 - 20000012X17M\x00\x00\x00\x00\x00\x00\x00\x00
Persent err - \x00\x00\x00\x00\x00\x00\x00@\x00\x00\x00\x00
err cnt - \n
ope cnt - \n
Error - ?\x00\x14\xac?\x00\x14\xac?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\x9a?\x00\x14\x9a\x13\x00\x14\x9a
Operation -  \x06\x00\x14\x9a\x01\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14\x9a\x04\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14p\x01\x00\x14p\x06\x00\x14p\x06\x00\x14^


Check Sum:
19fe
Always
2a

In [27]:

_.lastResponse = res
len(res)
print(_.lastResponse)

b'@\x00\x00\xc3\x00\x00\xcb\x00\x00\x00\x00\xff\x0f\x00\x00\x00\x00$\x00\x00\x00\xff\xff\x0f\x00\x00\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00\x12\x00\x9a\x08\xac\x14\x00\x0020000012X17M\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00@\x00\x00\x00\x00\n\n?\x00\x14\xac?\x00\x14\xac?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\x9a?\x00\x14\x9a\x13\x00\x14\x9a\x06\x00\x14\x9a\x01\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14\x9a\x04\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14p\x01\x00\x14p\x06\x00\x14p\x06\x00\x14^\x19\xfe*\r'


In [28]:
res[21:27]
_.response()


b'*\r' b'@\x00\x00'
OCTD:  b'\x00\x00\x00\x00'
SITDF:  b'\xff\x0f\x00\x00\x00\x00'
SOTDF:  b'$\x00\x00\x00'
SITSF:  b'\xff\xff\x0f\x00\x00\x00'


ValueError: Error was detected but was not found

In [73]:
temp = b'?\x00\x14\xac?\x00\x14\xac?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\xa0?\x00\x14\x9a?\x00\x14\x9a\x13\x00\x14\x9a\x06\x00\x14\x9a\x01\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14\x9a\x04\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14p\x01\x00\x14p\x06\x00\x14p\x06\x00\x14^'
temp[40:]

b'\x06\x00\x14\x9a\x01\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14\x9a\x04\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14p\x01\x00\x14p\x06\x00\x14p\x06\x00\x14^'

In [9]:
len(b'\x06\x00\x14\x9a\x01\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14\x9a\x04\x00\x14\x9a\x01\x00\x14\x9a\x06\x00\x14p\x01\x00\x14p\x06\x00\x14p\x06\x00\x14^')

40

In [18]:
_ = res[21:27]
len(_) == 6

True

In [29]:
def checkFlags13(byteString, norm = 1):
    assert isinstance(byteString, bytes)
    # this is for if we only need the last 13 bits (more or less hardcoding this 
    # just including the rest if it might be helpful in the future
    if norm == 1:
        if sum(bytesToBinary(byteString)[-13:] >= 13):
            # all flags we care about are 1
            return True
        else:
            return False
        
def bytesToBinary(byte_string):
    return ''.join(format(byte, '08b') for byte in byte_string)

In [43]:
# checkFlags13(b'\xff\xff\x0f\x00\x00\x00')
bin(bytesToBinary(b'\xff\xff\x0f\x00\x00\x00')[-13:])


TypeError: 'str' object cannot be interpreted as an integer