In [None]:
import time


## General Utility 

In [None]:
import numpy as np
from collections import Counter
import math


# -----------------------------------------------
#   ARRAY MANIPULATION
# -----------------------------------------------
def is_unique(arr,rate=False):
    """
    Return a boolean value for whether all elements in the array is unique, or as a rate
    """
    if rate:
        return len(np.unique(arr))/len(arr)
    else:
        return len(np.unique(arr)) == len(arr)

def overlap(*arrs):
    """
    Return the set of overlapped elements, i.e. the intersection, in the form of an array
    """
    overlap_set = set(arrs[0])
    for arr in arrs[1:]:
        overlap_set = overlap_set.intersection(set(arr))
    return np.array(list(overlap_set))

def union(*arrs):
    """
    Return the union set of all arrays, in the form of an array
    """
    union_set = set()
    for arr in arrs:
        union_set = union_set.union(set(arr))
    return np.array(list(union_set))

def difference(*arrs,how="outer"):
    """
    Return the difference elements in the form of an array
    Parameters:
        - how = `"outer"`: difference between the union set with the overlap set, i.e., elements that do not appear in all sets
        - how = `"left"`: differece between the left (or first) set with every other set
        - how = `"right"`: differece between the right (or last) set with every other set

    """
    difference_sets = {
        "outer": set(union(*arrs)) - set(overlap(*arrs)),
        "left" : set(arrs[0]) - set(overlap(*arrs)),
        "right": set(arrs[-1]) - set(overlap(*arrs))
    }
    return np.array(list(difference_sets[how]))

def label_counts(arr,labels=None):
    """ 
    @Description: Return a dict of label_counts and labels of a list-like object
    @Parameters:
        - arr: list-like object (LLO)
        - labels: labels to be included in the counting, even those not in the LLO. Order-sensitive
    """
    arr_ = np.array(arr).ravel() # Omitted dtype=object
    labels = np.unique(arr_) if isNone(labels) else np.array(labels)
    return {'counts':np.array([Counter(arr_)[lab] for lab in labels]),'labels':labels,'num_classes':len(labels)}

# -----------------------------------------------
#   DICT MANIPULATION
# -----------------------------------------------

def dict_subset(dict_obj, keys):
    """
    Return a subset of the dict object based on the keys
    """
    return {key:dict_obj[key] for key in list(keys)}

# -----------------------------------------------
#   LOGIC FUNCTIONS
# -----------------------------------------------
def isNone(var):
    """
    @Description: Check if a value is None. The typical boolean expression `var == None` may give rise to error when var is a list/array
    """
    return isinstance(var,type(None))

def default_value(var,default_val,default_trigger=None):
    """
    @Description: Set a value if variable is default value or another default trigger
    """
    if isNone(default_trigger):
        return default_val if isNone(var) else var
    else:
        return default_val if (type(var) != type(default_trigger)) or (var == default_trigger) else var

def converse(var,choices):
    assert len(choices)==2, "The converse of more than 2 choices is ambiguous"
    return choices[0] if var == choices[1] else choices[1]


# -----------------------------------------------
#   MATH FUNCTIONS
# -----------------------------------------------
def clamp(val,lower=0,upper=1,default_nan=0):
    """
    @Description: Clamp a numerical value between lower and upper. 
    """
    return max(lower,min(val,upper))


# -----------------------------------------------
#   STRING FORMATTING
# -----------------------------------------------

def fmt_time(seconds):
    """
    Convert time in seconds to a string representation of the format hh:mm:ss
    """
    seconds = int(seconds)
    hours = math.floor(seconds / 3600)
    minutes = math.floor((seconds % 3600) / 60)
    odd_secs = seconds % 60
    if hours < 10: 
        hh = f'0{hours}' 
    else: 
        hh = f'{hours}'
    if minutes < 10 : 
        mm = f'0{minutes}'
    else : 
        mm = f'{minutes}'
    if odd_secs < 10 : 
        ss = f'0{int(odd_secs)}'
    else: 
        ss = f'{int(odd_secs)}'
    return f'{hh}:{mm}:{ss}'


## Helper Classes

