# KALMAN FOLDING IN CLOJURE

Some sample data:

In [138]:
(def zs [-0.178654, 0.828305, 0.0592247, -0.0121089, -1.48014, 
         -0.315044, -0.324796, -0.676357, 0.16301, -0.858164])

#'user/zs

TODO: generate new random data.

TODO: incanter integration

## RUNNING COUNT

The traditional and obvious way:

In [139]:
(reduce (fn [c z] (inc c)) 0 zs)

10

In [140]:
(reductions (fn [c z] (inc c)) 0 zs)

(0 1 2 3 4 5 6 7 8 9 10)

A thread-safe way; overkill, but shows the _let-over-lambda_ style of closing over stateful variables. We can do everything in this paper with pure functions, as above, but this style is more in keeping with commonplace stateful intuitions. It also illustrates Clojure's beautiful handling of _state_ as distinct from _identity_, distinguishing Clojure in a positive light from more standard object-oriented approaches.

In [143]:
(let [initial-count 0]
    (reduce
        (let [running-count (atom initial-count)]
            (fn [c z]
                (swap! running-count inc)
                @running-count)) ; ~~> new value for c
        initial-count
        zs))

10

Here is a version that shows all intermediate results.

In [144]:
(let [initial-count 0]
    (reductions
        (let [running-count (atom initial-count)]
            (fn [c z]
                (swap! running-count inc)
                @running-count)) ; ~~> new value for c
        initial-count
        zs))

(0 1 2 3 4 5 6 7 8 9 10)

A thread-safe way, without `reduce`:

In [106]:
(let [initial-count 0
      running-count (atom initial-count)]
    (dorun
        (map (fn [z]
                 (swap! running-count inc)
                 (print (str @running-count " ")))
             zs)))

1 2 3 4 5 6 7 8 9 10 

In [146]:
(dorun (map 
           (let [initial-count 0
                 running-count (atom initial-count)]
               (fn [z]
                   (swap! running-count inc)
                   (print (str @running-count " "))))
           zs))

1 2 3 4 5 6 7 8 9 10 

## RUNNING MEAN

In [147]:
(dorun (map
           (let [initial-count 0
                 initial-mean  0
                 initial-stats {:count initial-count :mean  initial-mean}
                 running-stats (atom initial-stats)]
               (fn [z]
                   (let [{x :mean, n :count} @running-stats
                         n+1 (inc n)
                         K   (/ 1.0 n+1)]
                       (swap! running-stats conj
                              [:count n+1]
                              [:mean (+ x (* K (- z x)))]))
                   (println @running-stats)))
           zs))

{:count 1, :mean -0.178654}
{:count 2, :mean 0.3248255}
{:count 3, :mean 0.2362919}
{:count 4, :mean 0.1741917}
{:count 5, :mean -0.15667464000000003}
{:count 6, :mean -0.18306953333333337}
{:count 7, :mean -0.20331617142857145}
{:count 8, :mean -0.262446275}
{:count 9, :mean -0.21517335555555556}
{:count 10, :mean -0.27947242}


### HOISTING THE FUNCTION

We can remove the side-effector from inside the function and hoist it, making it more hospitable to application over async streams. The `map` will accumulate the intermediate values. This is like a `scan` in Haskell or a `FoldList` in Wolfram.

In [148]:
(defn make-running-stats-mapper []
    (let [initial-count 0
          initial-mean  0
          initial-stats {:count initial-count :mean initial-mean}
          running-stats (atom initial-stats)]
        (fn [z]
            (let [{x :mean, n :count} @running-stats
                  n+1 (inc n)
                  K   (/ 1.0 n+1)]
                (swap! running-stats conj
                       [:count n+1]
                       [:mean (+ x (* K (- z x)))]))
            @running-stats)))
(clojure.pprint/pprint (map (make-running-stats-mapper) zs))

