In [None]:
# default_exp core

# MRtrix3notebook

> Develop MRtrix3-based pipelines using Jupyter notebooks

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
# export
import os
from pathlib import Path, PurePath

def exists(s, minsize_bytes=100, isdir=False, force=False):
    # check if `s` exists (file or dir if `isdir`) and has at least `minsize_bytes` bytes
    if isdir:
        ex = os.path.isdir(s)
        if force and not ex:
            raise IOError(s)
        return ex
    if not os.path.isfile(s):
        if force: raise IOError(s)
        return False
    if isinstance(s, PurePath):
        s = str(s)
    if s.endswith('.mif') or s.endswith('.mif.gz') or s.endswith('.nii') or s.endswith('.nii.gz'):
        size = Path(s).stat().st_size
        if size < minsize_bytes:
            print(s, 'size', size, 'Bytes')
            if force: 
                raise IOError(s)
            return False
    return True

# exists('/tmp')

In [None]:
# export
import os
import shutil
import logging
import subprocess
import shlex
from pathlib import Path
from collections import deque
import pprint

logging.basicConfig()

In [None]:
# def map_paths(cmds, source_target_map):
#     dirmap = source_target_map
#     keys = sorted(dirmap.keys())
#     for key in keys:
#         assert np.sum([key in k for k in keys]) == 1, f'keys must no be substrings of each other but "{key}" is'

#     import re 
#     import shlex


#     regex = re.compile("(%s)" % "|".join(map(re.escape, dirmap.keys())))


#     mapped_files = dict()
#     mapped_jobs = []
#     for cmd in cmds:
#         mapped = []
#         for arg in shlex.split(cmd):
#             newarg = regex.sub(lambda mo: dirmap[mo.string[mo.start():mo.end()]], arg)
#             if newarg not in ['|', '&&']:
#                 newarg = shlex.quote(newarg)
#             mapped += [newarg]
#             if arg != newarg:
#                 if arg in mapped_files:
#                     assert mapped_files[arg] == newarg
#                 mapped_files[arg] = newarg

#         mapped_jobs.append(' '.join(mapped))

#     return mapped_jobs, mapped_files

# mapped_jobs, mapped_files = map_paths(run.jobs, dirmap)

In [None]:
# export
from tqdm.auto import tqdm
import tempfile
import re

class Comp:
    def __benice(self):
        try:
            import psutil
        except ImportError:
            print('pip install psutil')
            return
        
        pid = os.getpid()
        ps = psutil.Process(pid)
        ps.nice(19)
        
    def __init__(self, dry_run=False, env=None, loglevel='INFO', progress=True, nice=False, dummy=False, path_map_dict=None):
        self.dry_run = dry_run
        
        self.dummy = dummy 
        
        self.path_map = None
        self.mapped_files = dict()
        if path_map_dict is not None:
            self.path_map_dict = path_map_dict
            dirmap = path_map_dict
            keys = sorted(dirmap.keys())
            for key in keys:
                assert sum([key in k for k in keys]) == 1, f'keys must no be substrings of each other but "{key}" is'
            self.path_map = re.compile("(%s)" % "|".join(map(re.escape, dirmap.keys())))
            
            
        self.jobs = deque()
        
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(loglevel)
        
        # use directory of python binary
        python = shutil.which('python') # None if executable not found
        if python is None:
            raise ImportError("could not find python in Path")
        
        if env is None:
            env = os.environ.copy()
        if 'PATH' in env:
            env['PATH'] = os.pathsep.join([str(Path(python).parent), env.get('PATH')])
        self.logger.debug('env: '+pprint.pformat(env))
        self.env = env
        self.noprogress = not progress
        self.preexec_fn = None
        if nice:
            self.preexec_fn = self.__benice
        self.__tmp_dir = None
                    
    
    def _execute(self, cmd):
        if not self.dry_run:
            self.logger.debug('running: ' + cmd)
            try:
                subprocess.check_call(cmd, shell=True, env=self.env, preexec_fn=self.preexec_fn)
            except:
                self.logger.warning('failed: '+str(cmd))
                raise
        else:
            self.logger.info('dry_run: ' + cmd)
        return 
        
    def run(self):
        if not self.jobs:
            return
        
        if self.dummy:
            self.jobs = deque()
            return

        if not self.noprogress: pbar = tqdm(total=len(self.jobs))
        while self.jobs:
            self._execute(self.jobs.popleft())
            if not self.noprogress: pbar.update(1)
        if not self.noprogress: pbar.close()
    
    def __map_paths(self, x):
        if self.path_map is not None:
            x = shlex.split(x)
            newx = []
            for arg in x:
                newarg = self.path_map.sub(lambda mo: self.path_map_dict[mo.string[mo.start():mo.end()]], arg)
                if newarg not in ['|', '&&']:
                    newarg = shlex.quote(newarg)
                newx += [newarg]
                if arg != newarg:
                    if arg in self.mapped_files:
                        assert self.mapped_files[arg] == newarg
                    self.mapped_files[arg] = newarg
            return ' '.join(newx)
        return x
    
    def __add__(self, x):
        if isinstance(x, list):
            # convert Pathlib objects to strings
            x = [str(e) if isinstance(e, PurePath) else e for e in x]
            # join elements, but don't quote |