In [None]:
from .utils import *
import time

# -----------------------------------------------
#   DICT LIKE DUMMY OBJECT
# -----------------------------------------------
class MyObj():
    """
    @Description: Object that serves as a namespace for related attributes within other classes
    @Example:
        cpa = CPA1()\n
        cpa.var1 = MyObj()\n
        cpa.var1.inputs = [1,2,3]\n
        print(cpa.var1.inputs)\n
        >> [1, 2, 3]\n
        cpa.var2 = MyObj({'a':1,'b':2})\n
        print(cpa.var2.a,cpa.var2.b)\n
        >> 1 2

    """
    def __init__(self,__name__="My Object",**kwargs):
        super(MyObj,self)
        self.__name__ = __name__
        self.dict_ = {}
        self.update(**kwargs)

    def update(self,**kwargs):
        if kwargs:
            self.dict_.update(kwargs)
            for key,value in kwargs.items():
                self.__setattr__(key,value)
    
    def dict(self,keys=None):
        """
        Return the dictionary for all or a subset of attributes
        """
        if isNone(keys):
            return self.dict_
        else:
            return dict_subset(self.dict_,keys)

    def __str__(self):
        return f'{self.__name__} {self.dict_}'


# -----------------------------------------------
#   TIMER FOR PROCESSES/TASKS
# -----------------------------------------------

class ProcessTimer:
    def __init__(self) -> None:
        self.start_ = {}
        self.prev_ = {}
        self.curr_ = {}
        self.NEW_ID = 0;

    def start(self,job_id=None):
        job_id = self.NEW_ID if job_id is None else job_id
        self.start_[job_id] = time.time()
        self.curr_[job_id] = self.start_[job_id]
        self.prev_[job_id] = -1
        self.NEW_ID += 1

    def record(self,job_id=0):
        self.prev_[job_id] = self.curr_[job_id]
        self.curr_[job_id] = time.time()

    def execute(self,func,job_id=-1,**func_args):
        """
        Record the time for executing a function.

        Return 
        ---------
        Return the time of execution followed by the function followed by the return value(s)
        """
        self.start(job_id)
        return_val = func(**func_args)
        self.record(job_id)
        if isNone(return_val):
            return self.time_elapsed(job_id)
        else:
            return self.time_elapsed(job_id),return_val

    def step_elapsed(self,job_id=0):
        return -1 if (job_id not in self.prev_.keys() or job_id not in self.curr_.keys()) else self.curr_[job_id] - self.prev_[job_id]

    def time_elapsed(self,job_id=0):
        return -1 if (job_id not in self.start_.keys() or job_id not in self.curr_.keys()) else self.curr_[job_id] - self.start_[job_id]
        




## Pandas Extension

In [None]:
from typing import Iterable
import numpy as np
import pandas as pd
from .utils import *

#----------------------------------------
#   EXTRACT DATA FRAME INFORMATION
#----------------------------------------

def summary(df:pd.DataFrame|pd.Series) -> pd.DataFrame:
    """
    Summary table for each columns in the dataframe
    """
    df_sum = pd.DataFrame(index=df.columns,columns=["dtypes","length","unique","samples","mode","range","mean","std","fill"])
    df_sum.index.name = "columns"
    for c in df.columns:
        df_c_notna = df[c][df[c].notna()]
        df_sum.loc[c,"dtypes"] = df[c].dtypes
        df_sum.loc[c,"samples"] = (list(df[c].value_counts(ascending=False).index.values[:5]))
        df_sum.loc[c,"length"] = len(df[c])
        df_sum.loc[c,"unique"] = len(df[c].unique())
        df_sum.loc[c,"mode"] = df_c_notna.mode()[0]
        df_sum.loc[c,"fill"] = np.round(len(df_c_notna)/len(df[c]),2)
        if isinstance(df[c].dtype,(type(np.dtype("float64")),type(np.dtype("int64")))):
            df_sum.loc[c,"range"] = ([df_c_notna.min(),df_c_notna.max()])
            df_sum.loc[c,"mean"] = df_c_notna.mean().round(2)
            df_sum.loc[c,"std"] = df_c_notna.std().round(2)
    return df_sum

