-
Notifications
You must be signed in to change notification settings - Fork 1k
Description
Versions
- Python: 3.7
- OS: Ubuntu 18.04
- Pymodbus: 2.2
- Modbus Hardware (if used): -
Pymodbus Specific
- Server: tcp/rtu/ascii - sync/
- Client: tcp/rtu/ascii - sync/
Description
What were you trying, what has happened, what went wrong, and what did you expect?
Hi, i'm trying to provide securty to modbus via tlslite-ng. Firstly, I tested the TLS socket and works. Next, I have put pymodbus tcp/sync inside a method, however i got this error:
Logs
Test 0 - good X.509 (plus SNI)
Traceback (most recent call last):
File "client03_cert_sesion.py", line 82, in
clientTestCmd(sys.argv[1:])
File "client03_cert_sesion.py", line 73, in clientTestCmd
testConnClient(connection)
File "client03_cert_sesion.py", line 53, in testConnClient
client = ModbusClient('192.168.127.104', port=8005)
File "/home/tsec/.local/lib/python3.6/site-packages/pymodbus/client/sync.py", line 430, in init
BaseModbusClient.init(self, self.__implementation(method, self),
File "/home/tsec/.local/lib/python3.6/site-packages/pymodbus/client/sync.py", line 466, in __implementation
raise ParameterException("Invalid framer method requested")
pymodbus.exceptions.ParameterException: Modbus Error: [Invalid Parameter] Invalid framer method requested
Note:
- In order to lauch the client from a terminal i use: python3 client03_cert_sesion.py localhost:8005 ./tests
- I have included a private ip in both client and server because previously i got the error: ip in use.
# client-------------------------------------------------------------------------------------------------
#
import sys
import os
import os.path
import socket
import time
import timeit
import getopt
from tempfile import mkstemp
import os
import socket
#import sys
from tlslite.api import *
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
import logging
FORMAT = ('%(asctime)-15s %(threadName)-15s '
'%(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s')
logging.basicConfig(format=FORMAT)
log = logging.getLogger()
log.setLevel(logging.DEBUG)
def clientTestCmd(argv):
address = argv[0]
#dir = argv[1]
#Split address into hostname/port tuple
address = address.split(":")
address = ( address[0], int(address[1]) )
#open synchronisation FIFO
synchro = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
synchro.settimeout(60)
synchro.connect((address[0], address[1]-1))
def connect():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(15)
sock.connect(address)
c = TLSConnection(sock)
return c
def testConnClient(conn):
#user is not finnished
finnished =False
#while not finnished
while not finnished:
UNIT = 0x01
client = ModbusClient('192.168.127.104', port=8005)
client.connect()
print("conectado")
log.debug("Reading Coils")
rr = client.read_coils(1, 1, unit=UNIT)
log.debug(rr)
log.debug("Write to a Coil and read back")
rq = client.write_coil(0, True, unit=UNIT)
rr = client.read_coils(0, 1, unit=UNIT)
assert(not rq.isError()) # test that we are not an error
assert(rr.bits[0] == True)
test_no = 0
print("Test {0} - good X.509 (plus SNI)".format(test_no))
synchro.recv(1)
connection = connect()
connection.handshakeClientCert(serverName=address[0])
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
assert(connection.session.serverName == address[0])
assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites)
assert(connection.encryptThenMAC == False)
assert connection.session.appProto is None
connection.close()
if __name__ == "__main__":
clientTestCmd(sys.argv[1:])
# server------------------------------------------------------------------------------------------------
#
import sys
import os
import os.path
import socket
import time
import timeit
import getopt
from tempfile import mkstemp
import os
import socket
#import sys
from tlslite.api import *
from pymodbus.server.sync import StartTcpServer
from pymodbus.server.sync import StartUdpServer
from pymodbus.server.sync import StartSerialServer
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSparseDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
def serverTestCmd(argv):
address = argv[0]
dir = argv[1]
#Split address into hostname/port tuple
address = address.split(":")
address = ( address[0], int(address[1]) )
#Create synchronisation FIFO
synchroSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
synchroSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
synchroSocket.bind((address[0], address[1]-1))
synchroSocket.listen(2)
#Connect to server
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
lsock.bind(address)
lsock.listen(5)
# following is blocking until the other side doesn't open
synchro = synchroSocket.accept()[0]
def connect():
s = lsock.accept()[0]
s.settimeout(15)
return TLSConnection(s)
def testConnServer(connection):
finnished =False
while not finnished:
store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [17]*100),
co=ModbusSequentialDataBlock(0, [17]*100),
hr=ModbusSequentialDataBlock(0, [17]*100),
ir=ModbusSequentialDataBlock(0, [17]*100))
context = ModbusServerContext(slaves=store, single=True)
identity = ModbusDeviceIdentification()
StartTcpServer(context, identity=identity, address=('192.168.127.104', int(address[1])))
x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
x509Chain = X509CertChain([x509Cert])
s = open(os.path.join(dir, "serverX509Key.pem")).read()
x509Key = parsePEMKey(s, private=True)
with open(os.path.join(dir, "serverRSAPSSSigCert.pem")) as f:
x509CertRSAPSSSig = X509().parse(f.read())
x509ChainRSAPSSSig = X509CertChain([x509CertRSAPSSSig])
with open(os.path.join(dir, "serverRSAPSSSigKey.pem")) as f:
x509KeyRSAPSSSig = parsePEMKey(f.read(), private=True)
with open(os.path.join(dir, "serverRSAPSSCert.pem")) as f:
x509CertRSAPSS = X509().parse(f.read())
x509ChainRSAPSS = X509CertChain([x509CertRSAPSS])
assert x509CertRSAPSS.certAlg == "rsa-pss"
with open(os.path.join(dir, "serverRSAPSSKey.pem")) as f:
x509KeyRSAPSS = parsePEMKey(f.read(), private=True,
implementations=["python"])
test_no = 0
print("Test {0} - good X.509 (plus SNI)".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
assert connection.session.serverName == address[0]
assert connection.extendedMasterSecret
assert connection.session.appProto is None
testConnServer(connection)
connection.close()
if __name__ == "__main__":
serverTestCmd(sys.argv[1:])