Skip to content
Permalink
Browse files

scripts: west_commands: decouple runners pkg from west

I've had some requests to be able to use code in the runners package
without having west installed.

It turns out to be pretty easy to make this happen, as west is
currently only used for west.log and some trivial helper methods:

- To replace west log, use the standard logging module
- Add an appropriate handler for each runner's logger in
  run_common.py which delegates to west.log, to keep
  output working as expected.

Signed-off-by: Marti Bolivar <marti.bolivar@nordicsemi.no>
  • Loading branch information...
mbolivar authored and carlescufi committed Jun 3, 2019
1 parent d176cc3 commit ddce583ca21aa7b42c48b932a4d8d1849bceb434
@@ -6,6 +6,7 @@
'''

import argparse
import logging
from os import getcwd, path
from subprocess import CalledProcessError
import textwrap
@@ -25,6 +26,44 @@
# Don't change this, or output from argparse won't match up.
INDENT = ' ' * 2

if log.VERBOSE >= log.VERBOSE_NORMAL:
# Using level 1 allows sub-DEBUG levels of verbosity. The
# west.log module decides whether or not to actually print the
# message.
#
# https://docs.python.org/3.7/library/logging.html#logging-levels.
LOG_LEVEL = 1
else:
LOG_LEVEL = logging.INFO


class WestLogFormatter(logging.Formatter):

def __init__(self):
super().__init__(fmt='%(message)s')

class WestLogHandler(logging.Handler):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setFormatter(WestLogFormatter())
self.setLevel(LOG_LEVEL)

def emit(self, record):
fmt = self.format(record)
lvl = record.levelno
if lvl > logging.CRITICAL:
log.die(fmt)
elif lvl >= logging.ERROR:
log.err(fmt)
elif lvl >= logging.WARNING:
log.wrn(fmt)
elif lvl >= logging.INFO:
log.inf(fmt)
elif lvl >= logging.DEBUG:
log.dbg(fmt)
else:
log.dbg(fmt, level=log.VERBOSE_EXTREME)

def add_parser_common(parser_adder, command):
parser = parser_adder.add_parser(
@@ -201,9 +240,13 @@ def do_run_common(command, args, runner_args, cached_runner_var):
# At this point, the common options above are already parsed in
# 'args', and unrecognized arguments are in 'runner_args'.
#
# - Set up runner logging to delegate to west.
# - Pull the RunnerConfig out of the cache
# - Override cached values with applicable command-line options

logger = logging.getLogger('runners')
logger.setLevel(LOG_LEVEL)
logger.addHandler(WestLogHandler())
cfg = cached_runner_config(build_dir, cache)
_override_config_from_namespace(cfg, args)

@@ -14,19 +14,21 @@
import abc
import argparse
import errno
import logging
import os
import platform
import shlex
import shutil
import signal
import subprocess

from west import log
from west.util import quote_sh_list
# Turn on to enable just logging the commands that would be run (at
# info rather than debug level), without actually running them. This
# can break runners that are expecting output or if one command
# depends on another, so it's just for debugging.
_DRY_RUN = False

# Turn on to enable just printing the commands that would be run,
# without actually running them. This can break runners that are expecting
# output or if one command depends on another, so it's just for debugging.
JUST_PRINT = False
_logger = logging.getLogger('runners')


class _DebugDummyPopen:
@@ -313,6 +315,17 @@ class ZephyrBinaryRunner(abc.ABC):
3. Give your runner's name to the Zephyr build system in your
board's board.cmake.
Some advice on input and output:
- If you need to ask the user something (e.g. using input()), do it
in your create() classmethod, not do_run(). That ensures your
__init__() really has everything it needs to call do_run(), and also
avoids calling input() when not instantiating within a command line
application.
- Use self.logger to log messages using the standard library's
logging API; your logger is named "runner.<your-runner-name()>"
For command-line invocation from the Zephyr build system, runners
define their own argparse-based interface through the common
add_parser() (and runner-specific do_add_parser() it delegates
@@ -330,6 +343,10 @@ def __init__(self, cfg):
``cfg`` is a RunnerConfig instance.'''
self.cfg = cfg
'''RunnerConfig for this instance.'''

self.logger = logging.getLogger('runners.{}'.format(self.name()))
'''logging.Logger for this instance.'''

