In [1]:
import numpy as np
from struct import *
import random

import PySide6
from PySide6 import QtWidgets
from PySide6.QtSerialPort import *
from PySide6.QtGui import QColor
from PySide6.QtWidgets import ( QApplication, QWidget, QTabWidget,
    QTableWidget,  QTableWidgetItem, QHeaderView,
    QPushButton, QLabel, QGridLayout, QHBoxLayout, QVBoxLayout )

### We import the classes defined in the Taros_GCS module but run the main from the notebook

In [2]:
from Taros_GCS import format_time, CommunicationsView, MissionControlView, MainWindow

# You need one (and only one) QApplication instance per application.
# Pass in sys.argv to allow command line arguments for your app.
# If you know you won't use command line arguments QApplication([]) works too.
# create or reuse app object
if not QtWidgets.QApplication.instance():
    app = QtWidgets.QApplication([])
else:
    app = QtWidgets.QApplication.instance()
# QGuiApplication.primaryScreen().availableGeometry()

# Create a Qt widget, which will be our window.
window = MainWindow(app)
window.show()  # IMPORTANT!!!!! Windows are hidden by default.

# Start the event loop.
app.exec()

# Your application won't reach here until you exit and the event
# loop has stopped.

if window.cv.port_available:
    window.cv.serial.close()

### now we can override classes for development purposes

In [3]:
class MissionControlView(QWidget):
    
    def __init__(self, app):
        QWidget.__init__(self)
        self.app = app
        self.cv = self.app.

    def send_ping(self):
        if hasattr(self.cv, 'serial'):
            # assemple an empty command message header (3 payload bytes)
            msg_buffer = bytearray(b'\xCC\x86\x03')
            # generate a random 16-bit hash
            hash = random.randint(0,65535)
            msg_buffer.extend(hash.to_bytes(2,'big'))
            msg_buffer.extend([0])
            # transmit
            self.app.cv.serial.write(msg_buffer)
            line = "sent ping "
            for c in msg_buffer[3:5]:
                line += (" %0.2X" % c)
            print(line)
        else:
            print('port not open.')

In [4]:
from Taros_GCS import format_time, CommunicationsView, MissionControlView, MainWindow

# You need one (and only one) QApplication instance per application.
# Pass in sys.argv to allow command line arguments for your app.
# If you know you won't use command line arguments QApplication([]) works too.
# create or reuse app object
if not QtWidgets.QApplication.instance():
    app = QtWidgets.QApplication([])
else:
    app = QtWidgets.QApplication.instance()
# QGuiApplication.primaryScreen().availableGeometry()

# Create a Qt widget, which will be our window.
window = MainWindow(app)
window.show()  # IMPORTANT!!!!! Windows are hidden by default.

# Start the event loop.
app.exec()

# Your application won't reach here until you exit and the event
# loop has stopped.

if window.cv.port_available:
    window.cv.serial.close()

In [7]:
class MissionControlView(QWidget):
    
    def __init__(self, main_window):
        QWidget.__init__(self)
        self.mv = main_window
        self.cv = main_window.cv
        layout = QVBoxLayout()
        ping_button = QPushButton("Ping")
        layout.addWidget(ping_button)
        ping_button.clicked.connect(self.send_ping)
        calsave_button = QPushButton("Save Calibration")
        layout.addWidget(calsave_button)
        # open_button.clicked.connect(self.open_serial_port)
        self.setLayout(layout)

    def send_ping(self):
        if hasattr(self.cv, 'serial'):
            # assemple an empty command message header (3 payload bytes)
            msg_buffer = bytearray(b'\xCC\x87\x03')
            # generate a random 16-bit hash
            hash = random.randint(0,65535)
            msg_buffer.extend(hash.to_bytes(2,'big'))
            msg_buffer.extend([0])
            # transmit
            self.cv.serial.write(msg_buffer)
            line = "sent ping "
            for c in msg_buffer[3:5]:
                line += (" %0.2X" % c)
            print(line)
        else:
            print('port not open.')
        
class MainWindow(QWidget):
    
    def __init__(self, rect):
        QWidget.__init__(self)
        self.app = app
        self.setWindowTitle("TAROS ground control station")
        self.setGeometry(100,100,0.6*rect.width(),0.8*rect.height())
        layout = QGridLayout()
        self.setLayout(layout)
        tabwidget = QTabWidget()
        label1 = QLabel("Widget in Tab 1.")
        tabwidget.addTab(label1, "Primary Flight Display")
        self.cv = CommunicationsView(self)
        tabwidget.addTab(self.cv, "Communications")
        self.mcv = MissionControlView(self)
        tabwidget.addTab(self.mcv, "Mission Control")
        layout.addWidget(tabwidget, 0, 0)

