In [None]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

# Parsing the notebook

## Finding tags, filter cells

In [None]:
#export
#default_exp om2
import re
_re_tag = re.compile(r"^\s*#([a-zA-Z_]+).*$")

In [None]:
#export
def extract_tag(line):
    """Returns the name of a tag (#name), if it 
    occurs at the beginning of the line, or None."""
    m = _re_tag.match(line)
    if m is not None: return m.group(1)
    else: return None

In [None]:
assert extract_tag("#nbx ") == "nbx"
assert extract_tag("#nbx  something else ") == "nbx"
assert extract_tag("# nbx something else ") == None
assert extract_tag("#xarg ") == "xarg"

In [None]:
#export
def contains_tag(name):
    return lambda line: extract_tag(line) == name

is_nbx = contains_tag("nbx")

In [None]:
assert is_nbx("#nbx") 
assert is_nbx("# nbx") == False
assert is_nbx(" #nbx") 
assert is_nbx("  #nbx") 

In [None]:
#export 
def is_nbx_cell(cell):
    if cell['cell_type'] != 'code': return False
    if not cell['source']: return False
    line0 = cell['source'][0]
    return is_nbx(line0)

When we create our python script we need to exclude jupyter's *magic* functions and shell commands that can beused in a code cell.

In [None]:
#export
_re_magic =  re.compile(r"^\s*%{1,2}|^\s*!")

In [None]:
#export
def is_magic_or_shell(line):
    m = _re_magic.match(line)
    return m is not None

In [None]:
assert is_magic_or_shell("%pwd ")
assert is_magic_or_shell("%%capture ")
assert is_magic_or_shell("!ls")

## Parsing "xargs"

We need to parse the line below `#xarg`, and decompose it into a variable declaration and the parameter range for the sweep.

In [None]:
#export                
_re_xarg = re.compile(r"""
# parses the line below an `xarg` tag:
^
([^=]+)
=
([^;]+)
;?
(.*)
$""", re.VERBOSE)

In [None]:
#export
def strip(s):
    return s.strip()

def parse_xarg_expr(line):
    m = _re_xarg.match(line)
    name, val, sweep = map(strip, m.groups())
    return name, val, sweep

In [None]:
parse_xarg_expr("x = f(a) ; [f(1),f(a),6,8]")

('x', 'f(a)', '[f(1),f(a),6,8]')

## Parsing "nbx" cells

First let's load the notebook

In [None]:
#export
import json
from argparse import Namespace

class Bunch(object):
    def __init__(self, adict={}):
        self.__dict__.update(adict)
        
    def __repr__(self):
        return str(self.__dict__.keys())

def load_nb(fname):
    nbdict = json.load(open(fname,'r',encoding="utf-8"))
    nb = Bunch(nbdict)
    nb.name = fname
    return nb

In [None]:
nb = load_nb("om.ipynb")
nb

dict_keys(['cells', 'metadata', 'nbformat', 'nbformat_minor', 'name'])

In [None]:
#export
def parse_src_with_parse_dict(a, src, parse_dict):
    if len(src) == 0: return a, []
    
    tag = extract_tag(src[0])
    if tag is None or tag not in parse_dict: a, rest = parse_none(a, src)
    else: a, rest = parse_dict[tag](a, src)

    return parse_src_with_parse_dict(a, rest, parse_dict)


def parse_none(a, src):
    if not is_magic_or_shell(src[0]):
        a['none'].append(src[0])
    rest = src[1:]
    return a, rest


def parse_nbx(a, src):
    a["nbx"].append(src[0])
    rest = src[1:]
    return a, rest

def parse_xarg(a, src):
    a["xarg"].append(src[1])
    rest = src[2:]
    return a, rest

def parse_xuse(a, src):
    a["xuse"].append(src[1])
    rest = src[2:]
    return a, rest


PARSE_DICT = {
    'xarg': parse_xarg,
    'xuse': parse_xuse}
    
def parse_nbx_cell_with_parse_dict(cell, parse_dict=PARSE_DICT):
    a = dict([(t,[]) for t in parse_dict.keys()])
    a['none'] = []
    
    a, _ = parse_src_with_parse_dict(a, cell['source'], parse_dict)
    return a


In [None]:
nb = load_nb("om.ipynb")
for cell in list(filter(is_nbx_cell, nb.cells)):    
    print("\n*****************\n** Parsed Cell **\n*****************\n")
    a = parse_nbx_cell_with_parse_dict(cell)
    for key, vals in a.items():
        print(f"--{'-'*len(key)}----\n   {key}   \n--{'-'*len(key)}----")
        [print(v, end="") for v in vals]


