diff --git a/nbclient/cli.py b/nbclient/cli.py index 2151095..4cb2edf 100644 --- a/nbclient/cli.py +++ b/nbclient/cli.py @@ -2,9 +2,9 @@ from __future__ import annotations import logging -import pathlib import sys import typing +from pathlib import Path from textwrap import dedent import nbformat @@ -22,6 +22,7 @@ "timeout": "NbClientApp.timeout", "startup_timeout": "NbClientApp.startup_timeout", "kernel_name": "NbClientApp.kernel_name", + "output": "NbClientApp.output_base", } nbclient_flags: dict[str, typing.Any] = { @@ -33,6 +34,14 @@ }, "Errors are ignored and execution is continued until the end of the notebook.", ), + "inplace": ( + { + "NbClientApp": { + "inplace": True, + }, + }, + "Overwrite input notebook with executed results.", + ), } @@ -98,6 +107,29 @@ class NbClientApp(JupyterApp): """ ), ).tag(config=True) + inplace = Bool( + False, + help=dedent( + """ + Default is execute notebook without writing the newly executed notebook. + If this flag is provided, the newly generated notebook will + overwrite the input notebook. + """ + ), + ).tag(config=True) + output_base = Unicode( + None, + allow_none=True, + help=dedent( + """ + Write executed notebook to this file base name. + Supports pattern replacements ``'{notebook_name}'``, + the name of the input notebook file without extension. + Note that output is always relative to the parent directory of the + input notebook. + """ + ), + ).tag(config=True) @default("log_level") def _log_level_default(self) -> int: @@ -115,6 +147,15 @@ def initialize(self, argv: list[str] | None = None) -> None: if not self.notebooks: sys.exit(-1) + # If output, must have single notebook + if len(self.notebooks) > 1 and self.output_base is not None: + if "{notebook_name}" not in self.output_base: + msg = ( + "If passing multiple notebooks with `--output=output` option, " + "output string must contain {notebook_name}" + ) + raise ValueError(msg) + # Loop and run them one by one for path in self.notebooks: self.run_notebook(path) @@ -136,16 +177,27 @@ def run_notebook(self, notebook_path: str) -> None: # Log it self.log.info(f"Executing {notebook_path}") - name = notebook_path.replace(".ipynb", "") + input_path = Path(notebook_path).with_suffix(".ipynb") # Get its parent directory so we can add it to the $PATH - path = pathlib.Path(notebook_path).parent.absolute() + path = input_path.parent.absolute() + + # Optional output of executed notebook + if self.inplace: + output_path = input_path + elif self.output_base: + output_path = input_path.parent.joinpath( + self.output_base.format(notebook_name=input_path.with_suffix("").name) + ).with_suffix(".ipynb") + else: + output_path = None - # Set the input file paths - input_path = f"{name}.ipynb" + if output_path and not output_path.parent.is_dir(): + msg = f"Cannot write to directory={output_path.parent} that does not exist" + raise ValueError(msg) # Open up the notebook we're going to run - with open(input_path) as f: + with input_path.open() as f: nb = nbformat.read(f, as_version=4) # Configure nbclient to run the notebook @@ -162,5 +214,10 @@ def run_notebook(self, notebook_path: str) -> None: # Run it client.execute() + # Save it + if output_path: + self.log.info(f"Save executed results to {output_path}") + nbformat.write(nb, output_path) + main = NbClientApp.launch_instance diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..04b9887 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,178 @@ +from pathlib import Path +from subprocess import CalledProcessError, check_output +from unittest.mock import call, mock_open, patch + +import pytest + +from nbclient.cli import NbClientApp + +current_dir = Path(__file__).parent.absolute() + + +@pytest.fixture() +def jupyterapp(): + with patch("nbclient.cli.JupyterApp.initialize") as mocked: + yield mocked + + +@pytest.fixture() +def client(): + with patch("nbclient.cli.NotebookClient", autospec=True) as mocked: + yield mocked + + +@pytest.fixture() +def writer(): + with patch("nbformat.write", autospec=True) as mocked: + yield mocked + + +@pytest.fixture() +def reader(): + with patch("nbformat.read", autospec=True, return_value="nb") as mocked: + yield mocked + + +@pytest.fixture() +def path_open(): + opener = mock_open() + + def mocked_open(self, *args, **kwargs): + return opener(self, *args, **kwargs) + + with patch("nbclient.cli.Path.open", mocked_open): + yield opener + + +@pytest.mark.parametrize( + "input_names", [("Other Comms",), ("Other Comms.ipynb",), ("Other Comms", "HelloWorld.ipynb")] +) +@pytest.mark.parametrize("relative", [False, True]) +@pytest.mark.parametrize("inplace", [False, True]) +def test_mult(input_names, relative, inplace, jupyterapp, client, reader, writer, path_open): + paths = [current_dir / "files" / name for name in input_names] + if relative: + paths = [p.relative_to(Path.cwd()) for p in paths] + + c = NbClientApp(notebooks=[str(p) for p in paths], kernel_name="python3", inplace=inplace) + c.initialize() + + # add suffix if needed + paths = [p.with_suffix(".ipynb") for p in paths] + + assert path_open.mock_calls[::3] == [call(p) for p in paths] + assert reader.call_count == len(paths) + # assert reader.mock_calls == [call(p, as_version=4) for p in paths] + + expected = [] + for p in paths: + expected.extend( + [ + call( + "nb", + timeout=c.timeout, + startup_timeout=c.startup_timeout, + skip_cells_with_tag=c.skip_cells_with_tag, + allow_errors=c.allow_errors, + kernel_name=c.kernel_name, + resources={"metadata": {"path": p.parent.absolute()}}, + ), + call().execute(), + ] + ) + + assert client.mock_calls == expected + + if inplace: + assert writer.mock_calls == [call("nb", p) for p in paths] + else: + writer.assert_not_called() + + +@pytest.mark.parametrize( + "input_names", [("Other Comms",), ("Other Comms.ipynb",), ("Other Comms", "HelloWorld.ipynb")] +) +@pytest.mark.parametrize("relative", [False, True]) +@pytest.mark.parametrize("output_base", ["thing", "thing.ipynb", "{notebook_name}-new.ipynb"]) +def test_output(input_names, relative, output_base, jupyterapp, client, reader, writer, path_open): + paths = [current_dir / "files" / name for name in input_names] + if relative: + paths = [p.relative_to(Path.cwd()) for p in paths] + + c = NbClientApp( + notebooks=[str(p) for p in paths], kernel_name="python3", output_base=output_base + ) + + if len(paths) != 1 and "{notebook_name}" not in output_base: + with pytest.raises(ValueError) as e: + c.initialize() + assert "If passing multiple" in str(e.value) + return + + c.initialize() + + # add suffix if needed + paths = [p.with_suffix(".ipynb") for p in paths] + + assert path_open.mock_calls[::3] == [call(p) for p in paths] + assert reader.call_count == len(paths) + + expected = [] + for p in paths: + expected.extend( + [ + call( + "nb", + timeout=c.timeout, + startup_timeout=c.startup_timeout, + skip_cells_with_tag=c.skip_cells_with_tag, + allow_errors=c.allow_errors, + kernel_name=c.kernel_name, + resources={"metadata": {"path": p.parent.absolute()}}, + ), + call().execute(), + ] + ) + + assert client.mock_calls == expected + + assert writer.mock_calls == [ + call( + "nb", + (p.parent / output_base.format(notebook_name=p.with_suffix("").name)).with_suffix( + ".ipynb" + ), + ) + for p in paths + ] + + +def test_bad_output_dir(jupyterapp, client, reader, writer, path_open): + input_names = ["Other Comms"] + output_base = "thing/thing" + + paths = [current_dir / "files" / name for name in input_names] + + c = NbClientApp( + notebooks=[str(p) for p in paths], kernel_name="python3", output_base=output_base + ) + + with pytest.raises(ValueError) as e: + c.initialize() + + assert "Cannot write to directory" in str(e.value) + + +# simple runner from command line +def test_cli_simple(): + path = current_dir / "files" / "Other Comms" + + with pytest.raises(CalledProcessError): + check_output(["jupyter-execute", "--output", "thing/thing", str(path)]) # noqa: S603, S607 + + +def test_no_notebooks(jupyterapp): + c = NbClientApp(notebooks=[], kernel_name="python3") + + with pytest.raises(SystemExit): + c.initialize()