# You need one (and only one) QApplication instance per application.
# Pass in sys.argv to allow command line arguments for your app.
# If you know you won't use command line arguments QApplication([]) works too.
# create or reuse app object
if not QtWidgets.QApplication.instance():
    app = QtWidgets.QApplication([])
else:
    app = QtWidgets.QApplication.instance()
# QGuiApplication.primaryScreen().availableGeometry()

# Create a Qt widget, which will be our window.
window = MainWindow(app.primaryScreen().availableGeometry())
window.show()  # IMPORTANT!!!!! Windows are hidden by default.

# Start the event loop.
app.exec()

# Your application won't reach here until you exit and the event
# loop has stopped.

if window.cv.port_available:
    window.cv.serial.close()

### development overrides

class CommunicationsView() is initially loaded from Taros_GCS.py but can be overridden here.

In [5]:
class CommunicationsView(QWidget):
    
    def __init__(self):
        QWidget.__init__(self)
        layout = QHBoxLayout()
        layout.setContentsMargins(20,20,20,20)
        self.setLayout(layout)
        # receive buffer for the messages
        self.receive_buffer = bytearray(b'')
        # the left side - message table
        self.table = QTableWidget()
        self.table.setRowCount(0)
        self.table.setColumnCount(4)
        self.table.setHorizontalHeaderLabels(["Source", "Time", "Message", "RSI"])
        header = self.table.horizontalHeader()       
        header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents)
        header.setSectionResizeMode(1, QHeaderView.ResizeMode.ResizeToContents)
        header.setSectionResizeMode(2, QHeaderView.ResizeMode.Stretch)
        header.setSectionResizeMode(3, QHeaderView.ResizeMode.ResizeToContents)
        layout.addWidget(self.table,75)
        layout.addSpacing(20)
        # the right side - column of buttons
        rlayout = QVBoxLayout()
        layout.addLayout(rlayout,25)
        self.status = QLabel("None.")
        self.port_available = False
        self.port = None
        refresh_button = QPushButton("Refresh")
        refresh_button.clicked.connect(self.refresh)
        open_button = QPushButton("Open Port")
        rlayout.addWidget(self.status)
        rlayout.addWidget(refresh_button)
        rlayout.addWidget(open_button)
        open_button.clicked.connect(self.open_serial_port)

    def refresh(self):
        self.status.setText("searching devices ...")
        try:
            portinfo = QSerialPortInfo()
            avail = portinfo.availablePorts()
            self.port_available = (len(avail)>0)
            if self.port_available:
                self.port = avail[0]
                self.status.setText("using port " + self.port.portName())
            else:
                self.status.setText("no available port found.")
        except Exception as e:
            print(e)
            
    def open_serial_port(self):
        if self.port_available:
            try:
                self.serial = QSerialPort()
                self.serial.setPortName(self.port.portName())
                self.serial.setBaudRate(QSerialPort.Baud115200, QSerialPort.AllDirections)
                self.serial.setParity(QSerialPort.NoParity)
                self.serial.setStopBits(QSerialPort.OneStop)
                self.serial.setDataBits(QSerialPort.Data8)
                self.serial.setFlowControl(QSerialPort.NoFlowControl)
                self.serial.open(QSerialPort.ReadWrite)
                self.serial.readyRead.connect(self.on_serial_read)
                text = self.status.text()
                text += "\n"
                text += "port open."
                self.status.setText(text)
                print(text)
            except Exception as e:
                self.port_available = False
                self.status.setText("error opening port.")
                print(e)
                
    def on_serial_read(self):
        """
        Called when the application gets data from the connected device.
        """
        msg = bytes(self.serial.readAll())
        self.receive_buffer.extend(msg)
        line = ""
        for c in msg:
            line += (" %0.2X" % c)
        print(line)
        
        found = False
        empty = len(self.receive_buffer) < 3
        while not (found or empty):
            msb, lsb, n_bytes = unpack('BBB', self.receive_buffer[:3])
            # if it ihas a message header
            if msb == 204:
                # if it is a system message
                if lsb == 129:
                    if len(self.receive_buffer) >= n_bytes+3:
                        found = True
                    else:
                        empty = True
            if not (found or empty):
                self.receive_buffer.pop(0)
                empty = len(self.receive_buffer) < 3
                
        if found:
            msg = self.receive_buffer[0:n_bytes+4]
            del self.receive_buffer[0:n_bytes+4]
            msb, lsb, n_bytes = unpack('BBB', msg[:3])
            # if it has a message header
            if msb == 204:
                # if it is a system message
                if lsb == 129:
                    sender = msg[3:11].decode(encoding='utf-8')
                    self.next_index = self.table.rowCount()
                    self.table.insertRow(self.next_index)
                    self.table.setRowCount(self.next_index+1)
                    level = msg[11]
                    if level==1: # MSG_LEVEL_FATALERROR
                        col = QColor.fromRgb(200, 0, 0)
                    elif level==3: # MSG_LEVEL_CRITICAL
                        col = QColor.fromRgb(255, 0, 0)
                    elif level==5: # MSG_LEVEL_MILESTONE
                        col = QColor.fromRgb(0, 200, 0)
                    elif level==8: # MSG_LEVEL_ERROR
                        col = QColor.fromRgb(255, 200, 200)
                    elif level==10: # MSG_LEVEL_STATE_CHANGE
                        col = QColor.fromRgb(200, 255, 200)
                    elif level==12: # MSG_LEVEL_WARNING
                        col = QColor.fromRgb(255, 255, 100)
                    else: # MSG_LEVEL_STATUSREPORT
                        col = QColor.fromRgb(210, 210, 210)
                    sender_item = QTableWidgetItem(sender)
                    sender_item.setBackground(col)
                    self.table.setItem(self.next_index, 0, sender_item)
                    time, = unpack('I', msg[12:16])
                    time_item = QTableWidgetItem(format_time(time))
                    time_item.setBackground(col)
                    self.table.setItem(self.next_index, 1, time_item)
                    text = msg[16:n_bytes+3].decode(encoding='utf-8')
                    text_item = QTableWidgetItem(text)
                    text_item.setBackground(col)
                    self.table.setItem(self.next_index, 2, text_item)
                    # the RSI is appended after the counted payload bytes
                    rsi = msg[n_bytes+3]
                    rsi_item = QTableWidgetItem("%3d"%rsi)
                    rsi_item.setBackground(col)
                    self.table.setItem(self.next_index, 3, rsi_item)
                    print("%3d"%level, sender, format_time(time), text)
                    self.table.setCurrentCell(self.next_index, 0)
                # if it is a ping response
                if lsb == 136:
                    self.next_index = self.table.rowCount()
                    self.table.insertRow(self.next_index)
                    self.table.setRowCount(self.next_index+1)
                    level = 0
                    col = QColor.fromRgb(210, 210, 210)
                    sender_item = QTableWidgetItem("")
                    sender_item.setBackground(col)
                    self.table.setItem(self.next_index, 0, sender_item)
                    time_item = QTableWidgetItem("")
                    time_item.setBackground(col)
                    self.table.setItem(self.next_index, 1, time_item)
                    up_rsi = msg[6]
                    down_rsi = msg[7]
                    text = f'ping RSI up={up_rsi:%d} down = {down_rsi:%d}'
                    text_item = QTableWidgetItem(text)
                    text_item.setBackground(col)
                    self.table.setItem(self.next_index, 2, text_item)
                    # the RSI is appended after the counted payload bytes
                    rsi = msg[n_bytes+3]
                    rsi_item = QTableWidgetItem("%3d"%rsi)
                    rsi_item.setBackground(col)
                    self.table.setItem(self.next_index, 3, rsi_item)
                    self.table.setCurrentCell(self.next_index, 0)


