In [1]:
import logging
from pymodbus.client.sync import ModbusTcpClient
from pymodbus.exceptions import ConnectionException, ModbusIOException

class WenlenPLC:
    def __init__(self, ip_address, port, retries=2):
        self.retries = retries
        self.logger = logging.getLogger("WenlenPLC")
        self.logger.setLevel(logging.DEBUG)
        handler = logging.StreamHandler()
        formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

        self.client = ModbusTcpClient(ip_address, port=port)

    def update_ip_and_port(self, new_ip_address, new_port):
        if self.connection_type != "TCP":
            self.logger.error("Cannot update IP and Port for non-TCP connection")
            return
        else:
            self.client = ModbusTcpClient(
                host=new_ip_address, port=new_port, auto_open=True, auto_close=True
            )

    def connect(self):
        try:
            if self.client.connect():
                self.logger.info("Connection successful")
                return True
            else:
                self.logger.error("Failed to connect")
                return False
        except ConnectionException as e:
            self.logger.error(f"Connection failed: {e}")
            return False

    def close_connection(self):
        self.client.close()
        self.logger.info("Connection closed")

    def read_coils(self, start_reg, count=1, unit=1):
        try:
            self.logger.debug(f"Attempting to read {count} coils starting at {start_reg} with unit {unit}")
            result = self.client.read_coils(start_reg, count, unit=unit)
            if not result.isError():
                return result.bits
            else:
                self.logger.error(f"Failed to read coils: {result}")
                return None
        except ModbusIOException as e:
            self.logger.error(f"Modbus IO Error: {e}")
            return None
        except Exception as e:
            self.logger.error(f"Failed to read coils: {e}")
            return None

    def read_holding_registers(self, start_reg, length=1, unit=1):
        try:
            self.logger.debug(f"Attempting to read {length} holding registers starting at {start_reg} with unit {unit}")
            regs = self.client.read_holding_registers(start_reg, length, unit=unit)
            if not regs.isError():
                return regs.registers
            else:
                self.logger.error(f"Failed to read holding registers: {regs}")
                return None
        except ModbusIOException as e:
            self.logger.error(f"Modbus IO Error: {e}")
            return None
        except Exception as e:
            self.logger.error(f"Failed to read holding registers: {e}")
            return None

    def read_input_registers(self, start_reg, length=1, unit=1):
        try:
            self.logger.debug(f"Attempting to read {length} input registers starting at {start_reg} with unit {unit}")
            regs = self.client.read_input_registers(start_reg, length, unit=unit)
            if not regs.isError():
                return regs.registers
            else:
                self.logger.error(f"Failed to read input registers: {regs}")
                return None
        except ModbusIOException as e:
            self.logger.error(f"Modbus IO Error: {e}")
            return None
        except Exception as e:
            self.logger.error(f"Failed to read input registers: {e}")
            return None

    def read_discrete_inputs(self, start_reg, count=1, unit=1):
        try:
            self.logger.debug(f"Attempting to read {count} discrete inputs starting at {start_reg} with unit {unit}")
            result = self.client.read_discrete_inputs(start_reg, count, unit=unit)
            if not result.isError():
                return result.bits
            else:
                self.logger.error(f"Failed to read discrete inputs: {result}")
                return None
        except ModbusIOException as e:
            self.logger.error(f"Modbus IO Error: {e}")
            return None
        except Exception as e:
            self.logger.error(f"Failed to read discrete inputs: {e}")
            return None

    def read_boolean_registers(self, start_reg=10001, count=32):
        first_values = []
        for i in range(count):
            discrete_input = self.read_discrete_inputs(start_reg=i, count=8)
            if discrete_input is not None:
                first_values.append(discrete_input[0])
            else:
                first_values.append(None)
        return first_values
    
    def combine_registers_to_floats(self, registers):
        floats = []
        for i in range(0, len(registers), 2):
            if registers[i] is not None and registers[i+1] is not None:
                combined_value = float(f"{registers[i]}.{registers[i+1]:02d}")
                floats.append(combined_value)
            else:
                floats.append(None)
        return floats

    def read_holding_registers_values(self):
        holding_values = []
        for i in range(8):
            holding_register = self.read_holding_registers(start_reg=i, length=1)
            if holding_register is not None:
                holding_values.append(holding_register[0])
            else:
                holding_values.append(None)
        return holding_values
    
    def read_float_registers(self, start_reg=40001, count=4):
        holding_registers = plc.read_holding_registers_values()
        float_registers = self.combine_registers_to_floats(holding_registers)
        return float_registers

