# cli

In [1]:
#|default_exp cli

In [2]:
#|hide
import nbdev; nbdev.nbdev_export()

In [3]:
#|hide
from nbdev.showdoc import show_doc

In [4]:
#|export
import typer
from typer import Argument, Option
from typing_extensions import Annotated
from types import FunctionType
from typing import Callable, Union
import inspect
import re
from pathlib import Path
import tempfile
import importlib.resources as resources

from nblite.config import _find_config_file, read_config
from nblite.export import convert_nb
from nblite.const import format_to_file_exts

ModuleNotFoundError: No module named 'nblite'

In [5]:
import nblite.cli

# Helper functions

In [6]:
show_doc(nblite.cli.parse_docstring)

---

### parse_docstring

>      parse_docstring (docstring:str)

*Parses a docstring to extract argument descriptions and return value description.

Args:
    docstring: The docstring to parse.

Returns:
    A tuple containing three elements: 
    1. The function summary as a string.
    2. A dictionary of argument descriptions.
    3. The return value description as a string.*

In [7]:
#|exporti
def parse_docstring(docstring: str) -> tuple:
    """Parses a docstring to extract argument descriptions and return value description.

    Args:
        docstring: The docstring to parse.

    Returns:
        A tuple containing three elements: 
        1. The function summary as a string.
        2. A dictionary of argument descriptions.
        3. The return value description as a string.
    """
    _docstring = docstring.split('Args:', 1)
    func_summary, _docstring = _docstring if len(_docstring) == 2 else (docstring, '')
    arg_docstring, return_docstring = _docstring.split('Returns:', 1) if 'Returns:' in _docstring else (_docstring, '')
    
    # Use regex to find argument descriptions
    pattern = r'(\w+): (.+)'
    matches = re.findall(pattern, arg_docstring)
    args = {arg: desc.strip() for arg, desc in matches}
    
    return func_summary.strip(), args, return_docstring.strip()

In [8]:
func_summary, arg_docs, return_doc = parse_docstring(parse_docstring.__doc__)
print(func_summary)
print(arg_docs)
print(return_doc)

Parses a docstring to extract argument descriptions and return value description.
{'docstring': 'The docstring to parse.'}
A tuple containing three elements: 
        1. The function summary as a string.
        2. A dictionary of argument descriptions.
        3. The return value description as a string.


In [9]:
show_doc(nblite.cli.derive_cli_meta)

---

### derive_cli_meta

>      derive_cli_meta (source_func:function)

*A decorator factory that transfers docstring and argument annotations from a source functio and turns
them into a typer annotations for the target function.

Args:
    source_func: The function from which to derive the docstring and argument annotations.*

In [10]:
#|export
def derive_cli_meta(source_func: FunctionType) -> Callable:
    """
    A decorator factory that transfers docstring and argument annotations from a source functio and turns
    them into a typer annotations for the target function.

    Args:
        source_func: The function from which to derive the docstring and argument annotations.
    """
    def decorator(target_func: FunctionType) -> FunctionType:
        func_summary, arg_docs, return_doc = parse_docstring(source_func.__doc__)
        target_func.__doc__ = func_summary
        if return_doc.strip():
            target_func.__doc__ += f"\n\nReturns:\n{return_doc}"
        target_func.__doc__ = inspect.cleandoc("\n".join([l.strip() for l in target_func.__doc__.split("\n") if l.strip()]))
        typer_annotations = {
            arg_key: Annotated[arg_type, Argument(help=arg_docs[arg_key] if arg_key in arg_docs else '')]
            for arg_key, arg_type in source_func.__annotations__.items()
        }
        target_func.__annotations__.update(typer_annotations)
        return target_func
    return decorator

# Define CLI

In [11]:
#|export
app = typer.Typer(invoke_without_command=True)

@app.callback()
def entrypoint(ctx: typer.Context):
    # If no subcommand is provided, show the help
    if ctx.invoked_subcommand is None:
        typer.echo(ctx.get_help())
            
def main():
    app()

## `nbl export`

In [12]:
#|export
from nblite.export import export

In [13]:
#|export
@app.command(name='export')
@derive_cli_meta(export)
def cli_export(root_folder=None, config_path= None, export_pipeline= None):
    export(root_folder, config_path, export_pipeline)

## `nbl init`

