In [None]:
#|hide
#|default_exp shell

# shell

> A shell for running notebook code without a notebook server

In [None]:
#|export
from __future__ import annotations
from fastcore.basics import *
from fastcore.imports import *
from fastcore.script import call_parse

from base64 import b64encode
from IPython.core.interactiveshell import InteractiveShell
from IPython.core.displayhook import DisplayHook
from IPython.core.displaypub import DisplayPublisher
from io import StringIO
from matplotlib_inline.config import InlineBackend
from matplotlib_inline.backend_inline import select_figure_formats
import tokenize

from execnb.fastshell import FastInteractiveShell
from execnb.nbio import *

from collections.abc import Callable

In [None]:
#|export
# IPython requires a DisplayHook and DisplayPublisher
# We override `__call__` and `publish` to save outputs instead of printing them

class _CaptureHook(DisplayHook):
    "Called when displaying a result"
    def quiet(self):
        """Should we silence the display hook because of ';'?"""
        # Adapted from `DisplayHook.quiet` using `shell._code` over `shell.history_manager.input_hist_parsed[-1]`
        sio = StringIO(self.shell._code)
        tokens = list(tokenize.generate_tokens(sio.readline))
        for token in reversed(tokens):
            if token[0] in (tokenize.ENDMARKER, tokenize.NL, tokenize.NEWLINE, tokenize.COMMENT): continue
            if (token[0] == tokenize.OP) and (token[1] == ';'): return True
            else: return False

    def __call__(self, result=None):
        if result is None or self.quiet(): return
        self.fill_exec_result(result)
        self.shell._result(result)

class _CapturePub(DisplayPublisher):
    "Called when adding an output"
    def publish(self, data, metadata=None, **kwargs): self.shell._add_out(data, metadata, typ='display_data')

In [None]:
#|export
# These are the standard notebook formats for exception and stream data (e.g stdout)
def _out_exc(ename, evalue, traceback): return dict(ename=str(ename), evalue=str(evalue), output_type='error', traceback=traceback)
def _out_stream(text, name): return dict(name=name, output_type='stream', text=text.splitlines(True))

## CaptureShell -

In [None]:
#|export
class CaptureShell(FastInteractiveShell):
    "Execute the IPython/Jupyter source code"
    def __init__(self,
                 path:str|Path=None): # Add `path` to python path
        super().__init__(displayhook_class=_CaptureHook, display_pub_class=_CapturePub)
        InteractiveShell._instance = self
        self.out,self.count = [],1
        self.exc = self.result = self._fname = self._cell_idx = None
        try: self.enable_matplotlib('inline')
        except ModuleNotFoundError: pass
        if path: self.set_path(path)
        
    def set_path(self, path):
        "Add `path` to python path, or `path.parent` if it's a file"
        path = Path(path)
        if path.is_file(): path = path.parent
        self.run_cell(f"import sys; sys.path.insert(0, '{path}')")

    def enable_gui(self, gui=None):
        "Disable GUI (over-ridden; called by IPython)"
        pass
    
    def enable_matplotlib(self, gui=None):
        gui, backend = super().enable_matplotlib(gui)
        # Adapted from matplotlib_inline.backend_inline.configure_inline_support
        cfg = InlineBackend.instance(parent=self)
        select_figure_formats(self, cfg.figure_formats, **cfg.print_figure_kwargs)
        return gui, backend
    
    def _showtraceback(self, etype, evalue, stb: str):
        self.out.append(_out_exc(etype, evalue, stb))
        self.exc = (etype, evalue, '\n'.join(stb))

    def _add_out(self, data, meta, typ='execute_result', **kwargs):
        self._stream()
        def _format(k, v):
            if k.startswith('text/'): return v.splitlines(True)
            if k.startswith('image/') and isinstance(v, bytes):
                v = b64encode(v).decode()
                if not v.endswith('\n'): v+='\n'
                return v
            return v
        data = {k: _format(k,v) for k,v in data.items()}
        self.out.append(dict(data=data, metadata=meta, output_type=typ, **kwargs))

    def _add_exec(self, result, meta, typ='execute_result'):
        self._add_out(result, meta, execution_count=self.count)
        self.count += 1

    def _result(self, result):
        self.result = result
        self._add_exec(*self.display_formatter.format(result))

    def _stream(self):
        for nm in ('stdout','stderr'):
            if hasattr(self, nm):
                std = getattr(self, nm)
                text = std.getvalue()
                if text:
                    self.out.append(_out_stream(text, nm))
                    setattr(self, nm, StringIO())