({:count 1, :mean -0.178654}
 {:count 2, :mean 0.3248255}
 {:count 3, :mean 0.2362919}
 {:count 4, :mean 0.1741917}
 {:count 5, :mean -0.15667464000000003}
 {:count 6, :mean -0.18306953333333337}
 {:count 7, :mean -0.20331617142857145}
 {:count 8, :mean -0.262446275}
 {:count 9, :mean -0.21517335555555556}
 {:count 10, :mean -0.27947242})


### NUMERICAL CHECK

In [149]:
(defn mean [zs] (/ (reduce + zs) (count zs)))
(mean zs)

-0.27947242

## CORE.ASYNC

In [150]:
(require '[clojure.core.async :refer [onto-chan sliding-buffer 
                                      dropping-buffer buffer
                                      <!! <! >! >!! go chan 
                                      close! thread 
                                      alts! alts!! timeout]])

We can write on the asynch thread and read on the UI thread:

In [151]:
(let [c (chan)]
    (go (>! c 42))
    (println (<!! c))
    (close! c))

42


We can read on the async thread and write on the UI thread:

In [152]:
(let [c (chan)]
    (go (println (<! c)))
    (>!! c 42)
    (close! c))

42


In all cases, we must do any blocking call without timeout on the UI thread last. The following will hang:

In [153]:
#_(let [c (chan)]
    (>!! c 42)
    (go (println (<! c)))
    (close! c))

We're won't block if we add a timeout, but we don't know how to write to a timeout channel:

In [153]:
(let [c (chan)]
    (>!! (alts!! [c (timeout 500)]) 42)
    (go (println (<! c)))
    (close! c))

IllegalArgumentException No implementation of method: :put! of protocol: #'clojure.core.async.impl.protocols/WritePort found for class: clojure.lang.PersistentVector  clojure.core/-cache-protocol-fn (core_deftype.clj:568)


class java.lang.IllegalArgumentException: 

We can't read from a `timeout` channel either, but at least we won't hang. Here, we do the blocking read too early on the UI thread:

In [153]:
(let [c (chan)]
    (println (<!! (alts!! [c (timeout 500)])))
    (go (>! c 42))
    (close! c))

IllegalArgumentException No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: clojure.lang.PersistentVector  clojure.core/-cache-protocol-fn (core_deftype.clj:568)


class java.lang.IllegalArgumentException: 

The following illustrates putting data in a blocking channel at random times and reading some of them on the UI thread. It will leave values in the channel and thus leak the channel according to the documentation for `close!` here https://clojure.github.io/core.async/api-index.html#C.

In [154]:
(def echo-chan (chan))

(doseq   [z zs] (go (Thread/sleep (rand 100)) (>! echo-chan z)))
(dotimes [_ 3] (println (<!! echo-chan)))

(close! echo-chan)

-0.676357
-0.315044
0.16301


We can chain channels. Reading from `echo-chan` may hang the UI thread because the UI thread races the internal thread that reads `echo-chan`, but the timeout trick works here as above. This will also leak channels.

In [154]:
(def echo-chan (chan))
(def repl-chan (chan))

(dotimes [_ 10] (go (>! repl-chan (<! echo-chan))))
(doseq   [z zs] (go (Thread/sleep (rand 100)) (>! echo-chan z)))
(dotimes [_ 3] (println (<!! (alts!! [echo-chan (timeout 500)]))))

(close! echo-chan)
(close! repl-chan)

IllegalArgumentException No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: clojure.lang.PersistentVector  clojure.core/-cache-protocol-fn (core_deftype.clj:568)


class java.lang.IllegalArgumentException: 

`println` on a `go` process works if we wait long enough. This, of course, is bad practice or "code smell."

In [155]:
(def echo-chan (chan))

(doseq   [z zs] (go (Thread/sleep (rand 100)) (>! echo-chan z)))
(dotimes [_ 3]  (go (println (<! echo-chan))))

(Thread/sleep 500) ; no visible output if you remove this line.
(close! echo-chan)

-0.324796
-0.315044
-1.48014


### ASYNC RUNNING MEAN

