Skip to content

Commit

Permalink
Driver/Resource: Add Segger J-Link support
Browse files Browse the repository at this point in the history
Add a driver and resource for the Segger J-Link debug probe.
Export the J-Link resource over IP using the J-Link Remote Server.

Signed-off-by: Paul Vittorino <paul.vittorino@garmin.com>
  • Loading branch information
PaulVittorino committed May 18, 2023
1 parent c3253c4 commit 8ed0405
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ New Features in 23.1
- A new log level called ``CONSOLE`` has been added between the default
``INFO`` and ``DEBUG`` levels. This level will show all reads and writes made
to the serial console during testing.
- Add JLinkDriver and JLinkDevice resource.

Bug fixes in 23.1
~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -437,7 +438,7 @@ New and Updated Drivers
- The `SerialDriver` now supports using plain TCP instead of RFC 2217, which is
needed from some console servers.
- The `ShellDriver` has been improved:

- It supports configuring the various timeouts used during the login process.
- It can use xmodem to transfer file from and to the target.

Expand Down
30 changes: 30 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,21 @@ Arguments:
Used by:
- `OpenOCDDriver`_

JLinkDevice
~~~~~~~~~~~
A JLinkDevice resource describes a Segger J-Link Debug Probe.

.. code-block:: yaml
JLinkDevice:
match:
ID_SERIAL_SHORT: '000000123456'
- match (dict): key and value for a udev match, see `udev Matching`_

Used by:
- `JLinkDriver`_

SNMPEthernetPort
~~~~~~~~~~~~~~~~
A SNMPEthernetPort resource describes a port on an Ethernet switch, which is
Expand Down Expand Up @@ -1751,6 +1766,21 @@ Arguments:
- board_config (str): optional, board config in the ``openocd/scripts/board/`` directory
- load_commands (list of str): optional, load commands to use instead of ``init``, ``bootstrap {filename}``, ``shutdown``

JLinkDriver
~~~~~~~~~~~~~
A JLinkDriver provides access to a Segger J-Link Debug Probe via `pylink https://github.com/square/pylink`_

Binds to:
interface:
- `JLinkDevice`_
- `NetworkJLinkDevice`_

Implements:
- None

Arguments:
- None

QuartusHPSDriver
~~~~~~~~~~~~~~~~
A QuartusHPSDriver controls the "Quartus Prime Programmer and Tools" to flash
Expand Down
1 change: 1 addition & 0 deletions labgrid/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@
from .usbtmcdriver import USBTMCDriver
from .deditecrelaisdriver import DeditecRelaisDriver
from .dediprogflashdriver import DediprogFlashDriver
from .jlinkdriver import JLinkDriver
43 changes: 43 additions & 0 deletions labgrid/driver/jlinkdriver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from importlib import import_module
import socket
import attr

from ..factory import target_factory
from ..resource.remote import NetworkJLinkDevice
from ..util.proxy import proxymanager
from .common import Driver

@target_factory.reg_driver
@attr.s(eq=False)
class JLinkDriver(Driver):
bindings = {"jlink_device": {"JLinkDevice", "NetworkJLinkDevice"}, }

def __attrs_post_init__(self):
super().__attrs_post_init__()
self._module = import_module('pylink')
self.jlink = None

def on_activate(self):
self.jlink = self._module.JLink()

if isinstance(self.jlink_device, NetworkJLinkDevice):
# we can only forward if the backend knows which port to use
host, port = proxymanager.get_host_and_port(self.jlink_device)
# The J-Link client software does not support host names
ip_addr = socket.gethostbyname(host)

# Workaround for Debian's /etc/hosts entry
# https://www.debian.org/doc/manuals/debian-reference/ch05.en.html#_the_hostname_resolution
if ip_addr == "127.0.1.1":
ip_addr = "127.0.0.1"
self.jlink.open(ip_addr=f"{ip_addr}:{port}")
else:
self.jlink.open(serial_no=self.jlink_device.serial)

def on_deactivate(self):
self.jlink.close()
self.jlink = None

@Driver.check_active
def get_interface(self):
return self.jlink
77 changes: 77 additions & 0 deletions labgrid/remote/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
from pathlib import Path
from typing import Dict, Type
from socket import gethostname, getfqdn
from pexpect import TIMEOUT
import attr
from autobahn.asyncio.wamp import ApplicationRunner, ApplicationSession

from .config import ResourceConfig
from .common import ResourceEntry, enable_tcp_nodelay
from ..util import get_free_port, labgrid_version
from ..util import Timeout


