In [None]:
#|default_exp utils.misc

# Miscellaneous

> Miscellaneous functions.

In [None]:
#|export
from fastcore.all import *
import numbers
import numpy as np
import polvo as pv

from PIL import Image

  from tqdm.autonotebook import tqdm


In [None]:
#|export
def flatten(x):
    "From https://stackoverflow.com/a/2158532/6772672"
    if isinstance(x, Iterable) and not isinstance(x, (str, bytes)): 
        return [a for i in x for a in flatten(i)]
    else: return [x]

In [None]:
test_eq(flatten([[2, 3], ['a', ['c', 4]]]), [2, 3, 'a', 'c', 4])

In [None]:
#|export
def sort_quadrilateral(points):
    ysorted_idxs = np.argsort(points[:, 1])
    top_idxs, bottom_idxs = ysorted_idxs[:2], ysorted_idxs[2:]
    xsorted_top_idxs = top_idxs[np.argsort(points[top_idxs][:, 0])]
    xsorted_bottom_idxs = bottom_idxs[np.argsort(points[bottom_idxs][:, 0])]
    # Combine the sorted points in the order: [top-left, top-right, bottom-left, bottom-right]
    sorted_points = np.vstack((points[xsorted_top_idxs], points[xsorted_bottom_idxs]))
    sorted_idxs = np.concatenate((xsorted_top_idxs, xsorted_bottom_idxs))
    return sorted_points, sorted_idxs

In [None]:
test_points = np.array([(1.8, 2.1), (2, 0.9), (1.1, 1.95), (1, 1)])
expected_points = np.array([(1, 1), (2, 0.9), (1.1, 1.95), (1.8, 2.1)])
expected_idxs = np.array([3, 1, 2, 0])
test_eq(sort_quadrilateral(test_points), (expected_points, expected_idxs))

In [None]:
test_points = np.array([[1368.4, 1856.], [270.5, 1815.3], [1436.6, 77.7], [340.2, 27.5]])
expected_points = np.array([[340.2, 27.5], [1436.6, 77.7], [270.5, 1815.3], [1368.4, 1856.]])
expected_idxs = np.array([3, 2, 1, 0])
test_eq(sort_quadrilateral(test_points), (expected_points, expected_idxs))

In [None]:
#|export
def kwargs_grid(**kwargs):
    "Returns a generator with all combinations of kwargs"
    return (dict(zip(kwargs.keys(), v)) for v in itertools.product(*kwargs.values()))

In [None]:
list(kwargs_grid(size=[(224, 224), (512, 512)], alpha=[0.3, 0.7, 0.9], apply_tfms=[False, True]))

[{'size': (224, 224), 'alpha': 0.3, 'apply_tfms': False},
 {'size': (224, 224), 'alpha': 0.3, 'apply_tfms': True},
 {'size': (224, 224), 'alpha': 0.7, 'apply_tfms': False},
 {'size': (224, 224), 'alpha': 0.7, 'apply_tfms': True},
 {'size': (224, 224), 'alpha': 0.9, 'apply_tfms': False},
 {'size': (224, 224), 'alpha': 0.9, 'apply_tfms': True},
 {'size': (512, 512), 'alpha': 0.3, 'apply_tfms': False},
 {'size': (512, 512), 'alpha': 0.3, 'apply_tfms': True},
 {'size': (512, 512), 'alpha': 0.7, 'apply_tfms': False},
 {'size': (512, 512), 'alpha': 0.7, 'apply_tfms': True},
 {'size': (512, 512), 'alpha': 0.9, 'apply_tfms': False},
 {'size': (512, 512), 'alpha': 0.9, 'apply_tfms': True}]

In [None]:
#|export
@functools.wraps(zip)
def safe_zip(*args, **kwargs):
    if len(set(map(len, args))) not in (0, 1):
        raise ValueError(f'All elements should have the same size, but got {[len(x) for x in args]}')
    return zip(*args, **kwargs)

In [None]:
test_eq(list(safe_zip([1, 2], [3, 4], [5, 6])), list(zip([1, 2], [3, 4], [5, 6])))

In [None]:
test_fail(lambda: safe_zip([1, 2], [3, 4], [5]), contains='same size')