We want our `running-stats` function called with the data delivered at random times and in random order. A transducer lets us collect items off the buffer. The size of the buffer does not matter.

In [156]:
(defn async-scan [zs mapper effector]
    (let [transducer (map mapper)
          echo-chan (chan (buffer 1) transducer)]
        (doseq [z zs] (go (Thread/sleep (rand 100)) (>! echo-chan z)))
        (dotimes [_ (count zs)] (effector (<!! echo-chan)))
        (close! echo-chan)))
(async-scan zs (make-running-stats-mapper) println)

{:count 1, :mean -0.676357}
{:count 2, :mean -0.4275055}
{:count 3, :mean -0.5710583333333333}
{:count 4, :mean -0.50705475}
{:count 5, :mean -0.2399828}
{:count 6, :mean -0.20200381666666667}
{:count 7, :mean -0.16468545714285715}
{:count 8, :mean -0.184699275}
{:count 9, :mean -0.1460649111111111}
{:count 10, :mean -0.27947242}


We now have complete parity between space (vector `zs`) and time (values on `echo-chan` in random order).

## RUNNING STDDEV

### BRUTE-FORCE (SCALAR VERSION)

#### SSR: SUM OF SQUARED RESIDUALS

In [157]:
(defn ssr [sequ]
    (let [m (mean sequ)]
        (reduce #(+ %1 (* (- %2 m) (- %2 m)))
                0 sequ)))
(ssr zs)

3.5566483654807355

#### VARIANCE