def compare(df1 : pd.DataFrame|pd.Series, df2: pd.DataFrame|pd.Series,numeric: bool=False) -> pd.DataFrame:
    """
    Compare the 2 DataFrames along the same indexes and columns. 
        - For numeric columns, return the difference. 
        - For object columns, return whether the values match
    """
    overlapped_index = overlap(df1.index,df2.index)
    overlapped_columns = overlap(df1.columns,df2.columns)
    index = df1.index if set(df1.index) == set(overlapped_index) else overlapped_index
    columns = df1.columns if set(df1.columns) == set(overlapped_columns) else overlapped_columns
    df1_comp = df1.loc[index,columns]
    df2_comp = df2.loc[index,columns]
    df_comp = pd.DataFrame(index=index,columns=columns)

    if numeric:
        numeric_cols = []
        for c in columns:
            if is_np_numeric(df1[c]) and is_np_numeric(df2[c]): 
                numeric_cols.append(c)
                df_comp.loc[index,c] = (df1_comp.loc[index,c] - df2_comp.loc[index,c])
        return df_comp.loc[index,numeric_cols]

    else:
        for c in columns:
            df_comp.loc[index,c] = (df1_comp.loc[index,c] == df2_comp.loc[index,c]) | ((df1_comp.loc[index,c] != df2_comp.loc[index,c]) 
                                                                                        & (df1_comp.loc[index,c].isna() == df2_comp.loc[index,c].isna())
                                                                                        & df1_comp.loc[index,c].isna() 
                                                                                        & df2_comp.loc[index,c].isna())
        return df_comp.loc[index,columns]

#----------------------------------------
# DATA FRAME MANIPULATION & TRANSFORMATION
#----------------------------------------

def filter(df: pd.DataFrame|pd.Series, condition: pd.DataFrame|pd.Series, filter_columns: bool=False) -> pd.DataFrame|pd.Seires:
    """
    @Description: Filter a `DataFrame` object based on a match over one or multiple columns. Results can be filtered by rows or both rows and columns.
    @Parameters:
        - condition: boolean Series, typically a DataFrame expression involving one or more conditions. E.g., `df['A'] == 1` or `df[['B','C']] == [2,3]`
        - filter_columns: When filter_columns is False, only filter by rows, otherwise filter by both rows and columns
    """
    df_ = pd.DataFrame(df)
    filter_ = pd.DataFrame(condition)
    # Ensure that condition works over multiple columns matching
    match_all_columns = (pd.DataFrame(condition).sum(axis=1) == len(filter_.columns.values))
    condition =  match_all_columns if len(filter_.columns.values) >= 1 else condition 

    # Select columns to keep and rows to display based on condition
    columns_filt = filter_.columns.values if filter_columns else df_.columns.values
    index_filt = df_.loc[:,columns_filt][condition].dropna().index    
    return df_.loc[index_filt,columns_filt]

def per_class_sample(inputs: pd.DataFrame|pd.Series, targets: pd.DataFrame|pd.Series, 
                        sampling_dist: str|int|float|Iterable='min',random_state: int=None) -> tuple[pd.DataFrame|pd.Series, pd.DataFrame|pd.Series]:
    """
    ### Description: 
    Sample inputs based on a distribution of target labels. By default will attempt to sample all classes equally according to the least populous class.

    ### Parameters:
    - sampling_dist:
        - If sampling_dist is `None`: Sample all classes .
        - If sampling_dist is `min`: Attempt to sample all classes equally according to the least populous class.
        - If sampling_dist is type `int`: Attempt to sample all classes up to a maximum of label_dists
        - If sampling_dist is type `float` (within (0,1)): Attempt to sample all classes each with the proportion of label_dists
        - If sampling_dist is type `list`: Attempt to sample classes based on the distribution specified.
            - If a class distribution is `None`, all members of that class is sampled
            - If a class distribution is a fraction (within (0,1)), it will be understood as the class proportion
            - If a class distribution is an integer (>=1), it will be understood as class counts
    """
    # Convert sampling_dist into list of distribution if not already is
    counts,labels,num_labels = label_counts(targets).values()
    if isNone(sampling_dist):
        sampling_dist = counts
    if isinstance(sampling_dist,str) & sampling_dist == 'min':
        sampling_dist = min(counts)
    if isinstance(sampling_dist,(int,float)):
        sampling_dist = np.full(shape = labels.shape,fill_value = sampling_dist)

    sampled_index = pd.Index([])
    for labels_i,counts_i, sampling_dist_i in zip(labels,counts,sampling_dist):
        # Convert distribution values to actual class counts
        if isNone(sampling_dist_i): 
            dist_i = int(counts_i)
        elif 0 <= sampling_dist_i and sampling_dist_i < 1:
            dist_i = int(counts_i * sampling_dist_i)
        else:
            dist_i = int(clamp(sampling_dist_i,0,counts_i))
        # Obtain samples with the labels_i   
        sampled_targets_i = filter(targets,targets==labels_i).sample(dist_i,random_state=random_state)
        sampled_index = sampled_index.append(sampled_targets_i.index)

    return inputs.loc[sampled_index], targets.loc[sampled_index]


