In [None]:
#| hide
#| default_exp utils

# Utils

## Disk Cache

In [None]:
#| export
# dill is an improved version of pickle, using it to support namedtuples
import dill
from pathlib import Path
import inspect
import hashlib
from pyprojroot import here

In [None]:
#| hide
def a():
    # comment
    return 'a'
inspect.getsource(a)

"def a():\n    # comment\n    return 'a'\n"

In [None]:
#| export
cache_dir = here(".cache")

In [None]:
#| export
# inspired from https://gist.github.com/shantanuo/c6a376309d6bac6bd55bf77e3961b5fb
def cache_disk(base_file, rm_cache=False):
    "Decorator to cache function output to disk"
    base_file = Path(base_file)
    def decorator(original_func):
        
        f_hash = hashlib.md5(inspect.getsource(original_func).encode()).hexdigest()
        filename = base_file.parent / (base_file.stem + f_hash + ".pickle")
        
        if rm_cache: filename.unlink()
        
        try:
            cache = dill.load(open(filename, 'rb'))
        except (IOError, ValueError):
            cache = {}

        def save_data():
            dill.dump(cache, open(filename, "wb"))  

        def new_func(*args):
            if tuple(args) not in cache:
                cache[tuple(args)] = original_func(*args)
                save_data()
            return cache[args]

        return new_func

    return decorator

In [None]:
import time
from tempfile import tempdir

In [None]:
cp = Path(tempdir) / "test_cache"

In [None]:
@cache_disk(cp)
def slow_add(a,b):
    time.sleep(1)
    return a + b 

this time is the first time so not from the cache

In [None]:
%time slow_add(1,2)

CPU times: user 3 µs, sys: 1 µs, total: 4 µs
Wall time: 5.48 µs


3

now is much faster beacuse of the cache

In [None]:
%time slow_add(1,2)

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.01 µs


3

adding comments change the hash, so the function is still cached

In [None]:
@cache_disk(cp)
def slow_add(a,b):
    time.sleep(1)
    # this is a comment
    return a + b 

In [None]:
%time slow_add(1,2)

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 4.77 µs


3

In [None]:
%time slow_add(1,2)

CPU times: user 5 µs, sys: 1 µs, total: 6 µs
Wall time: 7.39 µs


3

In [None]:
#| export
import torch
import numpy as np

In [None]:
#| export
def reset_seed(seed=27):
    torch.manual_seed(seed)
    np.random.seed(seed)

In [None]:
#| export
def reset_seed(seed=27):
    torch.manual_seed(seed)
    np.random.seed(seed)

## Testing

In [None]:
#| export
from typing import Generator, Iterable
from functools import partial
from fastcore.test import test
from fastcore.basics import patch

In [None]:
#| exporti
def is_close(a,b,eps=1e-5):
    "Is `a` within `eps` of `b`"
    if hasattr(a, '__array__') or hasattr(b,'__array__'):
        a = torch.as_tensor(a)
        b = torch.as_tensor(b)
        return (abs(a-b)<eps).all()
    if isinstance(a, (Iterable,Generator)) or isinstance(b, (Iterable,Generator)):
        return all(is_close(a_, b_, eps) for a_,b_ in zip(a,b))
    return abs(a-b)<eps

In [None]:
#| export
def test_close(a,b,eps=1e-5):
    "`test` that `a` is within `eps` of `b`"
    test(a,b,partial(is_close,eps=eps),'close')

## Standard Scaler

make a standard scaler that can also inverse transfor standard deviations. see `Standardizer` for details of implementation

In [None]:
#| export
from collections import namedtuple
from fastcore.basics import patch
from sklearn.preprocessing import StandardScaler

In [None]:
reset_seed()
xx = np.random.random((4, 10))

In [None]:
s = StandardScaler().fit(xx)

In [None]:
s.transform(xx)

