In [None]:
# default_exp core

# core

> Low level utilities.

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#export
from decision_tree.imports import *

## Utilities

In [None]:
#exports
def split_array(data, idx): return data[:idx], data[idx:]

In [None]:
#export
def r3(x):
    "return a scalar value rounded to 3dp or the values of iterable, rounded to 3dp in a new array/tuple "
    try: return round(x, 3)
    except: pass
    res = [round(v, 3) for v in x]
    return tuple(res) if isinstance(x, tuple) else res

In [None]:
test_in = [3.3, 1.1, 2.02, 3.003, 4.0004]
test_result = r3(test_in)
assert test_in is not test_result
assert [3.3, 1.1, 2.02, 3.003, 4.0] == test_result
test_in = tuple(test_in)
test_result = r3(test_in)
assert test_in is not test_result
assert (3.3, 1.1, 2.02, 3.003, 4.0) == test_result

For efficiency, we need to be able to calculate standard deviations without processing all data with `np.std`.

By keeping track of;
- count of items `c`
- sum of items `s`
- sum of items squared `s2`

we can calculate variance with; `(s2/c) - (s/c)**2`

Note: we have to clamp `var` at zero. When working with small numbers, they sometimes end up -ve due to numerical instability.
It might be interesting to do a moving average implementation like: https://github.com/pete88b/data-science/blob/master/pytorch-things/calculating-variance.ipynb

In [None]:
# exporti
def agg_var(c, s, s2):
    "return the variance of `c`, `s` and `s2` - i.e. one side of `y`"
    return max((s2/c) - (s/c)**2, 0)
def agg_std(c, s, s2):
    "return the standard deviation of `c`, `s` and `s2`"
    return np.sqrt(agg_var(c, s, s2))
def agg_score(c, s, s2):
    "return the score of `c`, `s` and `s2`"
    return 0 if c == 1 else agg_std(c, s, s2)*c

In [None]:
#export
class Aggs():
    "keeps track of `c`, `s` and `s2` and provides access to `score`"
    def __init__(self, y):
        "create a new `Aggs` assuming you're going to iterate over `y`"
        self.c, self.s, self.s2 = (0, 0., 0.) # these will get updated
        self._c, self._s, self._s2 = len(y), y.sum(), (y**2).sum() # initial values are fixed
    def upd(self, yi):
        "update `c`, `s` and `s2` values with the next `y` value"
        self.c += 1
        self.s += yi
        self.s2 += yi**2
    def score(self):
        "return the sum of the standard deviation for both sides of `y`"
        c, s, s2 = self.c, self.s, self.s2
        _c, _s, _s2 = self._c, self._s, self._s2
        return agg_score(c, s, s2) + agg_score(_c-c, _s-s, _s2-s2)

In [None]:
show_doc(Aggs.__init__)

<h4 id="Aggs.__init__" class="doc_header"><code>Aggs.__init__</code><a href="__main__.py#L4" class="source_link" style="float:right">[source]</a></h4>

> <code>Aggs.__init__</code>(**`y`**)

create a new [`Aggs`](/decision_tree/core#Aggs) assuming you're going to iterate over `y`

In [None]:
show_doc(Aggs.upd)

<h4 id="Aggs.upd" class="doc_header"><code>Aggs.upd</code><a href="__main__.py#L8" class="source_link" style="float:right">[source]</a></h4>

> <code>Aggs.upd</code>(**`yi`**)

update [`c`](/decision_tree/test_show_doc#c), `s` and `s2` values with the next `y` value

In [None]:
show_doc(Aggs.score)

<h4 id="Aggs.score" class="doc_header"><code>Aggs.score</code><a href="__main__.py#L13" class="source_link" style="float:right">[source]</a></h4>

> <code>Aggs.score</code>()

return the sum of the standard deviation for both sides of `y`

In [None]:
def np_score(y): return np.std(y)*len(y)
y = np.linspace(-2.5, 2.5, 11)
aggs = Aggs(y)
assert aggs._c == 11
for i in range(len(y)-1): 
    aggs.upd(y[i])
    y_le, y_gt = split_array(y, i+1)
    assert len(y_le) == aggs.c
    assert r3(aggs.score()) == r3(np_score(y_le) + np_score(y_gt))
assert 14.361 == r3(aggs.score())

## Loss Functions

In [None]:
#export
def mse(x,y): return ((x-y)**2).mean()
def rmse(x,y): return np.sqrt(mse(x, y))

In [None]:
# x = np.linspace(-1, 1, 10)
# y = x + np.random.random(x.shape)
x = np.array([-1., -0.778, -0.556, -0.332, -0.115, 0.119,  0.331,  0.556,  0.777,  1.])
y = np.array([-0.721,  0.036,  0.366,  0.490,  0.565, 1.080, 0.414, 1.125, 1.483, 1.569])
assert 0.480 == np.round(mse(x, y), 3)
assert 0.693 == np.round(rmse(x, y), 3) # expect 0.6931791254791217

In [None]:
#hide
from nbdev.export import *
notebook2script()

Converted 000_target_module.ipynb.
Converted 001_exports_to_target_module.ipynb.
Converted 002_target_module.ipynb.
Converted 00_core.ipynb.
Converted 10_data.ipynb.
Converted 20_models.ipynb.
Converted 21_models-extra.ipynb.
Converted 30_test_flag.ipynb.
Converted 40_test_export.ipynb.
Converted 50_test_doc.ipynb.
Converted 51_test_show_doc.ipynb.
Converted 60_all_test.ipynb.
Converted 61_test_add2__all__.ipynb.
Converted 70_multi_all_test_flag.ipynb.
Converted 71_tensor_patch.ipynb.
Converted 72_if__name__.ipynb.
Converted 73_in_ipython.ipynb.
Converted 80_test_coverage.ipynb.
Converted 81_test_coverage.ipynb.
Converted index.ipynb.