@staticmethod
def get_runners():
@@ -465,20 +482,23 @@ def run_server_and_client(self, server, client):
server_proc.terminate()
server_proc.wait()

def _log_cmd(self, cmd):
escaped = ' '.join(shlex.quote(s) for s in cmd)
if not _DRY_RUN:
self.logger.debug(escaped)
else:
self.logger.info(escaped)

def call(self, cmd):
'''Subclass subprocess.call() wrapper.
Subclasses should use this method to run command in a
subprocess and get its return code, rather than
using subprocess directly, to keep accurate debug logs.
'''
quoted = quote_sh_list(cmd)

if JUST_PRINT:
log.inf(quoted)
self._log_cmd(cmd)
if _DRY_RUN:
return 0

log.dbg(quoted)
return subprocess.call(cmd)

def check_call(self, cmd):
@@ -488,13 +508,9 @@ def check_call(self, cmd):
subprocess and check that it executed correctly, rather than
using subprocess directly, to keep accurate debug logs.
'''
quoted = quote_sh_list(cmd)

if JUST_PRINT:
log.inf(quoted)
self._log_cmd(cmd)
if _DRY_RUN:
return

log.dbg(quoted)
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
@@ -507,13 +523,9 @@ def check_output(self, cmd):
subprocess and check that it executed correctly, rather than
using subprocess directly, to keep accurate debug logs.
'''
quoted = quote_sh_list(cmd)

if JUST_PRINT:
log.inf(quoted)
self._log_cmd(cmd)
if _DRY_RUN:
return b''

log.dbg(quoted)
try:
return subprocess.check_output(cmd)
except subprocess.CalledProcessError:
@@ -526,16 +538,14 @@ def popen_ignore_int(self, cmd):
cflags = 0
preexec = None
system = platform.system()
quoted = quote_sh_list(cmd)

if system == 'Windows':
cflags |= subprocess.CREATE_NEW_PROCESS_GROUP
elif system in {'Linux', 'Darwin'}:
preexec = os.setsid

if JUST_PRINT:
log.inf(quoted)
self._log_cmd(cmd)
if _DRY_RUN:
return _DebugDummyPopen()

log.dbg(quoted)
return subprocess.Popen(cmd, creationflags=cflags, preexec_fn=preexec)
@@ -8,8 +8,6 @@
import sys
import time

from west import log

from runners.core import ZephyrBinaryRunner, RunnerCaps, \
BuildConfiguration

