In [1]:
import psutil
import serial
import serial.tools.list_ports
import time
import yaml
import socket
import struct
import netifaces

In [2]:
list(psutil.cpu_freq())

[2892.6093124999998, 1500.0, 3600.0]

In [3]:
class RatePub:
    def __init__(self) -> None:
        self.__load__config()
        if self.use_uart:
            self.ser = self.__init__serial()
        if not self.use_uart:
            self.__init__udp()
        self.usage = dict()
        self.max_cpu_freq = psutil.cpu_freq().max
        self.min_cpu_freq = psutil.cpu_freq().min

    def __del__(self):
        if not self.use_uart:
            self.udp_socket.close()
            
    def __init__udp(self):
        self.udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        local_addr = ("",8265)
        self.udp_socket.bind(local_addr)
        # ip_addr = netifaces.gateways()['default'][netifaces.AF_INET][0]
        ip_addr = "192.168.1.125"
        self.dest_addr = (ip_addr,8266)


    def __init__serial(self) :
        port_list = list(serial.tools.list_ports.comports())
        if len(port_list) == 0:
            print("NO device connected")
        else:
            for port in port_list:
                print("avaliable com device: ",port.device)
        port = port_list[0]
        try:
            ser = serial.Serial(port.device, baudrate=self.baud_rate)
            print("open {} success with baudrate {}".format(port.device,self.baud_rate))
        except serial.SerialException as e:
            print(e)
        return ser 
    
    def __load__config(self):
        with open('publisher_config.yaml','r',encoding='utf-8') as f:
            cfg = yaml.load(f, Loader = yaml.FullLoader)
            self.publish_topic = cfg['publish_type']
            self.baud_rate = int(cfg['baud_rate'])
            self.use_uart = bool(cfg['use_uart'])

    def __get__usage(self):
        # must be 0~100
        self.usage["cpu_usage"] = float( psutil.cpu_percent() )
        self.usage["cpu_freq"] =  100 * float( psutil.cpu_freq().current/ self.max_cpu_freq)
        self.usage["mem_usage"] = float( psutil.virtual_memory().percent)
        psutil.cpu_freq()

    def run(self):        
        while True:
            self.__get__usage()
            pub_val = self.usage[self.publish_topic] # float
            pub_val = str(round(2.55 * pub_val)).encode("utf-8")
            if self.use_uart:
                try: 
           
                    self.ser.write(pub_val) and  print("write {} {}".format(self.publish_topic,pub_val))
                except serial.SerialTimeoutException as e:
                    print(e)
                time.sleep(1)
            else:
                try:
                    self.udp_socket.sendto(pub_val, self.dest_addr) and print("write {} {}".format(self.dest_addr,pub_val))
                except Exception as e:
                    print(e)
                time.sleep(1.0)


In [4]:
rate_pub = RatePub()
rate_pub.run()

write ('192.168.1.125', 8266) b'241'
write ('192.168.1.125', 8266) b'131'
write ('192.168.1.125', 8266) b'107'
write ('192.168.1.125', 8266) b'113'
write ('192.168.1.125', 8266) b'124'
write ('192.168.1.125', 8266) b'177'
write ('192.168.1.125', 8266) b'160'
write ('192.168.1.125', 8266) b'136'
write ('192.168.1.125', 8266) b'180'
write ('192.168.1.125', 8266) b'132'


In [None]:
netifaces.gateways()['default'][netifaces.AF_INET][0]


'192.168.1.1'