In [None]:
#|export
class skip_error:
    def __init__(self, fn, log=True):
        'Returns the error instead of raising it.'
        store_attr()
        
    def __call__(self, *args, **kwargs):
        try: return self.fn(*args, **kwargs)
        except Exception as e: 
            if self.log: print(f'{e} {(args, kwargs)}')
            return e

In [None]:
skip_error(lambda: 'test'.get(2))()

'str' object has no attribute 'get' ((), {})


AttributeError("'str' object has no attribute 'get'")

In [None]:
#|export
class Cache:
    def __init__(self, func, save_dir=None, debug=False):
        self.func = func
        self.save_dir = Path(save_dir or Path.home()/'.cache/polvo')
        self.debug = debug
        
    def get(self, name, use_cache=True, **func_kwargs):
        path = self.save_dir/name
        try: 
            if use_cache: 
                o = pv.open_pickle(path)
                if self.debug: print('Using cache.')
                return o
        except FileNotFoundError: 
            pass
        
        if self.debug: print('Not using cache.')
        x = self.func(**func_kwargs)
        pv.mkdir(path.parent, exist_ok=True)
        pv.save_pickle(x, path)
        return x
    
    def list_cached(self, **kwargs):
        return pv.get_files(self.save_dir, **kwargs)

In [None]:
cache = Cache(lambda x: x+1, debug=True)
cache.get('test_cache.pkl', x=4, use_cache=True)

Using cache.


5

In [None]:
cache.list_cached()

(#6) [Path('/home/lgvaz/.cache/polvo/test_cache444.pkl'),Path('/home/lgvaz/.cache/polvo/OpenAIEncoder-the_loop_manual_de_regras_em_portugues_211955.pdf-chunks.pkl'),Path('/home/lgvaz/.cache/polvo/test_cache.pkl'),Path('/home/lgvaz/.cache/polvo/test_cache2.pkl'),Path('/home/lgvaz/.cache/polvo/test_cache'),Path('/home/lgvaz/.cache/polvo/OpenAIEncoder/the_loop_manual_de_regras_em_portugues_211955.pdf-embeds.pkl')]

In [None]:
#|export
def transparent_cmap():
    from matplotlib.colors import LinearSegmentedColormap
    colors = [(0, 0, 0, 0) for _ in range(256)]
    return LinearSegmentedColormap.from_list('transparent_cmap', colors, N=256)

In [None]:
# FIND CORRECT PLACE TO PLACE THIS
def wandb_upload(project:str, artifact_name:str, *path:str, type='dataset'):
    "Uploads files or dir to wandb"
    import wandb
    run = wandb.init(project=project, job_type="data-upload")
    artifact = wandb.Artifact(artifact_name, type=type)
    
    def _add_file(p): artifact.add_file(p, p)
    for p in path:
        if Path(p).is_file(): _add_file(p)
        else:
            for f in pv.get_files(p): _add_file(f)

    run.log_artifact(artifact)

In [None]:
class ParameterIterator:
    def __call__(self, f):
        sig = inspect.signature(f)
        @functools.wraps(f)
        def wrapper(*args, **kwargs):
            bound = sig.bind(*args, **kwargs)
            for k, v in bound.arguments.items():
                self.apply(k, v)
            return f(*args, **kwargs)
        return wrapper

In [None]:
def _save_image(image, save_dir): return pv.save_image(image, save_dir)

class save_params(ParameterIterator):
    _save_fns = {
        Image: pv.save_image,
        np.ndarray: pv.save_image,
        numbers.Number: lambda s: pv.save_txt(s+'\n', append=True),
        str: lambda s: pv.save_txt(s+'\n', append=True),
    }
    
    def __init__(
        self,
        save_dir,
        save_fns=None, # Dictionary of {<type>: save_fn}. `save_fn` first parameter should be the object, and second the path.
    ):
        "Save all parameters of decorated function"
        self.save_dir = pv.mkdir(save_dir, exist_ok=True)
        self.save_fns = save_fns or self._save_fns.copy()
        
    def apply(self, k, v):
        save_fns[type(k)](v, self.save_dir)

In [None]:
# @save_params('foo_params')
# def foo(a, b, c=2, d='test'):
#     return

In [None]:
#|hide
from nbdev import nbdev_export
nbdev_export()