In [14]:
#|export
@app.command(name='init')
def cli_init(
    module_name: Annotated[Union[str,None], Option(help="The name of the module to create")] = None,
    root_path: Annotated[Union[str,None], Option(help="The root path of the project")] = None,
    use_defaults: Annotated[bool, Option(help="Use default values for module name and root path")] = False,
):
    """
    Initialize a new nblite project.
    """
    if module_name is None:
        default_module_name = Path('.').resolve().name
        if not use_defaults:
            module_name = typer.prompt(f"Enter the name of the module to create", default=default_module_name)
        else:
            module_name = default_module_name
    
    if root_path is None:
        root_path = Path('.').resolve()
    
    nblite_toml_template = (resources.files("nblite") / "defaults" / "default_nblite.toml").read_text()
    nblite_toml_str = nblite_toml_template.format(module_name=module_name)
    
    toml_path = root_path / 'nblite.toml'
    if toml_path.exists():
        typer.echo(f"Error: {toml_path} already exists")
        raise typer.Abort()
    
    with open(toml_path, 'w') as f:
        f.write(nblite_toml_str)
        
    typer.echo(f"Created {toml_path}")

## `nbl new`

In [15]:
#|export
@app.command(name='new')
def cli_new(
    nb_path: Annotated[str, Argument(help="The notebook to create.")],
    mod_name: Annotated[Union[str,None], Option("-n", "--name", help="The name of the exported module. Defaults to the notebook path relative to the code location root.")] = None,
    nb_title: Annotated[Union[str,None], Option("-t", "--title", help="The display title of the notebook. Defaults to the notebook path stem.")] = None,
):
    "Create a new notebook in a code location."
    nb_path = Path(nb_path).resolve()
    config_path = _find_config_file(Path('.'))
    if config_path is None:
        typer.echo("Not inside an nblite project. No nblite.toml file found in the current directory or any parent directory.")
        raise typer.Abort()
    root_path = config_path.parent
    if nb_title is None:
        nb_title = nb_path.stem
    config = read_config(config_path)

    nb_format = None
    for loc in config.code_locations.values():
        if nb_path.is_relative_to(root_path / loc.path):
            nb_format = loc.format
            if not nb_path.name.endswith(format_to_file_exts[nb_format]):
                nb_path = Path(nb_path.as_posix() + '.' + format_to_file_exts[nb_format])
            if mod_name is None:
                rel_path = str(nb_path.relative_to(root_path / loc.path).parent)
                if rel_path == '.':
                    mod_name = nb_path.stem
                else:
                    mod_name = rel_path.replace('/', '.') + '.' + nb_path.stem
            break

    if nb_format is None:
        typer.echo(f"Error: '{nb_path}' is not inside any code location.")
        raise typer.Abort()

    if nb_path.exists():
        typer.echo(f"Error: '{nb_path}' already exists.")
        raise typer.Abort()

    with tempfile.NamedTemporaryFile(suffix='.pct.py') as tmp_nb:
        pct_content = (resources.files("nblite") / "defaults" / "default_nb.pct.py_").read_text().format(nb_title=nb_title, mod_name=mod_name)
        tmp_nb.write(pct_content.encode())
        tmp_nb.flush()
        nb_path.parent.mkdir(parents=True, exist_ok=True)
        convert_nb(tmp_nb.name, nb_path, nb_format="percent", dest_format=nb_format)
        
    typer.echo(f"Created {nb_path}")

## `nbl clean`

In [16]:
"--remove_outputs"

'--remove_outputs'

## `nbl fill`

In [None]:
def cli_fill(nb_path, cell_exec_timeout=None):
    import nbformat
    from nbconvert.preprocessors import ExecutePreprocessor

    if not nb_path.endswith('.ipynb'):
        typer.echo(f"Error: '{nb_path}' is not a Jupyter notebook file.")
        raise typer.Abort()

    # Load the notebook
    with open(nb_path) as f:
        nb = nbformat.read(f, as_version=4)

    # Remove outputs from each cell
    for cell in nb.cells:
        cell.outputs = {}

    # Parse directives for skipping cell evaluations
    skip_evals_mode = False
    skipped_cells = []
    for cell in nb.cells:
        skip_cell = False
        if cell['cell_type'] != 'code': continue
        for line in cell['source'].split('\n'):
            line = line.strip()
            if not line.startswith('#|'): continue
            directive = line.split('#|', 1)[1].strip()
            if directive == 'skip_evals':
                if skip_evals_mode:
                    raise ValueError("Already in skip_evals mode")
                skip_evals_mode = True
            elif directive == 'skip_evals_stop':
                if not skip_evals_mode:
                    raise ValueError("Not in skip_evals mode")
                skip_evals_mode = False
            elif directive.split(':', 1)[0].strip() == "eval":
                if directive.split(':', 1)[1].strip() == 'false':
                    skip_cell = True
            
            if skip_evals_mode or skip_cell:
                cell['cell_type'] = 'skip'
                skipped_cells.append(cell)

    # Create the execute preprocessor
    ep = ExecutePreprocessor(timeout=cell_exec_timeout, kernel_name="python3")

    # Initialize execution with an empty resources dictionary
    resources = {"metadata": {"path": "."}}

    # Start execution - this ensures the kernel is running
    ep.preprocess(nb, resources)

    # Restore the cell types of skipped code cells
    for cell in skipped_cells:
        cell['cell_type'] = 'code'

    # Remove metadata from each cell
    for cell in nb.cells:
        if cell['cell_type'] == 'code':
            cell['execution_count'] = 0 if cell in skipped_cells else 1
        cell.metadata = {}

    # Save the executed notebook with outputs
    with open(output_notebook, "w") as f:
        nbformat.write(nb, f)

    print(f"Executed notebook saved as {output_notebook}")