In [None]:
s = CaptureShell()

### Cells -

In [None]:
#|export
@patch
def run(self:CaptureShell,
        code:str, # Python/IPython code to run
        stdout=True, # Capture stdout and save as output?
        stderr=True): # Capture stderr and save as output?
    "runs `code`, returning a list of all outputs in Jupyter notebook format"
    self._code = code
    self.exc = False
    self.out.clear()
    sys_stdout,sys_stderr = sys.stdout, sys.stderr
    if stdout: self.stdout = sys.stdout = StringIO()
    if stderr: self.stderr = sys.stderr = StringIO()
    try: self.run_cell(code)
    finally: sys.stdout,sys.stderr = sys_stdout,sys_stderr
    self._stream()
    return [*self.out]

In [None]:
s.run("print(1)")

[{'name': 'stdout', 'output_type': 'stream', 'text': ['1\n']}]

Code can include magics and `!` shell commands:

In [None]:
s.run("%time 1+1")

[{'name': 'stdout',
  'output_type': 'stream',
  'text': ['CPU times: user 2 us, sys: 1 us, total: 3 us\n',
   'Wall time: 5.01 us\n']},
 {'data': {'text/plain': ['2']},
  'metadata': {},
  'output_type': 'execute_result',
  'execution_count': 1}]

The result of the last successful execution is stored in `result`:

In [None]:
s.result

2

If an exception is raised then the exception type, object, and stacktrace are stored in `exc`:

In [None]:
s.run('raise Exception("Oops")')
typ,obj,st = s.exc
typ,obj

(Exception, Exception('Oops'))

In [None]:
print(st)