# Crear una instancia del PLC
plc = WenlenPLC(ip_address='140.50.0.5', port=502)

# Conectar al PLC
if plc.connect():
    # Leer primeros valores de entradas discretas
    boolean_registers = plc.read_boolean_registers(start_reg=10001, count=32)
    print("Boolean Registers (10001-10032):", boolean_registers)

    float_registers = plc.read_float_registers(start_reg=40001, count=4)
    print("Float Registers (40001-40008):", float_registers)

    # Cerrar la conexión
    plc.close_connection()
else:
    print("Failed to connect to the PLC")



2024-07-04 17:53:47,012 - WenlenPLC - INFO - Connection successful
2024-07-04 17:53:47,013 - WenlenPLC - DEBUG - Attempting to read 1 coils starting at 0 with unit 1
2024-07-04 17:53:47,019 - WenlenPLC - DEBUG - Attempting to read 1 coils starting at 1 with unit 1
2024-07-04 17:53:47,023 - WenlenPLC - DEBUG - Attempting to read 1 coils starting at 2 with unit 1
2024-07-04 17:53:47,028 - WenlenPLC - DEBUG - Attempting to read 1 coils starting at 3 with unit 1
2024-07-04 17:53:47,032 - WenlenPLC - DEBUG - Attempting to read 1 coils starting at 4 with unit 1
2024-07-04 17:53:47,036 - WenlenPLC - DEBUG - Attempting to read 1 coils starting at 5 with unit 1
2024-07-04 17:53:47,040 - WenlenPLC - DEBUG - Attempting to read 1 coils starting at 6 with unit 1
2024-07-04 17:53:47,043 - WenlenPLC - DEBUG - Attempting to read 1 coils starting at 7 with unit 1
2024-07-04 17:53:47,047 - WenlenPLC - DEBUG - Attempting to read 1 coils starting at 8 with unit 1
2024-07-04 17:53:47,051 - WenlenPLC - DEBU

Prueba de Coils:
Coil 0: [False, False, False, False, False, False, False, False]
Coil 1: [False, False, False, False, False, False, False, False]
Coil 2: [False, False, False, False, False, False, False, False]
Coil 3: [False, False, False, False, False, False, False, False]
Coil 4: [False, False, False, False, False, False, False, False]
Coil 5: [False, False, False, False, False, False, False, False]
Coil 6: [False, False, False, False, False, False, False, False]
Coil 7: [False, False, False, False, False, False, False, False]
Coil 8: [False, False, False, False, False, False, False, False]
Coil 9: [False, False, False, False, False, False, False, False]

Prueba de Holding Registers:
Holding Register 0: [0]
Holding Register 1: [0]
Holding Register 2: [0]
Holding Register 3: [0]

Prueba de Input Registers:
Input Register 0: [0]
Input Register 1: [0]
Input Register 2: [0]
Input Register 3: [0]

