In [None]:
%%capture
%load_ext autoreload
%autoreload 2
%matplotlib inline

# Parser basics

In [None]:
#|default_exp nbx_parser
#|export
from collections import defaultdict


class Parser(object):
    """
    For us a parser is a callable with signature
    ```
        Iterable -> (a, Iterable) or None
    ```
    where `a` is the parser result. 
    Bindable functions have the signature 
    ```
        a -> Parser a
    ```
    """
    def __init__(self, f = lambda line: (None, line)): 
        """Wraps a fn with signature `line -> (a, line)` into a parser."""
        self.func = f
        
        
    def __call__(self, line): 
        return self.func(line)
    

    def bind(self, f): 
        """Monadic bind. Binds functions `a -> Parser a` to the parser."""
        def q(line):
            """The new parser func."""
            try: b, rest = self(line)
            except: return None
            return f(b)(rest)
        
        return Parser(q)
    
    
    def __rshift__(self, f):
        """Shorthand for 'bind'"""
        return self.bind(f)
    
    
    def __or__(self, q):
        """Creates a new parser that tries `q` if `self` fails."""
        def p(line):
            try: 
                a,rest = self(line)
                return (a, rest)
            except:
                return q(line)
            
        return Parser(p)
    
    
    def __eq__(self, val):
        """Creates a new parser that checks if result equals a given value."""
        def check(a, val):
            if isinstance(val, dict):
                for k, v in val.items(): 
                    if a[k] != v: return False
                return True
            else:
                return a == val

        return self.bind(lambda a: result(a) if check(a, val) else fail)
    
    
    def __ne__(self, val):
        """Checks if result does not equal a given value."""
        def check(a, val):
            if isinstance(val, dict):
                equal = True
                for k, v in val.items(): 
                    if a[k] != v: return True
            else:
                return a != val

        return self.bind(lambda a: result(a) if check(a, val) else fail)
    
        
    def __matmul__(self, key):      
        """
        Returns a bindable function `f(a)` that applies 
        the parser `self` and updates the given result dict `a` 
        with `self`'s results.
        """                
        def f(a):
            def p(line):
                try: b, rest = self(line)
                except: return None
                if key is None: return (a, rest)
                a[key] = b
                return (a, rest)
            
            return p
            
        return f
         

# A couple of useful atomic parsers 
def result(a): return Parser(lambda line: (a, line))
fail = Parser(lambda line: None)
item = Parser(lambda line: (line[0], line[1:]) if len(line)>0 else None)
seed = Parser(lambda line: ({}, line))

In [None]:
#|export
def result_or_none(y:"parser return"):
    """Returns None or the parser result (without the rest)."""
    if y is None: return None
    else: return y[0]

In [None]:
#|export
def seq(p, q): 
    return p >> (lambda a: (q >> (lambda b: result([a, b]))))

def compr(*ps, f = lambda *x: list(x)):
    def q(line):
        A = []
        rest = line
        for p in ps:
            try: a, rest = p(rest)
            except: return None
            A.append(a)
        return f(*A), rest
    return Parser(q)

In [None]:
p = seq(item, item)
line = "abcd"
assert seq  (item, item)      (line) == (['a', 'b'], 'cd')
assert compr(item, item, item)(line) == (['a', 'b', 'c'], 'd')

In [None]:
#|export
def many(q):
    """Apply `q` ZERO or MORE times."""
    def wrapper(line):
        a = []
        def p(line):
            if len(line) == 0: return (a, line)
            try: b, rest = q(line)
            except: return (a, line)
            a.append(b)
            return p(rest)
        
        return p(line)
    return Parser(wrapper)


def many1(q):
    """Apply `q` ONE or MORE times."""
    def p(line):
        try: a, rest_ = q(line)
        except: return None
        b, rest = many(q)(rest_)
        
        return ([a] + b,rest) 

    return Parser(p)

In [None]:
q = item == "a"
assert many(q)("aabb")  == (["a","a"], "bb")
assert many(q)("bb")    == ([], "bb")
assert many1(q)("aabb") == many(q)("aabb")
assert many1(q)("bb")   == None

# Useful Parsers

## Hashtags `#tag`

