diff --git a/.gitlab-ci.d/opentitan/qemu-ot.yml b/.gitlab-ci.d/opentitan/qemu-ot.yml index 54a62fd0a73a6..7e2886b7ac24c 100644 --- a/.gitlab-ci.d/opentitan/qemu-ot.yml +++ b/.gitlab-ci.d/opentitan/qemu-ot.yml @@ -1,5 +1,5 @@ variables: - BAREMETAL_REF: "a0-251104-1" + BAREMETAL_REF: "ot-251125-1" QEMU_BUILD_OPTS: "--disable-install-blobs" include: diff --git a/docs/opentitan/ot_spi_device.md b/docs/opentitan/ot_spi_device.md index ceafe9e370027..e5907a639b0fe 100644 --- a/docs/opentitan/ot_spi_device.md +++ b/docs/opentitan/ot_spi_device.md @@ -119,7 +119,7 @@ should release the /CS line, i.e. should be 0. - `p`: polarity, should match `CFG.CPOL` (not yet supported) - `a`: phase, should match `CFG.CPHA` (not yet supported) - `t`: tx order, see `CFG.TX_ORDER` (not yet supported) - - `r`: rx order, see `CFG.TX_ORDER` (not yet supported) + - `r`: rx order, see `CFG.RX_ORDER` (not yet supported) - `c`: whether to keep _/CS_ low (=1) or release _/CS_ (=0) when payload has been processed. Any SPI transaction should end with C=0 packet. However it is possible to use several SPI device CharDev packets to handle a single SPI transaction: example: JEDEC ID w/ continuation code, diff --git a/docs/opentitan/tools.md b/docs/opentitan/tools.md index 2dc8a7a57f080..ef250de7f2fad 100644 --- a/docs/opentitan/tools.md +++ b/docs/opentitan/tools.md @@ -4,7 +4,7 @@ All the OpenTitan tools and associated files are stored in the `scripts/opentita ## Installation -Most tools are implemented in Python language. They require Python 3.9 or newer. +Most tools are implemented in Python language. They require Python 3.10 or newer. It is recommended to install Python dependencies using a [virtual environment](https://virtualenv.pypa.io). diff --git a/docs/opentitan/verilate.md b/docs/opentitan/verilate.md index 7df7e3561acc5..e62682278c21f 100644 --- a/docs/opentitan/verilate.md +++ b/docs/opentitan/verilate.md @@ -8,8 +8,9 @@ binaries as QEMU and comparing the outcome of each simulation environment. ````text usage: verilate.py [-h] [-V VERILATOR] [-R FILE] [-M FILE] [-F FILE] [-O VMEM] - [-K] [-D TMP_DIR] [-c CFG] [-a PREFIX] [-C CYCLES] [-I] - [-k SECONDS] [-l] [-P FILE] [-w] [-x] [-v] [-d] [-G] + [-K] [-D TMP_DIR] [-c CFG] [-a PREFIX] [-b TCP_PORT] + [-C CYCLES] [-I] [-k SECONDS] [-l] [-P FILE] [-w] [-x] [-v] + [-d] [-G] [ELF ...] Verilator wrapper. @@ -36,6 +37,8 @@ Verilator: -a, --artifact-name PREFIX set an alternative artifact name (default is derived from the application name) + -b, --spi-device-bridge TCP_PORT + Create a SPI device bridge -C, --cycles CYCLES exit after the specified cycles -I, --show-init show initializable devices -k, --timeout SECONDS @@ -62,6 +65,12 @@ Extras: * `-a` / `--artifact` all artifact files (see `-l`, `-x` and `-w`) are named after the application name. This option specifies an alternative file prefix for all those artifacts. +* `-b` / `--spi-device-bridge` create a local server on the specified TCP port that accepts QEMU SPI + device CharDev compliant requests, translates and converts them into simplified Verilator SPI DPI + requests using its PTY SPI device channel. SDO output data received on the PTY channel are + converted back into QEMU CharDev compliant responses. This feature can be enabled to use SPI + device tests that have been designed for the QEMU VM. + * `-C` / `--cycles` abort Verilator execution after the specified count of cycles. See also the `-k` option. diff --git a/python/qemu/ot/spi/spi_device.py b/python/qemu/ot/spi/spi_device.py index 9f1e9ba49fa05..a1507c17d394a 100644 --- a/python/qemu/ot/spi/spi_device.py +++ b/python/qemu/ot/spi/spi_device.py @@ -81,6 +81,7 @@ class SpiDevice: 'RESET1': 0x66, 'RESET2': 0x99, 'CHECK_ANSWER': 0xca, + 'CUSTOM_COMMAND': 0xed, # reserved opcode for custom commands } """Supported *25 SPI data flash device commands.""" @@ -91,6 +92,7 @@ def __init__(self): self._4ben = False self._rev_rx = False self._rev_tx = False + self._timeout = self.TIMEOUT def connect(self, host: str, port: Optional[int] = None) -> None: """Open a connection to the remote host. @@ -120,7 +122,7 @@ def connect(self, host: str, port: Optional[int] = None) -> None: except ValueError as exc: raise ValueError('TCP port not specified') from exc self._socket = create_connection((host, port), - timeout=self.TIMEOUT) + timeout=self.CONN_TIMEOUT) self._socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1) elif sock_args[0] == 'unix': self._socket = socket(AF_UNIX, SOCK_STREAM) @@ -143,6 +145,22 @@ def disconnect(self) -> None: self._socket.close() self._socket = None + @property + def exchange_timeout(self) -> float: + """Get the maximum allowed time to perform a SPI round trip. + + :return: the timeout in seconds + """ + return self._timeout + + @exchange_timeout.setter + def exchange_timeout(self, timeout: float) -> None: + """Set the maximum allowed time to perform a SPI round trip. + + :param timeout: the timeout in seconds + """ + self._timeout = timeout + quit = disconnect """Old API.""" @@ -207,7 +225,9 @@ def read_jedec_id(self) -> JedecId: def read_sfdp(self, address: int = 0) -> bytes: """Read out the flash device SFTP descriptor.""" - payload = spack('>I', address) + # SFTP command expects a dummy byte before the SFDP payload is actually + # streamed, address should be 3 byte long + payload = spack('>Ix', address)[1:] return self.transmit(self.COMMANDS['READ_SFDP'], payload, 256) def enable_write(self): @@ -264,10 +284,9 @@ def sector_erase(self, address: int): addr = addr[1:] self.transmit(self.COMMANDS['SECTOR_ERASE'], addr) - def reset(self): + def reset(self, alt: bool = False): """Reset the flash device.""" - # self.transmit(self.COMMANDS['RESET1']) - self.transmit(self.COMMANDS['RESET2']) + self.transmit(self.COMMANDS['RESET2' if not alt else 'RESET1']) def read(self, address: int, length: int, fast: bool = False, release: bool = True) -> bytes: @@ -301,6 +320,26 @@ def read_cont(self, length: int, release: bool = True) -> bytes: """ return self.transmit(None, None, length, release) + def custom_command(self, address: Optional[int] = None, + payload: Optional[bytes] = None) -> bytes: + """Execute a custom command. + + :param address: optional address, sent as 3 or 4 bytes, depending on + the current mode + :param payload: optional payload + """ + if address is not None: + byte_count = 4 if self.is_4b_addr else 3 + if address >= (1 << (byte_count * 8)): + raise ValueError('Cannot encode address') + addr = spack('>I', address) + if not self.is_4b_addr: + addr = addr[1:] + else: + addr = b'' + payload = b''.join((addr, payload or b'')) + return self.transmit(self.COMMANDS['CUSTOM_COMMAND'], addr) + def release(self) -> None: """Release /CS line.""" data = self._build_cs_header(0, True) @@ -385,7 +424,7 @@ def _send(self, buf: bytes, release: bool = True): def _receive(self, size: int) -> bytes: buf = bytearray() rem = size - timeout = now() + self.TIMEOUT + timeout = now() + self._timeout poller = spoll() poller.register(self._socket, POLLIN) while rem: diff --git a/python/qemu/ot/verilator/executer.py b/python/qemu/ot/verilator/executer.py index 11478e5ff1acb..5112b8bb6f102 100644 --- a/python/qemu/ot/verilator/executer.py +++ b/python/qemu/ot/verilator/executer.py @@ -1,21 +1,19 @@ -"""Verilator wrapper.""" - # Copyright (c) 2025 Rivos, Inc. # Copyright (c) 2025 lowRISC contributors. # SPDX-License-Identifier: Apache2 +"""Verilator wrapper.""" + from collections import deque -from fcntl import fcntl, F_GETFL, F_SETFL -from io import BufferedRandom, TextIOWrapper -from os import O_NONBLOCK, close, getcwd, rename, symlink, unlink +from io import TextIOWrapper +from os import close, environ, getcwd, rename, symlink, unlink from os.path import (abspath, basename, exists, isfile, islink, join as joinpath, realpath, splitext) -from select import POLLIN, POLLERR, POLLHUP, poll as sel_poll from shutil import copyfile from subprocess import Popen, PIPE, TimeoutExpired from sys import stderr from tempfile import mkstemp -from threading import Thread +from threading import Event, Thread from time import sleep, time as now from traceback import format_exc from typing import NamedTuple, Optional, Union @@ -26,24 +24,12 @@ from ot.rom.image import ROMImage from ot.util.elf import ElfBlob from ot.util.file import guess_file_type, make_vmem_from_elf -from ot.util.log import ColorLogFormatter from ot.util.misc import HexInt, split_map_join from . import DEFAULT_TIMEOUT from .filemgr import VtorFileManager - - -class VtorVcpDescriptor(NamedTuple): - """ Virtual communication port.""" - - vcpid: str - """VCP identifier.""" - pty: BufferedRandom - """Attached pseudo terminal.""" - buffer: bytearray - """Data buffer.""" - logger: logging.Logger - """Associated logger.""" +from .spi import VtorSpiBridge +from .vcp import VtorVcpManager class VtorMemRange(NamedTuple): @@ -61,9 +47,6 @@ class VtorExecuter: VMEM_OFFSET = 0 """Offset when converting BM test to VMEM file.""" - ANSI_CRE = re.compile(rb'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]') - """ANSI escape sequences.""" - DV_CRE = re.compile(r'^(\d+):\s\(../.*?\)\s(.*)$') """DV macro messages.""" @@ -76,6 +59,9 @@ class VtorExecuter: START_TIMEOUT = 2.0 """Initial timeout to load and query Verilator.""" + TERMINATION_TIMEOUT = 2.0 + """Delay for Verilator to quit on request, before being force-killed.""" + def __init__(self, vfm: VtorFileManager, verilator: str, profile: Optional[str], debug: bool = False): self._log = logging.getLogger('vtor.exec') @@ -85,15 +71,19 @@ def __init__(self, vfm: VtorFileManager, verilator: str, self._verilator = verilator self._fm = vfm self._artifact_name: Optional[str] = None + self._generate_spi_log = False self._save_xlog = False self._link_xlog = False self._gen_wave = False # parsed communication ports from Verilator self._ports: dict[str, Union[int, str]] = {} + self._vcp = VtorVcpManager() + self._spi_event = Event() + self._spi = VtorSpiBridge(debug) + # where Verilator stores the SPI device log file + self._spilog_path: Optional[str] = None # where Verilator stores the execution log file self._xlog_path: Optional[str] = None - self._vcps: dict[int, VtorVcpDescriptor] = {} - self._poller = sel_poll() self._resume = False self._threads: list[Thread] = [] self._devices: dict[str, VtorMemRange[int, int]] = {} @@ -179,10 +169,19 @@ def secret_file(self, file_path: str) -> None: raise FileNotFoundError(f'No such secret file {file_path}') self._secret_file = file_path + def create_spi_device_bridge(self, spi_port: int) -> None: + """Create a fake QEMU SPI device server to bridge Verilator SPI device. + + :param spi_port: optional TCP port to create a SPI device bridge + """ + self._spi.create(spi_port, self._spi_event) + self._generate_spi_log = True + def verilate(self, rom_files: list[str], ram_files: list[str], flash_files: list[str], app_files: list[str], otp: Optional[str], timeout: float = None, - cycles: Optional[int] = None) -> int: + cycles: Optional[int] = None, ) \ + -> int: """Execute a Verilator simulation. :param rom_files: optional list of files to load in ROMs @@ -191,7 +190,7 @@ def verilate(self, rom_files: list[str], ram_files: list[str], :param app_files: optional list of application ELF files to execute :param otp: optional file to load as OTP image :param timeout: optional max execution delay in seconds - :paran cycles: optional max execution cycles + :param cycles: optional max execution cycles """ workdir = self._fm.tmp_dir self._log.debug('Work dir: %s', workdir) @@ -240,9 +239,13 @@ def verilate(self, rom_files: list[str], ram_files: list[str], args.append(f'--trace={wave_name}') self._log.debug('Executing Verilator as %s', self._simplifly_cli(args)) + env = dict(environ) + if self._generate_spi_log: + # 'P': log SPI PTY protocol (do not use M': SPI "monitor") + env['VERILATOR_SPI_LOG'] = 'P' # pylint: disable=consider-using-with proc = Popen(args, - bufsize=1, cwd=workdir, + bufsize=1, cwd=workdir, env=env, stdout=PIPE, stderr=PIPE, encoding='utf-8', errors='ignore', text=True) try: @@ -253,10 +256,10 @@ def verilate(self, rom_files: list[str], ram_files: list[str], ret = proc.returncode self._log.error('Verilator bailed out: %d', ret) raise OSError() - # if execution starts and the execution log should be generated - # discards any previous file to avoid leaving a previous version of - # this file that would not match the current session - self._discard_exec_log() + # if execution starts and the log file should be generated, discards + # any previous file to avoid leaving a previous version of such file + # that would not match the current session + self._discard_logs() log_q = deque() self._resume = True for pos, stream in enumerate(('out', 'err')): @@ -269,6 +272,23 @@ def verilate(self, rom_files: list[str], ram_files: list[str], thread.start() abstimeout = float(timeout) + now() while now() < abstimeout: + if self._spi_event.is_set(): + match self._spi.exception: + case TimeoutError(): + reason = 'no response from Verilator' + case BrokenPipeError() | ConnectionResetError(): + reason = 'host disconnected' + case Exception(): + reason = str(self._spi.exception) + case _: + reason = 'unknown' + if not ret: + ret = proc.poll() + if not ret: + ret = 1 + self._log.error('Exiting on %s SPI bridge event, code %d', + reason, ret) + break while log_q: err, qline = log_q.popleft() if err: @@ -282,7 +302,16 @@ def verilate(self, rom_files: list[str], ram_files: list[str], if not err and not simulate: if qline.startswith('Simulation running, '): simulate = True - self._connect_vcps(2.0) + self._vcp.connect({ + k: v for k, v in self._ports.items() + if k.startswith('uart')}, 2.0) + spi_ptys = sorted([v for p, v in self._ports.items() + if p.startswith('spi')]) + if spi_ptys: + self._spi.connect_pty(spi_ptys[0]) + # only support a single SPI device bridge + if len(spi_ptys) > 1: + self._log.warning('Too many SPI devices') self._log.info('Simulation begins') else: self._parse_verilator_info(qline) @@ -295,8 +324,9 @@ def verilate(self, rom_files: list[str], ram_files: list[str], sleep(0.005) xret = proc.poll() if xret is None: - xret = self._process_vcps() + xret = self._vcp.process() if xret is not None: + self._log.debug('Verilator exited with code %d', xret) if xend is None: xend = now() ret = xret @@ -311,11 +341,12 @@ def verilate(self, rom_files: list[str], ram_files: list[str], else: ret = self.DEADLOCK finally: - self._disconnect_vcps() + self._spi.disconnect() + self._vcp.disconnect() if proc: # leave some for Verilator to cleanly complete and flush its # streams - wait = 0.5 + wait = self.TERMINATION_TIMEOUT if xend is None: xend = now() waited_time = now() @@ -365,11 +396,15 @@ def verilate(self, rom_files: list[str], ram_files: list[str], if log_q: # should never happen self._vlog.error('Lost traces') - self._save_exec_log() + self._save_logs() tmp_profile = joinpath(workdir, 'profile.vlt') if isfile(tmp_profile) and profile_file: self._log.info('Saving profile file as %s', profile_file) copyfile(tmp_profile, profile_file) + if ret: + self._log.error("Verilator failed: %s", ret) + else: + self._log.info("Success") return abs(ret or 0) def _vtor_logger(self, stream: TextIOWrapper, queue: deque, err: bool) \ @@ -525,6 +560,11 @@ def _parse_verilator_info(self, line: str) -> None: parts = line.split('.')[0].split(' ') self._ports[parts[-1]] = parts[-3] return + spi_prefix = 'SPI: PTY output file created at ' + if line.startswith(spi_prefix): + spi_log_line = line.removeprefix(spi_prefix) + self._spilog_path = spi_log_line.rsplit('.', 1)[0] + self._log.debug('SPI PTY log path: %s', self._spilog_path) def _parse_verilator_log(self, line: str) -> bool: """Parse verilator log mesage. @@ -571,111 +611,24 @@ def _parse_verilator_output(self, line: str, err: bool) -> str: return '' return line - def _connect_vcps(self, delay: float): - connect_map = {k: v for k, v in self._ports.items() - if k.startswith('uart')} - timeout = now() + delay - # ensure that QEMU starts and give some time for it to set up - # when multiple VCPs are set to 'wait', one VCP can be connected at - # a time, i.e. QEMU does not open all connections at once. - vcp_lognames = [] - vcplogname = 'vtor' - vcplognames = [] - while connect_map: - if now() > timeout: - minfo = ', '.join(f'{d} @ {r}' - for d, r in connect_map.items()) - raise TimeoutError(f'Cannot connect to Verilator VCPs: {minfo}') - connected = [] - for vcpid, ptyname in connect_map.items(): - try: - # pylint: disable=consider-using-with - vcp = open(ptyname, 'rb+', buffering=0) - flags = fcntl(vcp, F_GETFL) - fcntl(vcp, F_SETFL, flags | O_NONBLOCK) - connected.append(vcpid) - vcp_lognames.append(vcpid) - vcp_log = logging.getLogger(f'{vcplogname}.{vcpid}') - vcplognames.append(vcpid) - vcp_fno = vcp.fileno() - assert vcp_fno not in self._vcps - self._vcps[vcp_fno] = VtorVcpDescriptor(vcpid, vcp, - bytearray(), - vcp_log) - self._log.debug('VCP %s connected to pty %s', - vcpid, ptyname) - self._poller.register(vcp, POLLIN | POLLERR | POLLHUP) - except ConnectionRefusedError: - continue - except OSError as exc: - self._log.error('Cannot setup Verilator VCP connection %s: ' - '%s', vcpid, exc) - print(format_exc(chain=False), file=stderr) - raise - # removal from dictionary cannot be done while iterating it - for vcpid in connected: - del connect_map[vcpid] - self._colorize_vcp_log(vcplogname, vcplognames) - - def _disconnect_vcps(self): - for _, vcp, _, _ in self._vcps.values(): - self._poller.unregister(vcp) - vcp.close() - self._vcps.clear() - - def _process_vcps(self) -> Optional[int]: - ret = None - for vfd, event in self._poller.poll(0.01): - if event in (POLLERR, POLLHUP): - self._poller.modify(vfd, 0) + def _discard_logs(self) -> None: + if not self._artifact_name: + return + for log_suffix in ('', 'spi.'): + log_path = f'{self._artifact_name}.{log_suffix}log' + if not isfile(log_path): continue - _, vcp, vcp_buf, vcp_log = self._vcps[vfd] try: - data = vcp.read(256) - except TimeoutError: - self._log.error('Unexpected timeout w/ poll on %s', vcp) - continue - if not data: - continue - vcp_buf += data - lines = vcp_buf.split(b'\n') - vcp_buf[:] = bytearray(lines[-1]) - for line in lines[:-1]: - line = self.ANSI_CRE.sub(b'', line) - sline = line.decode('utf-8', errors='ignore').rstrip() - level = logging.INFO - vcp_log.log(level, sline) - if ret is not None: - # match for exit sequence on current VCP - break - return ret - - def _colorize_vcp_log(self, logbase: str, lognames: list[str]) -> None: - vlog = logging.getLogger(logbase) - clr_fmt = None - while vlog: - for hdlr in vlog.handlers: - if isinstance(hdlr.formatter, ColorLogFormatter): - clr_fmt = hdlr.formatter - break - vlog = vlog.parent - if not clr_fmt: - return - for color, logname in enumerate(sorted(lognames)): - clr_fmt.add_logger_colors(f'{logbase}.{logname}', color) + unlink(log_path) + self._log.debug('Old execution log file discarded: %s', + log_path) + except OSError as exc: + self._log.error('Cannot remove previous execution log file: %s', + exc) - def _discard_exec_log(self) -> None: - if not self._artifact_name: - return - log_path = f'{self._artifact_name}.log' - if log_path or not isfile(log_path): - return - try: - unlink(log_path) - self._log.debug('Old execution log file discarded') - except OSError as exc: - self._log.error('Cannot remove previous execution log file: %s', - exc) + def _save_logs(self) -> None: + self._save_exec_log() + self._save_spi_log() def _save_exec_log(self) -> None: if not self._save_xlog: @@ -694,6 +647,20 @@ def _save_exec_log(self) -> None: self._log.debug('Saving execution log as %s', log_path) copyfile(self._xlog_path, log_path) + def _save_spi_log(self) -> None: + if not self._generate_spi_log: + return + if not self._spilog_path: + self._log.error('No SPI log file found') + return + if not isfile(self._spilog_path): + self._log.error('Missing SPI log file') + return + assert self._artifact_name is not None + log_path = f'{self._artifact_name}.spi.log' + self._log.debug('Saving SPI log as %s', log_path) + copyfile(self._spilog_path, log_path) + def _convert_rom_file(self, file_kind: str, file_path: str, size: int, rom_idx: int) -> str: if file_kind in ('hex', 'svmem'): diff --git a/python/qemu/ot/verilator/filemgr.py b/python/qemu/ot/verilator/filemgr.py index bf7a0972198bd..9451eb0ac568a 100644 --- a/python/qemu/ot/verilator/filemgr.py +++ b/python/qemu/ot/verilator/filemgr.py @@ -1,8 +1,8 @@ -"""Verilator wrapper.""" - # Copyright (c) 2025 Rivos, Inc. # SPDX-License-Identifier: Apache2 +"""Verilator wrapper.""" + from atexit import register from logging import getLogger from os.path import isdir diff --git a/python/qemu/ot/verilator/spi.py b/python/qemu/ot/verilator/spi.py new file mode 100644 index 0000000000000..6ed7bfee602d2 --- /dev/null +++ b/python/qemu/ot/verilator/spi.py @@ -0,0 +1,375 @@ +# Copyright (c) 2025 Rivos, Inc. +# SPDX-License-Identifier: Apache2 + +"""Verilator SPI protocol bridge. + + Accept incoming requests using QEMU OT SPI device protocol (TCP socket) + Converts requests/responses to/from Verilator SPI DPI simplified protocol + (PTY socket). + + See https://github.com/lowRISC/opentitan/pull/28803 for SPI DPI protocol. +""" + +from binascii import hexlify +from collections import deque +from enum import IntFlag +from fcntl import fcntl, F_GETFL, F_SETFL +from os import O_NONBLOCK +from io import BufferedRandom +from socket import create_server, socket, SHUT_RDWR +from struct import unpack as sunpack +from threading import Event, Thread +from time import sleep, time as now +from traceback import format_exc +from typing import Optional, Union + +import logging +import sys + +from ot.spi.spi_device import SpiDevice + + +class VtorSpiInput(IntFlag): + """Bit assignment on Verilator SPI device input.""" + SCK = 0 + CSB = 1 + SDI = 2 + + +class VtorSpiOutput(IntFlag): + """Bit assignment on Verilator SPI device output.""" + SDO_EN = 0 + SDO = 1 + + +class VtorSpiBridge: + """SPI bridge. + + Expose a QEMU OT SPI device-compatible socket. + Relay SPI bus requests to Verilator SPI device port. + """ + + CONN_TIMETOUT = 0.25 + """Maximum time waiting for a connection before yielding + """ + + POLL_TIMEOUT = 0.05 + """Maximum time to wait on a blocking operation. + """ + + PTY_SYNC = 0.1 + """Time to poll for PTY connection. + """ + + SPI_BUS_TIMEOUT = 0.5 + """Maximum time to receive a response from Verilator PTY. + """ + + SCK_LOW = 0 + SCK_HIGH = 1 << VtorSpiInput.SCK + CSB_LOW = 0 + CSB_HIGH = 1 << VtorSpiInput.CSB + SDI_LOW = 0 + SDI_HIGH = 1 << VtorSpiInput.SDI + SDO_EN_HIGH = 1 << VtorSpiOutput.SDO_EN + SDO_HIGH = 1 << VtorSpiOutput.SDO + + # Each byte requires 16 PTY chars (two clock edges per bit, i.e. two chars). + PTY_CHAR_PER_BYTE = 16 + DEFAULT_PTY_BUF_SIZE = 512 # 1024 should be a safe value + + ASCII_ZERO = ord('0') + + def __init__(self, debug: bool = False): + self._log = logging.getLogger('vtor.spi') + self._slog = logging.getLogger('vtor.spi.server') + self._clog = logging.getLogger('vtor.spi.client') + self._debug = debug + self._server: Optional[Thread] = None + self._pty: Optional[BufferedRandom] = None + self._resume = False + self._cs = False # note: False means inactive (i.e. /CS high) + self._exception = Optional[Exception] + self._chunk_size = 0 + # initialize self._chunk_size with the property setter + self.pty_buffer_size = self.DEFAULT_PTY_BUF_SIZE + + def create(self, tcp_port: int, end_event: Event) -> None: + """Create fake QEMU SPI device server. + :param tcp_port: TCP port on local host to listen for incoming + connections. + :param end_event: event to signal on SPI bridge termination. + """ + if not self._server: + self._server = Thread(target=self._serve, name='spibridge', + daemon=True, args=(tcp_port, end_event)) + + @property + def pty_buffer_size(self) -> int: + """Get current PTY usable buffer size.""" + return self._chunk_size * self.PTY_CHAR_PER_BYTE + + @pty_buffer_size.setter + def pty_buffer_size(self, pty_buffer_size: int) -> None: + """Set PTY usable buffer size.""" + self._chunk_size = pty_buffer_size // self.PTY_CHAR_PER_BYTE + + def connect_pty(self, pty_name: str) -> None: + """Connect Verilator SPI device pseudo-terminal. + + :param pty_name: pseudo terminal device to use for the SPI device + """ + if self._pty: + raise RuntimeError('Cannot reconnect to PTY') + try: + # pylint: disable=consider-using-with + pty = open(pty_name, 'rb+', buffering=0) + flags = fcntl(pty, F_GETFL) + fcntl(pty, F_SETFL, flags | O_NONBLOCK) + self._log.debug('SPI device PTY %s connected', pty_name) + self._pty = pty + except (ConnectionError, OSError) as exc: + self._log.error('Cannot connect Verilator SPI device PTY %s: ' + '%s', pty_name, exc) + raise + if not self._server: + return + self._resume = True + self._server.start() + + def disconnect(self) -> None: + """Disconnect all managed VCPs.""" + self._resume = False + if self._server: + if self._server.is_alive(): + self._server.join() + self._server = None + if self._pty: + self._pty.close() + self._pty = None + + @property + def exception(self) -> Optional[Exception]: + """Return last exception, if any has been raised.""" + return self._exception + + def _serve(self, tcp_port: int, event: Event): + """Worker thread that bridge SPI device communication between a QEMU + SPI device client and Verilator bitbang SPI device PTY. + """ + try: + sock = create_server(('localhost', tcp_port), + backlog=1, reuse_port=True) + except OSError: + self._log.fatal('Cannot connect to :%d', tcp_port) + raise + sock.settimeout(self.CONN_TIMETOUT) + with sock: + try: + while self._resume: + try: + conn, addr = sock.accept() + except TimeoutError: + # check whether server should resume every CONN_TIMEOUT + # to avoid deadlocking when no client wants to connect + continue + self._log.debug('New SPI device connection %s:%s', *addr) + with conn: + self._serve_connection(conn) + except Exception as exc: + self._exception = exc + event.set() + raise + finally: + try: + sock.shutdown(SHUT_RDWR) + sock.close() + except OSError: + pass + + def _serve_connection(self, sock: socket) -> None: + buffer = bytearray() + length = 0 + pty_en = False + cfg = 0 + while self._resume: + if not self._pty: + # there is no point handling incoming SPI host requests while + # downstream PTY is not available + sleep(self.PTY_SYNC) + continue + if not pty_en: + # set IDLE state on SPI device connection: /CS high, SCK low + self._log.debug('SPI idle') + idle = bytes([self.CSB_HIGH | self.SCK_LOW]) + sync = 10 + while sync: + self._pty.write(idle) + data = self._pty.read(len(idle)) + if data: + break + sleep(0.1) + sync -= 1 + else: + raise TimeoutError('No reply from SPI device') + pty_en = True + try: + # arbitrary length to receive all buffered data at once. + data = sock.recv(1024) + if not data: + self._log.warning('SPI host disconnected') + return + buffer.extend(data) + if not buffer: + # wait for data + continue + if not length: + # wait for header + if len(buffer) < SpiDevice.CS_HEADER_SIZE: + continue + magic, version, cfg, length = \ + sunpack(SpiDevice.CS_HEADER_FMT, + buffer[:SpiDevice.CS_HEADER_SIZE]) + if magic != b'/CS': + self._slog.error('Invalid SPI magic: %s', + hexlify(magic).decode()) + if self._debug: + raise RuntimeError('SPI protocol error') + return + if version != 0: + self._slog.error('Invalid SPI protocol version: %d', + version) + if self._debug: + raise RuntimeError('SPI version error') + return + buffer = buffer[SpiDevice.CS_HEADER_SIZE:] + if len(buffer) < length: + # wait for SPI packet/payload + self._slog.debug('Expect %d bytes, got %d; %d to go', + length, len(buffer), length-len(buffer)) + continue + request, buffer = bytes(buffer[:length]), buffer[length:] + release = not bool(cfg >> 7) + cfg &= 0b1111 + if cfg: + # only support SPI mode 0, default order + self._slog.error('SPI mode/config not supported: %s', + f'0b{cfg:04b}') + sock.send(bytes([0xff] * length)) + length = 0 + continue + if len(request) <= 32: + self._slog.debug('SPI request > %s (%d)', + hexlify(request).decode(), len(request)) + else: + self._slog.debug('SPI request > %s (%d) ...', + hexlify(request[:32]).decode(), + len(request)) + response = self._handle_spi_request(request, release) + if len(request) <= 32: + self._slog.debug('SPI response < %s, %s /CS', + hexlify(response).decode(), + 'release' if release else 'hold') + else: + self._slog.debug('SPI response < %s, %s ... /CS', + hexlify(response[:32]).decode(), + 'release' if release else 'hold') + assert len(response) == length + length = 0 + if not sock.send(response): + self._log.warning('SPI host disconnected') + return + except (BrokenPipeError, ConnectionResetError): + self._log.warning('SPI host disconnected') + return + # pylint: disable=broad-except + except Exception as exc: + # connection shutdown may have been requested + if self._resume: + self._resume = False + self._log.fatal('Exception: %s', exc) + if self._debug: + print(format_exc(chain=False), file=sys.stderr) + raise + + def _exchange_pty(self, bit_seq: Union[bytes, bytearray]) -> bytearray: + self._pty.write(bit_seq) + return self._read_from_pty(len(bit_seq)) + + def _read_from_pty(self, size: int) -> bytearray: + mi_bit_seq = bytearray() + # increase a bit the timeout for large packets + timeout = now() + self.SPI_BUS_TIMEOUT * (1 + size / 20) + while now() < timeout: + data = self._pty.read(size-len(mi_bit_seq)) + if not data: + continue + mi_bit_seq.extend(data) + if len(mi_bit_seq) == size: + break + else: + raise TimeoutError(f'Not enough data received from SPI device: ' + f'{len(mi_bit_seq)} < {size}') + return mi_bit_seq + + def _handle_spi_request(self, request: bytes, release: bool) -> bytes: + # ensure CSB is low (if not already) + prolog = bytes([self.CSB_LOW | self.SCK_LOW]) + self._exchange_pty(prolog) + mo_bit_seq = bytearray() + mi_bit_que = deque() + # add ASCII '0' so it's easier to debug the comm channel: + # no need to hexlify, upper nibble is never printed out (0x01 -> '1') + # the remote peer (Verilator SPI DPI) only considers the 3 LSBs anyway. + pos = 0 + # PTY buffer size is hardcoded (OS setting, not configurable) + # we do not want to overshoot its capability, so split the request + # into manageable chunks. + for _ in range(0, len(request), self._chunk_size): + for _ in range(self._chunk_size): + byte = request[pos] + for _ in range(8): + bit = byte & 0x80 + byte <<= 1 + sdi = self.SDI_HIGH if bit else self.SDI_LOW + mo_bit_seq.append(self.ASCII_ZERO | sdi | + self.CSB_LOW | self.SCK_LOW) + mo_bit_seq.append(self.ASCII_ZERO | sdi | + self.CSB_LOW | self.SCK_HIGH) + pos += 1 + if pos == len(request): + break + self._clog.debug('MOSI %s', mo_bit_seq.decode()) + mi_bit_seq = self._exchange_pty(mo_bit_seq) + self._clog.debug('MISO %s', bytes(mi_bit_seq).decode()) + assert len(mo_bit_seq) == len(mi_bit_seq) + mo_bit_seq.clear() + mi_bit_que.extend(mi_bit_seq) + if release: + self._clog.debug('Release') + epilog = bytes([self.ASCII_ZERO | + self.SDI_LOW | self.CSB_HIGH | self.SCK_LOW]) + self._exchange_pty(epilog) + self._cs = not release + response = bytearray() + req_byte_count = len(request) + while len(response) < req_byte_count: + byte = 0 + for _ in range(8): + if len(mi_bit_que) < 2: + raise RuntimeError('Incoherent response length') + mi_bit_que.popleft() # sample (clock low) + bval = mi_bit_que.popleft() # sample (clock high / raise) + # ignore ASCII MSBs if any + # pylint: disable=superfluous-parens + if not (bval & self.SDO_EN_HIGH): + bit = 0 + else: + bit = int(bool(bval & self.SDO_HIGH)) + byte <<= 1 + byte |= bit + response.append(byte) + if mi_bit_que: + raise RuntimeError(f'Incoherent response length {len(mi_bit_que)} ' + f'/ {int(release)}') + return response diff --git a/python/qemu/ot/verilator/vcp.py b/python/qemu/ot/verilator/vcp.py new file mode 100644 index 0000000000000..f90a2b177b7e2 --- /dev/null +++ b/python/qemu/ot/verilator/vcp.py @@ -0,0 +1,150 @@ +# Copyright (c) 2025 Rivos, Inc. +# SPDX-License-Identifier: Apache2 + +"""Verilator virtual comm port management.""" + +from fcntl import fcntl, F_GETFL, F_SETFL +from os import O_NONBLOCK +from io import BufferedRandom +from select import POLLIN, POLLERR, POLLHUP, poll as sel_poll +from sys import stderr +from time import time as now +from traceback import format_exc +from typing import NamedTuple, Optional + +import logging +import re + +from ot.util.log import ColorLogFormatter + + +class VtorVcpDescriptor(NamedTuple): + """ Virtual communication port.""" + + vcpid: str + """VCP identifier.""" + pty: BufferedRandom + """Attached pseudo terminal.""" + buffer: bytearray + """Data buffer.""" + logger: logging.Logger + """Associated logger.""" + + +class VtorVcpManager: + """Virtual COM port manager. + + Handles messages emitted on serial port. + """ + + ANSI_CRE = re.compile(rb'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]') + """ANSI escape sequences.""" + + def __init__(self): + self._log = logging.getLogger('vtor.vcp') + self._vcps: dict[int, VtorVcpDescriptor] = {} + self._poller = sel_poll() + + def connect(self, vcp_map: dict[str, str], delay: float) -> None: + """Connect Verilator pseudo-terminals. + + :param vcp_map: a map of serial port, pseudo terminal device + :param delay: how long to wait for a successful connection, + in seconds + """ + vcp_map = dict(vcp_map) + timeout = now() + delay + # ensure that QEMU starts and give some time for it to set up + # when multiple VCPs are set to 'wait', one VCP can be connected at + # a time, i.e. QEMU does not open all connections at once. + vcp_lognames = [] + vcplogname = 'vtor' + vcplognames = [] + while vcp_map: + if now() > timeout: + minfo = ', '.join(f'{d} @ {r}' + for d, r in vcp_map.items()) + raise TimeoutError(f'Cannot connect to Verilator VCPs: {minfo}') + connected = [] + for vcpid, ptyname in vcp_map.items(): + try: + # pylint: disable=consider-using-with + vcp = open(ptyname, 'rb+', buffering=0) + flags = fcntl(vcp, F_GETFL) + fcntl(vcp, F_SETFL, flags | O_NONBLOCK) + connected.append(vcpid) + vcp_lognames.append(vcpid) + vcp_log = logging.getLogger(f'{vcplogname}.{vcpid}') + vcplognames.append(vcpid) + vcp_fno = vcp.fileno() + assert vcp_fno not in self._vcps + self._vcps[vcp_fno] = VtorVcpDescriptor(vcpid, vcp, + bytearray(), + vcp_log) + self._log.debug('VCP %s connected to pty %s', + vcpid, ptyname) + self._poller.register(vcp, POLLIN | POLLERR | POLLHUP) + except ConnectionRefusedError: + continue + except OSError as exc: + self._log.error('Cannot setup Verilator VCP connection %s: ' + '%s', vcpid, exc) + print(format_exc(chain=False), file=stderr) + raise + # removal from dictionary cannot be done while iterating it + for vcpid in connected: + del vcp_map[vcpid] + self._colorize_vcp_log(vcplogname, vcplognames) + + def disconnect(self) -> None: + """Disconnect all managed VCPs.""" + for _, vcp, _, _ in self._vcps.values(): + self._poller.unregister(vcp) + vcp.close() + self._vcps.clear() + + def process(self) -> Optional[int]: + """Handle any received message on VCPs. + + :return: an optional integer (exit code) to early abort execution. + not yet implemented, always return None. + """ + ret = None + for vfd, event in self._poller.poll(0.01): + if event in (POLLERR, POLLHUP): + self._poller.modify(vfd, 0) + continue + _, vcp, vcp_buf, vcp_log = self._vcps[vfd] + try: + data = vcp.read(256) + except TimeoutError: + self._log.error('Unexpected timeout w/ poll on %s', vcp) + continue + if not data: + continue + vcp_buf += data + lines = vcp_buf.split(b'\n') + vcp_buf[:] = bytearray(lines[-1]) + for line in lines[:-1]: + line = self.ANSI_CRE.sub(b'', line) + sline = line.decode('utf-8', errors='ignore').rstrip() + level = logging.INFO + vcp_log.log(level, sline) + if ret is not None: + # match for exit sequence on current VCP + break + return ret + + def _colorize_vcp_log(self, logbase: str, lognames: list[str]) -> None: + vlog = logging.getLogger(logbase) + clr_fmt = None + while vlog: + for hdlr in vlog.handlers: + if isinstance(hdlr.formatter, ColorLogFormatter): + clr_fmt = hdlr.formatter + break + vlog = vlog.parent + if not clr_fmt: + return + for color, logname in enumerate(sorted(lognames)): + clr_fmt.add_logger_colors(f'{logbase}.{logname}', color) diff --git a/scripts/opentitan/verilate.py b/scripts/opentitan/verilate.py index dce875db10714..b4304a60d56e1 100755 --- a/scripts/opentitan/verilate.py +++ b/scripts/opentitan/verilate.py @@ -30,7 +30,7 @@ def main(): """Main routine.""" - debug = False + debug = True try: desc = modules[__name__].__doc__.split('.', 1)[0].strip() argparser = ArgumentParser(description=f'{desc}.') @@ -63,6 +63,9 @@ def main(): veri.add_argument('-a', '--artifact-name', metavar='PREFIX', help='set an alternative artifact name ' '(default is derived from the application name)') + veri.add_argument('-b', '--spi-device-bridge', metavar='TCP_PORT', + type=int, + help='Create a SPI device bridge') veri.add_argument('-C', '--cycles', type=int, help='exit after the specified cycles') veri.add_argument('-I', '--show-init', action='store_true', @@ -88,8 +91,9 @@ def main(): args = argparser.parse_args() debug = args.debug - log = configure_loggers(args.verbose, 'vtor', -1, 'elf', -1, 'romimg', - name_width=12, ms=args.log_time)[0] + log = configure_loggers(args.verbose, 'vtor', + -1, 'vtor.spi.client', 'elf', -1, 'romimg', + name_width=20, ms=args.log_time)[0] if args.tmp_dir and not isdir(args.tmp_dir): argparser.error('Invalid directory for temporary files') @@ -135,6 +139,8 @@ def main(): vtor.artifact_name = args.artifact_name vtor.enable_exec_log(args.save_exec_log, args.link_exec_log) vtor.generate_wave(args.wave_gen) + if args.spi_device_bridge: + vtor.create_spi_device_bridge(args.spi_device_bridge) ret = vtor.verilate(args.rom, args.ram, args.flash, args.app, args.otp, timeout=args.timeout, cycles=args.cycles)