Skip to content

Commit

Permalink
Merge pull request #14 from chrisflesher/subclasses
Browse files Browse the repository at this point in the history
Subclasses
  • Loading branch information
jfjlaros committed Jan 2, 2021
2 parents cd14123 + 73f9cb6 commit df6a447
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 53 deletions.
2 changes: 1 addition & 1 deletion simple_rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from configparser import ConfigParser

from .simple_rpc import Interface
from .simple_rpc import Interface, SerialInterface, SocketInterface
from .extras import dict_to_object, object_to_dict


Expand Down
154 changes: 102 additions & 52 deletions simple_rpc/simple_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,34 @@
_list_req = 0xff


class Interface(object):
def __init__(self, device, baudrate=9600, wait=1, autoconnect=True):
"""Initialise the class.
class _Interface(object):
"""Generic simpleRPC interface."""
def __init__(self, device, baudrate=9600):
"""
:arg str device: Device name.
:arg int baudrate: Baud rate.
:arg int wait: Time in seconds before communication starts.
:arg bool autoconnect: Automatically connect.
"""
self._wait = wait

self._connection = serial_for_url(
device, do_not_open=True, baudrate=baudrate)
self._is_socket = isinstance(self._connection, socket_serial)
self._version = (0, 0, 0)
self._endianness = b'<'
self._size_t = b'H'
self.methods = {}

if autoconnect:
self.open()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def _open(self):
if not self._connection.isOpen():
try:
self._connection.open()
except SerialException as error:
raise IOError(error.strerror.split(':')[0])
try:
self._connection.open()
except SerialException as error:
raise IOError(error.strerror.split(':')[0])

def _close(self):
if self._connection.isOpen():
self._connection.close()

def _auto_open(f):
"""Decorator for automatic opening and closing of ethernet sockets."""
def _auto_open_wrapper(self, *args, **kwargs):
if self._is_socket:
self._open()

result = f(self, *args, **kwargs)

if self._is_socket:
self._close()

return result

return _auto_open_wrapper

self._connection.close()

def _select(self, index):
"""Initiate a remote procedure call, select the method.
Expand All @@ -84,8 +58,7 @@ def _write(self, obj_type, obj):
:arg bytes obj_type: Type of the parameter.
:arg any obj: Value of the parameter.
"""
write(
self._connection, self._endianness, self._size_t, obj_type, obj)
write(self._connection, self._endianness, self._size_t, obj_type, obj)

def _read_byte_string(self):
return read_byte_string(self._connection)
Expand All @@ -97,10 +70,8 @@ def _read(self, obj_type):
:returns any: Return value.
"""
return read(
self._connection, self._endianness, self._size_t, obj_type)
return read(self._connection, self._endianness, self._size_t, obj_type)

@_auto_open
def _get_methods(self):
"""Get remote procedure call methods.
Expand Down Expand Up @@ -131,9 +102,6 @@ def _get_methods(self):

def open(self):
"""Connect to device."""
self._open()
sleep(self._wait)

self.methods = self._get_methods()
for method in self.methods.values():
setattr(
Expand All @@ -145,15 +113,6 @@ def close(self):
delattr(self, method)
self.methods.clear()

self._close()

def is_open(self):
"""Query interface state."""
if self._is_socket:
return bool(self.methods)
return self._connection.isOpen()

@_auto_open
def call_method(self, name, *args):
"""Execute a method.
Expand Down Expand Up @@ -184,3 +143,94 @@ def call_method(self, name, *args):
if method['return']['fmt']:
return self._read(method['return']['fmt'])
return None


class SerialInterface(_Interface):
"""Serial simpleRPC interface."""
def __init__(self, device, baudrate=9600, wait=2, autoconnect=True):
"""
:arg str device: Device name.
:arg int baudrate: Baud rate.
:arg int wait: Time in seconds before communication starts.
:arg bool autoconnect: Automatically connect.
"""
super().__init__(device, baudrate)
self._wait = wait

if autoconnect:
self.open()

def is_open(self):
"""Query interface state."""
return self._connection.isOpen()

def open(self):
"""Connect to device."""
self._open()
sleep(self._wait)
super().open()

def close(self):
"""Disconnect from device."""
super().close()
self._close()


class SocketInterface(_Interface):
"""Socket simpleRPC interface."""
def __init__(self, device, baudrate=9600, autoconnect=True):
"""
:arg str device: Device name.
:arg int baudrate: Baud rate.
:arg bool autoconnect: Automatically connect.
"""
super().__init__(device, baudrate)

if autoconnect:
self.open()

def _auto_open(f):
"""Decorator for automatic opening and closing of ethernet sockets."""
def _auto_open_wrapper(self, *args, **kwargs):
self._open()
result = f(self, *args, **kwargs)
self._close()

return result

return _auto_open_wrapper

def is_open(self):
"""Query interface state."""
return len(self.methods) > 0

@_auto_open
def open(self):
"""Connect to device."""
super().open()

@_auto_open
def call_method(self, name, *args):
"""Execute a method.
:arg str name: Method name.
:arg list *args: Method parameters.
:returns any: Return value of the method.
"""
return super().call_method(name, *args)


class Interface(object):
"""Generic simpleRPC interface wrapper."""
def __new__(cls, device, *args, **kwargs):
"""
:arg str device: Device name.
:arg list *args: Interface positional arguments.
:arg list **kwargs: Interface keyword arguments.
:returns object: simpleRPC interface.
"""
if ':' in device:
return SocketInterface(device, *args, **kwargs)
return SerialInterface(device, *args, **kwargs)

0 comments on commit df6a447

Please sign in to comment.