using port ttyUSB0
port open.


In [10]:
help(window.cv.receive_buffer)

Help on bytearray object:

class bytearray(object)
 |  bytearray(iterable_of_ints) -> bytearray
 |  bytearray(string, encoding[, errors]) -> bytearray
 |  bytearray(bytes_or_buffer) -> mutable copy of bytes_or_buffer
 |  bytearray(int) -> bytes array of size given by the parameter initialized with null bytes
 |  bytearray() -> empty bytes array
 |  
 |  Construct a mutable bytearray object from:
 |    - an iterable yielding integers in range(256)
 |    - a text string encoded using the specified encoding
 |    - a bytes or a buffer object
 |    - any object implementing the buffer API.
 |    - an integer
 |  
 |  Methods defined here:
 |  
 |  __add__(self, value, /)
 |      Return self+value.
 |  
 |  __alloc__(...)
 |      B.__alloc__() -> int
 |      
 |      Return the number of bytes actually allocated.
 |  
 |  __contains__(self, key, /)
 |      Return key in self.
 |  
 |  __delitem__(self, key, /)
 |      Delete self[key].
 |  
 |  __eq__(self, value, /)
 |      Return self==va

In [5]:
window.cv.serial.close()

In [14]:
help(window)

Help on Window in module __main__ object:

class Window(PySide6.QtWidgets.QWidget)
 |  Window(rect)
 |  
 |  QWidget(self, parent: Union[PySide6.QtWidgets.QWidget, NoneType] = None, f: PySide6.QtCore.Qt.WindowType = Default(Qt.WindowFlags)) -> None
 |  
 |  Method resolution order:
 |      Window
 |      PySide6.QtWidgets.QWidget
 |      PySide6.QtCore.QObject
 |      PySide6.QtGui.QPaintDevice
 |      Shiboken.Object
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  __init__(self, rect)
 |      __init__(self, parent: Union[PySide6.QtWidgets.QWidget, NoneType] = None, f: PySide6.QtCore.Qt.WindowType = Default(Qt.WindowFlags)) -> None
 |      
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  ----------------------------------------------------------------------
 |  Data and other attributes defined here:
 |  
 |  staticMetaObject = PySide6.QtCore.QMetaObject("Window" inherits "QWidg...
 |  
 |  ------------------------------------------------

In [107]:
dir(PySide6.QtCore)

['ClassInfo',
 'MetaFunction',
 'MetaSignal',
 'Property',
 'PyClassProperty',
 'QAbstractAnimation',
 'QAbstractEventDispatcher',
 'QAbstractItemModel',
 'QAbstractListModel',
 'QAbstractNativeEventFilter',
 'QAbstractProxyModel',
 'QAbstractTableModel',
 'QAnimationGroup',
 'QBasicMutex',
 'QBasicTimer',
 'QBitArray',
 'QBuffer',
 'QByteArray',
 'QByteArrayMatcher',
 'QCalendar',
 'QCborArray',
 'QCborError',
 'QCborKnownTags',
 'QCborMap',
 'QCborParserError',
 'QCborSimpleType',
 'QCborStreamReader',
 'QCborStreamWriter',
 'QCborStringResultByteArray',
 'QCborStringResultString',
 'QCborTag',
 'QCborValue',
 'QChildEvent',
 'QCollator',
 'QCollatorSortKey',
 'QCommandLineOption',
 'QCommandLineParser',
 'QConcatenateTablesProxyModel',
 'QCoreApplication',
 'QCryptographicHash',
 'QDataStream',
 'QDate',
 'QDateTime',
 'QDeadlineTimer',
 'QDir',
 'QDirIterator',
 'QDynamicPropertyChangeEvent',
 'QEasingCurve',
 'QElapsedTimer',
 'QEnum',
 'QEvent',
 'QEventLoop',
 'QFactoryInterface

In [108]:
from PySide6.QtGui import *

In [68]:
portinfo = QSerialPortInfo()

In [83]:
port = portinfo.availablePorts()[0]
port.portName()

'ttyUSB0'

In [84]:
port

<PySide6.QtSerialPort.QSerialPortInfo at 0x7f45079d5708>

In [109]:
dir(PySide6.QtGui)

['QAbstractFileIconProvider',
 'QAbstractTextDocumentLayout',
 'QAccessible',
 'QAccessibleActionInterface',
 'QAccessibleEditableTextInterface',
 'QAccessibleEvent',
 'QAccessibleInterface',
 'QAccessibleObject',
 'QAccessibleStateChangeEvent',
 'QAccessibleTableCellInterface',
 'QAccessibleTableModelChangeEvent',
 'QAccessibleTextCursorEvent',
 'QAccessibleTextInsertEvent',
 'QAccessibleTextInterface',
 'QAccessibleTextRemoveEvent',
 'QAccessibleTextSelectionEvent',
 'QAccessibleTextUpdateEvent',
 'QAccessibleValueChangeEvent',
 'QAccessibleValueInterface',
 'QAction',
 'QActionEvent',
 'QActionGroup',
 'QBackingStore',
 'QBitmap',
 'QBrush',
 'QClipboard',
 'QCloseEvent',
 'QColor',
 'QColorConstants',
 'QColorSpace',
 'QColorTransform',
 'QConicalGradient',
 'QContextMenuEvent',
 'QCursor',
 'QDesktopServices',
 'QDoubleValidator',
 'QDrag',
 'QDragEnterEvent',
 'QDragLeaveEvent',
 'QDragMoveEvent',
 'QDropEvent',
 'QEnterEvent',
 'QEventPoint',
 'QExposeEvent',
 'QFileOpenEvent',