In [None]:
#|export
import re
tag_rx = re.compile(r"^\s*#([a-zA-Z_]+)\s*(.*)$")

@Parser
def parse_any_tag(line):
    """
    Splits off a tag (eg. `#MyTAg`) 
    at the beginning of the line. 
    """
    m = tag_rx.match(line)
    if m is not None: return m.groups()
    else: return None

In [None]:
parse_any_tag("#hi my name is")

('hi', 'my name is')

In [None]:
assert  parse_any_tag(         "#hi my name is")  == ('hi', 'my name is')
assert (parse_any_tag == "hi")("#hi  my name is") == ('hi', 'my name is')
assert (parse_any_tag == "hi")("#hey my name is") == None

In [None]:
#|export
def rx_parser(rx):
    def p(line):
        m = rx.match(line)
        if m is not None: 
            d = m.groupdict()
            return (d, d.pop("rest", ""))
        else: return None
        
    return Parser(p)
        
    
p = rx_parser(re.compile(r"^\s*(#)(?P<tag>[a-zA-Z_]+)\s*(?P<rest>.*)$"))
p("#hi my name is")

({'tag': 'hi'}, 'my name is')

## Option flags `--flag`

In [None]:
#|export
import re
flagname_rx = re.compile(r"\s*(?:[\-]+)(?P<name>[a-zA-Z_\-]+)=?(.*)?$")

def parse_flagname(line):
    """
    Splits off the name of a flag iff 
    there is one at the beginning.
    """
    m = flagname_rx.match(line)
    if m is not None: 
        n,r = m.groups()
        if r is None: r = ""
        return (n,r.lstrip())
    else: return None
    
def starts_with_flag(line): return parse_flagname(line) is not None

In [None]:
print(parse_flagname("  --name mirko"))
print(parse_flagname("--name=mirko"))
print(starts_with_flag("--name mirko"))

('name', 'mirko')
('name', 'mirko')
True


In [None]:
#|export
from collections import defaultdict
import shlex 

@Parser
def parse_flags(line):
    """Extracts flag keys and value pairs"""
    d    = dict()
    last = None
    rest = [] # not a flag-key nor a value 
    for s in shlex.split(line):
        try:
            n, v = parse_flagname(s)
            v = v.strip()
            if v == "":
                d[n] = None
                last = n
            else:
                d[n] = v
                last = None
        except: 
            if last is not None: 
                d[last] = s
                last    = None
            else:
                rest.append(s)
         
    return (d, rest)

In [None]:
parse_flags('#nbx --some-name=A B C --something_else=D --some-name=E --fname')

({'some-name': 'E', 'something_else': 'D', 'fname': None}, ['#nbx', 'B', 'C'])

## Tagged line with flags `#tag  --flags`

In [None]:
#|export
parse_tagged_line = seq(parse_any_tag, parse_flags)

In [None]:
(tag, flags), rest = parse_tagged_line('#nbx --some-name=A B C --something_else=D --some-name=E --fname')
tag, flags, rest

('nbx', {'some-name': 'E', 'something_else': 'D', 'fname': None}, ['B', 'C'])

In [None]:
(tag, flags), rest = parse_tagged_line('#nbx no flag here')
tag, flags, rest

('nbx', {}, ['no', 'flag', 'here'])

# Notebook parsing and tokenizing

In [None]:
#|export
from typing import Union, List, Tuple
from nbx.utils import listmap
import json
from nbx.utils import Bunch, load_nb
from pathlib import Path

In [None]:
nb = load_nb("nbx_example/test_notebook.ipynb")
nb.cells[0]

{'cell_type': 'markdown',
 'id': '89c3991f',
 'metadata': {},
 'source': ['# Test Notebook for `nbx`']}

In [None]:
#|export 
is_nbx = parse_any_tag == "nbx"

def is_nbx_cell(cell):
    """Checks first of cell source for nbx tag."""
    if cell['cell_type'] != 'code': return False
    if not cell['source']: return False
    line0 = cell['source'][0]
    return is_nbx(line0)


def get_nbx_cells(nb):
    return list(filter(is_nbx_cell, nb.cells))


def get_nbx_cells_src(nb):
    return list(map(lambda c: c["source"] ,filter(is_nbx_cell, nb.cells)))

## Tokenize a cell source

