# nbd

> Extra command line utilities for nbdev

In [None]:
#| default_exp nbd

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
from pathlib import Path
from fastcore.all import *
import itertools as it
import os, time
from ghapi.all import *

import configparser
from pathlib import Path

from oztools.core import *
from oztools.gh import *

import subprocess

import json
from fastcore.net import HTTP422UnprocessableEntityError

from glob import glob
import re, yaml, shutil
import importlib.resources as res

import asyncio
from asyncinotify import Inotify, Event, Mask
from pathlib import Path
from datetime import datetime

import hashlib

In [None]:
path = Path("../..")
Path.BASE_PATH = path

In [None]:
#| export
def make_things_pretty():
    # Fix not being able to click on "source" link in docs
    with open("./nbs/styles.css", 'a') as f: f.write("\nh3 {\n  width: fit-content;\n}")
    # Add dark theme and make bright theme compatible with in in colorscheme
    theme = {'light': 'united', 'dark': 'superhero'}
    with open("./nbs/_quarto.yml", 'r') as f: data = yaml.safe_load(f)
    data['format']['html']['theme'] = theme
    data['website']['favicon'] = 'favicon.png'
    with open("./nbs/_quarto.yml", 'w') as f: f.write(yaml.dump(data))
    # Copy favicon
    with res.as_file(res.files("oztools")/"data/favicon.png") as f: shutil.copy(f, "./nbs")

In [None]:
#| export

def nbd_new_fn(name:str, description:str, license:str="Apache-2.0", private:bool=False):
    "Create a new nbdev project and setup github repo for it"
    gh_repo, local_repo = gh_new_repo_fn(name, description, license, private)
    # this one makes github run CI twice (not nice)
    #setup_pages_branch(local_repo, gh_repo.name)
    os.chdir(gh_repo.name)
    subprocess.run(["nbdev_new"]) # TODO: use .__wrapped__ property to extract original function
    subprocess.run(["nbdev_install_hooks"])
    make_things_pretty()
    subprocess.run(["nbdev_prepare"])
    subprocess.run(["nbdev_clean"])
    # Hopefully using pip + some sleep would be enough for github to be ready
    # to enable pages branch
    commit_and_push(local_repo, "Initial commit")
    subprocess.run(["pip", "install", "-e", ".[dev]"])
    print("Waiting for the github to finish build before enabling pages")
    print("Feel free to start working on the project")
    # TODO: maybe check github api if build is ready
    while True:
        time.sleep(20)
        try:
            setup_pages_branch_location(local_repo, gh_repo.name)
            break
        except HTTP422UnprocessableEntityError:
            pass

In [None]:
#| export
@call_parse
def nbd_new(name:str, description:str, license:str="Apache-2.0", private:bool=False):
    "Create a new nbdev project and setup github repo for it"
    nbd_new_fn(name, description, license, private)

In [None]:
#| export

def new_notebook_template(name, description):
    template = {
        'cells': [
            {'cell_type': 'markdown', 'metadata': {},
             'source': [
                    f'# {name}\n',
                    '\n',
                    f'> {description}'
                ]},
            {'cell_type': 'code', 'execution_count': None, 'metadata': {}, 'outputs': [],
             'source': [f'#| default_exp {name}']},
            {'cell_type': 'code', 'execution_count': None, 'metadata': {}, 'outputs': [],
             'source': [
                 '#| hide\n',
                 'from nbdev.showdoc import *'
             ]},
            {'cell_type': 'code', 'execution_count': None, 'metadata': {}, 'outputs': [],
             'source': [
                 '#| export\n',
                 'from fastcore.all import *'
             ]},
            {'cell_type': 'code', 'execution_count': None, 'metadata': {}, 'outputs': [],
             'source': [
                 '#| hide\n',
                 'import nbdev; nbdev.nbdev_e'+'xport()' # nbdev_e**ort is a forbidden word
             ]},
      ], 'metadata': { 'kernelspec': { 'display_name': 'python3', 'language': 'python', 'name': 'python3' } }, 'nbformat': 4, 'nbformat_minor': 4
    }
    return template

FIXME: Cell below results in an error for some reason

In [None]:
template = new_notebook_template("foo", "Makes foo using bar")
print('\n---\n\n'.join(L(template['cells']).attrgot('source').map(''.join)))