Prueba de Discrete Inputs:
Discrete Input 0: [False, False, False, False, False, False, Fa

In [2]:
import logging
from pymodbus.client.sync import ModbusTcpClient
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.constants import Endian

class WenlenPLC:
    def __init__(self, ip_address, port, retries=2):
        self.retries = retries
        self.logger = logging.getLogger("WenlenPLC")
        self.logger.setLevel(logging.DEBUG)
        handler = logging.StreamHandler()
        formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

        self.client = ModbusTcpClient(ip_address, port=port)

    def connect(self):
        try:
            if self.client.connect():
                self.logger.info("Connection successful")
                return True
            else:
                self.logger.error("Failed to connect")
                return False
        except ConnectionException as e:
            self.logger.error(f"Connection failed: {e}")
            return False

    def close_connection(self):
        self.client.close()
        self.logger.info("Connection closed")

    def read_holding_registers(self, start_reg, length=2, unit=1):
        try:
            self.logger.debug(f"Attempting to read {length} holding registers starting at {start_reg} with unit {unit}")
            response = self.client.read_holding_registers(start_reg, length, unit=unit)
            if not response.isError():
                self.logger.debug(f"Read holding registers response: {response.registers}")
                return response.registers
            else:
                self.logger.error(f"Failed to read holding registers: {response}")
                return None
        except Exception as e:
            self.logger.error(f"Exception occurred while reading holding registers: {e}")
            return None

    def decode_registers(self, registers):
        decoder = BinaryPayloadDecoder.fromRegisters(registers, byteorder=Endian.Big, wordorder=Endian.Little)
        decoded_values = {
            'uint16': [],
            'int16': [],
            'uint32': [],
            'int32': [],
            'float32': []
        }

        try:
            decoded_values['uint16'] = [decoder.decode_16bit_uint() for _ in range(len(registers))]
        except Exception as e:
            self.logger.error(f"Failed to decode uint16: {e}")
        
        try:
            decoder.reset()
            decoded_values['int16'] = [decoder.decode_16bit_int() for _ in range(len(registers))]
        except Exception as e:
            self.logger.error(f"Failed to decode int16: {e}")
        
        try:
            if len(registers) >= 2:
                decoder.reset()
                decoded_values['uint32'] = [decoder.decode_32bit_uint() for _ in range(len(registers) // 2)]
        except Exception as e:
            self.logger.error(f"Failed to decode uint32: {e}")
        
        try:
            if len(registers) >= 2:
                decoder.reset()
                decoded_values['int32'] = [decoder.decode_32bit_int() for _ in range(len(registers) // 2)]
        except Exception as e:
            self.logger.error(f"Failed to decode int32: {e}")
        
        try:
            if len(registers) >= 2:
                decoder.reset()
                decoded_values['float32'] = [decoder.decode_32bit_float() for _ in range(len(registers) // 2)]
        except Exception as e:
            self.logger.error(f"Failed to decode float32: {e}")
        
        return decoded_values

    def read_pressure_values(self):
        pressure_addresses = {
            "PRESION_BAR": 40000,
            "PRESION_PSI": 40002
        }
        
        results = {}
        if self.connect():
            for name, address in pressure_addresses.items():
                holding_registers = self.read_holding_registers(start_reg=address, length=2, unit=1)
                if holding_registers:
                    decoded_values = self.decode_registers(holding_registers)
                    results[name] = decoded_values
                else:
                    self.logger.warning(f"Failed to read or decode {name}")
                    results[name] = None
            self.close_connection()
        return results


# Ejemplo de uso
if __name__ == "__main__":
    plc = WenlenPLC("192.168.0.155", 5020)
    pressure_values = plc.read_pressure_values()
    for key, value in pressure_values.items():
        print(f"{key}: {value}")


2024-07-04 17:56:44,210 - WenlenPLC - INFO - Connection successful
2024-07-04 17:56:44,210 - WenlenPLC - INFO - Connection successful
2024-07-04 17:56:44,211 - WenlenPLC - DEBUG - Attempting to read 2 holding registers starting at 40000 with unit 1
2024-07-04 17:56:44,211 - WenlenPLC - DEBUG - Attempting to read 2 holding registers starting at 40000 with unit 1
2024-07-04 17:56:44,220 - WenlenPLC - DEBUG - Read holding registers response: [47186, 16882]
2024-07-04 17:56:44,220 - WenlenPLC - DEBUG - Read holding registers response: [47186, 16882]
2024-07-04 17:56:44,221 - WenlenPLC - DEBUG - Attempting to read 2 holding registers starting at 40002 with unit 1
2024-07-04 17:56:44,221 - WenlenPLC - DEBUG - Attempting to read 2 holding registers starting at 40002 with unit 1
2024-07-04 17:56:44,224 - WenlenPLC - DEBUG - Read holding registers response: [31457, 16684]
2024-07-04 17:56:44,224 - WenlenPLC - DEBUG - Read holding registers response: [31457, 16684]
2024-07-04 17:56:44,225 - Wenl

PRESION_BAR: {'uint16': [47186, 16882], 'int16': [-18350, 16882], 'uint32': [1106425938], 'int32': [1106425938], 'float32': [30.34000015258789]}
PRESION_PSI: {'uint16': [31457, 16684], 'int16': [31457, 16684], 'uint32': [1093434081], 'int32': [1093434081], 'float32': [10.779999732971191]}