In [158]:
(defn variance [sequ]
    (let [n (count sequ)]
        (case n
            0 0
            1 (first sequ)
            #_default (/ (ssr sequ) (- n 1.0)))))
(variance zs)

0.3951831517200817

Let's do a smaller example:

In [160]:
(def z2s [55. 89. 144.])
(variance z2s)

2017.0

##### BAD VARIANCE

Here is a really stupid recurrent form. It's stupid because
1. it requires the whole sequence up front, so it cannot function in constant memory
3. the intermediate values are meaningless because they refer to the final mean and count, not the intermediate ones

But, the final value is correct.

In [162]:
(reductions 
    (let [m (mean z2s)
          c (count z2s)]
        (fn [var z] (+ var (let [r (- z m)]
                               (/ (* r r) (- c 1.0))))))
    0 z2s)

(0 840.5 865.0 2017.0)

This is sufficiently stupid that we won't bother with a thread-safe, stateful, or asynchronous form.

##### SCHOOL VARIANCE

Instead of the sum of squared residuals, $ssr$, we accumulate the sum of squares, $ssq$, which grows quickly. This is exposed to _catastrophic cancellation_, but works for our small example.

In [168]:
(defn make-school-stats-mapper []
    (let [initial-count    0
          initial-mean     0
          initial-variance 0
          initial-ssq      0
          initial-stats {:count    initial-count
                         :mean     initial-mean
                         :variance initial-variance
                         :ssq      initial-ssq}
          running-stats (atom initial-stats)]
        (fn [z]
            (let [{x :mean, n :count, v :variance, s :ssq} @running-stats
                  n+1 (inc n)
                  K   (/ 1.0 n+1)
                  r   (- z x)
                  x2  (+ x (* K r))
                  s2  (+ s (* z z))]
                (swap! running-stats conj
                       [:count    n+1]
                       [:mean     x2 ]
                       [:ssq      s2]
                       [:variance (/ (- s2 (* n+1 x2 x2)) (max 1 n))]))
            @running-stats)))
(clojure.pprint/pprint (map (make-school-stats-mapper) z2s))

({:count 1, :mean 55.0, :variance 0.0, :ssq 3025.0}
 {:count 2, :mean 72.0, :variance 578.0, :ssq 10946.0}
 {:count 3, :mean 96.0, :variance 2017.0, :ssq 31682.0})


##### RECURRENT VARIANCE

In [187]:
(defn make-recurrent-stats-mapper []
    (let [initial-count    0
          initial-mean     0
          initial-variance 0
          initial-stats {:count    initial-count
                         :mean     initial-mean
                         :variance initial-variance}
          running-stats (atom initial-stats)]
        (fn [z]
            (let [{x :mean, n :count, v :variance} @running-stats
                  n+1 (inc n)
                  K   (/ 1.0 (inc n))
                  r   (- z x)
                  x2  (+ x (* K r))
                  ssr (+ (* (- n 1) v) ; old ssr is (* (- n 1) v)
                         (* K n r r))]
                (swap! running-stats conj
                       [:count    n+1]
                       [:mean     x2 ]
                       [:variance (/ ssr  (max 1 n))]))
            @running-stats)))
(clojure.pprint/pprint (map (make-recurrent-stats-mapper) z2s))

({:count 1, :mean 55.0, :variance 0.0}
 {:count 2, :mean 72.0, :variance 578.0}
 {:count 3, :mean 96.0, :variance 2017.0})


Of course, this works asynchronously, with potentially different intermediate values because the order is random.

In [194]:
(async-scan z2s (make-recurrent-stats-mapper) println)

{:count 1, :mean 144.0, :variance 0.0}
{:count 2, :mean 116.5, :variance 1512.5}
{:count 3, :mean 96.0, :variance 2017.0}


#### WELFORD'S VARIANCE

The above is equivalent, algebraically and numerically, to Welford's famous formula.

In [191]:
(defn make-welfords-stats-mapper []
    (let [initial-count    0
          initial-mean     0
          initial-variance 0
          initial-stats {:count    initial-count
                         :mean     initial-mean
                         :variance initial-variance}
          running-stats (atom initial-stats)]
        (fn [z]
            (let [{x :mean, n :count, v :variance} @running-stats
                  n+1 (inc n)
                  K   (/ 1.0 n+1)
                  r   (- z x)
                  x2  (+ x (* K r))
                  ssr (+ (* (- n 1) v) 
                         (* (- z x) (- z x2)))] ; <-- only difference to recurrent variance
                (swap! running-stats conj
                       [:count    n+1]
                       [:mean     x2 ]
                       [:variance (/ ssr  (max 1 n))]))
            @running-stats)))
(clojure.pprint/pprint (map (make-welfords-stats-mapper) z2s))

({:count 1, :mean 55.0, :variance 0.0}
 {:count 2, :mean 72.0, :variance 578.0}
 {:count 3, :mean 96.0, :variance 2017.0})


## WINDOWED STATISTICS

variable | description 
---------|-------------
$n$      | how many points seen before this one
$z$      | $=z_{n+1}$ current data point, at index $n + 1$
$l$      | index of first point in window at least 1 wide and no more than $w$ wide
$w$      | width of window
$u$      | number of points including $z$ in the running window; converges to $w$.
$m_{n+1}$| the mean of all points through $z_{n+1}$
$m_l$    | the mean of all points through index $l-1 = n+1-w$, lagging $w$ behind $m_n$
$m_w$    | the mean of all points within the window
$u_{n+1}$| variance of all points through $z_{n+1}$
$u_l$    | variance of all points through index $l-1 = n+1-w$, lagging $w$ behind $u_n$
$u_w$    | variance of all points within the window

$l=\max(1,n-w+2)$

$u=n-l+2$

$m_{n+1}=x+\frac{z-x}{n+1}$

$ml=\begin{cases} 
  x_w+\frac{z_{n+1-w}}{} & x\leq 0 \\
  \frac{100-x}{100} & 0\leq x\leq 100 \\
  0 & 100\leq x 
\end{cases}$

$w$      | width of window
$u$      | number of points including $z$ in the running window; converges to $w$.
$m_n$    | the mean of all points through $z_{n+1}$
$m_l$    | the mean of all points through index $l-1 = n+1-w$, lagging $w$ behind $m_n$
$m_w$    | the mean of all points within the window
$u_n$    | variance of all points through $z_{n+1}$
$u_l$    | variance of all points through index $l-1 = n+1-w$, lagging $w$ behind $u_n$
$u_w$    | variance of all points within the window