def match(df_in: pd.DataFrame|pd.Series, oper: str ,values: Iterable[int],strict: bool=False) -> pd.DataFrame|pd.Series:
    """
    Apply a comparison operation to a list of values and return all results that matches all elements in the list.

    In strict mode, only keep rows that satisfy all matching requirements 
    """
    mult_result = (df_in != df_in) if oper == "==" else (df_in == df_in)
    for val in np.array(values):
        mult_result = {
            "<=": mult_result & (df_in <= val), "<"  : mult_result & (df_in < val),
            ">=": mult_result & (df_in >= val), ">"  : mult_result & (df_in > val),
            "==": mult_result | (df_in == val), "!=" : mult_result & (df_in != val),
        } [oper]
    df_out = df_in[mult_result].dropna(how={True:"any",False:"all"}[strict])
    return df_out

#----------------------------------------
#   DATA TYPES
#----------------------------------------

def is_np_dtypes(series: pd.Series ,dtypes : str|tuple|Iterable) -> bool:
    """
    Check if a series is of one or multiple numpy dtypes
    - For `int` dtype, use `"int64"`
    - For `float` dtype, use  `"float64"`
    - For `object` dtype, use  `"O"`
    """
    return isinstance(series.dtypes,tuple([type(np.dtype(typ)) for typ in dtypes]))

def is_np_numeric(series: pd.Series) -> bool:
    """
    Check if a series is of numeric numpy dtypes
    """
    return is_np_dtypes(series,("float64","int64"))







## Summary Function

In [None]:
df = pd.read_csv("datasets/random_dataframe/random_dataframe_1.csv").drop(columns="Unnamed: 0")
df_in = df[df>5]
df_in

Unnamed: 0,c0,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14
0,37.0,43,12.0,8,9.0,11.0,,15.0,,16.0,,12.0,7,45.0,6.0
1,25.0,20,37.0,18,20.0,11.0,42.0,28.0,29.0,14.0,,23.0,23,41.0,49.0
2,30.0,32,22.0,13,41.0,9.0,7.0,22.0,,,17.0,8.0,24,13.0,47.0
3,42.0,8,30.0,7,,6.0,21.0,49.0,,,24.0,49.0,43,12.0,26.0
4,16.0,45,41.0,18,15.0,,,25.0,47.0,34.0,23.0,7.0,26,25.0,40.0
5,22.0,9,,39,23.0,36.0,27.0,37.0,19.0,38.0,8.0,32.0,34,10.0,23.0
6,15.0,47,23.0,25,7.0,28.0,10.0,46.0,32.0,24.0,23.0,,49,13.0,
7,,13,6.0,21,6.0,,12.0,27.0,21.0,11.0,7.0,13.0,8,11.0,12.0
8,43.0,20,30.0,36,39.0,7.0,45.0,,48.0,18.0,32.0,13.0,10,23.0,17.0
9,7.0,24,10.0,28,20.0,32.0,12.0,,30.0,41.0,24.0,18.0,33,,44.0
