# Utils
> Utility functions with wider use potential

In [None]:
#| default_exp utils

In [None]:
#| hide
from nbdev.showdoc import *
from fastcore.test import *

In [None]:
#| exporti
import json, os, warnings, math, inspect
import itertools as it
from collections import defaultdict

import numpy as np
import pandas as pd
import datetime as dt

import altair as alt
import matplotlib.colors as mpc
from copy import deepcopy
from hashlib import sha256

from typing import List, Tuple, Dict, Union, Optional

In [None]:
#| export
# Value or Default - returns key value in dict if key in dict, otherwise Mone
def vod(d,k,default=None): return d[k] if k in d else default

In [None]:
#| export 

# convenience for warnings that gives a more useful stack frame (fn calling the warning, not warning fn itself)
warn = lambda msg,*args: warnings.warn(msg,*args,stacklevel=3)

In [None]:
#| export

# I'm surprised pandas does not have this function but I could not find it. 
def factorize_w_codes(s, codes):
    res = s.replace(dict(zip(codes,range(len(codes)))))
    if not s.isin(codes).all(): # Throw an exception if all values were not replaced
        vals = set(s) - set(codes)
        raise Exception(f'Codes for {s.name} do not match all values: {vals}')
    return res.to_numpy(dtype='int')

In [None]:
#| export

# Simple batching of an iterable
def batch(iterable, n=1):
    l = len(iterable)
    for ndx in range(0, l, n):
        yield iterable[ndx:min(ndx + n, l)]

In [None]:
#| export

# turn index values into order indices
def loc2iloc(index, vals):
    d = dict(zip(np.array(index),range(len(index))))
    return [ d[v] for v in vals ]

In [None]:
#| export

# Round in a way that preserves total sum
def match_sum_round(s):
    s = np.array(s)
    fs = np.floor(s)
    diff = round(s.sum()-fs.sum())
    residues = np.argsort(-(s%1))[:diff]
    fs[residues] = fs[residues]+1
    return fs.astype('int')

In [None]:
# TEST
assert (match_sum_round([0.7,0.7,0.6]) == [1,1,0]).all()
assert (match_sum_round([1,2,3]) == [1,2,3]).all()

In [None]:
#| export

# Find the minimum difference between two values in the array
def min_diff(arr):
    b = np.diff(np.sort(arr))
    if len(b)==0 or b.max()==0.0: return 0
    else: return b[b>0].min()

# Turn a discretized variable into a more smooth continuous one w a gaussian kernel
def continify(ar, bounded=False):
    mi,ma = ar.min(), ar.max()
    noise = np.random.normal(0,0.5 * min_diff(ar),size=len(ar))
    res = ar + noise
    if bounded: # Reflect the noise on the boundaries
        res[res>ma] = ma - (res[res>ma] - ma)
        res[res<mi] = mi + (mi - res[res<mi])
    return res

In [None]:
# TEST
assert min_diff([0,0,2,1.5,3,3]) == 0.5
assert min_diff([]) == min_diff([1,1]) == 0.0

ar = np.array([0,2,4,1,5]+ [5]*10 + [-1]*10)
c_ar = continify(ar,True)
assert c_ar.min() >= ar.min() and c_ar.max()<=ar.max()

In [None]:
#| exporti
from scipy.spatial.distance import cdist
from scipy.optimize import linear_sum_assignment

In [None]:
#| export

# Match data1 with data2 on columns cols as closely as possible
def match_data(data1,data2,cols=None):
    d1 = data1[cols].copy().dropna()
    d2 = data2[cols].copy().dropna()

    ccols = [c for c in cols if d1[c].dtype.name=='category']
    for c in ccols: # replace categories with their index. This is ok for ordered categories, not so great otherwise
        s1, s2 = set(d1[c].dtype.categories), set(d2[c].dtype.categories)
        if s1-s2 and s2-s1: # one-way imbalance is fine
            raise Exception(f"Categorical columns differ in their categories on: {s1-s2} vs {s2-s1}")
        
        md = d1 if len(s2-s1)==0 else d2
        mdict = dict(zip(md[c].dtype.categories, range(len(md[c].dtype.categories))))
        d1[c] = d1[c].replace(mdict)
        d2[c] = d2[c].replace(mdict)

    dmat = cdist(d1, d2, 'mahalanobis')
    i1, i2 = linear_sum_assignment(dmat, maximize=False)
    ind1, ind2 = d1.index[i1], d2.index[i2]
    return ind1, ind2