In [99]:
import nbformat
from nbconvert.preprocessors import ExecutePreprocessor

cell_exec_timeout = None

# Define notebook filenames
input_notebook = "../../test_proj/nbs/notebook1.ipynb"
output_notebook = "output_notebook.ipynb"

# Load the notebook
with open(input_notebook) as f:
    nb = nbformat.read(f, as_version=4)

# Remove outputs from each cell
for cell in nb.cells:
    cell.outputs = {}

# Parse directives for skipping cell evaluations
skip_evals_mode = False
skipped_cells = []
for cell in nb.cells:
    skip_cell = False
    if cell['cell_type'] != 'code': continue
    for line in cell['source'].split('\n'):
        line = line.strip()
        if not line.startswith('#|'): continue
        directive = line.split('#|', 1)[1].strip()
        if directive == 'skip_evals':
            if skip_evals_mode:
                raise ValueError("Already in skip_evals mode")
            skip_evals_mode = True
        elif directive == 'skip_evals_stop':
            if not skip_evals_mode:
                raise ValueError("Not in skip_evals mode")
            skip_evals_mode = False
        elif directive.split(':', 1)[0].strip() == "eval":
            if directive.split(':', 1)[1].strip() == 'false':
                skip_cell = True
        
        if skip_evals_mode or skip_cell:
            cell['cell_type'] = 'skip'
            skipped_cells.append(cell)

# Create the execute preprocessor
ep = ExecutePreprocessor(timeout=cell_exec_timeout, kernel_name="python3")

# Initialize execution with an empty resources dictionary
resources = {"metadata": {"path": "."}}

# Start execution - this ensures the kernel is running
ep.preprocess(nb, resources)

# Restore the cell types of skipped code cells
for cell in skipped_cells:
    cell['cell_type'] = 'code'

# Remove metadata from each cell
for cell in nb.cells:
    if cell['cell_type'] == 'code':
        cell['execution_count'] = 0 if cell in skipped_cells else 1
    cell.metadata = {}

# Save the executed notebook with outputs
with open(output_notebook, "w") as f:
    nbformat.write(nb, f)

print(f"Executed notebook saved as {output_notebook}")

Executed notebook saved as output_notebook.ipynb


In [None]:
import nbformat
from nbconvert.preprocessors import ExecutePreprocessor
import traceback

cell_exec_timeout = None

# Define notebook filenames
input_notebook = "../../test_proj/nbs/notebook1.ipynb"
output_notebook = "output_notebook.ipynb"

# Load the notebook
with open(input_notebook) as f:
    nb = nbformat.read(f, as_version=4)

# Create the execute preprocessor
ep = ExecutePreprocessor(timeout=cell_exec_timeout, kernel_name="python3")

# Initialize execution with an empty resources dictionary
resources = {"metadata": {"path": "."}}

# Execute each cell one-by-one
for index, cell in enumerate(nb.cells):
    if cell.cell_type == 'code':
        try:
            # Execute the cell
            ep.preprocess(nb, resources)
            print(f"Executed cell {index}")
        except Exception as e:
            print(f"Error executing cell {index}: {e}")
            traceback.print_exc()

# Save the executed notebook with outputs
with open(output_notebook, "w") as f:
    nbformat.write(nb, f)

print(f"Executed notebook saved as {output_notebook}")

Executed cell 1
Executed cell 2
Executed cell 3
Executed cell 4
Executed cell 5
Executed cell 6
Executed notebook saved as output_notebook.ipynb


In [24]:
ep.preprocess?

[0;31mSignature:[0m
[0mep[0m[0;34m.[0m[0mpreprocess[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mnb[0m[0;34m:[0m [0;34m'NotebookNode'[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mresources[0m[0;34m:[0m [0;34m't.Any'[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mkm[0m[0;34m:[0m [0;34m'KernelManager | None'[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m [0;34m->[0m [0;34m'tuple[NotebookNode, dict[str, t.Any]]'[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Preprocess notebook executing each code cell.

The input argument *nb* is modified in-place.

Note that this function recalls NotebookClient.__init__, which may look wrong.
However since the preprocess call acts line an init on execution state it's expected.
Therefore, we need to capture it here again to properly reset because traitlet
assignments are not passed. There is a risk if traitlets apply any side effects for
dual init.
The risk should b