In [None]:
#|export 
class Line(object):
    def __init__(self, src=""):
        self.name = None
        self.src  = src
        
    def __eq__(self, other): return self.name == other
    def __ne__(self, other): return self.name != other
    def __str__(self) : return f"``{self.src}''"
    def __repr__(self): return f"{self.__class__.__name__}('{self.src}')"
    
    
class TaggedLine(Line):
    def __init__(self, tag, flags, src = ""): 
        super().__init__(src=src)
        self.name  = tag
        self.flags = flags
        
    
    def __str__(self): 
        flags = "".join([f" --{k}=``{v}''" for k,v in self.flags.items()])
        return f"<{self.name}{flags}/>"
    def __repr__(self): return f"TaggedLine('{self.name}', '{self.src}')"
    
    
class EmptyLine(Line): 
    def __init__(self,): 
        super().__init__(src="")
        self.name = "empty"
    def __str__(self) : return u"\u2205" # "empty set" symbol

In [None]:
#|export 
def tokenize_src(src, tags=None):
    """
    Converts each line of the src 
    into their corresponding `Line` object.
    
    If the `tags` argument is set, only its keys are
    considered valid tags when forming tagged lines.
    """
    parsed = []
    for line in src:
        line = line.rstrip()
        try: 
            (tag, flags), _ = parse_tagged_line(line.rstrip())
            if tags is None or tag in tags:
                parsed.append(TaggedLine(tag, flags, line))
            else:
                parsed.append(Line(line))

        except: 
            if line.strip() == "": parsed.append(EmptyLine())
            else:                  parsed.append(Line(line))
            
    return parsed

In [None]:
nbx_cells = get_nbx_cells(nb)
parsed    = tokenize_src(nbx_cells[0]["source"])
for t in parsed: print(t)

<nbx/>
<nbx_meta --fname=``test_script.py'' --src=``./src''/>
∅
``print("This message is in the notebook and will be added")''
∅
<xarg/>
``task_id    = -1;''
``result_dir = "./";''
∅
<xarg/>
``xtask    = -1;''
``xdir = "./";''


## Simple notebook parser

In [None]:
#|export 
def parse_into_nbx_blocks(nbpath):
    nbpath = Path(nbpath)
    nb     = load_nb(nbpath)
    
    pnb = Bunch(path  = str(nbpath),
                name  = nbpath.name,
                meta  = {},
                cells = [], 
                src   = [])
        
    nbx_cells = get_nbx_cells(nb)
    lines = []
    for cell in nbx_cells:
        cell_lines = tokenize_src(cell["source"])
        lines.extend(cell_lines)
        
    last_fname = None
    blocks = []
    for line in lines:
        if line == "nbx" and "fname" in line.flags:
            fname = line.flags["fname"]
            if fname != last_fname:
                blocks.append(Bunch(fname=fname, flags=line.flags, src=[]))
                last_fname = fname
        blocks[-1]["src"].append(line)
        
    return blocks

In [None]:
nbpath = "nbx_example/julia_test.ipynb"
blocks = parse_into_nbx_blocks(nbpath)
for block in blocks:
    print("\n", block.src[0], "\n")
    for l in block.src[1:]: print("\t",l)


 <nbx --fname=``_test.jl'' --src=``_src''/> 

	 ``include("src/empty_import.jl")''
	 ``using Plots''
	 ∅
	 ``plot(1:10)''
	 <nbx/>
	 ``println("This IS going into the first file...")''

 <nbx --fname=``_test_2.jl'' --src=``_src''/> 

	 ∅
	 ``println("This IS ALSO going in, but into ANOTHER file!")''


In [None]:
#|export 
def parse_nbx(nbpath):
    nbpath = Path(nbpath)
    nb = load_nb(nbpath)

    pnb = Bunch(path  = str(nbpath),
                name  = nbpath.name,
                meta  = {},
                cells = [], 
                src   = [])
    
    for cell in nb.cells:
        parsed = tokenize_src(cell["source"], tags={"nbx"})

        if len(parsed) > 0 and parsed[0] == "nbx": 
            lines = [line.src for line in parsed]
            
            (_, flags), _ = parse_tagged_line(lines[0])
            pnb.meta.update(flags)

            pnb.cells.append(lines[1:])
            pnb.src.extend(lines[1:])
    return pnb

