In [31]:
# This is not going to be front facing so it's nice if we have
# general comments but odn't need documentation or doc strings

"""
Utility Reducing Functions for VASA Object
-------------------------------------------------------

Three types:
    * Reducing by count:
        Adding up the total number of times the
        county attained some LISA classification.

    * Reducing by recency:
        The last week number of the time a county
        attained some LISA classification.

    * Reducing by mode:
        The most often LISA classification of a county.
"""

from typing import List, Tuple, Callable
from functools import reduce
import numpy as np
from scipy.stats import mode

HC_List = List[Tuple[int, int]]
County_History_List = List[List[int]]
County_list = List[int]


def reduce_by_count(arr: County_History_List) -> HC_List:

    # Start with pairs of 0 for each county
    initial: HC_List = [(0, 0) for _ in range(len(arr[0]))]

    reducer: Callable[[HC_List, List[int]], HC_List] = lambda acc, curr: [
        (a[0] + (c == 1), a[1] + (c == 2)) for a, c in zip(acc, curr)
    ]

    hh_ll: HC_List = reduce(
        reducer,
        arr,
        initial
    )

    return hh_ll


def reduce_by_count_hh(arr: County_History_List) -> County_list:
    return reduce_by_count_equals(arr, 1)


def reduce_by_count_ll(arr: County_History_List) -> County_list:
    return reduce_by_count_equals(arr, 2)

#
# No tests since called from reduce_by_count_**
#
def reduce_by_count_equals(arr: County_History_List, val: int) -> County_list:
    return list(reduce(
        lambda acc, curr: np.array(acc) + (np.array(curr) == val),
        arr,
        np.zeros(len(arr[0]))
    ))


# ughhh this needs to be made better
# this should return a date for each classification....
def reduce_by_recency(arr: County_History_List) -> County_list:
    return [
        (hh if clas == 1 else (ll if clas == 2 else 0))
        for hh, ll, clas in zip(
            reduce_by_recency_hh(arr),
            reduce_by_recency_ll(arr),
            reduce_by_mode_sig(arr)
        )
    ]


def reduce_by_recency_hh(arr: County_History_List) -> County_list:
    return reduce_by_recency_equals(arr, 1)


def reduce_by_recency_ll(arr: County_History_List) -> County_list:
    return reduce_by_recency_equals(arr, 2)

#
# THIS MUST ADD 1 so that we can tell the difference between week 0 and no week at all
# This doesn't need tests because called from reduce_by_recency_**
#
def reduce_by_recency_equals(
    arr: County_History_List,
    val: int
) -> County_list:
    return [
        max([
            (idx + 1 if week[county_idx] == val else 0)
            for idx, week in enumerate(arr)
        ])
        for county_idx in range(len(arr[0]))
    ]


# not really mode b/c prefers sig over non-sig
# change to list for loop thingy
def reduce_by_mode_sig(arr: County_History_List) -> County_list:
    output: County_list = []

    for hh, ll in reduce_by_count(arr):
        region_class = 1 if hh > ll else 2
        output.append(region_class if max(hh, ll) > 0 else 0)

    return output


def reduce_by_mode(arr: County_History_List) -> County_list:
    return [
        mode([
            week[county_idx]
            for week in arr
        ])[0][0]  # argh
        for county_idx in range(len(arr[0]))
    ]


In [32]:
test = np.array([
    [1, 1, 1],
    [0, 1, 2],
    [0, 0, 2]
])

In [33]:
def reduce_by_count(arr: County_History_List)

def reduce_by_count_hh(arr: County_History_List)
def reduce_by_count_ll(arr: County_History_List)
def reduce_by_count_equals(arr: County_History_List, val: int)
def reduce_by_recency(arr: County_History_List)

def reduce_by_recency_hh(arr: County_History_List)
def reduce_by_recency_ll(arr: County_History_List)

def reduce_by_recency_equals(
    arr: County_History_List,
    val: int
) 

# not really mode b/c prefers sig over non-sig
# change to list for loop thingy
def reduce_by_mode_sig(arr: County_History_List)

def reduce_by_mode(arr: County_History_List) -> County_list:

SyntaxError: invalid syntax (<ipython-input-33-13d2b7a7d57b>, line 1)

In [34]:
reduce_by_count_hh(test)

[1.0, 2.0, 1.0]

In [35]:
countValue = [(1, 0), (2, 0), (1, 2)]

assert reduce_by_count(test) == countValue
assert reduce_by_count_hh(test) == [a[0] for a in countValue]
assert reduce_by_count_ll(test) == [a[1] for a in countValue]

In [39]:
assert reduce_by_recency(test) == [1, 2, 3]
assert reduce_by_recency_hh(test) == [1, 2, 1]
assert reduce_by_recency_ll(test) == [0, 0, 3]

In [42]:
assert reduce_by_mode_sig(test) == [1, 1, 2]
assert reduce_by_mode(test) == [0, 1, 2]