#             try:
#                 x = shlex.join(x)
#             except AttributeError:
            x = ' '.join([shlex.quote(w) if w != '|' and w != '&&' else w for w in x ])
        elif not isinstance(x, str):
            raise TypeError("required input is string or list of strings, received " + str(type(x)))
            
        x = self.__map_paths(x)
        
        self.logger.debug('add: '+x)
        self.jobs.append(x)
        return self
            
    def __iadd__(self, x):
        return self.__add__(x)
            
    def __repr__(self):
        return 'computer %i jobs: %s' % (self.__len__(), str(self.jobs))
    
    def __str__(self):
        return self.__repr__()
    
#     def __next__(self):
#         return self.jobs.get()
    
    def __len__(self):
        return len(self.jobs)
    
    def __enter__(self):
        self.logger.debug('Entering Comp')
        return self

    def __exit__(self, exc_type, exc, exc_tb):
        self.logger.debug('Exiting Comp')
        self.run()
        
    def tmp_dir(self, suffix=None, prefix=None, tmp='/tmp'):
        return tempfile.TemporaryDirectory(suffix=suffix, prefix=prefix, dir=tmp)
    


In [None]:
# with Comp(dry_run=False) as run:
#     run += 'mrinfo --version && mrregister --version'.split()
#     run += 'mrinfo --version && mrregister --version'

In [None]:
with Comp(dry_run=False) as run:
    run.logger.info('start')
    run += 'which mrinfo'
    run += 'mrinfo --version'
    run += 'sleep 3'
    run += ['dcminfo', '--version']
    run.logger.warn('done')

INFO:__main__:start
  run.logger.warn('done')


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=4.0), HTML(value='')))

/Users/mp/anaconda3/envs/notebook/bin/mrinfo
== mrinfo 3.0.3 ==
64 bit release version, built Jul 19 2021, using Eigen 3.3.7
Author(s): J-Donald Tournier (d.tournier@brain.org.au) and Robert E. Smith (robert.smith@florey.edu.au)
Copyright (c) 2008-2021 the MRtrix3 contributors.

This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.

Covered Software is provided under this License on an "as is"
basis, without warranty of any kind, either expressed, implied, or
statutory, including, without limitation, warranties that the
Covered Software is free of defects, merchantable, fit for a
particular purpose or non-infringing.
See the Mozilla Public License v. 2.0 for more details.

For more details, see http://www.mrtrix.org/.

== dcminfo 3.0.3 ==
64 bit release version, built Jul 19 2021, using Eigen 3.3.7
Author(s): J-Donald Tournier (jdtournier@gmail.com)

['mrinfo', '--version', '&&', 'mrregister', '--version']


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=1.0), HTML(value='')))

INFO:__main__:dry_run: mrinfo --version '&&' mrregister --version





In [None]:
# class DummyRun:
#     def __init__(self):
#         self.queue = []
        
#     def __add__(self, cmd):
#         self.queue.append(cmd)
        