*****************
** Parsed Cell **
*****************

----------
   xarg   
----------
x = 0 ; [0,1,2,3,4]
y = 0 ;
task_id = 0
results_dir = "./"
----------
   xuse   
----------
----------
   none   
----------
#nbx




# some comment
z = 1
*****************
** Parsed Cell **
*****************

----------
   xarg   
----------
----------
   xuse   
----------
----------
   none   
----------
#nbx

print("some result")

Let's see how it works

In [None]:
#nbx

#xarg 
x = 0 ; [0,1,2,3,4]

#xarg 
y = 0 ;

#xarg
task_id = 0
#xarg
results_dir = "./"

# some comment
z = 1

In [None]:
#nbx

print("some result")

some result


## Parsing the whole thing

In [None]:
#export
from functools import reduce

def concat(list1, list2):
    return list1 + list2

def unzip(zipped):
    return zip(*zipped)

def negate(func):
    return lambda x: not func(x)

def is_constarg(a):
    return len(a[2]) == 0

not_constarg = negate(is_constarg)

def get_item(i):
    return lambda x: x[i]

def get_items(*I):
    return lambda x: tuple([x[i] for i in I])

In [None]:
#export


def parse_nb_with_parse_dict(nb, parse_dict=PARSE_DICT):
    nbx_cells = filter(is_nbx_cell, nb.cells)

    keys = parse_dict
    A = dict([(k,[]) for k in parse_dict.keys()])
    A['func_body'] = []
    for cell in nbx_cells:
        a = parse_nbx_cell_with_parse_dict(cell, parse_dict)
        
        for k in parse_dict.keys():
            A[k].extend(a[k])
        A['func_body'].extend(a['none'])
    
    A['xarg'] = [parse_xarg_expr(line) for line in A['xarg']]
    A['args'] = list(map(get_items(0,1), A['xarg']))
    A['const_args'] = list(map(get_items(0,1), filter(is_constarg, A['xarg'])))
    A['sweep_args'] = list(map(get_items(0,2), filter(not_constarg, A['xarg'])))
    A['name'] = nb.name

         
    return A
        

In [None]:
nb = load_nb("om.ipynb")
nb = parse_nb_with_parse_dict(nb, parse_dict=PARSE_DICT)
nb

{'xarg': [('x', '0', '[0,1,2,3,4]'),
  ('y', '0', ''),
  ('task_id', '0', ''),
  ('results_dir', '"./"', '')],
 'xuse': [],
 'func_body': ['#nbx\n',
  '\n',
  '\n',
  '\n',
  '\n',
  '# some comment\n',
  'z = 1',
  '#nbx\n',
  '\n',
  'print("some result")'],
 'args': [('x', '0'), ('y', '0'), ('task_id', '0'), ('results_dir', '"./"')],
 'const_args': [('y', '0'), ('task_id', '0'), ('results_dir', '"./"')],
 'sweep_args': [('x', '[0,1,2,3,4]')],
 'name': 'om.ipynb'}

# Creating the file bundle 