array([[ 0.07263978,  0.63279488, -0.9975139 ,  0.50899177,  0.15537652,
         1.45555506,  1.56629646, -1.60237369,  1.51674974,  1.29584745],
       [ 1.58579521,  0.83086419, -0.68281902,  0.51578245, -0.62395756,
        -1.19720248, -0.43000476,  1.1539719 , -0.74724819, -0.85525414],
       [-1.05809926, -1.69049694,  0.0895118 , -1.72684476, -1.08418417,
         0.32617669, -1.16657374,  0.2345773 ,  0.26525847,  0.64349108],
       [-0.60033573,  0.22683787,  1.59082112,  0.70207053,  1.55276521,
        -0.58452927,  0.03028204,  0.21382449, -1.03476002, -1.08408439]])

In [None]:
s.mean_

array([0.40358703, 0.6758362 , 0.77934606, 0.70748673, 0.34417949,
       0.62067044, 0.48500116, 0.54921643, 0.34604713, 0.3660338 ])

In [None]:
s.scale_

array([0.30471427, 0.21926148, 0.04405831, 0.31536161, 0.25229864,
       0.24649441, 0.26061043, 0.21187396, 0.26093989, 0.22927816])

In [None]:
#| export
@patch
def inverse_transform_std(self: StandardScaler, 
                         x_std # standard deviations
                        ):
    return x_std * self.scale_

## Info visualization

In [None]:
#| export
from torch import Tensor
from typing import Collection
import pandas as pd

from IPython.display import HTML
from IPython.display import display
from typing import Iterable

from fastcore.basics import *

In [None]:
#| export
def array2df(x: Tensor, # 2d tensor
             row_names: Collection[str]|None=None, # names for the row
             col_names: Collection[str]|None=None, # names for the columns
             row_var: str = '' # name of the first column (the one with row names). This should describe the values of `row_name`
            ):
    df = pd.DataFrame(x.detach().cpu().numpy(), columns=col_names)
    if row_names is not None: df.insert(0, row_var, row_names)
    return df

In [None]:
import numpy as np

In [None]:
a = np.random.rand(2,3,3)

In [None]:
display(HTML(f"<pre> {repr(a)} </pre>"))

In [None]:
a

array([[[0.96646567, 0.58332229, 0.09242191],
        [0.0136295 , 0.83693011, 0.9147879 ],
        [0.70458626, 0.3870066 , 0.7056939 ]],

       [[0.92331116, 0.28815289, 0.68401985],
        [0.5202925 , 0.87736578, 0.92388931],
        [0.48923016, 0.59621396, 0.26427542]]])

In [None]:
#| export
import inspect

In [None]:
#| export
# inspired from https://stackoverflow.com/questions/18425225/ 
def maybe_retrieve_callers_name(args):
    """Tries to retrieve the argument name in the call frame, if there are multiple matches name is ''"""
    names = []
    for arg in args:
        callers_local_vars = inspect.currentframe().f_back.f_back.f_locals.items()
        var_names = [var_name for var_name, var_val in callers_local_vars if var_val is arg and not var_name.startswith("_")]
        names.append(var_names[0] if len(var_names)==1 else '')
    return names

def retrieve_names(*args):
    """Tries to retrieve the argument name in the call frame, if there are multiple matches name is ''"""
    names = []
    for arg in args:
        callers_local_vars = inspect.currentframe().f_back.f_locals.items()
        var_names = [var_name for var_name, var_val in callers_local_vars if var_val is arg]
        names.append(var_names)
    return names

In [None]:
x, y, z = 1, 2, 3

def func(*args):
    return maybe_retrieve_callers_name(args)

print(func(x,y))

['x', 'y']


In [None]:
retrieve_names(a,a)

[['_', 'a', '_32'], ['_', 'a', '_32']]

