In [None]:
#default_exp core

In [None]:
#export
from local.test import *
from local.imports import *
from local.notebook.showdoc import *

# Core

> Basic functions used in the fastai library

In [None]:
# export
defaults = SimpleNamespace()

## Metaclasses

See this [blog post](https://realpython.com/python-metaclasses/) for more information about metaclasses. 
- `PrePostInitMeta` ensures that the classes defined with it run `__pre_init__` and `__post_init__` (without having to write `self.__pre_init__()` and `self.__post_init__()`  in the actual `init`
- `NewChkMeta` gives the `PrePostInitMeta` functionality and ensures classes defined with it don't re-create an object of their type whenever it's passed to the constructor
- `BypassNewMeta` ensures classes defined with it can easily be casted form objects they subclass.

In [None]:
#export
class FixSigMeta(type):
    "A metaclass that fixes the signature on classes that override __new__"
    def __new__(cls, name, bases, dict):
        res = super().__new__(cls, name, bases, dict)
        if res.__init__ is not object.__init__: res.__signature__ = inspect.signature(res.__init__)
        return res

In [None]:
#export
class PrePostInitMeta(FixSigMeta):
    "A metaclass that calls optional `__pre_init__` and `__post_init__` methods"
    def __call__(cls, *args, **kwargs):
        res = cls.__new__(cls)
        if type(res)==cls:
            if hasattr(res,'__pre_init__'): res.__pre_init__(*args,**kwargs)
            res.__init__(*args,**kwargs)
            if hasattr(res,'__post_init__'): res.__post_init__(*args,**kwargs)
        return res

In [None]:
show_doc(PrePostInitMeta, title_level=3)

<h3 id="PrePostInitMeta" class="doc_header"><code>class</code> <code>PrePostInitMeta</code><a href="" class="source_link" style="float:right">[source]</a></h3>

> <code>PrePostInitMeta</code>(**`name`**, **`bases`**, **`dict`**) :: [`FixSigMeta`](/core.html#FixSigMeta)

A metaclass that calls optional `__pre_init__` and `__post_init__` methods

In [None]:
class _T(metaclass=PrePostInitMeta):
    def __pre_init__(self):  self.a  = 0; assert self.a==0
    def __init__(self,b=0):  self.a += 1; assert self.a==1
    def __post_init__(self): self.a += 1; assert self.a==2

t = _T()
test_eq(t.a, 2)

In [None]:
#export
class NewChkMeta(FixSigMeta):
    "Metaclass to avoid recreating object passed to constructor"
    def __call__(cls, x=None, *args, **kwargs):
        if not args and not kwargs and x is not None and isinstance(x,cls):
            x._newchk = 1
            return x

        res = super().__call__(*((x,) + args), **kwargs)
        res._newchk = 0
        return res

In [None]:
class _T(metaclass=NewChkMeta):
    "Testing"
    def __init__(self, o=None, b=1):
        self.foo = getattr(o,'foo',0) + 1
        self.b = b

In [None]:
class _T2():
    def __init__(self, o): self.foo = getattr(o,'foo',0) + 1

t = _T(1)
test_eq(t.foo,1)
t2 = _T(t)
test_eq(t2.foo,1)
test_is(t,t2)
t3 = _T(t, b=2)
test_eq(t3.b, 2)
assert not t3 is t

t = _T2(1)
test_eq(t.foo,1)
t2 = _T2(t)
test_eq(t2.foo,2)

test_eq(_T.__doc__, "Testing")
# TODO: this shouldn't have "self, "
test_eq(str(inspect.signature(_T)), '(self, o=None, b=1)')

In [None]:
#export
class BypassNewMeta(FixSigMeta):
    "Metaclass: casts `x` to this class if it's of type `cls._bypass_type`, initializing with `_new_meta` if available"
    def __call__(cls, x=None, *args, **kwargs):
        if hasattr(cls, '_new_meta'): x = cls._new_meta(x, *args, **kwargs)
        elif not isinstance(x,getattr(cls,'_bypass_type',object)) or len(args) or len(kwargs):
            x = super().__call__(*((x,)+args), **kwargs)
        if cls!=x.__class__: x.__class__ = cls
        return x

In [None]:
class T0: pass
class _T(T0, metaclass=BypassNewMeta):
    _bypass_type=T0
    def __init__(self,x): self.x=x

t = T0()
t.a = 1
t2 = _T(t)
test_eq(type(t2), _T)
test_eq(t2.a,1)
test_is(t2,t)
t = _T(2)
t.x = 2

## Foundational functions

In [None]:
#export
def copy_func(f):
    "Copy a non-builtin function (NB `copy.copy` does not work for this)"
    if not isinstance(f,types.FunctionType): return copy(f)
    fn = types.FunctionType(f.__code__, f.__globals__, f.__name__, f.__defaults__, f.__closure__)
    fn.__dict__.update(f.__dict__) 
    return fn

In [None]:
#export
def patch_to(cls, as_prop=False):
    "Decorator: add `f` to `cls`"
    if not isinstance(cls, (tuple,list)): cls=(cls,)
    def _inner(f):
        for c_ in cls:
            nf = copy_func(f)
            # `functools.update_wrapper` when passing patched function to `Pipeline`, so we do it manually
            for o in functools.WRAPPER_ASSIGNMENTS: setattr(nf, o, getattr(f,o))
            nf.__qualname__ = f"{c_.__name__}.{f.__name__}"
            setattr(c_, f.__name__, property(nf) if as_prop else nf)
        return f
    return _inner

In [None]:
class _T3(int): pass

@patch_to(_T3)
def func1(x, a): return x+a

t = _T3(1)
test_eq(t.func1(2), 3)

If `cls` is a tuple, `f` is added to all types in the tuple.

In [None]:
class _T4(int): pass
@patch_to((_T3,_T4))
def func2(x, a): return x+2*a

t = _T3(1)
test_eq(t.func2(1), 3)
t = _T4(1)
test_eq(t.func2(1), 3)

In [None]:
#export
def patch(f):
    "Decorator: add `f` to the first parameter's class (based on f's type annotations)"
    cls = next(iter(f.__annotations__.values()))
    return patch_to(cls)(f)

In [None]:
@patch
def func(x:_T3, a):
    "test"
    return x+2

t = _T3(1)
test_eq(t.func(2), 3)
test_eq(t.func.__qualname__, '_T3.func')

If annotation is a tuple, the function is added to all types in the tuple.

In [None]:
@patch
def func3(x:(_T3,_T4), a):
    "test"
    return x+2*a

t = _T3(1)
test_eq(t.func3(2), 5)
test_eq(t.func3.__qualname__, '_T3.func3')
t = _T4(1)
test_eq(t.func3(2), 5)
test_eq(t.func3.__qualname__, '_T4.func3')

In [None]:
#export
def patch_property(f):
    "Decorator: add `f` as a property to the first parameter's class (based on f's type annotations)"
    cls = next(iter(f.__annotations__.values()))
    return patch_to(cls, as_prop=True)(f)

In [None]:
@patch_property
def prop(x:_T3): return x+1

t = _T3(1)
test_eq(t.prop, 2)

In [None]:
#export
def _mk_param(n,d=None): return inspect.Parameter(n, inspect.Parameter.KEYWORD_ONLY, default=d)

In [None]:
def test_sig(f, b): test_eq(str(inspect.signature(f)), b)

In [None]:
#export
def use_kwargs(names, keep=False):
    "Decorator: replace `**kwargs` in signature with `names` params"
    def _f(f):
        sig = inspect.signature(f)
        sigd = dict(sig.parameters)
        k = sigd.pop('kwargs')
        s2 = {n:_mk_param(n) for n in names if n not in sigd}
        sigd.update(s2)
        if keep: sigd['kwargs'] = k
        f.__signature__ = sig.replace(parameters=sigd.values())
        return f
    return _f

In [None]:
@use_kwargs(['y', 'z'])
def foo(a, b=1, **kwargs): pass
test_sig(foo, '(a, b=1, *, y=None, z=None)')

@use_kwargs(['y', 'z'], keep=True)
def foo(a, *args, b=1, **kwargs): pass
test_sig(foo, '(a, *args, b=1, y=None, z=None, **kwargs)')

In [None]:
#export
def delegates(to=None, keep=False):
    "Decorator: replace `**kwargs` in signature with params from `to`"
    def _f(f):
        if to is None: to_f,from_f = f.__base__.__init__,f.__init__
        else:          to_f,from_f = to,f
        from_f = getattr(from_f,'__func__',from_f)
        if hasattr(from_f,'__delwrap__'): return f
        sig = inspect.signature(from_f)
        sigd = dict(sig.parameters)
        k = sigd.pop('kwargs')
        s2 = {k:v for k,v in inspect.signature(to_f).parameters.items()
              if v.default != inspect.Parameter.empty and k not in sigd}
        sigd.update(s2)
        if keep: sigd['kwargs'] = k
        from_f.__signature__ = sig.replace(parameters=sigd.values())
        from_f.__delwrap__ = to_f
        return f
    return _f

In [None]:
def basefoo(e, c=2): pass

@delegates(basefoo)
def foo(a, b=1, **kwargs): pass
test_sig(foo, '(a, b=1, c=2)')

@delegates(basefoo, keep=True)
def foo(a, b=1, **kwargs): pass
test_sig(foo, '(a, b=1, c=2, **kwargs)')

In [None]:
class BaseFoo:
    def __init__(self, e, c=2): pass

@delegates()
class Foo(BaseFoo):
    def __init__(self, a, b=1, **kwargs): super().__init__(**kwargs)

test_sig(Foo, '(a, b=1, c=2)')

In [None]:
#export
def funcs_kwargs(cls):
    "Replace methods in `self._methods` with those from `kwargs`"
    old_init = cls.__init__
    def _init(self, *args, **kwargs):
        for k in cls._methods:
            arg = kwargs.pop(k,None)
            if arg is not None:
                if isinstance(arg,types.MethodType): arg = types.MethodType(arg.__func__, self)
                setattr(self, k, arg)
        old_init(self, *args, **kwargs)
    functools.update_wrapper(_init, old_init)
    cls.__init__ = use_kwargs(cls._methods)(_init)
    return cls

In [None]:
#export
def method(f):
    "Mark `f` as a method"
    # `1` is a dummy instance since Py3 doesn't allow `None` any more
    return types.MethodType(f, 1)

In [None]:
@funcs_kwargs
class T:
    _methods=['b']
    def __init__(self, f=1, **kwargs): assert not kwargs
    def a(self): return 1
    def b(self): return 2
    
t = T()
test_eq(t.a(), 1)
test_eq(t.b(), 2)
t = T(b = lambda:3)
test_eq(t.b(), 3)
test_sig(T, '(f=1, *, b=None)')
test_fail(lambda: T(a = lambda:3))

@method
def _f(self,a=1): return a+1
t = T(b = _f)
test_eq(t.b(2), 3)

class T2(T):
    def __init__(self,a):
        super().__init__(b = lambda:3)
        self.a=a
t = T2(a=1)
test_eq(t.b(), 3)
test_sig(T2, '(a)')

def _g(a=1): return a+1
class T3(T): b = staticmethod(_g)
t = T3()
test_eq(t.b(2), 3)

Runtime type checking is handy, so let's make it easy!

In [None]:
@contextmanager
def working_directory(path):
    "Change working directory to `path` and return to previous on exit."
    prev_cwd = Path.cwd()
    os.chdir(path)
    try: yield
    finally: os.chdir(prev_cwd)

In [None]:
#def is_listy(x): return isinstance(x,(list,tuple,Generator))

In [None]:
#export core
def add_docs(cls, cls_doc=None, **docs):
    "Copy values from `docs` to `cls` docstrings, and confirm all public methods are documented"
    if cls_doc is not None: cls.__doc__ = cls_doc
    for k,v in docs.items():
        f = getattr(cls,k)
        if hasattr(f,'__func__'): f = f.__func__ # required for class methods
        f.__doc__ = v
    # List of public callables without docstring
    nodoc = [c for n,c in vars(cls).items() if callable(c)
             and not n.startswith('_') and c.__doc__ is None]
    assert not nodoc, f"Missing docs: {nodoc}"
    assert cls.__doc__ is not None, f"Missing class docs: {cls}"

In [None]:
#export core
def docs(cls):
    "Decorator version of `add_docs`, using `_docs` dict"
    add_docs(cls, **cls._docs)
    return cls

In [None]:
class _T:
    def f(self): pass
    @classmethod
    def g(cls): pass
add_docs(_T, "a", f="f", g="g")

test_eq(_T.__doc__, "a")
test_eq(_T.f.__doc__, "f")
test_eq(_T.g.__doc__, "g")

In [None]:
#export
def custom_dir(c, add:list):
    "Implement custom `__dir__`, adding `add` to `cls`"
    return dir(type(c)) + list(c.__dict__.keys()) + add

In [None]:
show_doc(is_iter)

<h4 id="is_iter" class="doc_header"><code>is_iter</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/local/imports.py#L42" class="source_link" style="float:right">[source]</a></h4>

> <code>is_iter</code>(**`o`**)

Test whether `o` can be used in a `for` loop

In [None]:
assert is_iter([1])
assert not is_iter(array(1))
assert is_iter(array([1,2]))
assert (o for o in range(3))

In [None]:
#export
class _Arg:
    def __init__(self,i): self.i = i
_0,_1,_2,_3,_4 = _Arg(0),_Arg(1),_Arg(2),_Arg(3),_Arg(4)

In [None]:
#export
_all_ = ['_0', '_1', '_2', '_3', '_4']

In [None]:
#export
class bind:
    "Same as `partial`, except you can use `_0` `_1` etc param placeholders"
    def __init__(self, fn, *pargs, **pkwargs):
        self.fn,self.pargs,self.pkwargs = fn,pargs,pkwargs
        self.maxi = max((x.i for x in pargs if isinstance(x, _Arg)), default=-1)

    def __call__(self, *args, **kwargs):
        args = list(args)
        kwargs = {**self.pkwargs,**kwargs}
        for k,v in kwargs.items():
            if isinstance(v,_Arg): kwargs[k] = args.pop(v.i)
        fargs = [args[x.i] if isinstance(x, _Arg) else x for x in self.pargs] + args[self.maxi+1:]
        return self.fn(*fargs, **kwargs)

In [None]:
def myfn(a,b,c,d=1,e=2): return(a,b,c,d,e)
test_eq(bind(myfn, _1, 17, _0, e=3)(19,14), (14,17,19,1,3))
test_eq(bind(myfn, 17, _0, e=3)(19,14), (17,19,14,1,3))
test_eq(bind(myfn, 17, e=3)(19,14), (17,19,14,1,3))
test_eq(bind(myfn)(17,19,14), (17,19,14,1,2))
test_eq(bind(myfn, 17,19,14,e=_0)(3), (17,19,14,1,3))

## GetAttr -

In [None]:
#export
class GetAttr:
    "Inherit from this to have all attr accesses in `self._xtra` passed down to `self.default`"
    _default='default'
    @property
    def _xtra(self): return [o for o in dir(getattr(self,self._default)) if not o.startswith('_')]
    def __getattr__(self,k):
        if k not in ('_xtra',self._default) and (self._xtra is None or k in self._xtra): return getattr(getattr(self,self._default), k)
        raise AttributeError(k)
    def __dir__(self): return custom_dir(self, self._xtra)
    def __setstate__(self,data): self.__dict__.update(data)

In [None]:
class _C(GetAttr):
    _xtra = ['lower']
    def __init__(self,a): self.default = a
    def foo(self): noop

t = _C('Hi')
test_eq(t.lower(), 'hi')
test_fail(lambda: t.upper())
assert 'lower' in dir(t)

In [None]:
#export
def delegate_attr(self, k, to):
    "Use in `__getattr__` to delegate to attr `to` without inheriting from `GetAttr`"
    if k.startswith('_') or k==to: raise AttributeError(k)
    try: return getattr(getattr(self,to), k)
    except AttributeError: raise AttributeError(k) from None

In [None]:
class _C:
    f = 'Hi'
    def __getattr__(self, k): return delegate_attr(self, k, 'f')

t = _C()
test_eq(t.lower(), 'hi')

## L -

In [None]:
#export
def _is_array(x): return hasattr(x,'__array__') or hasattr(x,'iloc')

def _listify(o):
    if o is None: return []
    if isinstance(o, list): return o
    if isinstance(o, str) or _is_array(o): return [o]
    if is_iter(o): return list(o)
    return [o]

In [None]:
# export
def coll_repr(c, max_n=10):
    "String repr of up to `max_n` items of (possibly lazy) collection `c`"
    return f'(#{len(c)}) [' + ','.join(itertools.islice(map(str,c), max_n)) + (
        '...' if len(c)>10 else '') + ']'

In [None]:
test_eq(coll_repr(range(1000), 5), '(#1000) [0,1,2,3,4...]')

In [None]:
# export
def mask2idxs(mask):
    "Convert bool mask or index list to index `L`"
    if isinstance(mask,slice): return mask
    mask = list(mask)
    if len(mask)==0: return []
    if isinstance(mask[0],(bool,NoneType)): return [i for i,m in enumerate(mask) if m]
    return [int(i) for i in mask]

In [None]:
test_eq(mask2idxs([False,True,False,True]), [1,3])
test_eq(mask2idxs(array([1,2,3])), [1,2,3])

In [None]:
#export
listable_types = typing.Collection,Generator,map,filter,zip

In [None]:
#export
class CollBase:
    "Base class for composing a list of `items`"
    def __init__(self, items): self.items = items
    def __len__(self): return len(self.items)
    def __getitem__(self, k): return self.items[k]
    def __setitem__(self, k, v): self.items[list(k) if isinstance(k,CollBase) else k] = v
    def __delitem__(self, i): del(self.items[i])
    def __repr__(self): return self.items.__repr__()
    def __iter__(self): return self.items.__iter__()

In [None]:
#export
def cycle(o):
    "Like `itertools.cycle` except creates list of `None`s if `o` is empty"
    o = _listify(o)
    return itertools.cycle(o) if o is not None and len(o) > 0 else itertools.cycle([None])

In [None]:
test_eq(itertools.islice(cycle([1,2,3]),5), [1,2,3,1,2])
test_eq(itertools.islice(cycle([]),3), [None]*3)
test_eq(itertools.islice(cycle(None),3), [None]*3)
test_eq(itertools.islice(cycle(1),3), [1,1,1])

In [None]:
#export
def zip_cycle(x, *args):
    "Like `itertools.zip_longest` but `cycle`s through elements of all but first argument"
    return zip(x, *map(cycle,args))

In [None]:
test_eq(zip_cycle([1,2,3,4],list('abc')), [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'a')])

In [None]:
#export
def is_indexer(idx):
    "Test whether `idx` will index a single item in a list"
    return isinstance(idx,int) or not getattr(idx,'ndim',1)

In [None]:
#export
class L(CollBase, GetAttr, metaclass=NewChkMeta):
    "Behaves like a list of `items` but can also index with list of indices or masks"
    _default='items'
    def __init__(self, items=None, *rest, use_list=False, match=None):
        if rest: items = (items,)+rest
        if items is None: items = []
        if (use_list is not None) or not _is_array(items):
            items = list(items) if use_list else _listify(items)
        if match is not None:
            if len(items)==1: items = items*len(match)
            else: assert len(items)==len(match), 'Match length mismatch'
        super().__init__(items)

    def _new(self, items, *args, **kwargs): return type(self)(items, *args, use_list=None, **kwargs)
    def __getitem__(self, idx): return self._get(idx) if is_indexer(idx) else L(self._get(idx), use_list=None)

    def _get(self, i):
        if is_indexer(i) or isinstance(i,slice): return getattr(self.items,'iloc',self.items)[i]
        i = mask2idxs(i)
        return (self.items.iloc[list(i)] if hasattr(self.items,'iloc')
                else self.items.__array__()[(i,)] if hasattr(self.items,'__array__')
                else [self.items[i_] for i_ in i])

    def __setitem__(self, idx, o):
        "Set `idx` (can be list of indices, or mask, or int) items to `o` (which is broadcast if not iterable)"
        idx = idx if isinstance(idx,L) else _listify(idx)
        if not is_iter(o): o = [o]*len(idx)
        for i,o_ in zip(idx,o): self.items[i] = o_

    def __iter__(self): return iter(self.items.itertuples() if hasattr(self.items,'iloc') else self.items)
    def __contains__(self,b): return b in self.items
    def __invert__(self): return self._new(not i for i in self)
    def __eq__(self,b): return False if isinstance(b, (str,dict,set)) else all_equal(b,self)
    def __repr__(self): return repr(self.items) if _is_array(self.items) else coll_repr(self)
    def __mul__ (a,b): return a._new(a.items*b)
    def __add__ (a,b): return a._new(a.items+_listify(b))
    def __radd__(a,b): return a._new(b)+a
    def __addi__(a,b):
        a.items += list(b)
        return a

    def sorted(self, key=None, reverse=False):
        if isinstance(key,str):   k=lambda o:getattr(o,key,0)
        elif isinstance(key,int): k=itemgetter(key)
        else: k=key
        return self._new(sorted(self.items, key=k, reverse=reverse))

    @classmethod
    def split(cls, s, sep=None, maxsplit=-1): return cls(s.split(sep,maxsplit))

    @classmethod
    def range(cls, a, b=None, step=None):
        if is_coll(a): a = len(a)
        return cls(range(a,b,step) if step is not None else range(a,b) if b is not None else range(a))

    def map(self, f, *args, **kwargs):
        g = (bind(f,*args,**kwargs) if callable(f)
             else f.format if isinstance(f,str)
             else f.__getitem__)
        return self._new(map(g, self))
    
    def unique(self): return L(dict.fromkeys(self).keys())
    def enumerate(self): return L(enumerate(self))
    def val2idx(self): return {v:k for k,v in self.enumerate()}
    def itemgot(self, idx): return self.map(itemgetter(idx))
    def attrgot(self, k, default=None): return self.map(lambda o:getattr(o,k,default))
    def cycle(self): return cycle(self)
    def filter(self, f, *args, **kwargs): return self._new(filter(partial(f,*args,**kwargs), self))
    def map_dict(self, f=noop, *args, **kwargs): return {k:f(k, *args,**kwargs) for k in self}
    def starmap(self, f, *args, **kwargs): return self._new(itertools.starmap(partial(f,*args,**kwargs), self))
    def zip(self, cycled=False): return self._new((zip_cycle if cycled else zip)(*self))
    def zipwith(self, *rest, cycled=False): return self._new([self, *rest]).zip(cycled=cycled)
    def map_zip(self, f, *args, cycled=False, **kwargs): return self.zip(cycled=cycled).starmap(f, *args, **kwargs)
    def map_zipwith(self, f, *rest, cycled=False, **kwargs): return self.zipwith(*rest, cycled=cycled).starmap(f, **kwargs)
    def concat(self): return self._new(itertools.chain.from_iterable(self.map(L)))
    def shuffle(self):
        it = copy(self.items)
        random.shuffle(it)
        return self._new(it)

In [None]:
#export
add_docs(L,
         __getitem__="Retrieve `idx` (can be list of indices, or mask, or int) items",
         range="Same as builtin `range`, but returns an `L`. Can pass a collection for `a`, to use `len(a)`",
         split="Same as builtin `str.split`, but returns an `L`",
         sorted="New `L` sorted by `key`. If key is str then use `attrgetter`. If key is int then use `itemgetter`",
         unique="Unique items, in stable order",
         val2idx="Dict from value to index",
         filter="Create new `L` filtered by predicate `f`, passing `args` and `kwargs` to `f`",
         map="Create new `L` with `f` applied to all `items`, passing `args` and `kwargs` to `f`",
         map_dict="Like `map`, but creates a dict from `items` to function results",
         starmap="Like `map`, but use `itertools.starmap`",
         itemgot="Create new `L` with item `idx` of all `items`",
         attrgot="Create new `L` with attr `k` of all `items`",
         cycle="Same as `itertools.cycle`",
         enumerate="Same as `enumerate`",
         zip="Create new `L` with `zip(*items)`",
         zipwith="Create new `L` with `self` zip with each of `*rest`",
         map_zip="Combine `zip` and `starmap`",
         map_zipwith="Combine `zipwith` and `starmap`",
         concat="Concatenate all elements of list",
         shuffle="Same as `random.shuffle`, but not inplace")

You can create an `L` from an existing iterable (e.g. a list, range, etc) and access or modify it with an int list/tuple index, mask, int, or slice. All `list` methods can also be used with `L`.

In [None]:
t = L(range(12))
test_eq(t, list(range(12)))
test_ne(t, list(range(11)))
t.reverse()
test_eq(t[0], 11)
t[3] = "h"
test_eq(t[3], "h")
t[3,5] = ("j","k")
test_eq(t[3,5], ["j","k"])
test_eq(t, L(t))
test_eq(L(L(1,2),[3,4]), ([1,2],[3,4]))
t

(#12) [11,10,9,j,7,k,5,4,3,2...]

There are optimized indexers for arrays, tensors, and DataFrames.

In [None]:
arr = np.arange(9).reshape(3,3)
t = L(arr, use_list=None)
test_eq(t[1,2], arr[[1,2]])

arr = np.arange(9).reshape(3,3)
t = L(arr, use_list=None)
test_eq(t[1,2], arr[[1,2]])

df = pd.DataFrame({'a':[1,2,3]})
t = L(df, use_list=None)
test_eq(t[1,2], L(pd.DataFrame({'a':[2,3]}, index=[1,2]), use_list=None))

You can also modify an `L` with `append`, `+`, and `*`.

In [None]:
t = L()
test_eq(t, [])
t.append(1)
test_eq(t, [1])
t += [3,2]
test_eq(t, [1,3,2])
t = t + [4]
test_eq(t, [1,3,2,4])
t = 5 + t
test_eq(t, [5,1,3,2,4])
test_eq(L(1,2,3), [1,2,3])
test_eq(L(1,2,3), L(1,2,3))
t = L(1)*5
t = t.map(operator.neg)
test_eq(t,[-1]*5)
test_eq(~L([True,False,False]), L([False,True,True]))
t = L(range(4))
test_eq(zip(t, L(1).cycle()), zip(range(4),(1,1,1,1)))
t = L.range(100)
test_shuffled(t,t.shuffle())

In [None]:
def _f(x,a=0): return x+a
t = L(1)*5
test_eq(t.map(_f), t)
test_eq(t.map(_f,1), [2]*5)
test_eq(t.map(_f,a=2), [3]*5)

An `L` can be constructed from anything iterable, although tensors and arrays will not be iterated over on construction, unless you pass `use_list` to the constructor.

In [None]:
test_eq(L([1,2,3]),[1,2,3])
test_eq(L(L([1,2,3])),[1,2,3])
test_ne(L([1,2,3]),[1,2,])
test_eq(L('abc'),['abc'])
test_eq(L(range(0,3)),[0,1,2])
test_eq(L(o for o in range(0,3)),[0,1,2])
test_eq(L(array(0)),[array(0)])
test_eq(L([array(0),array(1)]),[array(0),array(1)])
test_eq(L(array([0.,1.1]))[0],array([0.,1.1]))
test_eq(L(array([0.,1.1]), use_list=True), [array(0.),array(1.1)])  # `use_list=True` to unwrap arrays/arrays

If `match` is not `None` then the created list is same len as `match`, either by:

- If `len(items)==1` then `items` is replicated,
- Otherwise an error is raised if `match` and `items` are not already the same size.

In [None]:
test_eq(L(1,match=[1,2,3]),[1,1,1])
test_eq(L([1,2],match=[2,3]),[1,2])
test_fail(lambda: L([1,2],match=[1,2,3]))

If you create an `L` from an existing `L` then you'll get back the original object (since `L` uses the `NewChkMeta` metaclass).

In [None]:
test_is(L(t), t)

An `L` is considred equal to a list if they have the same elements. It's never considered equal to a `str` a `set` or a `dict` even if they have the same elements/keys.

In [None]:
test_eq(L(['a', 'b']), ['a', 'b'])
test_ne(L(['a', 'b']), 'ab')
test_ne(L(['a', 'b']), {'a', 'b'})
test_ne(L(['a', 'b']), {'a':1, 'b':2})

### Methods

In [None]:
show_doc(L.__getitem__)

<h4 id="L.__getitem__" class="doc_header"><code>L.__getitem__</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L16" class="source_link" style="float:right">[source]</a></h4>

> <code>L.__getitem__</code>(**`idx`**)

Retrieve `idx` (can be list of indices, or mask, or int) items

In [None]:
t = L(range(12))
test_eq(t[1,2], [1,2])                # implicit tuple
test_eq(t[[1,2]], [1,2])              # list
test_eq(t[:3], [0,1,2])               # slice
test_eq(t[[False]*11 + [True]], [11]) # mask
test_eq(t[array(3)], 3)

In [None]:
show_doc(L.__setitem__)

<h4 id="L.__setitem__" class="doc_header"><code>L.__setitem__</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L25" class="source_link" style="float:right">[source]</a></h4>

> <code>L.__setitem__</code>(**`idx`**, **`o`**)

Set `idx` (can be list of indices, or mask, or int) items to `o` (which is broadcast if not iterable)

In [None]:
t[4,6] = 0
test_eq(t[4,6], [0,0])
t[4,6] = [1,2]
test_eq(t[4,6], [1,2])

In [None]:
show_doc(L.unique)

<h4 id="L.unique" class="doc_header"><code>L.unique</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L63" class="source_link" style="float:right">[source]</a></h4>

> <code>L.unique</code>()

Unique items, in stable order

In [None]:
test_eq(L(1,2,3,4,4).unique(), [1,2,3,4])

In [None]:
show_doc(L.val2idx)

<h4 id="L.val2idx" class="doc_header"><code>L.val2idx</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L65" class="source_link" style="float:right">[source]</a></h4>

> <code>L.val2idx</code>()

Dict from value to index

In [None]:
test_eq(L(1,2,3).val2idx(), {3:2,1:0,2:1})

In [None]:
show_doc(L.filter)

<h4 id="L.filter" class="doc_header"><code>L.filter</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L69" class="source_link" style="float:right">[source]</a></h4>

> <code>L.filter</code>(**`f`**, **\*`args`**, **\*\*`kwargs`**)

Create new [`L`](/core.html#L) filtered by predicate `f`, passing `args` and `kwargs` to `f`

In [None]:
test_eq(t.filter(lambda o:o<5), [0,1,2,3,1,2])

In [None]:
show_doc(L.map)

<h4 id="L.map" class="doc_header"><code>L.map</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L57" class="source_link" style="float:right">[source]</a></h4>

> <code>L.map</code>(**`f`**, **\*`args`**, **\*\*`kwargs`**)

Create new [`L`](/core.html#L) with `f` applied to all `items`, passing `args` and `kwargs` to `f`

In [None]:
test_eq(L.range(4).map(operator.neg), [0,-1,-2,-3])

If `f` is a string then it is treated as a format string to create the mapping:

In [None]:
test_eq(L.range(4).map('#{}#'), ['#0#','#1#','#2#','#3#'])

If `f` is a dictionary (or anything supporting `__getitem__`) then it is indexed to create the mapping:

In [None]:
test_eq(L.range(4).map(list('abcd')), list('abcd'))

If the special argument `_arg` is passed, then that is the kwarg used in the map.

In [None]:
L.range(4).map(f, b=_0)

(#4) [0,1,2,3]

In [None]:
def f(a=None,b=None): return b
test_eq(L.range(4).map(f, b=_0), range(4))

In [None]:
show_doc(L.map_dict)

<h4 id="L.map_dict" class="doc_header"><code>L.map_dict</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L70" class="source_link" style="float:right">[source]</a></h4>

> <code>L.map_dict</code>(**`f`**=*`'noop'`*, **\*`args`**, **\*\*`kwargs`**)

Like `map`, but creates a dict from `items` to function results

In [None]:
test_eq(L(range(1,5)).map_dict(), {1:1, 2:2, 3:3, 4:4})
test_eq(L(range(1,5)).map_dict(operator.neg), {1:-1, 2:-2, 3:-3, 4:-4})

In [None]:
show_doc(L.zip)

<h4 id="L.zip" class="doc_header"><code>L.zip</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L72" class="source_link" style="float:right">[source]</a></h4>

> <code>L.zip</code>(**`cycled`**=*`False`*)

Create new [`L`](/core.html#L) with `zip(*items)`

In [None]:
t = L([[1,2,3],'abc'])
test_eq(t.zip(), [(1, 'a'),(2, 'b'),(3, 'c')])

In [None]:
t = L([[1,2,3,4],['a','b','c']])
test_eq(t.zip(cycled=True ), [(1, 'a'),(2, 'b'),(3, 'c'),(4, 'a')])
test_eq(t.zip(cycled=False), [(1, 'a'),(2, 'b'),(3, 'c')])

In [None]:
show_doc(L.map_zip)

<h4 id="L.map_zip" class="doc_header"><code>L.map_zip</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L74" class="source_link" style="float:right">[source]</a></h4>

> <code>L.map_zip</code>(**`f`**, **\*`args`**, **`cycled`**=*`False`*, **\*\*`kwargs`**)

Combine `zip` and `starmap`

In [None]:
t = L([1,2,3],[2,3,4])
test_eq(t.map_zip(operator.mul), [2,6,12])

In [None]:
show_doc(L.zipwith)

<h4 id="L.zipwith" class="doc_header"><code>L.zipwith</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L73" class="source_link" style="float:right">[source]</a></h4>

> <code>L.zipwith</code>(**\*`rest`**, **`cycled`**=*`False`*)

Create new [`L`](/core.html#L) with `self` zip with each of `*rest`

In [None]:
b = [[0],[1],[2,2]]
t = L([1,2,3]).zipwith(b)
test_eq(t, [(1,[0]), (2,[1]), (3,[2,2])])

In [None]:
show_doc(L.map_zipwith)

<h4 id="L.map_zipwith" class="doc_header"><code>L.map_zipwith</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L75" class="source_link" style="float:right">[source]</a></h4>

> <code>L.map_zipwith</code>(**`f`**, **\*`rest`**, **`cycled`**=*`False`*, **\*\*`kwargs`**)

Combine `zipwith` and `starmap`

In [None]:
test_eq(L(1,2,3).map_zipwith(operator.mul, [2,3,4]), [2,6,12])

In [None]:
show_doc(L.itemgot)

<h4 id="L.itemgot" class="doc_header"><code>L.itemgot</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L66" class="source_link" style="float:right">[source]</a></h4>

> <code>L.itemgot</code>(**`idx`**)

Create new [`L`](/core.html#L) with item `idx` of all `items`

In [None]:
test_eq(t.itemgot(1), b)

In [None]:
show_doc(L.attrgot)

<h4 id="L.attrgot" class="doc_header"><code>L.attrgot</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L67" class="source_link" style="float:right">[source]</a></h4>

> <code>L.attrgot</code>(**`k`**, **`default`**=*`None`*)

Create new [`L`](/core.html#L) with attr `k` of all `items`

In [None]:
a = [SimpleNamespace(a=3,b=4),SimpleNamespace(a=1,b=2)]
test_eq(L(a).attrgot('b'), [4,2])

In [None]:
show_doc(L.sorted)

<h4 id="L.sorted" class="doc_header"><code>L.sorted</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L43" class="source_link" style="float:right">[source]</a></h4>

> <code>L.sorted</code>(**`key`**=*`None`*, **`reverse`**=*`False`*)

New [`L`](/core.html#L) sorted by `key`. If key is str then use `attrgetter`. If key is int then use `itemgetter`

In [None]:
test_eq(L(a).sorted('a').attrgot('b'), [2,4])

In [None]:
show_doc(L.split)

<h4 id="L.split" class="doc_header"><code>L.split</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L49" class="source_link" style="float:right">[source]</a></h4>

> <code>L.split</code>(**`s`**, **`sep`**=*`None`*, **`maxsplit`**=*`-1`*)

Same as builtin `str.split`, but returns an [`L`](/core.html#L)

In [None]:
test_eq(L.split('a b c'), list('abc'))

In [None]:
show_doc(L.range)

<h4 id="L.range" class="doc_header"><code>L.range</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L52" class="source_link" style="float:right">[source]</a></h4>

> <code>L.range</code>(**`a`**, **`b`**=*`None`*, **`step`**=*`None`*)

Same as builtin `range`, but returns an [`L`](/core.html#L). Can pass a collection for `a`, to use `len(a)`

In [None]:
test_eq_type(L.range([1,1,1]), L(range(3)))
test_eq_type(L.range(5,2,2), L(range(5,2,2)))

In [None]:
show_doc(L.concat)

<h4 id="L.concat" class="doc_header"><code>L.concat</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/__main__.py#L76" class="source_link" style="float:right">[source]</a></h4>

> <code>L.concat</code>()

Concatenate all elements of list

In [None]:
test_eq(L([0,1,2,3],4,L(5,6)).concat(), range(7))

## Utility functions

### Basics

In [None]:
# export
def ifnone(a, b):
    "`b` if `a` is None else `a`"
    return b if a is None else a

Since `b if a is None else a` is such a common pattern, we wrap it in a function. However, be careful, because python will evaluate *both* `a` and `b` when calling `ifnone` (which it doesn't do if using the `if` version directly).

In [None]:
test_eq(ifnone(None,1), 1)
test_eq(ifnone(2   ,1), 2)

In [None]:
#export
def get_class(nm, *fld_names, sup=None, doc=None, funcs=None, **flds):
    "Dynamically create a class, optionally inheriting from `sup`, containing `fld_names`"
    attrs = {}
    for f in fld_names: attrs[f] = None
    for f in L(funcs): attrs[f.__name__] = f
    for k,v in flds.items(): attrs[k] = v
    sup = ifnone(sup, ())
    if not isinstance(sup, tuple): sup=(sup,)

    def _init(self, *args, **kwargs):
        for i,v in enumerate(args): setattr(self, list(attrs.keys())[i], v)
        for k,v in kwargs.items(): setattr(self,k,v)

    def _repr(self):
        return '\n'.join(f'{o}: {getattr(self,o)}' for o in set(dir(self))
                         if not o.startswith('_') and not isinstance(getattr(self,o), types.MethodType))

    if not sup: attrs['__repr__'] = _repr
    attrs['__init__'] = _init
    res = type(nm, sup, attrs)
    if doc is not None: res.__doc__ = doc
    return res

In [None]:
_t = get_class('_t', 'a', b=2)
t = _t()
test_eq(t.a, None)
test_eq(t.b, 2)
t = _t(1, b=3)
test_eq(t.a, 1)
test_eq(t.b, 3)
t = _t(1, 3)
test_eq(t.a, 1)
test_eq(t.b, 3)

Most often you'll want to call `mk_class`, since it adds the class to your module. See `mk_class` for more details and examples of use (which also apply to `get_class`).

In [None]:
#export
def mk_class(nm, *fld_names, sup=None, doc=None, funcs=None, mod=None, **flds):
    "Create a class using `get_class` and add to the caller's module"
    if mod is None: mod = inspect.currentframe().f_back.f_locals
    res = get_class(nm, *fld_names, sup=sup, doc=doc, funcs=funcs, **flds)
    mod[nm] = res

Any `kwargs` will be added as class attributes, and `sup` is an optional (tuple of) base classes.

In [None]:
mk_class('_t', a=1, sup=GetAttr)
t = _t()
test_eq(t.a, 1)
assert(isinstance(t,GetAttr))

A `__init__` is provided that sets attrs for any `kwargs`, and for any `args` (matching by position to fields), along with a `__repr__` which prints all attrs. The docstring is set to `doc`. You can pass `funcs` which will be added as attrs with the function names.

In [None]:
def foo(self): return 1
mk_class('_t', 'a', sup=GetAttr, doc='test doc', funcs=foo)

t = _t(3, b=2)
test_eq(t.a, 3)
test_eq(t.b, 2)
test_eq(t.foo(), 1)
test_eq(t.__doc__, 'test doc')
t

<__main__._t at 0x7f567470fcc0>

In [None]:
#export
def wrap_class(nm, *fld_names, sup=None, doc=None, funcs=None, **flds):
    "Decorator: makes function a method of a new class `nm` passing parameters to `mk_class`"
    def _inner(f):
        mk_class(nm, *fld_names, sup=sup, doc=doc, funcs=L(funcs)+f, mod=f.__globals__, **flds)
        return f
    return _inner

In [None]:
@wrap_class('_t', a=2)
def bar(self,x): return x+1

t = _t()
test_eq(t.a, 2)
test_eq(t.bar(3), 4)

In [None]:
show_doc(noop)

<h4 id="noop" class="doc_header"><code>noop</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/local/imports.py#L56" class="source_link" style="float:right">[source]</a></h4>

> <code>noop</code>(**`x`**=*`None`*, **\*`args`**, **\*\*`kwargs`**)

Do nothing

In [None]:
noop()
test_eq(noop(1),1)

In [None]:
show_doc(noops)

<h4 id="noops" class="doc_header"><code>noops</code><a href="https://github.com/fastai/fastai_dev/tree/master/dev/local/imports.py#L60" class="source_link" style="float:right">[source]</a></h4>

> <code>noops</code>(**`x`**=*`None`*, **\*`args`**, **\*\*`kwargs`**)

Do nothing (method)

In [None]:
mk_class('_t', foo=noops)
test_eq(_t().foo(1),1)

In [None]:
#export
def store_attr(self, nms):
    "Store params named in comma-separated `nms` from calling context into attrs in `self`"
    mod = inspect.currentframe().f_back.f_locals
    for n in re.split(', *', nms): setattr(self,n,mod[n])

In [None]:
class T:
    def __init__(self, a,b,c): store_attr(self, 'a,b, c')

t = T(1,c=2,b=3)
assert t.a==1 and t.b==3 and t.c==2

In [None]:
#export
def attrdict(o, *ks):
    "Dict from each `k` in `ks` to `getattr(o,k)`"
    return {k:getattr(o,k) for k in ks}

In [None]:
test_eq(attrdict(t,'b','c'), {'b':3, 'c':2})

In [None]:
#export
def properties(cls, *ps):
    "Change attrs in `cls` with names in `ps` to properties"
    for p in ps: setattr(cls,p,property(getattr(cls,p)))

In [None]:
class T:
    def a(self): return 1
    def b(self): return 2
properties(T,'a')

test_eq(T().a,1)
test_eq(T().b(),2)

### Collection functions

In [None]:
#export
def tuplify(o, use_list=False, match=None):
    "Make `o` a tuple"
    return tuple(L(o, use_list=use_list, match=match))

In [None]:
test_eq(tuplify(None),())
test_eq(tuplify([1,2,3]),(1,2,3))
test_eq(tuplify(1,match=[1,2,3]),(1,1,1))

In [None]:
#export
def replicate(item,match):
    "Create tuple of `item` copied `len(match)` times"
    return (item,)*len(match)

In [None]:
t = [1,1]
test_eq(replicate([1,2], t),([1,2],[1,2]))
test_eq(replicate(1, t),(1,1))

In [None]:
#export
def uniqueify(x, sort=False, bidir=False, start=None):
    "Return the unique elements in `x`, optionally `sort`-ed, optionally return the reverse correspondance."
    res = L(x).unique()
    if start is not None: res = start+res
    if sort: res.sort()
    if bidir: return res, res.val2idx()
    return res

In [None]:
# test
test_eq(set(uniqueify([1,1,0,5,0,3])),{0,1,3,5})
test_eq(uniqueify([1,1,0,5,0,3], sort=True),[0,1,3,5])
v,o = uniqueify([1,1,0,5,0,3], bidir=True)
test_eq(v,[1,0,5,3])
test_eq(o,{1:0, 0: 1, 5: 2, 3: 3})
v,o = uniqueify([1,1,0,5,0,3], sort=True, bidir=True)
test_eq(v,[0,1,3,5])
test_eq(o,{0:0, 1: 1, 3: 2, 5: 3})

In [None]:
# export
def setify(o): return o if isinstance(o,set) else set(L(o))

In [None]:
# test
test_eq(setify(None),set())
test_eq(setify('abc'),{'abc'})
test_eq(setify([1,2,2]),{1,2})
test_eq(setify(range(0,3)),{0,1,2})
test_eq(setify({1,2}),{1,2})

In [None]:
#export
def is_listy(x):
    "`isinstance(x, (tuple,list,L))`"
    return isinstance(x, (tuple,list,L,slice,Generator))

In [None]:
assert is_listy([1])
assert is_listy(L([1]))
assert is_listy(slice(2))
assert not is_listy(array([1]))

In [None]:
#export
def range_of(x):
    "All indices of collection `x` (i.e. `list(range(len(x)))`)"
    return list(range(len(x)))

In [None]:
test_eq(range_of([1,1,1,1]), [0,1,2,3])

In [None]:
#export
def groupby(x, key):
    "Like `itertools.groupby` but doesn't need to be sorted, and isn't lazy"
    res = {}
    for o in x: res.setdefault(key(o), []).append(o)
    return res

In [None]:
test_eq(groupby('aa ab bb'.split(), itemgetter(0)), {'a':['aa','ab'], 'b':['bb']})

In [None]:
#export
def merge(*ds):
    "Merge all dictionaries in `ds`"
    return {k:v for d in ds if d is not None for k,v in d.items()}

In [None]:
test_eq(merge(), {})
test_eq(merge(dict(a=1,b=2)), dict(a=1,b=2))
test_eq(merge(dict(a=1,b=2), dict(b=3,c=4), None), dict(a=1, b=3, c=4))

In [None]:
#export
def shufflish(x, pct=0.04):
    "Randomly relocate items of `x` up to `pct` of `len(x)` from their starting location"
    n = len(x)
    return L(x[i] for i in sorted(range_of(x), key=lambda o: o+n*(1+random.random()*pct)))

In [None]:
l = list(range(100))
l2 = array(shufflish(l))
test_close(l2[:50 ].mean(), 25, eps=5)
test_close(l2[-50:].mean(), 75, eps=5)
test_ne(l,l2)

In [None]:
#export
class IterLen:
    "Base class to add iteration to anything supporting `len` and `__getitem__`"
    def __iter__(self): return (self[i] for i in range_of(self))

In [None]:
#export
@docs
class ReindexCollection(GetAttr, IterLen):
    "Reindexes collection `coll` with indices `idxs` and optional LRU cache of size `cache`"
    _default='coll'
    def __init__(self, coll, idxs=None, cache=None):
        self.coll,self.idxs,self.cache = coll,ifnone(idxs,L.range(coll)),cache
        def _get(self, i): return self.coll[i]
        self._get = types.MethodType(_get,self)
        if cache is not None: self._get = functools.lru_cache(maxsize=cache)(self._get)

    def __getitem__(self, i): return self._get(self.idxs[i])
    def __len__(self): return len(self.coll)
    def reindex(self, idxs): self.idxs = idxs
    def shuffle(self): random.shuffle(self.idxs)
    def cache_clear(self): self._get.cache_clear()

    _docs = dict(reindex="Replace `self.idxs` with idxs",
                shuffle="Randomly shuffle indices",
                cache_clear="Clear LRU cache")

In [None]:
sz = 50
t = ReindexCollection(L.range(sz), cache=2)
test_eq(list(t), range(sz))
test_eq(t[sz-1], sz-1)
test_eq(t._get.cache_info().hits, 1)
t.shuffle()
test_eq(t._get.cache_info().hits, 1)
test_ne(list(t), range(sz))
test_eq(set(t), set(range(sz)))
t.cache_clear()
test_eq(t._get.cache_info().hits, 0)
test_eq(t.count(0), 1)

In [None]:
#export
def _oper(op,a,b=None): return (lambda o:op(o,a)) if b is None else op(a,b)

def _mk_op(nm, mod=None):
    "Create an operator using `oper` and add to the caller's module"
    if mod is None: mod = inspect.currentframe().f_back.f_locals
    op = getattr(operator,nm)
    def _inner(a,b=None): return _oper(op, a,b)
    _inner.__name__ = _inner.__qualname__ = nm
    _inner.__doc__ = f'Same as `operator.{nm}`, or returns partial if 1 arg'
    mod[nm] = _inner

In [None]:
#export
_all_ = ['lt', 'gt', 'le', 'ge', 'eq', 'ne', 'add', 'sub', 'mul', 'truediv']

In [None]:
#export
for op in 'lt gt le ge eq ne add sub mul truediv'.split(): _mk_op(op)

The following functions are provided matching the behavior of the equivalent versions in `operator`:

 - *lt gt le ge eq ne add sub mul truediv*

In [None]:
lt(3,5),gt(3,5)

(True, False)

However, they also have additional functionality: if you only pass one param, they return a partial function that passes that param as the second positional parameter.

In [None]:
lt(5)(3),gt(5)(3)

(True, False)

In [None]:
#export
class _InfMeta(type):
    @property
    def count(self): return itertools.count()
    @property
    def zeros(self): return itertools.cycle([0])
    @property
    def ones(self):  return itertools.cycle([1])
    @property
    def nones(self): return itertools.cycle([None])

In [None]:
#export
class Inf(metaclass=_InfMeta):
    "Infinite lists"
    pass

`Inf` defines the following properties:
    
- `count: itertools.count()`
- `zeros: itertools.cycle([0])`
- `ones : itertools.cycle([1])`
- `nones: itertools.cycle([None])`

In [None]:
test_eq([o for i,o in zip(range(5), Inf.count)],
        [0, 1, 2, 3, 4])

test_eq([o for i,o in zip(range(5), Inf.zeros)],
        [0, 0, 0, 0, 0])

In [None]:
#export
def true(*args, **kwargs):
    "Predicate: always `True`"
    return True

In [None]:
#export
def stop(e=StopIteration):
    "Raises exception `e` (by default `StopException`) even if in an expression"
    raise e

In [None]:
#export
def gen(func, seq, cond=true):
    "Like `(func(o) for o in seq if cond(func(o)))` but handles `StopIteration`"
    return itertools.takewhile(cond, map(func,seq))

In [None]:
test_eq(gen(noop, Inf.count, lt(5)),
        range(5))
test_eq(gen(operator.neg, Inf.count, gt(-5)),
        [0,-1,-2,-3,-4])
test_eq(gen(lambda o:o if o<5 else stop(), Inf.count),
        range(5))

In [None]:
#export
def chunked(it, cs, drop_last=False):
    if not isinstance(it, Iterator): it = iter(it)
    while True:
        res = list(itertools.islice(it, cs))
        if res and (len(res)==cs or not drop_last): yield res
        if len(res)<cs: return

In [None]:
t = L.range(10)
test_eq(chunked(t,3),      [[0,1,2], [3,4,5], [6,7,8], [9]])
test_eq(chunked(t,3,True), [[0,1,2], [3,4,5], [6,7,8],    ])

t = map(lambda o:stop() if o==6 else o, Inf.count)
test_eq(chunked(t,3), [[0, 1, 2], [3, 4, 5]])
t = map(lambda o:stop() if o==7 else o, Inf.count)
test_eq(chunked(t,3), [[0, 1, 2], [3, 4, 5], [6]])

t = np.arange(10)
test_eq(chunked(t,3),      L([0,1,2], [3,4,5], [6,7,8], [9]))
test_eq(chunked(t,3,True), L([0,1,2], [3,4,5], [6,7,8],    ))

In [None]:
#export
def retain_type(new, old=None, typ=None):
    "Cast `new` to type of `old` if it's a superclass"
    # e.g. old is TensorImage, new is Tensor - if not subclass then do nothing
    if new is None: return new
    assert old is not None or typ is not None
    if typ is None:
        if not isinstance(old, type(new)): return new
        typ = old if isinstance(old,type) else type(old)
    # Do nothing the new type is already an instance of requested type (i.e. same type)
    return typ(new) if typ!=NoneType and not isinstance(new, typ) else new

In [None]:
class _T(tuple): pass
a = _T((1,2))
b = tuple((1,2))
test_eq_type(retain_type(b, typ=_T), a)

In [None]:
#export
def retain_types(new, old=None, typs=None):
    "Cast each item of `new` to type of matching item in `old` if it's a superclass"
    if not is_listy(new): return retain_type(new, old, typs)
    return type(new)(L(new, old, typs).map_zip(retain_type, cycled=True))

In [None]:
class T(tuple): pass

t1,t2 = retain_types((1,(1,)), (2,T((2,))))
test_eq_type(t1, 1)
test_eq_type(t2, T((1,)))

### Simple types

In [None]:
#export
def show_title(o, ax=None, ctx=None, label=None, **kwargs):
    "Set title of `ax` to `o`, or print `o` if `ax` is `None`"
    ax = ifnone(ax,ctx)
    if ax is None: print(o)
    elif hasattr(ax, 'set_title'): ax.set_title(o)
    elif isinstance(ax, pd.Series):
        while label in ax: label += '_'
        ax = ax.append(pd.Series({label: o}))
    return ax

In [None]:
test_stdout(lambda: show_title("title"), "title")
# ensure that col names are unique when showing to a pandas series
assert show_title("title", ctx=pd.Series(dict(a=1)), label='a').equals(pd.Series(dict(a=1,a_='title')))

In [None]:
#export
class ShowTitle:
    "Base class that adds a simple `show`"
    _show_args = {'label': 'text'}
    def show(self, ctx=None, **kwargs): return show_title(str(self), ctx=ctx, **merge(self._show_args, kwargs))

class Int(int, ShowTitle): pass
class Float(float, ShowTitle): pass
class Str(str, ShowTitle): pass
add_docs(Int, "An `int` with `show`"); add_docs(Str, "An `str` with `show`"); add_docs(Float, "An `float` with `show`")

In [None]:
show_doc(Int, title_level=3)

<h3 id="Int" class="doc_header"><code>class</code> <code>Int</code><a href="" class="source_link" style="float:right">[source]</a></h3>

> <code>Int</code>() :: `int`

An `int` with `show`

In [None]:
show_doc(Str, title_level=3)

<h3 id="Str" class="doc_header"><code>class</code> <code>Str</code><a href="" class="source_link" style="float:right">[source]</a></h3>

> <code>Str</code>() :: `str`

An `str` with `show`

In [None]:
show_doc(Float, title_level=3)

<h3 id="Float" class="doc_header"><code>class</code> <code>Float</code><a href="" class="source_link" style="float:right">[source]</a></h3>

> <code>Float</code>(**`x`**=*`0`*) :: `float`

An `float` with `show`

In [None]:
test_stdout(lambda: Str('s').show(), 's')
test_stdout(lambda: Int(1).show(), '1')

In [None]:
#export
num_methods = """
    __add__ __sub__ __mul__ __matmul__ __truediv__ __floordiv__ __mod__ __divmod__ __pow__
    __lshift__ __rshift__ __and__ __xor__ __or__ __neg__ __pos__ __abs__
""".split()
rnum_methods = """
    __radd__ __rsub__ __rmul__ __rmatmul__ __rtruediv__ __rfloordiv__ __rmod__ __rdivmod__
    __rpow__ __rlshift__ __rrshift__ __rand__ __rxor__ __ror__
""".split()
inum_methods = """
    __iadd__ __isub__ __imul__ __imatmul__ __itruediv__
    __ifloordiv__ __imod__ __ipow__ __ilshift__ __irshift__ __iand__ __ixor__ __ior__ 
""".split()

In [None]:
#export
class Tuple(tuple):
    "A `tuple` with elementwise ops and more friendly __init__ behavior"
    def __new__(cls, x=None, *rest):
        if x is None: x = ()
        if not isinstance(x,tuple):
            if len(rest): x = (x,)
            else:
                try: x = tuple(iter(x))
                except TypeError: x = (x,)
        return super().__new__(cls, x+rest if rest else x)

    def _op(self,op,*args):
        if not isinstance(self,Tuple): self = Tuple(self)
        return type(self)(map(op,self,*map(cycle, args)))
    
    def mul(self,*args):
        "`*` is already defined in `tuple` for replicating, so use `mul` instead"
        return Tuple._op(self, operator.mul,*args)
    
    def add(self,*args):
        "`+` is already defined in `tuple` for concat, so use `add` instead"
        return Tuple._op(self, operator.add,*args)

def _get_op(op):
    if isinstance(op,str): op = getattr(operator,op)
    def _f(self,*args): return self._op(op,*args)
    return _f

for n in num_methods:
    if not hasattr(Tuple, n) and hasattr(operator,n): setattr(Tuple,n,_get_op(n))

for n in 'eq ne lt le gt ge'.split(): setattr(Tuple,n,_get_op(n))
setattr(Tuple,'__invert__',_get_op('__not__'))
setattr(Tuple,'max',_get_op(max))
setattr(Tuple,'min',_get_op(min))

In [None]:
test_eq(Tuple(1), (1,))
test_eq(type(Tuple(1)), Tuple)
test_eq_type(Tuple(1,2), Tuple(1,2))
test_ne(Tuple(1,2), Tuple(1,3))
test_eq(Tuple(), ())
test_eq(Tuple((1,2)), (1,2))
test_eq(-Tuple(1,2), (-1,-2))
test_eq(Tuple(1,1)-Tuple(2,2), (-1,-1))
test_eq(Tuple.add((1,1),(2,2)), (3,3))
test_eq(Tuple(1,1).add((2,2)), Tuple(3,3))
test_eq(Tuple('1','2').add('2'), Tuple('12','22'))
test_eq_type(Tuple(1,1).add(2), Tuple(3,3))
test_eq_type(Tuple(1,1).mul(2), Tuple(2,2))
test_eq(Tuple(3,1).le(1), (False, True))
test_eq(Tuple(3,1).eq(1), (False, True))
test_eq(Tuple(3,1).gt(1), (True, False))
test_eq(Tuple(3,1).min(2), (2,1))
test_eq(~Tuple(1,0,1), (False,True,False))

In [None]:
#export
class TupleTitled(Tuple, ShowTitle):
    "A `Tuple` with `show`"
    pass

### Functions on functions

In [None]:
#export
def trace(f):
    "Add `set_trace` to an existing function `f`"
    def _inner(*args,**kwargs):
        set_trace()
        return f(*args,**kwargs)
    return _inner

In [None]:
# export
def compose(*funcs, order=None):
    "Create a function that composes all functions in `funcs`, passing along remaining `*args` and `**kwargs` to all"
    funcs = L(funcs)
    if order is not None: funcs = funcs.sorted(order)
    def _inner(x, *args, **kwargs):
        for f in L(funcs): x = f(x, *args, **kwargs)
        return x
    return _inner

In [None]:
f1 = lambda o,p=0: (o*2)+p
f2 = lambda o,p=1: (o+1)/p
test_eq(f2(f1(3)), compose(f1,f2)(3))
test_eq(f2(f1(3,p=3),p=3), compose(f1,f2)(3,p=3))
test_eq(f2(f1(3,  3),  3), compose(f1,f2)(3,  3))

f1.order = 1
test_eq(f1(f2(3)), compose(f1,f2, order="order")(3))

In [None]:
#export
def maps(*args, retain=noop):
    "Like `map`, except funcs are composed first"
    f = compose(*args[:-1])
    def _f(b): return retain(f(b), b)
    return map(_f, args[-1])

In [None]:
test_eq(maps([1]), [1])
test_eq(maps(operator.neg, [1,2]), [-1,-2])
test_eq(maps(operator.neg, operator.neg, [1,2]), [1,2])

test_eq_type(list(maps(operator.neg, [Tuple((1,)), 2], retain=retain_type)), 
             [Tuple((-1,)), -2])

In [None]:
#export
def partialler(f, *args, order=None, **kwargs):
    "Like `functools.partial` but also copies over docstring"
    fnew = partial(f,*args,**kwargs)
    fnew.__doc__ = f.__doc__
    if order is not None: fnew.order=order
    elif hasattr(f,'order'): fnew.order=f.order
    return fnew

In [None]:
def _f(x,a=1):
    "test func"
    return x+a
_f.order=1

f = partialler(_f, a=2)
test_eq(f.order, 1)
f = partialler(_f, a=2, order=3)
test_eq(f.__doc__, "test func")
test_eq(f.order, 3)
test_eq(f(3), _f(3,2))

In [None]:
#export
def mapped(f, it):
    "map `f` over `it`, unless it's not listy, in which case return `f(it)`"
    return L(it).map(f) if is_listy(it) else f(it)

In [None]:
test_eq(mapped(_f,1),2)
test_eq(mapped(_f,[1,2]),[2,3])
test_eq(mapped(_f,(1,)),(2,))

In [None]:
#export
def instantiate(t):
    "Instantiate `t` if it's a type, otherwise do nothing"
    return t() if isinstance(t, type) else t

In [None]:
test_eq_type(instantiate(int), 0)
test_eq_type(instantiate(1), 1)

In [None]:
#export
class _Self:
    "An alternative to `lambda` for calling methods on passed object."
    def __init__(self): self.nms,self.args,self.kwargs,self.ready = [],[],[],True
    def __repr__(self): return f'self: {self.nms}({self.args}, {self.kwargs})'

    def __call__(self, *args, **kwargs):
        if self.ready:
            x = args[0]
            for n,a,k in zip(self.nms,self.args,self.kwargs):
                x = getattr(x,n)
                if callable(x) and a is not None: x = x(*a, **k)
            return x
        else:
            self.args.append(args)
            self.kwargs.append(kwargs)
            self.ready = True
            return self

    def __getattr__(self,k):
        if not self.ready:
            self.args.append(None)
            self.kwargs.append(None)
        self.nms.append(k)
        self.ready = False
        return self

class _SelfCls:
    def __getattr__(self,k): return getattr(_Self(),k)

Self = _SelfCls()

In [None]:
#export
_all_ = ['Self']

#### Self

fastai provides a concise way to create lambdas that are calling methods on an object, which is to use `Self` (note the capitalization!) `Self.sum()`, for instance, is a shortcut for `lambda o: o.sum()`.

In [None]:
f = Self.sum()
x = array([3.,1])
test_eq(f(x), 4.)

# This is equivalent to above
f = lambda o: o.sum()
x = array([3.,1])
test_eq(f(x), 4.)

f = Self.sum().is_integer()
x = array([3.,1])
test_eq(f(x), True)

f = Self.sum().real.is_integer()
x = array([3.,1])
test_eq(f(x), True)

f = Self.imag()
test_eq(f(3), 0)

### File and network functions

In [None]:
#export
#NB: Please don't move this to a different line or module, since it's used in testing `get_source_link`
@patch
def ls(self:Path, file_type=None, file_exts=None):
    "Contents of path as a list"
    extns=L(file_exts)
    if file_type: extns += L(k for k,v in mimetypes.types_map.items() if v.startswith(file_type+'/'))
    return L(self.iterdir()).filter(lambda x: len(extns)==0 or x.suffix in extns)

We add an `ls()` method to `pathlib.Path` which is simply defined as `list(Path.iterdir())`, mainly for convenience in REPL environments such as notebooks.

In [None]:
path = Path()
t = path.ls()
assert len(t)>0
t[0]

PosixPath('18_callback_fp16.ipynb')

You can also pass an optional `file_type` MIME prefix and/or a list of file extensions.

In [None]:
txt_files=path.ls(file_type='text')
assert len(txt_files) > 0 and txt_files[0].suffix=='.py'
ipy_files=path.ls(file_exts=['.ipynb'])
assert len(ipy_files) > 0 and ipy_files[0].suffix=='.ipynb'
txt_files[0],ipy_files[0]

(PosixPath('train_wt2.py'), PosixPath('18_callback_fp16.ipynb'))

In [None]:
#hide
pkl = pickle.dumps(path)
p2 =pickle.loads(pkl)
test_eq(path.ls()[0], p2.ls()[0])

In [None]:
#export
def bunzip(fn):
    "bunzip `fn`, raising exception if output already exists"
    fn = Path(fn)
    assert fn.exists(), f"{fn} doesn't exist"
    out_fn = fn.with_suffix('')
    assert not out_fn.exists(), f"{out_fn} already exists"
    with bz2.BZ2File(fn, 'rb') as src, out_fn.open('wb') as dst:
        for d in iter(lambda: src.read(1024*1024), b''): dst.write(d)

In [None]:
f = Path('files/test.txt')
if f.exists(): f.unlink()
bunzip('files/test.txt.bz2')
t = f.open().readlines()
test_eq(len(t),1)
test_eq(t[0], 'test\n')
f.unlink()

In [None]:
#export
def join_path_file(file, path, ext=''):
    "Return `path/file` if file is a string or a `Path`, file otherwise"
    if not isinstance(file, (str, Path)): return file
    path.mkdir(parents=True, exist_ok=True)
    return path/f'{file}{ext}'

In [None]:
path = Path.cwd()/'_tmp'/'tst'
f = join_path_file('tst.txt', path)
assert path.exists()
test_eq(f, path/'tst.txt')
with open(f, 'w') as f_: assert join_path_file(f_, path) == f_
shutil.rmtree(Path.cwd()/'_tmp')

### Sorting objects from before/after

Transforms and callbacks will have run_after/run_before attributes, this function will sort them to respect those requirements (if it's possible). Also, sometimes we want a tranform/callback to be run at the end, but still be able to use run_after/run_before behaviors. For those, the function checks for a toward_end attribute (that needs to be True).

In [None]:
#export
def _is_instance(f, gs):
    tst = [g if type(g) in [type, 'function'] else g.__class__ for g in gs]
    for g in tst:
        if isinstance(f, g) or f==g: return True
    return False

def _is_first(f, gs):
    for o in L(getattr(f, 'run_after', None)):
        if _is_instance(o, gs): return False
    for g in gs:
        if _is_instance(f, L(getattr(g, 'run_before', None))): return False
    return True

def sort_by_run(fs):
    end = L(fs).attrgot('toward_end')
    inp,res = L(fs)[~end] + L(fs)[end], L()
    while len(inp):
        for i,o in enumerate(inp):
            if _is_first(o, inp):
                res.append(inp.pop(i))
                break
        else: raise Exception("Impossible to sort")
    return res

In [None]:
class Tst(): pass    
class Tst1():
    run_before=[Tst]
class Tst2():
    run_before=Tst
    run_after=Tst1
    
tsts = [Tst(), Tst1(), Tst2()]
test_eq(sort_by_run(tsts), [tsts[1], tsts[2], tsts[0]])

Tst2.run_before,Tst2.run_after = Tst1,Tst
test_fail(lambda: sort_by_run([Tst(), Tst1(), Tst2()]))

def tst1(x): return x
tst1.run_before = Tst
test_eq(sort_by_run([tsts[0], tst1]), [tst1, tsts[0]])
    
class Tst1():
    toward_end=True
class Tst2():
    toward_end=True
    run_before=Tst1
tsts = [Tst(), Tst1(), Tst2()]
test_eq(sort_by_run(tsts), [tsts[0], tsts[2], tsts[1]])

### Other helpers

In [None]:
#export core
def display_df(df):
    "Display `df` in a notebook or defaults to print"
    try: from IPython.display import display, HTML
    except: return print(df)
    display(HTML(df.to_html()))

In [None]:
#export
def round_multiple(x, mult, round_down=False):
    "Round `x` to nearest multiple of `mult`"
    def _f(x_): return (int if round_down else round)(x_/mult)*mult
    res = L(x).map(_f)
    return res if is_listy(x) else res[0]

In [None]:
test_eq(round_multiple(63,32), 64)
test_eq(round_multiple(50,32), 64)
test_eq(round_multiple(40,32), 32)
test_eq(round_multiple( 0,32),  0)
test_eq(round_multiple(63,32, round_down=True), 32)
test_eq(round_multiple((63,40),32), (64,32))

In [None]:
#export
def even_mults(start, stop, n):
    "Build log-stepped array from `start` to `stop` in `n` steps."
    if n==1: return stop
    mult = stop/start
    step = mult**(1/(n-1))
    return np.array([start*(step**i) for i in range(n)])

In [None]:
test_eq(even_mults(2,8,3), [2,4,8])
test_eq(even_mults(2,32,5), [2,4,8,16,32])
test_eq(even_mults(2,8,1), 8)

In [None]:
#export
def num_cpus():
    "Get number of cpus"
    try:                   return len(os.sched_getaffinity(0))
    except AttributeError: return os.cpu_count()

defaults.cpus = num_cpus()

In [None]:
#export
def add_props(f, n=2):
    "Create properties passing each of `range(n)` to f"
    return (property(partial(f,i)) for i in range(n))

In [None]:
class _T(): a,b = add_props(lambda i,x:i*2)

t = _T()
test_eq(t.a,0)
test_eq(t.b,2)

# Export -

In [None]:
#hide
from local.notebook.export import notebook2script
notebook2script(all_fs=True)

Converted 00_test.ipynb.
Converted 01_core.ipynb.
Converted 01a_torch_core.ipynb.
Converted 02_script.ipynb.
Converted 03_dataloader.ipynb.
Converted 04_transform.ipynb.
Converted 05_data_core.ipynb.
Converted 06_data_transforms.ipynb.
Converted 07_vision_core.ipynb.
Converted 08_pets_tutorial.ipynb.
Converted 09_vision_augment.ipynb.
Converted 11_layers.ipynb.
Converted 11a_vision_models_xresnet.ipynb.
Converted 12_optimizer.ipynb.
Converted 13_learner.ipynb.
Converted 14_callback_schedule.ipynb.
Converted 15_callback_hook.ipynb.
Converted 16_callback_progress.ipynb.
Converted 17_callback_tracker.ipynb.
Converted 18_callback_fp16.ipynb.
Converted 19_callback_mixup.ipynb.
Converted 20_metrics.ipynb.
Converted 21_tutorial_imagenette.ipynb.
Converted 22_vision_learner.ipynb.
Converted 23_tutorial_transfer_learning.ipynb.
Converted 30_text_core.ipynb.
Converted 31_text_data.ipynb.
Converted 32_text_models_awdlstm.ipynb.
Converted 33_text_models_core.ipynb.
Converted 34_callback_rnn.ipynb.