# utils

> Helper functions

In [None]:
# | default_exp utils

In [None]:
# | hide
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
# | hide
from fastcore.test import *
from nbdev.showdoc import *

In [None]:
# | export


from datetime import *
import os
import pprint
import random
import re
import sys
from typing import *

from fastcore.basics import *
from fastcore.basics import patch
from hydra import *
import numpy as np
from omegaconf import *
import torch

In [None]:
# | export


def dt_from_ts(ts: int) -> np.datetime64:
    epoch_seconds = ts / 1000000
    dt = np.datetime64(datetime.fromtimestamp(epoch_seconds, tz=timezone.utc))

    return dt

In [None]:
# | export


def ts_from_dt(dt: datetime) -> int:
    return int(dt.timestamp())

In [None]:
# | export


def dict_depth(d: Any) -> int:
    if isinstance(d, dict):
        return 1 + (max(map(dict_depth, d.values())) if d else 0)

    return 0

In [None]:
# | export


def dict_flatten(d: dict, root: str = "") -> dict:
    out = {}
    for k, v in d.items():
        key = root + k
        if isinstance(v, dict):
            out.update(dict_flatten(v, key + "."))
        else:
            out[key] = v

    return out

In [None]:
# | export


def walk_folders(
    path_input: str, depth: int = 1, exclude: Union[List[str], None] = None
) -> List[str]:
    abs_path = os.path.abspath(path_input)
    if not os.path.exists(abs_path):
        raise FileNotFoundError(f"Path {abs_path} does not exist")

    if exclude is None:
        exclude = []

    folders = []

    for root, dirs, _ in os.walk(path_input, topdown=True):
        if root.count(os.sep) - path_input.count(os.sep) > depth:
            del dirs[:]
        else:
            if root.count(os.sep) - path_input.count(os.sep) == depth:
                folders.append(root)

            for ex in exclude:
                dirs[:] = [d for d in dirs if ex not in d]

    return folders

In [None]:
# | export


def get_config(
    config_path: str, config_name: str, target_path: str, overrides: List[str]
) -> DictConfig:
    sys.path.append(os.path.abspath(target_path))

    with initialize(version_base=None, config_path=config_path):
        cfg = compose(config_name=config_name, overrides=overrides)

    pp = pprint.PrettyPrinter(indent=4)
    pp.pprint(OmegaConf.to_container(cfg))

    return cfg

In [None]:
# | export


def get_resample_code(interval: str, frequency: int) -> str:
    codes_dict = {
        "day": "D",
        "hour": "H",
        "minute": "T",
        "second": "S",
        "millisecond": "L",
        "microseconds": "U",
        "nanoseconds": "N",
    }

    codes = {key: value for key, value in codes_dict.items() if key in interval}
    if not codes:
        raise ValueError(f"Invalid interval. Must be one of {list(codes_dict.keys())}.")

    return f"{frequency}{codes[interval]}"

In [None]:
# | export


def set_seed(seed: int) -> None:
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

In [None]:
# | export


def list_regex(
    lst: List[str],
    regex_match: str,
    regex_sort: Union[str, None] = None,
) -> List[str]:
    if regex_match is not None:
        lst = [x for x in lst if re.match(regex_match, x)]

    if regex_sort is not None:
        lst_key = [x for x in lst if re.search(regex_sort, x)]
        if len(lst) == len(lst_key):
            lst = sorted(lst, key=lambda x: int(re.search(regex_sort, x).group(0)))
        else:
            raise ValueError("Not all values in list match sorting regex.")

    return lst

In [None]:
# | hide
import nbdev

nbdev.nbdev_export()