[0;31m---------------------------------------------------------------------------[0m
[0;31mException[0m                                 Traceback (most recent call last)
Input [0;32mIn [1][0m, in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0m [38;5;28;01mraise[39;00m [38;5;167;01mException[39;00m([38;5;124m"[39m[38;5;124mOops[39m[38;5;124m"[39m)

[0;31mException[0m: Oops


In [None]:
#|export
@patch
def cell(self:CaptureShell, cell, stdout=True, stderr=True):
    "Run `cell`, skipping if not code, and store outputs back in cell"
    if cell.cell_type!='code': return
    self._cell_idx = cell.idx_ + 1
    outs = self.run(cell.source)
    if outs:
        cell.outputs = outs
        for o in outs:
            if 'execution_count' in o: cell['execution_count'] = o['execution_count']

In [None]:
clean = Path('../tests/clean.ipynb')
nb = read_nb(clean)
c = nb.cells[1]
c

```json
{ 'cell_type': 'code',
  'execution_count': None,
  'id': 'b123d6d0',
  'idx_': 1,
  'metadata': {},
  'outputs': [],
  'source': 'print(1)\n2'}
```

In [None]:
s.cell(c)
c.outputs

[{'name': 'stdout', 'output_type': 'stream', 'text': ['1\n']},
 {'data': {'text/plain': ['2']},
  'metadata': {},
  'output_type': 'execute_result',
  'execution_count': 2}]

### NBs -

In [None]:
#|export
def _false(o): return False

@patch
def run_all(self:CaptureShell,
            nb, # A notebook read with `nbclient` or `read_nb`
            exc_stop:bool=False, # Stop on exceptions?
            preproc:Callable=_false, # Called before each cell is executed
            postproc:Callable=_false, # Called after each cell is executed
            inject_code:str|None=None, # Code to inject into a cell
            inject_idx:int=0 # Cell to replace with `inject_code`
           ):
    "Run all cells in `nb`, stopping at first exception if `exc_stop`"
    if inject_code is not None: nb.cells[inject_idx].source = inject_code
    for cell in nb.cells:
        if not preproc(cell):
            self.cell(cell)
            postproc(cell)
        if self.exc and exc_stop: raise self.exc[1] from None

In [None]:
nb.cells[2].outputs

(#0) []

In [None]:
s.run_all(nb)
nb.cells[2].outputs

[{'data': {'text/plain': ['<IPython.core.display.Markdown object>'],
   'text/markdown': ["This is *bold*. Here's a [link](https://www.fast.ai)."]},
  'metadata': {},
  'output_type': 'execute_result',
  'execution_count': 5}]

With `exc_stop=False` (the default), execution continues after exceptions, and exception details are stored into the appropriate cell's output:

In [None]:
nb.cells[-1].source

'raise Exception("Oopsie!")'

In [None]:
nb.cells[-1].outputs

[{'ename': "<class 'Exception'>",
  'evalue': 'Oopsie!',
  'output_type': 'error',
  'traceback': ['\x1b[0;31m---------------------------------------------------------------------------\x1b[0m',
   '\x1b[0;31mException\x1b[0m                                 Traceback (most recent call last)',
   'Input \x1b[0;32mIn [1]\x1b[0m, in \x1b[0;36m<cell line: 1>\x1b[0;34m()\x1b[0m\n\x1b[0;32m----> 1\x1b[0m \x1b[38;5;28;01mraise\x1b[39;00m \x1b[38;5;167;01mException\x1b[39;00m(\x1b[38;5;124m"\x1b[39m\x1b[38;5;124mOopsie!\x1b[39m\x1b[38;5;124m"\x1b[39m)\n',
   '\x1b[0;31mException\x1b[0m: Oopsie!']}]

With `exc_stop=True` (the default), exceptions in a cell are raised and no further processing occurs:

In [None]:
try: s.run_all(nb, exc_stop=True)
except Exception as e: print(f"got exception: {e}")

got exception: Oopsie!


We can pass a function to `preproc` to have it run on every cell. It can modify the cell as needed. If the function returns `True`, then that cell will not be executed. For instance, to skip the cell which raises an exception:

In [None]:
nb = read_nb(clean)
s.run_all(nb, preproc=lambda c: 'raise' in c.source)

This cell will contain no output, since it was skipped.

In [None]:
nb.cells[-1].outputs

(#0) []

In [None]:
nb.cells[1].outputs

[{'name': 'stdout', 'output_type': 'stream', 'text': ['1\n']},
 {'data': {'text/plain': ['2']},
  'metadata': {},
  'output_type': 'execute_result',
  'execution_count': 10}]

You can also pass a function to `postproc` to modify a cell after it is executed.

In [None]:
#|export
@patch
def execute(self:CaptureShell,
            src:str|Path, # Notebook path to read from
            dest:str|None=None, # Notebook path to write to
            exc_stop:bool=False, # Stop on exceptions?
            preproc:Callable=_false, # Called before each cell is executed
            postproc:Callable=_false, # Called after each cell is executed
            inject_code:str|None=None, # Code to inject into a cell
            inject_path:str|Path|None=None, # Path to file containing code to inject into a cell
            inject_idx:int=0 # Cell to replace with `inject_code`
):
    "Execute notebook from `src` and save with outputs to `dest"
    nb = read_nb(src)
    self._fname = src
    self.set_path(Path(src).parent.resolve())
    if inject_path is not None: inject_code = Path(inject_path).read_text()
    self.run_all(nb, exc_stop=exc_stop, preproc=preproc, postproc=postproc,
                 inject_code=inject_code, inject_idx=inject_idx)
    if dest: write_nb(nb, dest)

This is a shortcut for the combination of `read_nb`, `run_all`, and `write_nb`.

In [None]:
s = CaptureShell()
try:
    s.execute(clean, 'tmp.ipynb')
    print(read_nb('tmp.ipynb').cells[1].outputs)
finally: Path('tmp.ipynb').unlink()

[{'name': 'stdout', 'output_type': 'stream', 'text': ['1\n']}, {'data': {'text/plain': ['2']}, 'execution_count': 2, 'metadata': {}, 'output_type': 'execute_result'}]


In [None]:
#|export
@patch
def prettytb(self:CaptureShell, 
             fname:str|Path=None): # filename to print alongside the traceback
    "Show a pretty traceback for notebooks, optionally printing `fname`."
    fname = fname if fname else self._fname
    _fence = '='*75
    cell_intro_str = f"While Executing Cell #{self._cell_idx}:" if self._cell_idx else "While Executing:"
    cell_str = f"\n{cell_intro_str}\n{self.exc[-1]}"
    fname_str = f' in {fname}' if fname else ''
    return f"{type(self.exc[1]).__name__}{fname_str}:\n{_fence}\n{cell_str}\n"

If an error occurs while running a notebook, you can retrieve a pretty version of the error with the `prettytb` method: 

In [None]:
s = CaptureShell()
try:
    s.execute('../tests/error.ipynb', exc_stop=True)
except:
    print(s.prettytb())

AssertionError in ../tests/error.ipynb:

While Executing Cell #2:
[0;31m---------------------------------------------------------------------------[0m
[0;31mAssertionError[0m                            Traceback (most recent call last)
Input [0;32mIn [1][0m, in [0;36m<cell line: 3>[0;34m()[0m
[1;32m      1[0m [38;5;66;03m# some comments[39;00m
[1;32m      2[0m [38;5;28mprint[39m([38;5;124m'[39m[38;5;124mhello[39m[38;5;124m'[39m)
[0;32m----> 3[0m [43mfoo[49m[43m([49m[43m)[49m

File [0;32m~/code/execnb/tests/err.py:2[0m, in [0;36mfoo[0;34m()[0m
[1;32m      1[0m [38;5;28;01mdef[39;00m [38;5;21mfoo[39m():
[0;32m----> 2[0m     [38;5;28;01massert[39;00m [38;5;241m13[39m [38;5;241m==[39m [38;5;241m98[39m

[0;31mAssertionError[0m: 



## Params -

If you pass `inject_code` to `CaptureShell.execute` or `CaptureShell.run_all`, the source of `nb.cells[inject_idx]` will be replaced with `inject_code`. By default, the first cell is replaced. For instance consider this notebook:

In [None]:
nb = read_nb('../tests/params.ipynb')
for c in nb.cells: print('- ',c.source)

-  a=1
-  print(a)


We can replace the first cell with `a=2` by passing that as `inject_code`, and the notebook will run with that change:

In [None]:
nb = read_nb('../tests/params.ipynb')
s.run_all(nb, inject_code="a=2")
list(nb.cells)

[{'cell_type': 'code',
  'execution_count': None,
  'id': 'a63ce885',
  'metadata': {},
  'outputs': (#0) [],
  'source': 'a=2',
  'idx_': 0},
 {'cell_type': 'code',
  'execution_count': None,
  'id': 'ea528db5',
  'metadata': {},
  'outputs': [{'name': 'stdout', 'output_type': 'stream', 'text': ['2\n']}],
  'source': 'print(a)',
  'idx_': 1}]

This can be used with `CaptureStream.execute` to parameterise runs of models in notebooks. Place any defaults for configuration code needed in the first cell, and then when running `execute` pass in new parameters as needed in `inject_code`. To replace only some of the defaults, leave an empty cell as the second cell, and inject code using `inject_idx=1` to replace the empty second cell with code that overrides some of the defaults set in the first cell. When using `execute` you can pass `inject_path` instead of `inject_code` to read the injected code from a file.

## cli -

In [None]:
#|export
@call_parse
def exec_nb(
    src:str, # Notebook path to read from
    dest:str='', # Notebook path to write to
    exc_stop:bool=False, # Stop on exceptions?
    inject_code:str=None, # Code to inject into a cell
    inject_path:str=None, # Path to file containing code to inject into a cell
    inject_idx:int=0 # Cell to replace with `inject_code`
):
    "Execute notebook from `src` and save with outputs to `dest"
    CaptureShell().execute(src, dest, exc_stop=exc_stop, inject_code=inject_code,
                           inject_path=inject_path, inject_idx=inject_idx)

This is the command-line version of `CaptureShell.execute`. Run `exec_nb -h` from the command line to see how to pass arguments. If you don't pass `dest` then the output notebook won't be saved; this is mainly useful for running tests.

## export -

In [None]:
#|hide
#|eval: false
from nbprocess.doclinks import nbprocess_export
nbprocess_export()