@@ -36,6 +34,7 @@ def __init__(self, cfg, pid, alt, img, exe='dfu-util',
else:
self.dfuse = True
self.dfuse_config = dfuse_config
self.reset = False

@classmethod
def name(cls):
@@ -80,8 +79,17 @@ def create(cls, cfg, args):
else:
dcfg = None

return DfuUtilBinaryRunner(cfg, args.pid, args.alt, args.img,
exe=args.dfu_util, dfuse_config=dcfg)
ret = DfuUtilBinaryRunner(cfg, args.pid, args.alt, args.img,
exe=args.dfu_util, dfuse_config=dcfg)
ret.ensure_device()
return ret

def ensure_device(self):
if not self.find_device():
self.reset = True
print('Please reset your board to switch to DFU mode...')
while not self.find_device():
time.sleep(0.1)

def find_device(self):
cmd = list(self.cmd) + ['-l']
@@ -91,17 +99,9 @@ def find_device(self):

def do_run(self, command, **kwargs):
self.require(self.cmd[0])
reset = False

if not self.find_device():
reset = True
log.dbg('Device not found, waiting for it',
level=log.VERBOSE_EXTREME)
# Use of print() here is advised. We don't want to lose
# this information in a separate log -- this is
# interactive and requires a terminal.
print('Please reset your board to switch to DFU mode...')
while not self.find_device():
time.sleep(0.1)
raise RuntimeError('device not found')

cmd = list(self.cmd)
if self.dfuse:
@@ -118,6 +118,6 @@ def do_run(self, command, **kwargs):
#
# DfuSe targets do as well, except when 'leave' is given
# as an option.
reset = False
if reset:
self.reset = False
if self.reset:
print('Now reset your board again to switch back to runtime mode.')
@@ -6,8 +6,6 @@

from os import path

from west import log

from runners.core import ZephyrBinaryRunner, RunnerCaps


@@ -95,8 +93,9 @@ def do_run(self, command, **kwargs):
else:
cmd_flash.extend(['0x1000', bin_name])

log.inf("Converting ELF to BIN")
self.logger.info("Converting ELF to BIN")
self.check_call(cmd_convert)

log.inf("Flashing ESP32 on {} ({}bps)".format(self.device, self.baud))
self.logger.info("Flashing ESP32 on {} ({}bps)".
format(self.device, self.baud))
self.check_call(cmd_flash)
@@ -7,7 +7,6 @@
from os import path
import time
import signal
from west import log

from runners.core import ZephyrBinaryRunner

@@ -84,7 +83,7 @@ def flash(self, **kwargs):
jtag_instr_file = kwargs['ocd-jtag-instr']
gdb_flash_file = kwargs['gdb-flash-file']

self.print_gdbserver_message(self.gdb_port)
self.log_gdbserver_message(self.gdb_port)
server_cmd = [self.xt_ocd_dir,
'-c', topology_file,
'-I', jtag_instr_file]
@@ -122,7 +121,7 @@ def do_debug(self, **kwargs):
topology_file = kwargs['ocd-topology']
jtag_instr_file = kwargs['ocd-jtag-instr']

self.print_gdbserver_message(self.gdb_port)
self.log_gdbserver_message(self.gdb_port)
server_cmd = [self.xt_ocd_dir,
'-c', topology_file,
'-I', jtag_instr_file]
@@ -152,14 +151,11 @@ def do_debug(self, **kwargs):
server_proc.terminate()
server_proc.wait()

def print_gdbserver_message(self, gdb_port):
log.inf('Intel S1000 GDB server running on port {}'.format(gdb_port))

def debugserver(self, **kwargs):
topology_file = kwargs['ocd-topology']
jtag_instr_file = kwargs['ocd-jtag-instr']

self.print_gdbserver_message(self.gdb_port)
self.log_gdbserver_message(self.gdb_port)
server_cmd = [self.xt_ocd_dir,
'-c', topology_file,
'-I', jtag_instr_file]
@@ -170,3 +166,7 @@ def debugserver(self, **kwargs):
time.sleep(6)
server_proc.terminate()
self.check_call(server_cmd)

def log_gdbserver_message(self, gdb_port):
self.logger.info('Intel S1000 GDB server running on port {}'.
format(gdb_port))
@@ -7,11 +7,9 @@
import argparse
import os
import shlex
import shutil
import sys
import tempfile

from west import log
from runners.core import ZephyrBinaryRunner, RunnerCaps, \
BuildConfiguration

@@ -105,7 +103,8 @@ def create(cls, cfg, args):
tui=args.tui, tool_opt=args.tool_opt)

def print_gdbserver_message(self):
log.inf('J-Link GDB server running on port {}'.format(self.gdb_port))
self.logger.info('J-Link GDB server running on port {}'.
format(self.gdb_port))

def do_run(self, command, **kwargs):
server_cmd = ([self.gdbserver] +
@@ -162,8 +161,8 @@ def flash(self, **kwargs):
lines.append('g') # Start the CPU
lines.append('q') # Close the connection and quit

log.dbg('JLink commander script:')
log.dbg('\n'.join(lines))
self.logger.debug('JLink commander script:')
self.logger.debug('\n'.join(lines))

# Don't use NamedTemporaryFile: the resulting file can't be
# opened again on Windows.
@@ -179,5 +178,5 @@ def flash(self, **kwargs):
'-CommanderScript', fname] +
self.tool_opt)

log.inf('Flashing Target Device')
self.logger.info('Flashing Target Device')
self.check_call(cmd)
@@ -4,7 +4,6 @@

'''Runner for NIOS II, based on quartus-flash.py and GDB.'''

from west import log
from runners.core import ZephyrBinaryRunner, NetworkPortHelper


@@ -65,7 +64,8 @@ def flash(self, **kwargs):
self.check_call(cmd)

def print_gdbserver_message(self, gdb_port):
log.inf('Nios II GDB server running on port {}'.format(gdb_port))
self.logger.info('Nios II GDB server running on port {}'.
format(gdb_port))

def debug_debugserver(self, command, **kwargs):
# Per comments in the shell script, the NIOSII GDB server

0 comments on commit ddce583

Please sign in to comment.
You can’t perform that action at this time.