Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mypy typing support #57

Merged
merged 2 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions isotp/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from isotp.address import *
from isotp.errors import *
from isotp.protocol import CanMessage as CanMessage, CanStack as CanStack, TransportLayer as TransportLayer
from isotp.tpsock import socket as socket
52 changes: 52 additions & 0 deletions isotp/address.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Optional, Dict, Any


class AddressingMode:
Normal_11bits: int
Normal_29bits: int
NormalFixed_29bits: int
Extended_11bits: int
Extended_29bits: int
Mixed_11bits: int
Mixed_29bits: int
@classmethod
def get_name(cls,
num: int) -> str: ...

class TargetAddressType:
Physical: int
Functional: int

class Address:
addressing_mode: int
target_address: Optional[int]
source_address: Optional[int]
address_extension: Optional[int]
txid: Optional[int]
rxid: Optional[int]
is_29bits: bool
tx_arbitration_id_physical: Optional[int]
tx_arbitration_id_functional: Optional[int]
rx_arbitration_id_physical: Optional[int]
rx_arbitration_id_functional: Optional[int]
tx_payload_prefix: bytearray
rx_prefix_size: int
rxmask: Optional[int]
is_for_me: bool
def __init__(self,
addressing_mode: int=...,
txid: Optional[int] = ...,
rxid: Optional[int] = ...,
target_address: Optional[int] = ...,
source_address: Optional[int] = ...,
address_extension: Optional[int] = ...,
**kwargs: Dict[Any, Any]) -> None: ...
def validate(self) -> None: ...
def get_tx_arbitraton_id(self,
address_type:TargetAddressType=...) -> Optional[int]: ...
def get_rx_arbitraton_id(self,
address_type:TargetAddressType=...) -> Optional[int]: ...
def requires_extension_byte(self) -> bool: ...
def get_tx_extension_byte(self) -> Optional[int]: ...
def get_rx_extension_byte(self) -> Optional[int]: ...
def get_content_str(self) -> str: ...
22 changes: 22 additions & 0 deletions isotp/errors.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Tuple, Dict, Any

class IsoTpError(Exception):
def __init__(self,
*args: Tuple[Any],
**kwargs: Dict[Any, Any]) -> None: ...

class FlowControlTimeoutError(IsoTpError): ...
class ConsecutiveFrameTimeoutError(IsoTpError): ...
class InvalidCanDataError(IsoTpError): ...
class UnexpectedFlowControlError(IsoTpError): ...
class UnexpectedConsecutiveFrameError(IsoTpError): ...
class ReceptionInterruptedWithSingleFrameError(IsoTpError): ...
class ReceptionInterruptedWithFirstFrameError(IsoTpError): ...
class WrongSequenceNumberError(IsoTpError): ...
class UnsuportedWaitFrameError(IsoTpError): ...
class MaximumWaitFrameReachedError(IsoTpError): ...
class FrameTooLongError(IsoTpError): ...
class ChangingInvalidRXDLError(IsoTpError): ...
class MissingEscapeSequenceError(IsoTpError): ...
class InvalidCanFdFirstFrameRXDL(IsoTpError): ...
class OverflowError(IsoTpError): ...
156 changes: 156 additions & 0 deletions isotp/protocol.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from typing import Optional, Callable, Dict, Tuple, Union, Any
from logging import Logger
from queue import Queue

from .address import Address, TargetAddressType


class CanMessage:
arbitration_id: int
dlc: int
data: bytearray
is_extended_id: bool
is_fd: bool
bitrate_switch: bool
def __init__(self,
arbitration_id: Optional[int] = ...,
dlc: Optional[int] = ...,
data: Optional[bytearray] = ...,
extended_id: bool = ...,
is_fd: bool = ...,
bitrate_switch: bool = ...) -> None: ...