__version__ = labgrid_version()
Expand Down Expand Up @@ -500,6 +502,80 @@ def __attrs_post_init__(self):
super().__attrs_post_init__()
self.data['cls'] = f"Remote{self.cls}".replace("Network", "")

class USBJLinkExport(USBGenericExport):
"""Export J-Link device using the J-Link Remote Server"""

def __attrs_post_init__(self):
super().__attrs_post_init__()
self.child = None
self.port = None
self.tool = '/opt/SEGGER/JLink/JLinkRemoteServer'

def _get_params(self):
"""Helper function to return parameters"""
return {
'host': self.host,
'port': self.port,
'busnum': self.local.busnum,
'devnum': self.local.devnum,
'path': self.local.path,
'vendor_id': self.local.vendor_id,
'model_id': self.local.model_id,
}

def __del__(self):
if self.child is not None:
self.stop()

def _start(self, start_params):
"""Start ``JLinkRemoteServer`` subprocess"""
assert self.local.avail
assert self.child is None
self.port = get_free_port()

cmd = [
self.tool,
"-Port",
f"{self.port}",
"-select",
f"USB={self.local.serial}",
]
self.logger.info("Starting JLinkRemoteServer with: %s", " ".join(cmd))
self.child = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True)

# Wait for the server to be ready for incoming connections
# Waiting to open the socket with Python does not work
timeout = Timeout(10.0)
while not timeout.expired:
line = self.child.stdout.readline().rstrip()
self.logger.debug(line)
if "Waiting for client connections..." in line:
break

if timeout.expired:
raise TIMEOUT(
f"Timeout of {timeout.timeout} seconds exceeded during waiting for J-Link Remote Server startup"
)
self.logger.info("started JLinkRemoteServer for %s on port %d", self.local.serial, self.port)

def _stop(self, start_params):
"""Stop ``JLinkRemoteServer`` subprocess"""
assert self.child
child = self.child
self.child = None
port = self.port
self.port = None
child.terminate()
try:
child.wait(2.0) # JLinkRemoteServer takes about a second to react
except subprocess.TimeoutExpired:
self.logger.warning("JLinkRemoteServer for %s still running after SIGTERM", self.local.serial)
log_subprocess_kernel_stack(self.logger, child)
child.kill()
child.wait(1.0)
self.logger.info("stopped JLinkRemoteServer for %s on port %d", self.local.serial, port)


exports["AndroidFastboot"] = USBGenericExport
exports["AndroidUSBFastboot"] = USBGenericRemoteExport
exports["DFUDevice"] = USBGenericExport
Expand All @@ -512,6 +588,7 @@ def __attrs_post_init__(self):
exports["USBSDMuxDevice"] = USBSDMuxExport
exports["USBSDWireDevice"] = USBSDWireExport
exports["USBDebugger"] = USBGenericExport
exports["JLinkDevice"] = USBJLinkExport

exports["USBMassStorage"] = USBGenericExport
exports["USBVideo"] = USBGenericExport
Expand Down
12 changes: 12 additions & 0 deletions labgrid/resource/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,18 @@ def __attrs_post_init__(self):
super().__attrs_post_init__()


@target_factory.reg_resource
@attr.s(eq=False)
class NetworkJLinkDevice(RemoteUSBResource):
"""The NetworkJLinkDevice describes a remotely accessible USB Segger J-Link device"""

port = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(int)))

def __attrs_post_init__(self):
self.timeout = 10.0
super().__attrs_post_init__()


@target_factory.reg_resource
@attr.s(eq=False)
class NetworkDeditecRelais8(RemoteUSBResource):
Expand Down
2 changes: 2 additions & 0 deletions labgrid/resource/suggest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
HIDRelay,
USBDebugger,
USBPowerPort,
JLinkDevice,
)
from ..util import dump

Expand Down Expand Up @@ -56,6 +57,7 @@ def __init__(self, args):
self.resources.append(HIDRelay(**args))
self.resources.append(USBDebugger(**args))
self.resources.append(USBPowerPort(**args, index=0))
self.resources.append(JLinkDevice(**args))

def suggest_callback(self, resource, meta, suggestions):
cls = type(resource).__name__
Expand Down
16 changes: 16 additions & 0 deletions labgrid/resource/udev.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,3 +696,19 @@ def filter_match(self, device):
return False

return super().filter_match(device)

@target_factory.reg_resource
@attr.s(eq=False)
class JLinkDevice(USBResource):
"""The JLinkDevice describes an attached Segger J-Link device,
it is identified via USB using udev
"""