In [None]:
#export
def get_arrays(num, m=1000):
    if num < m: return [[1,num]]
    
    arrays = []
    for i in range(num//m): arrays.append([i*m+1, (i+1)*m])
    last = arrays[-1][1]
    if last < num: arrays.append([last+1, num])
        
    return arrays

In [None]:
get_arrays(1543)

[[1, 1000], [1001, 1543]]

In [None]:
#export
def init_job(start, end, step):
    return f"job_0=`sbatch --array={start}-{end}%{step} job.sh | awk '{{ print $4 }}'`"
def cont_job(j, start, end, step):
    return f"job_{j}=`sbatch --array={start}-{end}%{step} --dependency=afterok:$job_{j-1} job.sh | awk '{{ print $4 }}'`"

def chain_jobs(arrays, step):
    s = ""
    for i, arr in enumerate(arrays):
        if i ==0: s += init_job(arr[0], arr[1], step)
        else: s += cont_job(i, arr[0], arr[1], step)
        s += "\n"  
    return s

In [None]:
print(chain_jobs(get_arrays(1543), step=1))

job_0=`sbatch --array=1-1000%1 job.sh | awk '{ print $4 }'`
job_1=`sbatch --array=1001-1543%1 --dependency=afterok:$job_0 job.sh | awk '{ print $4 }'`



## Bundle functions

In [None]:
#export
def add_if_necessary(d, k, v):
    if k not in d:
        d[k] = v
    
    
def create_script(tpath, tname, fname, vars):
    print(f"Creating... {fname} \n\tfrom {tname}")
    create_file_from_template(tpath/tname, fname, vars)
    

tpath = Path(pkg_resources.resource_filename(__name__, "templates/"))


def create_om_files(target_dir, lang, num_jobs, simg, job_header, arr_size=1000, step=20):
    """
    Creates a bundle folder and all the scripts 
    needed to run an experiment script on OM...
    
    Example usage:
    
    >> create_om_files(  
            target_dir = "./_EXAMPLE_BUNDLE", 
            lang = "py", 
            num_jobs = 10,
            simg = "pytorch.simg",
            job_header = {
                "--time": "01:20:00",
                "--partition": "fiete",
                "--mem": "32gb",
                "--cpus-per-task": 4,
                "--mail-user": "me@somewhere.com"})
                
    >> create_experiment_script(
            nbname = "my_notebook.ipynb",
            target_dir = "./_EXAMPLE_BUNDLE", 
            lang = "py")
            
    """
    print(f"Creating om ... files...\n\tfrom {tpath}")
    
    create_folders(target_dir, lang)
    create_run_script(target_dir, num_jobs, arr_size, step)
    create_job_script(target_dir, lang, simg, job_header)
    
    print(render_template_from_string(INSTRUCTIONS, {"path": target_dir}))
    
    
INSTRUCTIONS = """
** Instructions: **
    Copy to remote, run, and pull the results:
    - `!scp -r {{path}} $om:$omx` 
    - `!ssh $om sbatch -D $omx/{{path}} $omx/{{path}}/run.sh`
    - `!scp -r $om:$omx/{{path}}/results ./results`
    
    For this to work you have to set a few environment variables...
"""


def create_folders(path, lang):
    path=Path(path)
            
    if not os.path.exists(path):
        os.makedirs(path)
        os.makedirs(path/'io')
        os.makedirs(path/'results')

    if os.path.exists('./src'):
        if not os.path.exists(path/'src'):
            os.makedirs(path/'src')
        os.system(f"cp -r src/* {path/'src'}") 
                  
    if os.path.exists('./data'):
        if not os.path.exists(path/'data'):
            os.makedirs(path/'data')
        os.system(f"cp -r data/* {path/'data'}") 
    
    if lang==".py":
        open(path/'__init__.py', 'a').close()
                           

def create_run_script(target_dir, num_jobs, arr_size, step):
    assert arr_size <= 1000, "Maximum number of queued jobs on OM is 1000"   
    fname = Path(target_dir)/'run.sh'
    with open(fname, "w", newline="\n") as f:
        f.write("#!/bin/sh\n\n")
        f.write("#SBATCH --out=io/runner_out__%A\n")
        f.write("#SBATCH --error=io/runner_err__%A\n\n")
        f.write(chain_jobs(get_arrays(num_jobs, arr_size), step))


def create_job_script(target_dir, lang, simg, job_header):
    
    simg        = Path(os.environ['omsimg'])/simg
    nbx_folder  = Path(os.environ['omx'])
    results_dir = Path("./results")
    
    add_if_necessary(job_header, "--out", "io/out_%a")
    add_if_necessary(job_header, "--error", "io/err_%a")
    add_if_necessary(job_header, "--mail-type", "END")
    add_if_necessary(job_header, "--exclude", "node030,node016,node015")
    
    tname = f"job_{lang}.tpl"
    fname = Path(target_dir)/'job.sh'
    create_script(tpath, tname, fname, {
        'job_header': job_header.items(),
        'nbx_folder': nbx_folder,
        'simg': simg,
        'results_dir': results_dir
    })       

    
def check_nb(pnb):
    keys = list(map(get_item(0), pnb['args']))
    if "task_id" not in keys: raise KeyError("You didn't specify `task_id`!!")
    if "results_dir" not in keys: raise KeyError("You didn't specify `results_dir`!!")
    
                  
def create_experiment_script(nbname, target_dir, lang="py"):
    nb = load_nb(nbname)
    nb = parse_nb_with_parse_dict(nb, parse_dict=PARSE_DICT)
    check_nb(nb)

    tname = f"experiment_{lang}.tpl"                  
    fname = Path(target_dir)/f"experiment.{lang}"
    create_script(tpath, tname, fname, nb)      
                   
    
        