class TransportLayer:
LOGGER_NAME: str
class Params:
stmin: int
blocksize: int
squash_stmin_requirement: bool
rx_flowcontrol_timeout: int
rx_consecutive_frame_timeout: int
tx_padding: Optional[int]
wftmax: int
tx_data_length: int
tx_data_min_length: Optional[int]
max_frame_size: int
can_fd: bool
bitrate_switch: bool
target_address_type: Optional[TargetAddressType]
def __init__(self) -> None: ...
def set(self,
key: Optional[str],
val: Optional[str],
validate: bool = ...) -> None: ...
def validate(self) -> None: ...
class Timer:
start_time: Optional[float]
def __init__(self,
timeout: Optional[float | int]) -> None: ...
timeout: Optional[float | int]
def set_timeout(self,
timeout: Optional[float | int]) -> None: ...
def start(self, timeout: Optional[float | int] = ...) -> None: ...
def stop(self) -> None: ...
def elapsed(self) -> Union[float, int]: ...
def is_timed_out(self) -> bool: ...
def is_stopped(self) -> bool: ...
class RxState:
IDLE: int
WAIT_CF: int
class TxState:
IDLE: int
WAIT_FC: int
TRANSMIT_CF: int
params: Dict[Any, Any]
logger: Logger
remote_blocksize: Optional[int]
rxfn: Callable[[], Optional[CanMessage]]
txfn: Callable[[CanMessage], None]
tx_queue: Queue[bytearray]
rx_queue: Queue[bytearray]
rx_state: int
tx_state: int
rx_block_counter: int
last_seqnum: int
rx_frame_length: int
tx_frame_length: int
last_flow_control_frame: Optional[object]
tx_block_counter: int
tx_seqnum: int
wft_counter: int
pending_flow_control_tx: bool
timer_tx_stmin: Timer
timer_rx_fc: Timer
timer_rx_cf: Timer
error_handler: Callable[[Any], None]
actual_rxdl: Optional[int]
timings: Dict[Tuple[int, int], float]
def __init__(self,
rxfn: Optional[Callable[[], Optional[CanMessage]]],
txfn: Optional[Callable[[CanMessage], None]],
address: Optional[Address] = ...,
error_handler: Optional[Callable[[Any], None]] = ...,
params: Union[Dict[Any, Any], None] =
...) -> None: ...
def send(self,
data: bytearray,
target_address_type: Optional[TargetAddressType] = ...) -> None: ...
def recv(self) -> Optional[bytearray]: ...
def available(self) -> bool: ...
def transmitting(self) -> bool: ...
def process(self) -> None: ...
def check_timeouts_rx(self) -> None: ...
def process_rx(self,
msg: CanMessage) -> None: ...
tx_buffer: bytearray
def process_tx(self) -> None: ...
def set_sleep_timing(self,
idle: int,
wait_fc: int) -> None: ...
address: Address
def set_address(self,
address: Address) -> None: ...
def pad_message_data(self,
msg_data: bytearray) -> None: ...
rx_buffer: bytearray
def empty_rx_buffer(self) -> None: ...
def empty_tx_buffer(self) -> None: ...
def start_rx_fc_timer(self) -> None: ...
def start_rx_cf_timer(self) -> None: ...
def append_rx_data(self,
data: bytearray) -> None: ...
pending_flowcontrol_status: int
def request_tx_flowcontrol(self, status: int=...) -> None: ...
def stop_sending_flow_control(self) -> None: ...
def make_tx_msg(self,
arbitration_id: int,
data: bytearray) -> CanMessage: ...
def get_dlc(self,
data: bytearray,
validate_tx: bool = ...) -> None: ...
def get_nearest_can_fd_size(self,
size: int) -> int: ...
def make_flow_control(self,
flow_status: int=...,
blocksize: Optional[int] = ...,
stmin: Optional[int] = ...) -> CanMessage: ...
must_wait_for_flow_control: bool
def request_wait_flow_control(self) -> None: ...
def stop_sending(self) -> None: ...
def stop_receiving(self) -> None: ...
def start_reception_after_first_frame_if_valid(self,
pdu: object) -> None: ...
def trigger_error(self, error: Any) -> None: ...
def reset(self) -> None: ...
def sleep_time(self) -> float: ...

class CanStack(TransportLayer):
def rx_canbus(self) -> Optional[CanMessage]: ...
tx_canbus: Callable[[CanMessage], None]
def __init__(self,
bus: object, # BusABC
*args: Tuple[Any],
**kwargs: Dict[Any, Any]) -> None: ...
bus: object # BusABC
def set_bus(self,
bus: object) -> None: ...
67 changes: 67 additions & 0 deletions isotp/tpsock/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Any, Optional, Tuple, Dict