# foo

> Makes foo using bar
---

#| default_exp foo
---

#| hide
from nbdev.showdoc import *
---

#| export
from fastcore.all import *
---

#| hide
import nbdev; nbdev.nbdev_export()


In [None]:
m = re.match(r'(\d+).*\.ipynb', '99ab_asd.ipynb')
m.group(1)

'99'

In [None]:
#| export
def zero_pad(num):
    num = str(num)
    return num if len(num) > 1 else f"0{num}"

In [None]:
test_eq(zero_pad(9), '09')
test_eq(zero_pad(32), '32')

In [None]:
#| export
@call_parse
def nbd_add(name:str, description:str,
            at:Optional[int] = None # If specified, insert new notebook at a specific position
           ):
    "Add new notebook to the project"

    prev_nbs = L(glob("[0-9]*?*.ipynb"))
    prev_ids = prev_nbs.map(lambda x: int(re.match(r'(\d+).*\.ipynb', x).group(1)))
    prev_id = max(prev_ids)

    new_id = prev_id + 1
    new_id = zero_pad(new_id)
    template = new_notebook_template(name, description)

    with open(f"{new_id}_{name}.ipynb", 'w') as f:
        json.dump(template, f)

## nbd_watch

Adapted from https://github.com/ProCern/asyncinotify/blob/master/examples/recursivewatch.py

In [None]:
#| export
def get_directories_recursive(path: Path):
    return (Path(d) for d,_,_ in os.walk(path))

In [None]:
list(get_directories_recursive(Path('.')))

[Path('.'),
 Path('.ipynb_checkpoints'),
 Path('02_nbd'),
 Path('02_nbd/favicons'),
 Path('02_nbd/favicons/arlantr'),
 Path('02_nbd/favicons/food-ocal'),
 Path('02_nbd/favicons/food-ocal/vegetable'),
 Path('02_nbd/favicons/food-ocal/treat'),
 Path('02_nbd/favicons/food-ocal/pasta'),
 Path('02_nbd/favicons/food-ocal/fruit'),
 Path('02_nbd/favicons/food-ocal/drink'),
 Path('02_nbd/favicons/food-ocal/dairy'),
 Path('02_nbd/favicons/food-ocal/meat')]

Algorithm:
1. Watch forever until an event happens.
2. After that, watch for 1s. If no more events fire, yield an event. Go back to step 1.
3. If more events fire in that 1s, record those events and go back to step 2.

Extra features:
1. If new directories are added, mark them for watching.
2. Skip events we are not interested in

In [None]:
#| export
def watch_new_directories(inotify, event):
    if Mask.CREATE in event.mask and event.path is not None and event.path.is_dir():
        for directory in get_directories_recursive(event.path): inotify.add_watch(directory, mask)

In [None]:
#| export
def extract_exports(file):
    return '\n'.join([
        ''.join(x['source']) for x in file.read_json()['cells']
        if x['cell_type'] == 'code' and 'export' in ''.join(x['source'])
    ])

In [None]:
#| export
def get_hash(s):
    h_func = hashlib.sha256(s.encode("utf-8"))
    return h_func.digest()

In [None]:
get_hash("hello, world!")

b'h\xe6V\xb2Q\xe6~\x83X\xbe\xf8H:\xb0\xd5\x1cf\x19\xf3\xe7\xa1\xa9\xf0\xe7X8\xd4\x1f\xf3h\xf7('

In [None]:
#| export
def setup_tracking(path):
    files = dict()
    for d in get_directories_recursive(path):
        if d.name == '.ipynb_checkpoints':
            continue
        for f in d.ls():
            if not re.match(r'^(?!\.\~).+\.ipynb$', f.name):
                continue
            files[f] = get_hash(extract_exports(f))
    return files

In [None]:
tracked_files = setup_tracking(Path('.'))

In [None]:
list(get_directories_recursive(Path('.')))[1].name

'.ipynb_checkpoints'

In [None]:
tracked_files.keys()

dict_keys([Path('00_core.ipynb'), Path('01_gh.ipynb'), Path('03_kgl.ipynb'), Path('02_nbd.ipynb'), Path('04_format.ipynb')])

In [None]:
s = "hello"

In [None]:
s.encode("utf-8")

b'hello'