In [None]:
# TEST
from salk_toolkit.io import process_annotated_data
data = process_annotated_data('../data/master_meta.json')
data1,data2 = data.iloc[:10], data.iloc[10:]
cols = ['age','gender','education','nationality']

# Make sure everything except age (cols[0]) gets exactly matched
i1,i2 = match_data(data1,data2,cols)
assert (data1.loc[i1,cols[1:]].reset_index(drop=True) == data2.loc[i2,cols[1:]].reset_index(drop=True)).all().all()

In [None]:
#| export

# Allow 'constants' entries in the dict to provide replacement mappings
# This leads to much more readable jsons as repetitions can be avoided
def replace_constants(d, constants = {}, inplace=False):
    if not inplace: d = deepcopy(d)
    if type(d)==dict and 'constants' in d:
        constants = constants.copy() # Otherwise it would propagate back up through recursion - see test6 below
        constants.update(d['constants'])
        del d['constants']

    for k, v in (d.items() if type(d)==dict else enumerate(d)):
        if type(v)==str and v in constants:
            d[k] = constants[v]
        elif type(v)==dict or type(v)==list:
            d[k] = replace_constants(v,constants, inplace=True)
            
    return d

In [None]:
# TEST

# Test replace_constants
d = {
    'constants': { 'a': {'a':1}, 'b':['b'] },
    'test1': 'a',
    'test2': [1,'b'],
    'test3': { 'xy': 'a' },
    'test4': { 'xy': [2, 'b'] },
    'test5': { 'constants': {'a': ['a'] }, 'x':'a' },
    'test6': 'a'
}
dr = replace_constants(d)
assert dr == {'test1': {'a': 1}, 'test2': [1, ['b']], 'test3': {'xy': {'a': 1}}, 'test4': {'xy': [2, ['b']]}, 'test5': {'x': ['a']}, 'test6': {'a': 1}}

In [None]:
#| export

# JSON encoder needed to convert pandas indices into lists for serialization
def index_encoder(z):
    if isinstance(z, pd.Index):
        return list(z)
    else:
        type_name = z.__class__.__name__
        raise TypeError(f"Object of type {type_name} is not serializable")

In [None]:
#| export

default_color = 'lightgrey' # Something that stands out so it is easy to notice a missing color

# Helper function to turn a dictionary into an Altair scale (or None into alt.Undefined)
# Also: preserving order matters because scale order overrides sort argument
def to_alt_scale(scale, order=None):
    if scale is None: scale = alt.Undefined
    if isinstance(scale,dict):
        if order is None: order = scale.keys()
        #else: order = [ c for c in order if c in scale ]
        scale = alt.Scale(domain=list(order),range=[ (scale[c] if c in scale else default_color) for c in order ])
    return scale

In [None]:
#| export

# Turn a question with multiple variants all of which are in distinct columns into a two columns - one with response, the other with which question variant was used

def multicol_to_vals_cats(df, cols=None, col_prefix=None, reverse_cols=[], reverse_suffixes=None, cat_order=None, vals_name='vals', cats_name='cats', inplace=False):
    if not inplace: df = df.copy()
    if cols is None: cols = [ c for c in df.columns if c.startswith(col_prefix)]
    
    if not reverse_cols and reverse_suffixes is not None:
        reverse_cols = list({ c for c in cols for rs in reverse_suffixes if c.endswith(rs)})
    
    if len(reverse_cols)>0:
        #print("RC",reverse_cols)
        remap = dict(zip(cat_order,reversed(cat_order)))
        df.loc[:,reverse_cols] = df.loc[:,reverse_cols].replace(remap)
    
    tdf = df[cols]
    cinds = np.argmax(tdf.notna(),axis=1)
    df.loc[:,vals_name] = np.array(tdf)[range(len(tdf)),cinds]
    df.loc[:,cats_name] = np.array(tdf.columns)[cinds]
    return df

In [None]:
# Test