from isotp.address import Address
from isotp.tpsock.opts import linklayer, general, flowcontrol

mtu: int

def check_support() -> None: ...

class flags:
LISTEN_MODE: int
EXTEND_ADDR: int
TX_PADDING: int
RX_PADDING: int
CHK_PAD_LEN: int
CHK_PAD_DATA: int
HALF_DUPLEX: int
FORCE_TXSTMIN: int
FORCE_RXSTMIN: int
RX_EXT_ADDR: int
WAIT_TX_DONE: int

class LinkLayerProtocol:
CAN: int
CAN_FD: int

class socket:
flags: flags
LinkLayerProtocol: LinkLayerProtocol
interface: Optional[str]
address: Optional[Address]
bound: bool
closed: bool
def __init__(self,
timeout: float = ...) -> None: ...
def send(self,
*args: Tuple[Any],
**kwargs: Dict[Any, Any]) -> int: ...
def recv(self,
n:int=...) -> Optional[bytes]: ...
def set_ll_opts(self,
*args: Tuple[Any],
**kwargs: Dict[Any, Any]) -> linklayer: ...
def set_opts(self,
*args: Tuple[Any],
**kwargs: Dict[Any, Any]) -> general: ...
def set_fc_opts(self,
*args: Tuple[Any],
**kwargs: Dict[Any, Any]) -> flowcontrol: ...
def get_ll_opts(self,
*args: Tuple[Any],
**kwargs: Dict[Any, Any]) -> linklayer: ...
def get_opts(self,
*args: Tuple[Any],
**kwargs: Dict[Any, Any]) -> general: ...
def get_fc_opts(self,
*args: Tuple[Any],
**kwargs: Dict[Any, Any]) -> flowcontrol: ...
def bind(self,
interface: str,
*args: Tuple[Any],
**kwargs: Dict[Any, Any]) -> None: ...
def fileno(self) -> int: ...
def close(self,
*args: Tuple[Any],
**kwargs: Dict[Any, Any]) -> None: ...
def __delete__(self) -> None: ...
70 changes: 70 additions & 0 deletions isotp/tpsock/opts.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from . import check_support as check_support, socket as socket
from typing import Any, Optional
from isotp.tpsock.opts import socket

flags: Any

def assert_is_socket(s: socket) -> None: ...

SOL_CAN_BASE: int
SOL_CAN_ISOTP: Optional[int]
CAN_ISOTP_OPTS: int
CAN_ISOTP_RECV_FC: int
CAN_ISOTP_TX_STMIN: int
CAN_ISOTP_RX_STMIN: int
CAN_ISOTP_LL_OPTS: int

class general:
struct_size: Any
optflag: Any
frame_txtime: Any
ext_address: Any
txpad: Any
rxpad: Any
rx_ext_address: Any
def __init__(self) -> None: ...
@classmethod
def read(cls,
s: socket) -> general: ...
@classmethod
def write(cls,
s: socket,
optflag: Optional[Any] = ...,
frame_txtime: Optional[Any] = ...,
ext_address: Optional[Any] = ...,
txpad: Optional[Any] = ...,
rxpad: Optional[Any] = ...,
rx_ext_address: Optional[Any] = ...,
tx_stmin: Optional[Any] = ...) -> general: ...

class flowcontrol:
struct_size: int
stmin: Any
bs: Any
wftmax: Any
def __init__(self) -> None: ...
@classmethod
def read(cls,
s: socket) -> flowcontrol: ...
@classmethod
def write(cls,
s: socket,
bs: Optional[Any] = ...,
stmin: Optional[Any] = ...,
wftmax: Optional[Any] = ...) -> flowcontrol: ...

class linklayer:
struct_size: int
mtu: Any
tx_dl: Any
tx_flags: Any
def __init__(self) -> None: ...
@classmethod
def read(cls,
s: socket) -> linklayer: ...
@classmethod
def write(cls,
s: socket,
mtu: Optional[Any] = ...,
tx_dl: Optional[Any] = ...,
tx_flags: Optional[Any] = ...) -> linklayer: ...