-
Notifications
You must be signed in to change notification settings - Fork 6.2k
/
device_abstract.py
94 lines (74 loc) · 2.53 KB
/
device_abstract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import abc
import logging
import os
from typing import Generator
from twister_harness.log_files.log_file import LogFile, NullLogFile
from twister_harness.twister_harness_config import DeviceConfig
logger = logging.getLogger(__name__)
class DeviceAbstract(abc.ABC):
"""Class defines an interface for all devices."""
def __init__(self, device_config: DeviceConfig, **kwargs) -> None:
"""
:param device_config: device configuration
"""
self.device_config: DeviceConfig = device_config
self.handler_log_file: LogFile = NullLogFile.create()
self.device_log_file: LogFile = NullLogFile.create()
def __repr__(self) -> str:
return f'{self.__class__.__name__}()'
@property
def env(self) -> dict[str, str]:
env = os.environ.copy()
return env
@abc.abstractmethod
def connect(self, timeout: float = 1) -> None:
"""Connect with the device (e.g. via UART)"""
@abc.abstractmethod
def disconnect(self) -> None:
"""Close a connection with the device"""
@abc.abstractmethod
def generate_command(self) -> None:
"""
Generate command which will be used during flashing or running device.
"""
def flash_and_run(self, timeout: float = 60.0) -> None:
"""
Flash and run application on a device.
:param timeout: time out in seconds
"""
@property
@abc.abstractmethod
def iter_stdout(self) -> Generator[str, None, None]:
"""Iterate stdout from a device."""
@abc.abstractmethod
def write(self, data: bytes) -> None:
"""Write data bytes to device"""
@abc.abstractmethod
def initialize_log_files(self):
"""
Initialize file to store logs.
"""
def stop(self) -> None:
"""Stop device."""
# @abc.abstractmethod
# def read(self, size=1) -> None:
# """Read size bytes from device"""
# def read_until(self, expected, size=None):
# """Read until an expected bytes sequence is found"""
# lenterm = len(expected)
# line = bytearray()
# while True:
# c = self.read(1)
# if c:
# line += c
# if line[-lenterm:] == expected:
# break
# if size is not None and len(line) >= size:
# break
# else:
# break
# return bytes(line)