df = pd.DataFrame({ 'q1': ['a','b','c',None,None,None], 'q1b': [None,None,None,'c','b','a'] })
ndf = multicol_to_vals_cats(df,col_prefix='q1',reverse_suffixes=['1b'],cat_order=['a','b','c'])
assert (ndf['vals'] == ['a','b','c','a','b','c']).all()

In [None]:
#| export

# Grad is a list of colors
def gradient_to_discrete_color_scale( grad, num_colors):
    cmap = mpc.LinearSegmentedColormap.from_list('grad',grad)
    return [mpc.to_hex(cmap(i)) for i in np.linspace(0, 1, num_colors)]

In [None]:
assert gradient_to_discrete_color_scale(['#ff0000','#ffff00','#00ff00'],4) == ['#ff0000', '#ffaa00', '#aaff00', '#00ff00']

In [None]:
#| export
def is_datetime(col):
    with warnings.catch_warnings():
        warnings.simplefilter(action='ignore', category=UserWarning)
        return pd.api.types.is_datetime64_any_dtype(col) or (col.dtype.name in ['str','object'] and pd.to_datetime(col,errors='coerce').notna().any())

In [None]:
#| export

# Convert a series of wave indices and a series of survey dates into a time series usable by our gp model
def rel_wave_times(ws, dts, dt0=None):
    df = pd.DataFrame({'wave':ws, 'dt': pd.to_datetime(dts)})
    adf = df.groupby('wave')['dt'].median()
    if dt0 is None: dt0 = adf.max() # use last wave date as the reference
    
    w_to_time = dict(((adf - dt0).dt.days/30).items())
    
    return pd.Series(df['wave'].replace(w_to_time),name='t')

In [None]:
from salk_toolkit.io import process_annotated_data
data = process_annotated_data('../data/master_meta.json')
assert (rel_wave_times(data['wave'],data['date'])-data['t']).std() < 0.1

In [None]:
#| export

# Generate a random draws column that is deterministic in n, n_draws and uid
def stable_draws(n, n_draws, uid):
    # Initialize a random generator with a hash of uid
    bgen = np.random.SFC64(np.frombuffer(sha256(str(uid).encode("utf-8")).digest(), dtype='uint32'))
    gen = np.random.Generator(bgen)
    
    n_samples = int(math.ceil(n/n_draws))
    draws = (list(range(n_draws))*n_samples)[:n]
    return gen.permuted(draws)

# Use the stable_draws function to deterministicall assign shuffled draws to a df 
def deterministic_draws(df, n_draws, uid, n_total=None):
    if n_total is None: n_total = len(df)
    df.loc[:,'draw'] = pd.Series(stable_draws(n_total, n_draws, uid), index = np.arange(n_total))
    return df

In [None]:
assert (stable_draws(20,5,'test') == np.array([1, 2, 3, 3, 2, 3, 2, 2, 0, 0, 0, 3, 4, 4, 1, 1, 1, 0, 4, 4])).all()

In [None]:
#| export

# Clean kwargs leaving only parameters fn can digest
def clean_kwargs(fn, kwargs):
    aspec = inspect.getfullargspec(fn)
    return { k:v for k,v in kwargs.items() if k in aspec.args } if aspec.varkw is None else kwargs

In [None]:
#| export

# Simple one-liner to remove certain keys from a dict
def censor_dict(d,vs):
    return { k:v for k,v in d.items() if k not in vs }

In [None]:
#| export

# A nicer behaving wrapper around pd.cut
def cut_nice(s, breaks, ints=True):
    s = np.array(s)
    
    # Extend breaks if needed
    if s.max()>breaks[-1]:
        breaks += [s.max()+1]
    if s.min()<breaks[0]:
        breaks = [s.min()] + breaks
    
    labels = [ f'{breaks[i]} - {breaks[i+1] + (-1 if ints else 0)}' for i in range(len(breaks)-2) ] + [f'{breaks[-2]}+']
    
    return pd.cut(s,breaks,right=False,labels=labels)
    

In [None]:
assert (cut_nice([20,29,30,39,199],[0,20,30,40,50,60,70]) == ['20 - 29', '20 - 29', '30 - 39', '30 - 39', '70+']).all()

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()