In [None]:
#| export
def show_as_row(*os: Iterable, names: Iterable[str]=None, **kwargs):
    """Shows a interable of tensors on a row"""
    if names is None: names = maybe_retrieve_callers_name(os)
    kwargs.update(dict(zip(names, os)))
    columns = [f"<div><p style='font-size: 1.2rem;'>{title}</p> <pre>{repr(o)}</pre> </div>" for title, o in kwargs.items()]
    out = f"<div style=\"display: flex; column-gap: 20px; flex-wrap: wrap;\" class='table table-striped table-sm'> {''.join(columns)}</div>"
    display(HTML(out))

In [None]:
show_as_row(a,a)

In [None]:
func(a,a)

['a', 'a']

In [None]:
show_as_row(a, names='b')

In [None]:
show_as_row(c=a)

In [None]:
#| export
def _style_df(df):
    """style dataframe for better printing """
    return df.style.hide(axis="index").format(precision = 4)

def row_dfs(dfs: dict[str, pd.DataFrame], title="", styler=_style_df):
    out = []
    for df_title, df in dfs.items():
        df_html = _style_df(df).to_html()
        out.append(f"<div> <p style='font-size: 1.3rem;'>{df_title}</p> {df_html} </div>")
    out = f"<div style=\"display: flex; column-gap: 20px; flex-wrap: wrap;\" class='table table-striped table-sm'> {''.join(out)}</div>"
    return f"<p style='font-size: 1.5rem; font-decoration: bold'>{title}<p>" + "".join(out)
def display_as_row(dfs: dict[str, pd.DataFrame], title="", styler=_style_df):
    """display multiple dataframes in the same row"""
    display(HTML(row_dfs(dfs, title, styler)))

In [None]:
a = HTML(pd.DataFrame([1,2]).to_html(notebook=True))

In [None]:
display_as_row({"test": pd.DataFrame([1,2])}, "hello")

In [None]:
display_as_row({f"test{i}": pd.DataFrame([1,2]) for i in range(10)})

## Tensor shapes

In [None]:
#| export
def array1d(X):
    """Returns at least 1-d array with data from X"""
    return torch.atleast_1d(torch.as_tensor(X))

def array2d(X):
    """Returns at least 2-d array with data from X"""
    return torch.atleast_2d(torch.as_tensor(X))


In [None]:
#| export
def determine_dimensionality(variables, default):
    """Derive the dimensionality of the state space

    Parameters
    ----------
    variables : list of ({None, array}, conversion function, index)
        variables, functions to convert them to arrays, and indices in those
        arrays to derive dimensionality from.
        
    Returns
    -------
    dim : int
        dimensionality of state space as derived from variables or default.
    """
    # gather possible values based on the variables
    candidates = []
    for (v, converter, idx) in variables:
        if v is not None:
            v = converter(v)
            candidates.append(v.shape[idx])
    
     # also use the manually specified default
    if default is not None:
        candidates.append(default)
    
    # ensure consistency of all derived values
    if len(candidates) == 0:
        return 1
    else:
        if not torch.all(torch.tensor(candidates) == candidates[0]):
            raise ValueError(
                "The shape of all " +
                "parameters is not consistent.  " +
                "Please re-check their values."
            )
        return candidates[0]


def last_dims(X: Tensor, t: int, ndims: int=2):
    """Extract the final dimensions of `X`

    Extract the final `ndim` dimensions at index `t` if `X` has >= `ndim` + 1
    dimensions, otherwise return `X`.

    Parameters
    ----------
    X : Tensor with at least dimension `ndims`
    t : int
        index to use for the `ndims` + 1th dimension
    ndims : int, optional
        number of dimensions in the array desired

    Returns
    -------
    Y : array with dimension `ndims`
        the final `ndims` dimensions indexed by `t`
    """
    if len(X.shape) == ndims + 1:
        return X[t]
    elif len(X.shape) == ndims:
        return X
    else:
        raise ValueError(("X only has %d dimensions when %d" +
                " or more are required") % (len(X.shape), ndims))

## Export

In [None]:
#| hide
from nbdev import nbdev_export
nbdev_export()