In [None]:
parse_nbx("nbx_example/julia_test.ipynb")

{'path': 'nbx_example/julia_test.ipynb',
 'name': 'julia_test.ipynb',
 'meta': {'fname': '_test.jl', 'src': '_src'},
 'cells': [['include("src/empty_import.jl")',
   'using Plots',
   '',
   'plot(1:10)']],
 'src': ['include("src/empty_import.jl")', 'using Plots', '', 'plot(1:10)']}

In [None]:
#|export 
from nbx.utils import nbx_lib
from nbx.templ import *
import os

TEMPLATES = nbx_lib()/"templates"

In [None]:
TEMPLATES

PosixPath('/Users/mirko/Workspace/nbx/nbx/templates')

In [None]:
#|export 
def create_script(fname, tpl, vdict={}):
    """Create script from template and value dict""" 
    return create_file_from_template(TEMPLATES/tpl, fname, vdict)

# Julia

In [None]:
#|export
import re
include_rx = re.compile(r"^include\(\"([a-zA-Z0-9_.\/-]+)\"\)(.*)")

@Parser
def parse_jl_include(line):
    m = include_rx.match(line)
    if m is not None: 
        file, rest = m.groups()
        return (str(Path(file)), rest)
    else: return None

In [None]:
#|export
is_jl_include= lambda line: parse_jl_include(line) is not None

In [None]:
#|export
def transform_jl_include(line, src):
    if not is_jl_include(line): return line
    file, _ = parse_jl_include(line)
    new_file = "." + file.lstrip(str(src))
    new_line = f"include(\"{new_file}\")"
    return new_line

In [None]:
line = 'include("src/test.jl")'
print(parse_jl_include(line))
print(transform_jl_include(line, src="src"))

('src/test.jl', '')
include("./test.jl")


In [None]:
#|export
def prepare_jl_nb_for_export(notebook_path, target_path=None):
    if target_path is None: target_path = Path(notebook_path).parent
    pnb   = parse_nbx(notebook_path)
    fname = Path(pnb.meta["fname"])
    src   = Path(pnb.meta["src"])
    
    target_path = target_path/src
    target_path.mkdir(parents=True, exist_ok=True)
    
    fname = target_path/fname
    
    lines = []    
    for line in pnb.src:
        line = transform_jl_include(line, src)
        lines.append(line)
                
    return fname, lines

(PosixPath('nbx_example/_src/_test.jl'),
 ['include("./empty_import.jl")', 'using Plots', '', 'plot(1:10)'])

In [None]:
nbpath = "nbx_example/julia_test.ipynb"

pnb = parse_nbx(nbpath)
fname, lines = prepare_jl_nb_for_export(nbpath)

print(pnb.meta)
print(pnb.src)
print(lines)
print(fname)

{'fname': '_test.jl', 'src': '_src'}
['include("src/empty_import.jl")', 'using Plots', '', 'plot(1:10)']
['include("./empty_import.jl")', 'using Plots', '', 'plot(1:10)']
nbx_example/_src/_test.jl


In [None]:
def create_jl_from_nb(nb_path, target_path=None, tpl="nbxlines.tpl"):
    nb_path = Path(nb_path)
    if target_path is None: target_path = nb_path.parent
    
    pnb = parse_nbx(nbpath)
    print(pnb.src)
    lines = []
    
    fname = Path(pnb.meta["fname"])
    src   = Path(pnb.meta["src"])
    target_path = target_path/src
    target_path.mkdir(parents=True, exist_ok=True)
    fname = target_path/fname
    
    for line in pnb.src:
        line = transform_jl_include(line, src)
        lines.append(line)
            
    print(lines)
#     create_script(fname=fname, tpl=tpl, vdict=dict(lines=lines, nbname=pnb.name))
    
    return fname
    

In [None]:
nbpath = "nbx_example/julia_test.ipynb"
create_jl_from_nb(nbpath)

['include("src/empty_import.jl")', 'using Plots', '', 'plot(1:10)']
['include("./empty_import.jl")', 'using Plots', '', 'plot(1:10)']


PosixPath('nbx_example/_src/_test.jl')