def __attrs_post_init__(self):
self.match["ID_VENDOR_ID"] = "1366"
self.match["ID_MODEL_ID"] = "0101"
super().__attrs_post_init__()

def update(self):
super().update()
self.serial = self.device.properties.get('ID_SERIAL_SHORT')
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dynamic = ["version"] # via setuptools_scm
doc = ["sphinx_rtd_theme>=1.0.0"]
docker = ["docker>=5.0.2"]
graph = ["graphviz>=0.17.0"]
jlink = ["pylink-square>=1.0.0"]
kasa = ["python-kasa>=0.4.0"]
modbus = ["pyModbusTCP>=0.1.10"]
modbusrtu = ["minimalmodbus>=1.0.2"]
Expand Down Expand Up @@ -91,6 +92,9 @@ dev = [
# labgrid[graph]
"graphviz>=0.17.0",

# labgrid[jlink]
"pylink-square>=1.0.0",

# labgrid[kasa]
"python-kasa>=0.4.0",

Expand Down
104 changes: 104 additions & 0 deletions tests/test_jlink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import contextlib
import io
import pytest
import subprocess
from unittest.mock import MagicMock
from unittest.mock import Mock
from unittest.mock import patch

from labgrid.remote.exporter import USBJLinkExport
from labgrid.resource.remote import NetworkJLinkDevice
from labgrid.resource.udev import JLinkDevice
from labgrid.driver.jlinkdriver import JLinkDriver

FAKE_SERIAL = 123456789
MATCH = {"ID_SERIAL_SHORT": f"000{FAKE_SERIAL}"}


class Popen_mock():
"""Mock of Popen object to mimmic JLinkRemoteServer output"""

def __init__(self, args, **kwargs):
assert "JLinkRemoteServer" in args[0]
assert args[1] == "-Port"
# Since args[2] is dynamic do not check it
assert args[3] == "-select"
assert args[4] == f"USB={FAKE_SERIAL}"
self.wait_called = False

stdout = io.StringIO(
"SEGGER J-Link Remote Server V7.84a\n"
"Compiled Dec 22 2022 16:13:52\n"
"\n"
"'q' to quit '?' for help\n"
"\n"
f"Connected to J-Link with S/N {FAKE_SERIAL}\n"
"\n"
"Waiting for client connections...\n"
)

def kill(self):
pass

def poll(self):
return 0

def terminate(self):
pass

def wait(self, timeout=None):
# Only timeout on the first call to exercise the error handling code.
if not self.wait_called:
self.wait_called = True
raise subprocess.TimeoutExpired("JLinkRemoteServer", timeout)


def test_jlink_resource(target):
r = JLinkDevice(target, name=None, match=MATCH)


@patch('subprocess.Popen', Popen_mock)
def test_jlink_export_start(target):
config = {'avail': True, 'cls': "JLinkDevice", 'params': {'match': MATCH}, }
e = USBJLinkExport(config)
e.local.avail = True
e.local.serial = FAKE_SERIAL

e.start()
# Exercise the __del__ method which also exercises stop()
del e


@patch('subprocess.Popen', Popen_mock)
def test_jlink_driver(target):
pytest.importorskip("pylink")
device = JLinkDevice(target, name=None, match=MATCH)
device.avail = True
device.serial = FAKE_SERIAL
driver = JLinkDriver(target, name=None)

with patch('pylink.JLink') as JLinkMock:
instance = JLinkMock.return_value
target.activate(driver)
instance.open.assert_called_once_with(serial_no=FAKE_SERIAL)
intf = driver.get_interface()
assert(isinstance(intf, Mock))
target.deactivate(driver)
instance.close.assert_called_once_with()


@patch('subprocess.Popen', Popen_mock)
def test_jlink_driver_network_device(target):
pytest.importorskip("pylink")
device = NetworkJLinkDevice(target, None, host='127.0.1.1', port=12345, busnum=0, devnum=1, path='0:1', vendor_id=0x0, model_id=0x0,)
device.avail = True
driver = JLinkDriver(target, name=None)
assert (isinstance(driver, JLinkDriver))

with patch('pylink.JLink') as JLinkMock:
instance = JLinkMock.return_value
# Call on_activate directly since activating the driver via the target does not work during testing
driver.on_activate()
instance.open.assert_called_once_with(ip_addr='127.0.0.1:12345')
driver.on_deactivate()
instance.close.assert_called_once_with()

0 comments on commit 8ed0405

Please sign in to comment.