In [None]:
re.match(r'^(?!\.\~).+\.ipynb$', '.~00_core.ipynb')

In [None]:
f = list(get_directories_recursive(Path('.')))[0].ls()[0]

In [None]:
print('\n'.join([''.join(x['source']) for x in f.read_json()['cells'] if x['cell_type'] == 'code' and 'export' in ''.join(x['source'])]))

#| export
from fastcore.all import *
#| export
def pad(s: str, pad_to: int):
    "Pad `s` with spaces to the right"
    return s + " " * max(0, (pad_to - len(s)))
#| export
def attrkey(attr):
    "Create a function that fetches `attr` of its input"
    return lambda x: getattr(x, attr)
#| export
def str_enumerate(lst: list,
                  start: int = 0 # enumerate from what number
                 ) -> Iterable[str]:
    "Create aligned sequence of numbered strings for strings in `lst`"
    return map(lambda x: f"  {pad(str(x[0]),2)}  {x[1]}", enumerate(lst, start))
#| export
def cz(*funcs):
    "Compose functions together"
    def fn(x):
        for fn in funcs:
            x = fn(x)
        return x
    return fn
#| hide
import nbdev; nbdev.nbdev_export()


In [None]:
#| export
def _inotify_watch(inotify, path, debounce_interval):
    mask = (Mask.CREATE | Mask.MODIFY | Mask.MOVE | Mask.CLOSE_WRITE
            | Mask.DELETE | Mask.DELETE_SELF | Mask.ATTRIB)
    for directory in get_directories_recursive(path): inotify.add_watch(directory, mask)

    combined_event = []
    def process_event(event):
        watch_new_directories(inotify, event)
        if not event.mask & mask: return False
        combined_event.append(event)
        return True

    while True:
        inotify.sync_timeout=-1 # Watch forever
        for event in inotify:
            if not process_event(event): continue

            inotify.sync_timeout = debounce_interval
            for event_seq in inotify:
                if not process_event(event): continue

            # No more events seen in sync_timeout: yield events seen in this event sequence
            if combined_event:
                yield combined_event
                combined_event = []

In [None]:
#| export
def inotify_watch(path, debounce_interval=0.5):
    with Inotify() as inotify:
        yield from _inotify_watch(inotify, path, debounce_interval)

In [None]:
s = '.~04_core.ipynb'
s.startswith('.~')

True

In [None]:
Path('/user/home/.~04_core.ipynb').name

'.~04_core.ipynb'

Right now, it seems the best way to figure out
when user actually saves is to count number of events:
- Saving file manually results in at least 13 events,
  while autosave only produces 9 events (sometimes 10).

In [None]:
k = re.match(r'\.\~(.*.ipynb)', '.~04_core.ipynb')
k.group(1)

'04_core.ipynb'

In [None]:
#| export
def get_files_updated(events):
    unique = set((e.watch.path, e.name.name) for e in events)
    ipynb = ((path, re.match(r'\.\~(.*.ipynb)', name)) for path, name in unique)
    ipynb = [path/name.group(1) for path, name in ipynb if name]
    return ipynb

In [None]:
p = Path('nbs/api/02_nbd.ipynb')
p.parent

Path('nbs/api')

In [None]:
p.relative_to(Path('nbs')).as_posix()

'api/02_nbd.ipynb'

In [None]:
#| export
def anything_updated(tracked_files, events):
    updated = []
    for filename in get_files_updated(events):
        new_hash = get_hash(extract_exports(filename))
        if new_hash != tracked_files[filename]:
            updated.append(filename)
            tracked_files[filename] = new_hash
    return updated

In [None]:
#| export
def nbd_watch():
    "Watch `nbs` folder and automatically run `nbdev_export` on file change"

    path = Path('nbs')
    tracked_files = setup_tracking(path)

    for events in inotify_watch(path):
        # filter events
        # - ignore temporary file saves (this happens automatically, and we don't
        #   want to refresh anything unless user explicitly saves a file)
        if len(events) < 13: continue

        updated = anything_updated(tracked_files, events)
        if not updated: continue

        #for e in events: print(e)
        print(f"[{datetime.now()}] {' '.join(p.relative_to(Path('nbs')).as_posix() for p in updated)}")
        subprocess.run(["nbdev_export"])

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()