# 0.2 Utils

In general, a package is made that contains the useful functions and those that are specific to the problem to be solved, however, in the case of red-hot development it can be useful to have all the functions available in each notebook that is created, but without extra effort required to create a package. If, at some point, the goal of the project is to make a pipeline or a program that helps to perform a certain type of analysis, considering creating a formal package would be the most appropriate. Therefore, useful functions will be created here that will eventually be used by all the notebooks that are created.

## Imports

In [1]:
import collections
import functools
import itertools
import matplotlib.pyplot as plt
import networkx as nx
import pandas as pd
import pathlib

from pyprojroot import here
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    Set,
    Union,
)

## Function declarations

### Paths

For this project, the `here` function is used to establish the root of our project and maintain amore orderly working environment.  
> **Note:** `here` works like `file.path` on R or `pathlib.Path` on python, but where the path root is implicitly set to **"the path to the top-level of my current project".** It looks at working directory, checks a criterion and, if not satisfied, moves up to parent directory and checks agains. Lather, rinse, repeat. See [here](https://github.com/jennybc/here_here) for **R** or [here](https://pypi.org/project/pyprojroot/) for **Python**.


In [2]:
def auto_directories(prefix: str="_") -> dict:
    """Automatically identify directories in current project.
    
    The 'here()' function uses a reasonable heuristics to find your project's files,
    based on the current working directory at the time when the package is loaded.
    
    Starting from the identification of a project, it proceeds to identify all the
    non-hidden subdirectories.
    
    Args:
        prefix: A string to use as prefix for the keys on dictionary.
    
    Returns:
        A dict mapping keys to the corresponding paths of the folders found in the project.
        
        A momentary disadvantage is that if there are subdirectories with the same names
        it will only identify one of them. The function can be updated to deal with it.
    """
    
    directories = {
        '_' + path.stem: path
        for path in here().rglob('**')
        if path.is_dir() and not path.anchor + '.' in str(path)
    }
    
    return directories

In [3]:
def make_dir_function(dirname: Union[str, Iterable[str]]) -> Callable:
    """Generates a function that converts a string or iterable of strings into a path relative to the project directory.
    
    Args:
        dirname: Name of the subdirectories to extend the path of the main project.
            If an iterable of strings is passed as an argument, then it is collapsed
            to a single string with anchors dependent on the operating system.
    
    Returns:
        Function that returns the path relative to a directory that can receive n number of arguments for expansion. 
    """
    
    def dir_path(*args) -> pathlib.PosixPath:
        
        if type(dirname) == str:
            return here().joinpath(dirname, *args)
        else:
            return here().joinpath(*dirname, *args)
        
    return dir_path

### Plots

Before you start graphing the data to explore how it relates, it's good to think about style consistencies for a unified job. Therefore, making a section to configure the default values for the charts is appreciable. See [here](https://matplotlib.org/3.3.1/tutorials/introductory/customizing.html) for more information about the parameters that can be modified.

In [4]:
def set_plot_defaults(defaults: dict = None, style :str= None) -> None:
    """Updates the default plotting parameters in matplotlib.
    
    Args:
        defaults: Dictionary where the keys map to a matplotlib backend parameter (see `plt.rcParams` for possibilities).
        style: The style package adds support for easy-to-switch plotting "styles" with the same parameters as a matplotlib
        `rc file` (which is read at startup to configure Matplotlib).
    
    Returns:
        None
    """
    
    if style is None:
        plt.style.use('seaborn-whitegrid')
    else:
        plt.style.use(style)
        
    if defaults is None:
        defaults = {
            'legend.loc': 'best',
            'font.family': ['sans-serif'],
            'figure.figsize': [10.0, 8.0],
            'legend.shadow': True,
            'axes.labelsize': 14,
        }

    for key, value in defaults.items():
        plt.rcParams[key] = value
        
    return None

### Text

In [5]:
def text_split_to_dict(text: str, sep: str= ',', to_set: bool=True) -> collections.defaultdict(Union[set, list]):
    """Separates a text in a dictionary of sets or lists.
    
    Args:
        text: Text string that will be separated in a dictionary.
        sep: String to split text.
        to_set: Dictionary values should be stored in a list or a set?
    
    Returns:
        A dictionary with keys specified by position in the text and values assigned as a list or a set.
    """
    
    kind = set if to_set else list
    
    out_dict = collections.defaultdict(kind)
        
    for i, text_split in enumerate(text.split(sep), 1):
        
        out_dict[i].add(text_split) if to_set else out_dict[i].append(text_split)
        
    return out_dict

### Combinations

In [6]:
def product_entries(A: Iterable, B: Iterable=None, labels: Iterable[Union[str, str]]=None) -> pd.DataFrame:
    """Create a dataframe with the cartesian product of the supplied values.
    
    Args:
        A: Values to use.
        B: Optional; If specified, the values of B are used in conjunction with those of A.
        labels: Optional; Names for column A and B. If not specified then the output columns
            will be named A and B.
        
    Returns:
        Dataframe with the cartesian product of the supplied values.
    """
    
    entries = itertools.product(A, A) if B is None else itertools.product(A, B)
    
    if labels is None:
        labels = ["A", "B"]
    
    df = (
        pd.DataFrame(entries).
        rename(columns={0: labels[0], 1: labels[1]}).
        sort_values(by=labels, ignore_index=True)
    )
    
    return df

### Numeric

#### Jaccard calculations

In [7]:
def jaccard(
    A: set,
    B: set,
    error_return: float = 0.0,
    simmetric: bool = True,
    modified=False
) -> float:
    """Returns the jaccard index of sets A and B.
    
    Args:
        A: Set A.
        B: Set B.
        error_return: Optional; If division by zero occurs, use the specified value as a replacement.
        simmetric: Optional; If it is true, the division is on the length of the union of both sets,
            otherwise it is done on the length of the set A.
        modified: Optional; If true, divide over the smallest set and ignore the `simmetric` parameter.
        
    Returns:
        Jaccar index of sets A and B.
    """
    
    numerator = float(len(A.intersection(B)))
    
    if modified:
        denominator = min([len(A), len(B)])
    else:
        denominator = len(A.union(B)) if simmetric else len(A)

    try:
        return numerator / denominator
    except ZeroDivisionError:
        return error_return

In [8]:
def nested_jaccard(
    A: Dict[Any, set],
    B: Dict[Any, set],
    not_key_return: Any=-1,
    global_label: str=None,
    **kwargs
) -> Dict[Any, float]:
    """Returns the jaccard index of sets contained in A and B.
    
    Args:
        A: Set A.
        B: Set B.
        not_key_return: Default values in case the key is not shared in both dictionaries.
        kwargs: Arguments passed to the `jaccard` function.
        
    Returns:
        Dictionary where the keys correspond to the union of keys of dictionaries A and B
        and the values are the jaccard index for each entry.
    """
    
    score_dict = {}
    
    if global_label is not None:
        
        set_A, set_B = [functools.reduce(set.union, current_dict.values()) for current_dict in [A, B]]
        score_dict[global_label] = jaccard(set_A, set_B, **kwargs)
    
    for key in set([*A.keys(), *B.keys()]):
        
        set_A, set_B = A.get(key, None), B.get(key, None)
        
        if type(set_A) is not set or type(set_B) is not set:
            score_dict[key] = not_key_return
        else:
            score_dict[key] = jaccard(set_A, set_B, **kwargs)

    return score_dict    

#### Jaccard applied with Dataframes

In [9]:
def jaccard_df(
    df: pd.DataFrame,
    set_column: str,
    matrix: bool=False,
    B=None,
    labels: Iterable[Union[str, str]]=None,
    **kwargs
) -> pd.DataFrame:
    """Returns the jacdard index of an indexed dataframe with a column with the sets to use.
    
    Args:
        df: Indexed dataframe with a column of sets to use.
        set_column: Name of the column with the sets to use for the jaccard index.
        matrix: Optional; Should the result be presented in a long or tidy format?
        B: Use the Cartesian product of the dataframe index in conjunction with the supplied values.
        labels: Optional; Names for column A and B. If not specified then the output columns
            will be named A and B.
        kwargs: Extra arguments to pass to the `jaccard` function.
        
    Returns:
        Dataframe with the cartesian product of the supplied values and their respective jaccard index values.
    """
    
    if labels is None:
        labels = ["A", "B"]
    
    df_score = (
        product_entries(A=df.index, B=B, labels=labels).
        assign(
            A_set=lambda x: df.loc[x[labels[0]], set_column].values,
            B_set=lambda x: df.loc[x[labels[1]], set_column].values,
            score=lambda x: x.apply(lambda row: jaccard(row["A_set"], row["B_set"], **kwargs), axis=1)
        ).
        drop(columns=["A_set", "B_set"])
    )
    
    return df_score if not matrix else df_score.pivot_table(index=[labels[0]], columns=[labels[1]])["score"]

In [10]:
def nested_jaccard_df(
    df: pd.DataFrame,
    set_column: str,
    matrix: bool=False,
    B=None,
    labels: Iterable[Union[str, str]]=None,
    **kwargs
) -> pd.DataFrame:
    """Returns the jaccard index of an indexed dataframe with a column with the sets to use.
    
    Args:
        df: Indexed dataframe with a column of sets to use.
        set_column: Name of the column with the sets to use for the jaccard index.
        matrix: Optional; Should the result be presented in a long or tidy format?
        B: Use the Cartesian product of the dataframe index in conjunction with the supplied values.
        labels: Optional; Names for column A and B. If not specified then the output columns
            will be named A and B.
        kwargs: Extra arguments to pass to the `jaccard` function.
        
    Returns:
        Dataframe with the cartesian product of the supplied values and their respective jaccard index values.
    """
    
    if labels is None:
        labels = ["A", "B"]
    
    df_score = (
        product_entries(A=df.index, B=B, labels=labels).
        assign(
            A_set=lambda x: df.loc[x[labels[0]], set_column].values,
            B_set=lambda x: df.loc[x[labels[1]], set_column].values,
            tmp_scores=lambda x: x.apply(lambda row: nested_jaccard(row["A_set"], row["B_set"], **kwargs), axis=1)
        ).
        pipe(lambda df: pd.concat([df, pd.json_normalize(df.tmp_scores)], axis=1)).
        drop(columns=["A_set", "B_set", "tmp_scores"])
        
    )
    
    if "not_key_return" in kwargs:
        df_score.fillna(value=kwargs["not_key_return"], inplace=True)
    
    return df_score if not matrix else df_score.pivot_table(index=[labels[0]], columns=[labels[1]])

### Graphs

In [11]:
def reduce_nodes(
    nodes: Union[list, set],
    G: nx.MultiDiGraph,
    inmediate: bool=False,
    use_edges: str="in"
) -> set:
    """Keep non-redundantnodes in one direction (i.e in, out) presented in the acyclic directed graph.
    
    Args:
        nodes: Nodes to reduce.
        G: Graph to search.
        inmediate: Optional; If true, it uses only the nodes with direct connection to perform the filtering,
            otherwise it uses all the reachable nodes.
        use_edges: Optional; Filtering direction. If the value is `in`, it uses all the ancestors of the nodes
            and preserves the non-reducing child nodes, otherwise it uses all the descendant nodes, preserving
            all the non-redudant parent nodes.
        
    Returns:
        Set of non-redudant nodes.
    """

    if use_edges not in ("in", "out"):
        raise ValueError("use_edges must be in ('in', 'out').")    

    nodes = set(nodes)

    if inmediate:
        
        edges, use = (G.in_edges, 0) if use_edges == "in" else (G.out_edges, 1)
        related_nodes = set(edge[use] for edge in edges(nodes))

    else:

        connection_function = nx.ancestors if use_edges == "in" else nx.descendants
        related_nodes = functools.reduce(set.union, [connection_function(G=G, source=node) for node in nodes])
    
    return nodes.difference(nodes.intersection(related_nodes))

### Dictionaries

In [12]:
def merge_iter_dicts(iter_dicts: Iterable[Dict[Any, Set[Any]]], to_set: bool=True) -> Dict[Any, Set[Any]]:
    """Merge dictionaries and keep values of common keys in a set.
    """
    
    def merge_dictionaries(dict_1, dict_2):
        """"""
        nonlocal to_set
        
        kind = set if to_set else list
        merged_dictonary = collections.defaultdict(kind)
        for key, value in itertools.chain(dict_1.items(), dict_2.items()):
            value = value if isinstance(value, set) else [value]
            merged_dictonary[key].update(value) if to_set else merged_dictonary.extend(value)
        
        return merged_dictonary
    
    return functools.reduce(merge_dictionaries, iter_dicts)