#     def __iadd__(self, cmd):
#         self.queue.append(cmd)
#         return self
    
#     def __len__(self):
#         return len(self.jobs)
    
#     def __repr__(self):
#         return 'computer %i jobs: %s' % (self.__len__(), str(self.jobs))
    
#     def __str__(self):
#         return self.__repr__()
    
#     def __enter__(self):
#         return self

#     def __exit__(self, exc_type, exc, exc_tb):
#         self.queue = []

In [None]:
with Comp(dry_run=True) as run:
    run.logger.info('start')
    run += 'mrinfo --version'
    run += ['dcminfo', '--version']
    run.logger.info('done')

INFO:__main__:start
INFO:__main__:done


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=2.0), HTML(value='')))

INFO:__main__:dry_run: mrinfo --version
INFO:__main__:dry_run: dcminfo --version





In [None]:
c = Comp(loglevel='INFO', nice=True, env={**os.environ, 'MRTRIX_NTHREADS':'4'})
with c.tmp_dir() as tmp:
    print(tmp)
    assert exists(tmp, isdir=True), tmp
    c += 'dcminfo --version'
    c.run()
assert not exists(tmp, isdir=True), tmp

/tmp/tmpent1x3vh


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=1.0), HTML(value='')))




In [None]:
# this is our descriptor object
class Bar(object):
    def __init__(self):
        self.value = ''
    def __get__(self, instance, owner):
        print ("returned from descriptor object")
        return self.value
    def __set__(self, instance, value):
        print ("set in descriptor object")
        self.value = value
    def __delete__(self, instance):
        print ("deleted in descriptor object")
        del self.value

class Foo(object):
    bar = Bar()

f = Foo()
f.bar = 10
print( f.bar)
f = Foo()

set in descriptor object
returned from descriptor object
10


In [None]:
# export 

def aprint(array, prefix=''):
    
    """ pretty print numpy array and its name 

    adapted from https://stackoverflow.com/a/38434232/2389450
    """
    import re
    import numpy as np
    if prefix:
        prefix = str(prefix + ' = ')
    else:
        try:
            import inspect, ast

            frame = inspect.currentframe()
            frame = inspect.getouterframes(frame)[2]
            string = inspect.findsource(frame[0])[0]

            nodes = ast.parse(''.join(string))

            i_expr = -1
            for (i, node) in enumerate(nodes.body):
                if hasattr(node, 'value') and isinstance(node.value, ast.Call) and hasattr(node.value.func, 'id') and node.value.func.id == 'aprint':
                    i_expr = i
                    break

            i_expr_next = min(i_expr + 1, len(nodes.body)-1)  
            lineno_start = nodes.body[i_expr].lineno
            lineno_end = nodes.body[i_expr_next].lineno if i_expr_next != i_expr else len(string)

            str_func_call = ''.join([i.strip() for i in string[lineno_start - 1: lineno_end]])
            params = str_func_call[len('aprint('):-1]# [str_func_call.find('(') + 1:-1].split(',')
            prefix = params + ' ='
        except:
            prefix = ''
    value_str = re.sub(r'[\[\]]', ' ', np.array2string(np.asanyarray(array), precision=6, separator=', ', max_line_width=100))
    value_str = value_str.replace('\n', '\n' + (' '*len(prefix)))

    print(prefix + value_str)

In [None]:
c = Comp(loglevel='INFO')
c += ['mrinfo', '-version']
print(c)
c.run()
print(c)

computer 1 jobs: deque(['mrinfo -version'])
computer 0 jobs: deque([])


In [None]:
#! rsync -aP ~/Dropbox/mrtrix3nb k1465906@nanlnx1.iop.kcl.ac.uk:/home/k1465906/src/

nbdev_build_lib
Converted 00_core.ipynb.
Converted index.ipynb.
Converted mif.ipynb.
Converted reg.ipynb.
Converted vis.ipynb.
touch mrtrix3nb
nbdev_build_docs
converting: /Users/mp/Dropbox/mrtrix3nb/00_core.ipynb
converting /Users/mp/Dropbox/mrtrix3nb/index.ipynb to README.md
touch docs
