# COMPOSABLE STATISTICS

__Brian Beckman__

__June 2017__ - __June 2019__

## CLOJURE

We prefer Clojure to Python for this exercise due to Clojure's concurrency primitives, especially atoms and core.async. Python is growing and improving rapidly, so we may return to it someday. 

Section [HOW TO USE THIS DOCUMENT](#HOW-TO-USE-THIS-DOCUMENT) explains how to get Clojure working inside Jupyter. 

The best sites for learning Clojure by example are [clojuredocs.org](http://clojuredocs.org) and [4clojure.org](http://4clojure.org). A recommended book is [Clojure for the Brave and True](http://braveclojure.com).

## HOW TO USE THIS DOCUMENT

1. Install leiningen https://leiningen.org/ (this is all you need for Clojure)
2. Install [jupyter notebook](http://jupyter.readthedocs.io/en/latest/install.html)
3. https://github.com/clojupyter/clojupyter
4. At a bash prompt, type `jupyter notebook`
5. A web page will open automatically; navigate to the file you're reading right now
6. Evaluate cells by typing `Shift-Enter`

Consider doing all Python work in [pipenv](https://pipenv.readthedocs.io/). It keeps virtual environments outside your local folders. It works for this `clojupyter` notebook as well.

# INTRODUCTION

We want to compute descriptive statistics in constant memory. We want exactly the same code to run over sequences distributed in space as runs over sequences distributed in time. Sequences distributed in space are vectors, lists, arrays, lazy or not. Sequences distributed over time are asynchronous streams. Descriptive statistics range from `count`, `mean`, `max`, `min`, and `variance` to Kalman filters and Gaussian processes. We decouple computation from data delivery by packaging computation in composable functions.

Some sample scalar data:

In [1]:
(def zs [-0.178654, 0.828305, 0.0592247, -0.0121089, -1.48014, 
         -0.315044, -0.324796, -0.676357, 0.16301, -0.858164])

#'user/zs

TODO: generate new random data.

TODO: incanter integration

# RUNNING COUNT

The traditional and obvious way with `reduce` and `reductions` (https://clojuredocs.org/clojure.core/reduce). _Reduce_ takes three arguments: a binary function, an initial value, and a space-sequence of inputs.

In [2]:
(reduce 
    (fn [count datum] (inc count)) ; binary function
    0                              ; initial value
    zs)                            ; space sequence

10

... with all intermediate results:

In [3]:
(reductions (fn [c z] (inc c)) 0 zs)

(0 1 2 3 4 5 6 7 8 9 10)

## THREAD-SAFE

Overkill for sequences in space, but safe for multiple threads from asynchronous streams. It also shows (1) _let-over-lambda_ (LOL): closing over mutable state variables, and (2) transactional mutation, i.e., _atomic updates_. LOL is sematically equivalent to data encapsulation in OOP, and transactions are easier to verify than is OOP with locks and mutexes.

The following has a defect: we need `initial-count` both to initialize the atom and to initialize the `reduce` call. This defect must be traded off against the generalizable form or _functional type_ of the reducible, namely $(\textrm{estimate}, \textrm{measurement})\rightarrow\textrm{estimate}$. We get rid of this defect later.

In [6]:
(let [initial-count 0] ; Must use this twice below.
    (reduce           
        ; Let-over-lambda (anonymous "object") follows. 
        ; "Atom" is a transactional (thread-safe) type in Clojure.
        (let [running-count (atom initial-count)] 
            ; That was the "let" of "LOL." Here comes the lambda:
            ; Reducible closure over "running-count."
            (fn [c z] ; Here's the "lambda" of "LOL"
                (swap! running-count inc) ; transactional update
                @running-count)) 
                ; safe "read" of the atom ~~> new value for c
        initial-count
        zs))

10

Showing all intermediate results:

In [5]:
(let [initial-count 0]
    (reductions ; <-- this is the only difference to above
        (let [running-count (atom initial-count)]
            (fn [c z]
                (swap! running-count inc)
                @running-count)) ; ~~> new value for c
        initial-count
        zs))

(0 1 2 3 4 5 6 7 8 9 10)

## AVOIDING _REDUCE_

`Reduce` only works in space, not in time. Avoiding `reduce` decouples the statistics code ("business logic") from the space environment ("plumbing"). That spaces environment delivers data from vectors, lists, etc.). We want to be able to switch out an environment that delivers data from space for an environment that delivers data points $z$ from time.

The following is a thread-safe LOL, without `reduce`. We _map_ the LOL over a space-sequence in memory to produce exactly the same result as with `reduce`. The mappable LOL does not need an accumulator argument for `count`. 

Below, we map _exactly_ the same mappable LOL over asynchronous streams.

A subtle defect: the output is still coupled to the computing environment through `print`. We get rid of that, too, [below](#REMOVING-OUTPUT-COUPLING). 

In [7]:
(dorun ; <-- Discard 'nil's produced by "print."
    (map 
        (let [running-count (atom 0)]
            (fn [z] ; <-- one fewer argument
                (swap! running-count inc)
                (print (str @running-count " "))))
        zs))

1 2 3 4 5 6 7 8 9 10 

nil

# RUNNING MEAN

The following scheme for numbering equations does not work (see [stackoverflow question 41241984](https://stackoverflow.com/questions/41241984/equation-numbering-in-jupyter-notebooks)). Not sure why not. We get by without them for now (TODO).

A general scheme for recurrence: ___a new statistic is an old statistic plus a correction___. 

The _correction_ is a _gain_ times a _residual_. For running mean, the residual is the difference between the new measurement $z$ and the old mean $x$. The gain is $1/(n+1)$, where $n$ is _count-so-far_. $n$ is a statistic, too, so it is an _old_ value, computed and saved before the current observation $z$ arrived. 

_The correction therefore depends only on the new input $z$ and on old statistics $x$ and $n$. The correction does not depend on new statistics_.

Mathematically, write the general recurrence idea without subscripts as 

$$x\leftarrow{x+K\,(z-x)}$$

or, with Lamport's notation, wherein new versions of old values get a prime, as an equation

$$x'=x+K\,(z-x)$$

($z$ does not have a prime; it is the only exception to the rule that new versions of old quantities have primes). 

Contrast the noisy traditional form, which introduces another variable, the index $n$. This traditional form is objectively more complicated than either of the two above:

$$x_{n+1}=x_n+K(n)\,(z_{n+1}-x_n)$$

In [8]:
(dorun 
    (map
        (let [running-stats (atom {:count 0, :mean 0})]
            (fn [z]
                (let [{x :mean, n :count} @running-stats
                      n+1 (inc n) ; cool variable name!
                      K   (/ 1.0 n+1)]
                    (swap! running-stats conj
                           [:count n+1]
                           [:mean (+ x (* K (- z x)))]))
                (println @running-stats)))
        zs))

{:count 1, :mean -0.178654}
{:count 2, :mean 0.3248255}
{:count 3, :mean 0.2362919}
{:count 4, :mean 0.1741917}
{:count 5, :mean -0.15667464000000003}
{:count 6, :mean -0.18306953333333337}
{:count 7, :mean -0.20331617142857145}
{:count 8, :mean -0.262446275}
{:count 9, :mean -0.21517335555555556}
{:count 10, :mean -0.27947242}


nil

The `swap` above calls `conj` on the current contents of the atom `running-stats` and on the rest of the arguments, namely `[:count n+1, :mean ...]`. `conj` is the idiom for "updating" a hashmap, the hashmap in the atom, the hashmap that starts off as `{:count 0, :mean 0}`.

## REMOVING OUTPUT COUPLING

Remove `println` from inside the LOL function of $z$. Now the LOL function of $z$ is completely decoupled from its environment. Also, abstract a "factory" method for the LOL, _make-running-stats-mapper_, to clean up the line that does the printing.

### MAKE-RUNNING-STATS-MAPPER

In [76]:
(defn make-running-stats-mapper []
    (let [running-stats (atom {:count 0 :mean 0 :datum 0})]
        (fn [z]
            (let [{x :mean, n :count, _ :datum} @running-stats
                  n+1 (inc n)
                  K   (/ 1.0 n+1)]
                (swap! running-stats conj
                       [:count n+1]
                       [:mean (+ x (* K (- z x)))]
                       [:datum z]))
            @running-stats)))


(clojure.pprint/pprint (map (make-running-stats-mapper) zs))

({:count 1, :mean -0.178654, :datum -0.178654}
 {:count 2, :mean 0.3248255, :datum 0.828305}
 {:count 3, :mean 0.2362919, :datum 0.0592247}
 {:count 4, :mean 0.1741917, :datum -0.0121089}
 {:count 5, :mean -0.15667464000000003, :datum -1.48014}
 {:count 6, :mean -0.18306953333333337, :datum -0.315044}
 {:count 7, :mean -0.20331617142857145, :datum -0.324796}
 {:count 8, :mean -0.262446275, :datum -0.676357}
 {:count 9, :mean -0.21517335555555556, :datum 0.16301}
 {:count 10, :mean -0.27947242, :datum -0.858164})


nil

## NUMERICAL CHECK

The last value of the running mean is $-0.279...42$. Check that against an independent calculation.

In [75]:
(defn mean [zs] (/ (reduce + zs) (count zs)))
(mean zs)

-0.27947242

# CORE.ASYNC

For data distributed over time, we'll use Clojure's core.async. Core.async has some subtleties that we analyze below. 

In [11]:
(require 
    '[clojure.core.async 
      :refer 
      [sliding-buffer dropping-buffer buffer
       <!! <! >! >!! go chan onto-chan close! 
       thread alts! alts!! timeout]])

nil

## SHALLOW TUTORIAL

https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj

## DEEP TUTORIAL

The asynchronous, singleton `go` thread is loaded with very lightweight _pseudothreads_ (my terminology, not standard; most things you will read or see about Clojure.async does not carefully distinguish between threads and pseudothreads, and I think that's not helpful). 

Pseudothreads are lightweight state machines that pick up where they left off. It is feasible to have thousands, even millions of them. Pseudothreads don't block, they _park_. _Parking_ and _unparking_ are very fast. We can write clean code with pseudothreads because our code looks like it's blocked waiting for input or blocked waiting for buffer space. Code with blocking I/O is easy to write and to understand. Code in `go` forms doesn't actually block, just looks like it. 

Some details are tricky and definitely not easy to divine from the documentation. Hickey's video from InfoQ 2013 (https://www.infoq.com/presentations/core-async-clojure) is more helpful, but you can only appreciate the fine points after you've stumbled a bit. I stumbled over the fact that buffered and unbuffered channels have different synchronization semantics. Syntactically, they look the same, but you cannot, in general, run the same code over an unbuffered channel that works on a buffered channel. Hickey says this, but doesn't nail it to the mast; doesn't emphasize it with an example, as I do here in this deep tutorial. He motivates the entire library with the benefits of first-class queues, but fails to emphasize that, by default, a channel is not a queue but a blocking rendezvous. He does mention it, but one cannot fully appreciate the ramifications from a passing glance.

### COMMUNICATING BETWEEN THREADS AND PSEUDOTHREADS

Write output to unbuffered channel `c` via `>!` on the asynchronous `go` real-thread and read input from the same channel `c` via `<!!` on the UI/REPL `println` real-thread. We'll see later that writing via `>!!` to an unbuffered channel blocks the UI real-thread, so we can't write before reading unbuffered on the UI/REPL real-thread. However, we can write before reading on a non-blocking pseudothread, and no buffer space is needed.

In [12]:
(let [c (chan)]        ;; unbuffered chan
    (go (>! c 42))     ;; parks if no space in chan
    (println (<!! c))  ;; blocks UI/REPL until data on c
    (close! c))        ;; idiom; may be harmless overkill

42


nil

In general, single-bang forms work on `go` pseudothreads, and double-bang forms work on real, heavyweight, Java threads like the UI/REPL thread behind this notebook. In the rest of this notebook, "thread" means "real thread" and we write "pseudothread" explicitly when that's what we mean.

I don't address thread leakage carefully in this tutorial, mostly because I don't yet understand it well. I may overkill by closing channels redundantly. 

### CHANNEL VOODOO FIRST

Writing before reading seems very reasonable, but it does not work on unbuffered channels, as we see below. Before going there, however, let's understand more corners of the example above.

The `go` form itself returns a channel:

In [13]:
(clojure.repl/doc go)

-------------------------
clojure.core.async/go
([& body])
Macro
  Asynchronously executes the body, returning immediately to the
  calling thread. Additionally, any visible calls to <!, >! and alt!/alts!
  channel operations within the body will block (if necessary) by
  'parking' the calling thread rather than tying up an OS thread (or
  the only JS thread when in ClojureScript). Upon completion of the
  operation, the body will be resumed.

  Returns a channel which will receive the result of the body when
  completed


nil

I believe "the calling thread" above refers to a pseudothread inside the `go` real-thread, but I am not sure because of the ambiguities in the official documentation between "blocking" and "parking" and between "thread" and "well, we don't have a name for them, but Brian calls them 'pseudothreads'."

Is the channel returned by `go` the same channel as `c`?

In [14]:
(let [c (chan)]           
    (println {:c-channel c})
    (println {:go-channel (go (>! c 42))})
    (println {:c-coughs-up (<!! c)})
    (println {:close-c (close! c)}))

{:c-channel #object[clojure.core.async.impl.channels.ManyToManyChannel 0x8e613be clojure.core.async.impl.channels.ManyToManyChannel@8e613be]}
{:go-channel #object[clojure.core.async.impl.channels.ManyToManyChannel 0x64cb44fa clojure.core.async.impl.channels.ManyToManyChannel@64cb44fa]}
{:c-coughs-up 42}
{:close-c nil}


nil

No, `c` is a different channel from the one returned by `go`. Consult the documentation for `go` once more:

In [24]:
(clojure.repl/doc go)

-------------------------
clojure.core.async/go
([& body])
Macro
  Asynchronously executes the body, returning immediately to the
  calling thread. Additionally, any visible calls to <!, >! and alt!/alts!
  channel operations within the body will block (if necessary) by
  'parking' the calling thread rather than tying up an OS thread (or
  the only JS thread when in ClojureScript). Upon completion of the
  operation, the body will be resumed.

  Returns a channel which will receive the result of the body when
  completed


nil

We should be able to read from the channel returned by `go`; call it `d`:

In [15]:
(let [c (chan)
      d (go (>! c 42))] ;; 'let' in Clojure is sequential, 
                        ;; like 'let*' in Scheme or Common Lisp,
                        ;; so 'd' has a value, here.
    (println {:c-coughs-up (<!! c),  ;; won't block
              :d-coughs-up (<!! d)}) ;; won't block
    (close! c)
    (close! d))

{:c-coughs-up 42, :d-coughs-up true}


nil

`d`'s coughing up `true` means that the body of the `go`, namely `(>! c 42)` must have returned `true`, because `d` coughs up "the result of the body when completed." Let's see whether our deduction matches documentation for `>!`:

In [26]:
(clojure.repl/doc >!)

-------------------------
clojure.core.async/>!
([port val])
  puts a val into port. nil values are not allowed. Must be called
  inside a (go ...) block. Will park if no buffer space is available.
  Returns true unless port is already closed.


nil

Sure enough. But something important is true and not obvious from this documentation. Writing to `c` inside the `go` block parks the pseudothread because no buffer space is available: `c` was created with a call to `chan` with no arguments, so no buffer space is allocated. Only when reading from `c` does the pseudothread unpark. How? There is no buffer space. Reading on the UI thread manages to short-circuit any need for a buffer and unpark the pseudothread. Such short-circuiting is called a _rendezvous_ in the ancient literature of concurrency. Would the pseudothread unpark if we read inside a `go` block and not on the UI thread?

In [29]:
(let [c (chan)
      d (go (>! c 42))
      e (go (<! c))]
    (clojure.pprint/pprint {
      :c-channel c, :d-channel d, :e-channel e,
      :e-coughs-up (<!! e),  ;; won't block
      :d-coughs-up (<!! d)}) ;; won't block
    (close! c)
    (close! d)
    (close! e))

{:c-channel
 #object[clojure.core.async.impl.channels.ManyToManyChannel 0x7171c824 "clojure.core.async.impl.channels.ManyToManyChannel@7171c824"],
 :d-channel
 #object[clojure.core.async.impl.channels.ManyToManyChannel 0x1d30e64d "clojure.core.async.impl.channels.ManyToManyChannel@1d30e64d"],
 :e-channel
 #object[clojure.core.async.impl.channels.ManyToManyChannel 0xa995ddb "clojure.core.async.impl.channels.ManyToManyChannel@a995ddb"],
 :e-coughs-up 42,
 :d-coughs-up true}


nil

Yes, the pseudothread that parked when $42$ is put on `c` via `>!` unparks when $42$ is taken off via `<!`. Channel `d` represents the parking step and channel `e` represents the unparking step. All three channels are different. 

So now we know how to short-circuit or rendezvous unbuffered channels. In fact, the order of reading and writing (taking and putting) does not matter in the nebulous, asynchronous world of pseudothreads. How Einsteinian is that? The following takes (reads) from `c` on `e` before puting (writing) to `c` on `d`. That's the same as above, only in the opposite order.

In [30]:
(let [c (chan)
      e (go (<! c))
      d (go (>! c 42))]
    (clojure.pprint/pprint {
      :c-channel c, :d-channel d, :e-channel e,
      :e-coughs-up (<!! e),  ;; won't block
      :d-coughs-up (<!! d)}) ;; won't block
    (close! c)
    (close! d)
    (close! e))

{:c-channel
 #object[clojure.core.async.impl.channels.ManyToManyChannel 0x5d529720 "clojure.core.async.impl.channels.ManyToManyChannel@5d529720"],
 :d-channel
 #object[clojure.core.async.impl.channels.ManyToManyChannel 0x3960173 "clojure.core.async.impl.channels.ManyToManyChannel@3960173"],
 :e-channel
 #object[clojure.core.async.impl.channels.ManyToManyChannel 0x6c5dd118 "clojure.core.async.impl.channels.ManyToManyChannel@6c5dd118"],
 :e-coughs-up 42,
 :d-coughs-up true}


nil

### PUTS BEFORE TAKES CONSIDERED RISKY

`>!!`, by default, blocks if called too early on an unbuffered real thread. We saw above that parked pseudothreads don't block: you can read and write to channels in `go` blocks in any order. However, that's not true with threads that actually block. The documentation is obscure, though not incorrect, about this fact.

In [31]:
(clojure.repl/doc >!!)

-------------------------
clojure.core.async/>!!
([port val])
  puts a val into port. nil values are not allowed. Will block if no
  buffer space is available. Returns true unless port is already closed.


nil

When is "no buffer space available?" It turns out that the default channel constructor makes a channel with no buffer space allocated by default.

In [32]:
(clojure.repl/doc chan)

-------------------------
clojure.core.async/chan
([] [buf-or-n] [buf-or-n xform] [buf-or-n xform ex-handler])
  Creates a channel with an optional buffer, an optional transducer
  (like (map f), (filter p) etc or a composition thereof), and an
  optional exception-handler.  If buf-or-n is a number, will create
  and use a fixed buffer of that size. If a transducer is supplied a
  buffer must be specified. ex-handler must be a fn of one argument -
  if an exception occurs during transformation it will be called with
  the Throwable as an argument, and any non-nil return value will be
  placed in the channel.


nil

We can test the blocking-on-unbuffered case as follows. The following code will block at the line `(>!! c 42)`, as you'll find if you uncomment the code (remove `#_` at the beginning) and run it. You'll have to interrupt the Kernel using the "Kernel" menu at the top of the notebook, and you might have to restart the Kernel, but you should try it once.

In [33]:
#_(let [c (chan)]
    (>!! c 42)
    (println (<!! c))
    (close! c))

The following variation works fine because we made "buffer space" before writing to the channel. The only difference to the above is the $1$ argument to the call of `chan`. 

In [34]:
(let [c (chan 1)]
    (>!! c 42)
    (println (<!! c))
    (close! c))

42


nil

The difference between the semantics of the prior two examples is not subtle: one hangs the kernel and the other does not. However, the difference in the syntax is subtle and easy to miss. 

We can read on the asynchronous `go` pool from the buffered channel `c` because the buffered write `(>!! c)` on the UI thread doesn't block:

In [16]:
(let [c (chan 1)]
    (>!! c 42)
    (println {:go-channel-coughs-up (<!! (go (<! c)))})
    (close! c))

{:go-channel-coughs-up 42}


nil

#### ORDER DOESN'T MATTER, SOMEIMES

We can do things backwards, reading before writing, even without a buffer. Read from channel `(<! c)` on the async `go` thread "before" writing to `(>!! c 42)` on the REPL / UI thread. "Before," here, of course, means syntactically or lexically "before," not temporally. 

In [25]:
(let [c (chan) ;; NO BUFFER!
      d (go (<! c)) ;; park a pseudothread to read c
      e (>!! c 42)] ;; blocking write unparks c's pseudothread
    (println {:c-hangs '(<!! c),
              :d-coughs-up (<!! d),
              :what's-e    e})
    (close! c) (close! d))

{:c-hangs (<!! c), :d-coughs-up 42, :what's-e true}


nil

Why did `>!!` produce `true`? Look at docs again:

In [26]:
(clojure.repl/doc >!!)

-------------------------
clojure.core.async/>!!
([port val])
  puts a val into port. nil values are not allowed. Will block if no
  buffer space is available. Returns true unless port is already closed.


nil

Ok, now I fault the documentation. `>!!` will block if there is no buffer space available _and_ if there is no _rendezvous_ available, that is, no pseudothread parked waiting for `<!`. I have an open question in the Google group for Clojure about this issue with the documentation.

To get the value written in into `c`, we must read `d`. If we tried to read it from `c`, we would block forever because `>!!` blocks when there is no buffer space, and `c` never has buffer space. We get the value out of the `go` nebula by short-circuiting the buffer, by a rendezvous, as explained above.

`e`'s being true means that `c` wasn't closed. `(>!! c 42)` should hang.

In [29]:
(let [c (chan) ;; NO BUFFER!
      d (go (<! c)) ;; park a pseudothread to read c
      e (>!! c 42)  ;; blocking write unparks c's pseudothread
      f '(hangs (>!! c 43))] ;; is `c` closed?
    (println {:c-coughs-up '(hangs (<!! c)),
              :d-coughs-up (<!! d),
              :what's-e    e,
              :what's-f    f})
    (close! c) (close! d))

{:c-coughs-up (hangs (<!! c)), :d-coughs-up 42, :what's-e true, :what's-f (hangs (>!! c 43))}


nil

Stackoverflow reveals a way to find out whether a channel is closed by peeking under the covers (https://stackoverflow.com/questions/24912971):

In [31]:
(let [c (chan) ;; NO BUFFER!
      d (go (<! c)) ;; park a pseudothread to read c
      e (>!! c 42)  ;; blocking write unparks c's pseudothread
      f (clojure.core.async.impl.protocols/closed? c)]
    (println {:c-coughs-up '(hangs (<!! c)),
              :d-coughs-up (<!! d),
              :c-is-open-at-e?  e,
              :c-is-open-at-f?  f})
    (close! c) (close! d))

{:c-coughs-up (hangs (<!! c)), :d-coughs-up 42, :c-is-open-at-e? true, :c-is-open-at-f? false}


nil

#### ORDER DOES MATTER, SOMETIMES

Order does matter this time: Writing blocks the UI thread without a buffer and no parked read (rendezvous) in the `go` nebula beforehand. I hope you can predict that the following will block even before you run it. To be sure, run it, but you'll have to interrupt the kernel as before.

In [46]:
#_(let [c (chan)
      e (>!! c 42) ;; blocks forever
      d (go (<! c))]
    (println {:c-coughs-up '(this will hang (<!! c)),
              :d-coughs-up (<!! d),
              :what's-e    e})
    (close! c) (close! d))

### TIMEOUTS: DON'T  BLOCK FOREVER

In all cases, blocking calls like [`>!!`](https://clojuredocs.org/clojure.core.async/%3E!!) to unbuffered channels without timeout must appear _last_ on the UI, non-`go`, thread, and then only if there is some parked pseudothread that's waiting to read the channel by short-circuit (rendezvous). If we block too early, we won't get to the line that launches the async `go` nebula and parks the short-cicuitable pseudothread---parks the rendezvous.

The UI thread won't block forever if we add a timeout. `alts!!` is a way to do that. The [documentation](https://clojuredocs.org/clojure.core.async/alts!!) and [examples](https://clojuredocs.org/clojure.core.async/alts!!) are difficult, but, loosely quoting (emphasis and edits are mine, major ones in square brackets):

> `(alts!! ports & {:as opts})`

This destructures all keyword options into opts. We don't need opts or the `:as` keyword here.

> Completes at most one of several channel operations. [_Not for use
inside a (go ...) block._] **ports is a vector of channel endpoints**,
which can be either a channel to take from or a vector of
`[channel-to-put-to val-to-put]`, in any combination. Takes will be
made as if by `<!!`, and puts will be made as if by `>!!`. Unless
the `:priority` option is true, if more than one port operation is
ready, a non-deterministic choice will be made. If no operation is
ready and a `:default` value is supplied, [`default-val :default`] will
be returned, otherwise `alts!!` will [_block_ xxxxpark ?] until the first operation to
become ready completes. **Returns `[val port]` of the completed
operation**, where `val` is the value taken for takes, and a
boolean (`true` unless already closed, as per `put!`) for puts.
 `opts` are passed as `:key val` ... Supported options:
 `:default val` - the value to use if none of the operations are immediately ready
`:priority true` - (default `nil`) when `true`, the operations will be tried in order.
 Note: there is no guarantee that the port exps or val exprs will be
used, nor in what order should they be, so they should not be
depended upon for side effects.

`(alts!! ...)` returns a `[val port]` 2-vector.

`(second (alts!! ...))` is a wrapper of channel `c` We can't write to the resulting `timeout` channel because we didn't give it a name.

That's a lot of stuff, but we can divine an idiom: pair a channel `c` that _might_ block with a fresh `timeout` channel in an `alts!!`. At most one will complete. If `c` blocks, the `timeout` will cough up. If `c` coughs up before the `timeout` expires, the `timeout` quietly dies (question, is it closed? Will it be left open and leak?)

For a first example, let's make a buffered thread that won't block and pair it with a long timeout. You will see that it's OK to write $43$ into this channel (the `[c 43]` term is an implied write; that's clear from the documentation). `c` won't block because it's buffered, it returns immediately, long before the `timeout` could expire. 

In [47]:
(let [c (chan 1)
      a (alts!! ; outputs a [val port] pair; throw away the val
                ; here are the two channels for `alts!!`
        [[c 43] (timeout 2500)])]
    (clojure.pprint/pprint {:c c, :a a})
    (let [d (go (<! c))]
        (println {:d-returns (<!! d)}))
    (close! c))

{:c
 #object[clojure.core.async.impl.channels.ManyToManyChannel 0x278f2d1c "clojure.core.async.impl.channels.ManyToManyChannel@278f2d1c"],
 :a
 [true
  #object[clojure.core.async.impl.channels.ManyToManyChannel 0x278f2d1c "clojure.core.async.impl.channels.ManyToManyChannel@278f2d1c"]]}
{:d-returns 43}


nil

But, if we take away the buffer, the `timeout` channel wins. The only difference to the above is that instead of creating `c` via `(chan 1)`, that is, with a buffer of length $1$, we create it with no buffer (and we quoted out the blocking read of `d` with a tick mark).

In [48]:
(let [c (chan)
      a (alts!! ; outputs a [val port] pair; throw away the val
                ; here are the two channels for `alts!!`
        [[c 43] (timeout 2500)])]
    (clojure.pprint/pprint {:c c, :a a})
    (let [d (go (<! c))]
        (println {:d-is d})
        '(println {:d-returns (<!! d)})) ;; blocks
    (close! c))

{:c
 #object[clojure.core.async.impl.channels.ManyToManyChannel 0x7b0ab79 "clojure.core.async.impl.channels.ManyToManyChannel@7b0ab79"],
 :a
 [nil
  #object[clojure.core.async.impl.channels.ManyToManyChannel 0x292dfa5c "clojure.core.async.impl.channels.ManyToManyChannel@292dfa5c"]]}
{:d-is #object[clojure.core.async.impl.channels.ManyToManyChannel 0x7d67e933 clojure.core.async.impl.channels.ManyToManyChannel@7d67e933]}


nil

# ASYNC DATA STREAMS

The following writes at random times (`>!`) to a parking channel `echo-chan` on an async `go` fast pseudothread. The UI thread block-reads (`<!!`) some data from `echo-chan`. The UI thread leaves values in the channel and thus leaks the channel according to the documentation for `close!` here https://clojure.github.io/core.async/api-index.html#C. To prevent the leak permanently, we close the channel explicitly.

In [36]:
(def echo-chan (chan))

(doseq   [z zs] (go (Thread/sleep (rand 100)) (>! echo-chan z)))
(dotimes [_ 3] (println (<!! echo-chan)))

(println {:echo-chan-closed? (clojure.core.async.impl.protocols/closed? echo-chan)})
(close! echo-chan)
(println {:echo-chan-closed? (clojure.core.async.impl.protocols/closed? echo-chan)})

0.828305
-0.0121089
-0.676357
{:echo-chan-closed? false}
{:echo-chan-closed? true}


nil

We can chain channels, again with leaks that we explicitly close:

In [37]:
(def echo-chan (chan))
(def repl-chan (chan))

(dotimes [_ 10] (go (>! repl-chan (<! echo-chan))))
(doseq   [z zs] (go (Thread/sleep (rand 100)) (>! echo-chan z)))
(dotimes [_ 3] 
    (println (<!! (second (alts!! [repl-chan (timeout 500)])))))

(close! echo-chan)
(close! repl-chan)

-1.48014
-0.0121089
0.828305


nil

Reading from `echo-chan` may hang the UI thread because the UI thread races the internal thread that reads `echo-chan`, but the timeout trick works here as above. 

In [58]:
(def echo-chan (chan))
(def repl-chan (chan))

(dotimes [_ 10] (go (>! repl-chan (<! echo-chan))))
(doseq   [z zs] (go (Thread/sleep (rand 100)) (>! echo-chan z)))
(dotimes [_ 3] (println (<!! (second (alts!! [echo-chan (timeout 500)])))))

(close! echo-chan)
(close! repl-chan)

nil
nil
nil


nil

`println` on a `go` pseudoprocess works if we wait long enough. This, of course, is bad practice or "code smell."

In [59]:
(def echo-chan (chan))

(doseq   [z zs] (go (Thread/sleep (rand 100)) (>! echo-chan z)))
(dotimes [_ 3]  (go (println (<! echo-chan))))

(Thread/sleep 500) ; no visible output if you remove this line.
(close! echo-chan)

-0.315044
-1.48014
-0.324796


nil

## ASYNC RUNNING MEAN

### ASYNC-RANDOMIZED-SCAN

We want `running-stats` called at random times and with data in random order. A _transducer_, `(map mapper)`, lets us collect items off the buffer. The size of the buffer does not matter, but we must specify it. Notice that the side-effector `effector` is passed in, so `async-randomized-scan` remains decoupled from its environment. 

In this style of programming, the asynchronous stream might sometimes be called a _functor_, which is anything that's mappable, anything you can `map` over.

In [32]:
(defn async-randomized-scan [zs mapper effector]
    (let [transducer (map mapper)
          ; give buffer length if there is a transducer
          echo-chan (chan (buffer 1) transducer)]
        (doseq [z zs] 
            (go (Thread/sleep (rand 100)) (>! echo-chan z)))
        (dotimes [_ (count zs)] (effector (<!! echo-chan)))
        (close! echo-chan)))

(async-randomized-scan zs (make-running-stats-mapper) println)

{:count 1, :mean -1.48014, :datum -1.48014}
{:count 2, :mean -0.897592, :datum -0.315044}
{:count 3, :mean -0.8844493333333333, :datum -0.858164}
{:count 4, :mean -0.6225845, :datum 0.16301}
{:count 5, :mean -0.3324066, :datum 0.828305}
{:count 6, :mean -0.3311381666666667, :datum -0.324796}
{:count 7, :mean -0.3804551428571429, :datum -0.676357}
{:count 8, :mean -0.3254951625, :datum 0.0592247}
{:count 9, :mean -0.30917947777777777, :datum -0.178654}
{:count 10, :mean -0.27947242, :datum -0.0121089}


nil

We don't need to explicitly say `buffer`, but I prefer the example above to the one below. The only difference is the nested call to `buffer` above versus the naked buffer length below. I think the explicit call of `buffer` is easier to read because you don't have to remember the meaning of the naked $1$.

In [33]:
(defn async-randomized-scan [zs mapper effector]
    (let [transducer (map mapper)
          ; give buffer length if there is a transducer
          echo-chan (chan 1 transducer)]
        (doseq [z zs] 
            (go (Thread/sleep (rand 100)) (>! echo-chan z)))
        (dotimes [_ (count zs)] (effector (<!! echo-chan)))
        (close! echo-chan)))

(async-randomized-scan zs (make-running-stats-mapper) println)

{:count 1, :mean -0.315044, :datum -0.315044}
{:count 2, :mean -0.4957005, :datum -0.676357}
{:count 3, :mean -0.31072543333333336, :datum 0.0592247}
{:count 4, :mean -0.2360713, :datum -0.0121089}
{:count 5, :mean -0.25381624, :datum -0.324796}
{:count 6, :mean -0.18434520000000004, :datum 0.16301}
{:count 7, :mean -0.03968088571428577, :datum 0.828305}
{:count 8, :mean -0.05705252500000005, :datum -0.178654}
{:count 9, :mean -0.21517335555555558, :datum -1.48014}
{:count 10, :mean -0.27947242000000005, :datum -0.858164}


nil

### NOT RANDOMIZED

Of course, the `mean` of any permutation of the data `zs` is the same, so the order in which data arrive does not change the final result.

In [34]:
(defn async-non-random-scan [zs mapper effector]
    (let [transducer (map mapper)
          echo-chan (chan (buffer 1) transducer)]
        (go (doseq [z zs] (>! echo-chan z)))
        (dotimes [_ (count zs)] (effector (<!! echo-chan)))
        (close! echo-chan)))

(async-non-random-scan zs (make-running-stats-mapper) println)

{:count 1, :mean -0.178654, :datum -0.178654}
{:count 2, :mean 0.3248255, :datum 0.828305}
{:count 3, :mean 0.2362919, :datum 0.0592247}
{:count 4, :mean 0.1741917, :datum -0.0121089}
{:count 5, :mean -0.15667464000000003, :datum -1.48014}
{:count 6, :mean -0.18306953333333337, :datum -0.315044}
{:count 7, :mean -0.20331617142857145, :datum -0.324796}
{:count 8, :mean -0.262446275, :datum -0.676357}
{:count 9, :mean -0.21517335555555556, :datum 0.16301}
{:count 10, :mean -0.27947242, :datum -0.858164}


nil

### SYNC-SCAN WITH TRANSDUCER

Here is the modern way, with `transduce`, to reduce over a sequence of data, in order. It's equivalent to the non-random async version above. The [documentation for transduce](https://clojuredocs.org/clojure.core/transduce) writes its parameters as `xform f coll`, and then says

> reduce with a transformation of `f (xf)`. If `init` is not
supplied, `(f)` will be called to produce it. 

Our `xform` is `transducer`, or `(map mapper)`, and our `f` is `conj`, so this is an idiom for mapping because `(conj)`, with no arguments, returns `[]`, an appropriate `init`.

In [35]:
(defn sync-scan [zs mapper]
    (let [transducer (map mapper)]
        (transduce transducer conj zs)))

(clojure.pprint/pprint (sync-scan zs (make-running-stats-mapper)))

[{:count 1, :mean -0.178654, :datum -0.178654}
 {:count 2, :mean 0.3248255, :datum 0.828305}
 {:count 3, :mean 0.2362919, :datum 0.0592247}
 {:count 4, :mean 0.1741917, :datum -0.0121089}
 {:count 5, :mean -0.15667464000000003, :datum -1.48014}
 {:count 6, :mean -0.18306953333333337, :datum -0.315044}
 {:count 7, :mean -0.20331617142857145, :datum -0.324796}
 {:count 8, :mean -0.262446275, :datum -0.676357}
 {:count 9, :mean -0.21517335555555556, :datum 0.16301}
 {:count 10, :mean -0.27947242, :datum -0.858164}]


nil

We now have complete parity between space (vector `zs`) and time (values on `echo-chan` in random order).

# RUNNING STDDEV

## BRUTE-FORCE (SCALAR VERSION)

The definition of variance is the following, for $N>1$:

$$\frac{1}{N-1}\sum\limits_{i=1}^{N}\left({z_i-\bar{z}_N}\right)^2\tag{1}$$

The sum is the _sum of squared residuals_. Each residual is the difference between the $i$&#8209;th datum $z_i$ and the mean $\bar{z}_N$ of all $N$ data in the sample. The outer constant, $1/(N-1)$ is [Bessel's correction](https://en.wikipedia.org/wiki/Bessel%27s_correction).

### SSR: SUM OF SQUARED RESIDUALS

The following is _brute-force_ in the sense that it requires all data up-front so that it can calculate the mean.

In [67]:
(defn ssr [sequ]
    (let [m (mean sequ)]
        (reduce #(+ %1 (* (- %2 m) (- %2 m)))
                0 sequ)))
(ssr zs)

3.5566483654807355

### VARIANCE

Call `ssr` to compute variance:

In [68]:
(defn variance [sequ]
    (let [n (count sequ)]
        (case n
            0 0
            1 (first sequ)
            #_default (/ (ssr sequ) (- n 1.0)))))
(variance zs)

0.3951831517200817

### SMALLER EXAMPLE

Let's do a smaller example:

In [69]:
(def z2s [55. 89. 144.])
(variance z2s)

2017.0

#### REALLY DUMB RECURRENCE

Remember our general form for recurrences, $x\leftarrow{}x + K\times{}(z-x)$?

This is really dumb because
1. it requires the whole sequence up front, so it doesn't run in constant memory
3. the intermediate values are meaningless because they refer to the final mean and count, not to the intermediate ones

But, the final value is correct.

In [70]:
(reductions 
    (let [m (mean z2s) ; uh-oh, we refer to _all_ the data ??
          c (count z2s)]
        (fn [var z] (+ var (let [r (- z m)] ; residual
                               (/ (* r r) (- c 1.0))))))
    0 z2s)

(0 840.5 865.0 2017.0)

This is sufficiently dumb that we won't bother with a thread-safe, stateful, or asynchronous form.

#### SCHOOL VARIANCE

Easy, school-level exercise: prove the following equation:

$$\frac{1}{N-1}\sum\limits_{i=1}^{N}\left({z_i-\bar{z}_N}\right)^2 =
\frac{1}{N-1}\left(\sum\limits_{i=1}^{N}\left(z_i^2\right)-N\,{\bar{z}_N^2}\right)\tag{2}$$

Instead of the sum of squared residuals, $ssr$, accumulate the sum of squares, $ssq$. 

_School variance_ is exposed to _catastrophic cancellation_ because $ssq$ grows quickly. We fix that defect below. 

We see that something is not best with this form because we don't use the old variance to compute the new variance. We do better below.

Of course, the same mapper works synchronously and asynchronously.

### MAKE-SCHOOL-STATS-MAPPER

Test it both synchronously and asynchronously, randomized and not:

In [76]:
(defn make-school-stats-mapper []
    (let [running-stats (atom {:count 0, :mean 0, 
                               :variance 0, :ssq 0})]
        (fn [z]
            (let [{x :mean, n :count, s :ssq} @running-stats
                  n+1 (inc n)
                  K   (/ 1.0 n+1)
                  r   (- z x)
                  x'  (+ x (* K r)) ;; Isn't it nice we can use prime notation?
                  s'  (+ s (* z z))]
                (swap! running-stats conj
                       [:count    n+1]
                       [:mean     x' ]
                       [:ssq      s']
                       [:variance (/ (- s' (* n+1 x' x')) (max 1 n))]))
            @running-stats)))

(clojure.pprint/pprint (sync-scan z2s (make-school-stats-mapper)))

(async-randomized-scan z2s (make-school-stats-mapper) println)

(async-non-random-scan z2s (make-school-stats-mapper) println)

[{:count 1, :mean 55.0, :variance 0.0, :ssq 3025.0}
 {:count 2, :mean 72.0, :variance 578.0, :ssq 10946.0}
 {:count 3, :mean 96.0, :variance 2017.0, :ssq 31682.0}]
{:count 1, :mean 89.0, :variance 0.0, :ssq 7921.0}
{:count 2, :mean 72.0, :variance 578.0, :ssq 10946.0}
{:count 3, :mean 96.0, :variance 2017.0, :ssq 31682.0}
{:count 1, :mean 55.0, :variance 0.0, :ssq 3025.0}
{:count 2, :mean 72.0, :variance 578.0, :ssq 10946.0}
{:count 3, :mean 96.0, :variance 2017.0, :ssq 31682.0}


nil

#### RECURRENT VARIANCE

We already know the recurrence for the mean:

$$x\leftarrow{x+K\cdot(z-x)=x+\frac{1}{n+1}(z-x)}\tag{3}$$

The recurrence for the variance takes a little work to prove, but it's still a school-level exercise:

$$v\leftarrow\frac{\left(n-1\right)v+K\,n\,\left(z-x\right)^2}{\max(1,n)}\tag{4}$$

In [77]:
(defn make-recurrent-stats-mapper []
    (let [running-stats (atom {:count 0, :mean 0, 
                               :variance 0, :ssq 0})]
        (fn [z]
            (let [{x :mean, n :count, v :variance} @running-stats
                  n+1 (inc n)
                  K   (/ 1.0 (inc n))
                  r   (- z x)
                  x'  (+ x (* K r))
                  ssr (+ (* (- n 1) v) ; old ssr is (* (- n 1) v)
                         (* K n r r))]
                (swap! running-stats conj
                       [:count    n+1]
                       [:mean     x' ]
                       [:variance (/ ssr  (max 1 n))]))
            @running-stats)))
(async-non-random-scan z2s (make-recurrent-stats-mapper) println)

{:count 1, :mean 55.0, :variance 0.0, :ssq 0}
{:count 2, :mean 72.0, :variance 578.0, :ssq 0}
{:count 3, :mean 96.0, :variance 2017.0, :ssq 0}


nil

#### WELFORD'S VARIANCE

The above is equivalent, algebraically and numerically, to Welford's famous recurrence for the sum of squared residuals $S$. In recurrences, we want everything on the right-hand sides of equations or left arrows to be be old, _prior_ statistics, except for the new observation / measurement / input $z$. Welford's requires the new, _posterior_ mean on the right-hand side, so it's not as elegant as our recurrence above. However, it is easier to remember!

$$S\leftarrow{S} + \left(z-x_N\right)\left(z-x_{N+1}\right)=S+\left(z-x\right)\left(z-\left(x+K\,\left(z-x\right)\right)\right)\tag{5}$$

In [78]:
(defn make-welfords-stats-mapper []
    (let [running-stats (atom {:count 0, :mean 0, :variance 0})]
        (fn [z]
            (let [{x :mean, n :count, v :variance} @running-stats
                  n+1 (inc n)
                  K   (/ 1.0 n+1)
                  r   (- z x)
                  x'  (+ x (* K r))
                  ssr (+ (* (- n 1) v) 
                         (* (- z x) (- z x')))] ; <-- only difference to recurrent variance
                (swap! running-stats conj
                       [:count    n+1]
                       [:mean     x' ]
                       [:variance (/ ssr  (max 1 n))]))
            @running-stats)))
(async-non-random-scan z2s (make-welfords-stats-mapper) println)

{:count 1, :mean 55.0, :variance 0.0}
{:count 2, :mean 72.0, :variance 578.0}
{:count 3, :mean 96.0, :variance 2017.0}


nil

# WINDOWED STATISTICS

Suppose we want running statistics over a history of fixed, finite length. For example, suppose we have $N=10$ data and we want the statitics in a window of length $w=3$ behind the current value, inclusively. When the first datum arrives, the window and the total include one datum. The window overhangs the left until the third datum. When the fourth datum arrives, the window contains three data and the total contains four data. After the tenth datum, we may consider three more steps marching the window "off the cliff" to the right. The following figure illustrates (the first row corresponds to $n=0$, not to $n=1$):

<img src="sliding-window.png" style="width: 200px;"/>

We won't derive the following formulas, but rather say that they have been vetted at least twice independently (in a C program and in a Mathematica program). The following table shows a unit test that we reproduce. The notation is explained after the table.

<img src="sliding-window-unit-test-001.png" style="width: 800px;"/>

Denote prior statistics by plain variables like $m$ and corresponding posteriors by the same variables with primes like $m'$. The posteriors $j$ and $u$ do not have a prime. 

variable | description 
---------|:-------------
$n$      | prior count of data points; equals $0$ when considering the first point
$z$      | current data point
$w$      | fixed, constant, maximum width of window; $w\geq{1}$
$j$      | posterior number of points left of the window; $j\geq{0}$
$u$      | posterior number of points including $z$ in the running window; $1\leq{u}\leq{w}$
$m$      | prior mean of all points, not including $z$
$m'$     | posterior mean of all points including $z$
$m_j$    | prior mean of points left of the window, lagging $w$ behind $m$
$m'_j$   | posterior mean of points left of the window
$m'_w$   | posterior mean of points in the window, including the current point $z$
$v$      | prior variance, not including $z$
$v'$     | posterior variance of all points including $z$
$v_j$    | prior variance of points left of the window, lagging $w$ behind $u_n$
$v'_j$   | posterior variance of points left of the window
$v'_w$   | posterior variance of points within the window

The recurrences for $m$, $v$, $m_j$, and $v_j$ have only priors (no primes) on their right-hand sides. The values of $m_w$ and $v_w$ are not recurrences because the non-primed versions do not appear on the right-hand sides of equations 10 and 13. Those equations are simply transformations of the posteriors (values with primes) $m'$, $m'_j$, $v'$, and $v'_j$.

$$\begin{align}
j     &= \max(0,n+1-w)               \tag{6}\\
u     &= n-j+1                       \tag{7}\\
m'    &= m+\frac{z-m}{n+1}           \tag{8}\\
m'_j  &= \begin{cases} 
  m_j+\frac{z_j-m_j}{j} & j>0 \\
  0 & \mathrm{otherwise}
\end{cases}                            \tag{9}\\
m'_w  &= \frac{(n+1)\,m'-j\,m'_j}{u} \tag{10}\\
v'    &= \frac{(n-1)\,v+\frac{n}{n+1}\left(z-m\right)^2}{\max(1,n)}               \tag{11}\\
v'_j  &= \begin{cases}
  \frac{j-2}{j-1}\,v_j+\frac{1}{j}\,\left(z_j-m_j\right)^2 & j>1 \\
  0 & \mathrm{otherwise}
\end{cases}                                                                         \tag{12}\\
v'_w  &= \frac{n\,v'+(n-w)\,v'_j+(n+1)\,{m'}^2-j\,{m'_j}^2-u\,{m'_w}^2}{\max(1,u-1)}    \tag{13}
\end{align}$$

Here is sample data we can compare with the unit test above.

In [79]:
(def z3s [0.857454, 0.312454, 0.705325, 0.839363, 1.63781, 0.699257, -0.340016, -0.213596, -0.0418609, 0.054705])

#'user/z3s

The best algorithm we have found for tracking historical data is to keep a FIFO queue in a Clojure _vector_ of length $w$. This is still constant memory because it depends only on the length $w$ of the window, not on the length of the data stream.

In [80]:
(defn push-to-back [item vek]
    (conj (vec (drop 1 vek)) item))

#'user/push-to-back

In [81]:
(defn make-sliding-stats-mapper [w]
    (let [running-stats (atom {:n 0, :m 0, :v 0,
                               :win (vec (repeat w 0)),
                               :mw 0, :vw 0,
                               :mj 0, :vj 0})]
        (fn [z]
            (let [{:keys [m n v win mj vj]} @running-stats
                  zj   (first win)
                  win' (push-to-back z win)
                  n+1  (double (inc n))
                  n-1  (double (dec n))
                  K    (/ 1.0 n+1)
                  Kv   (* n K)
                  r    (- z m)
                  j    (max 0, (- n+1 w))
                  u    (- n+1 j)
                  m'   (+ m (* K r))
                  rj   (- zj mj)
                  mj'  (if (> j 0), (+ mj (/ rj j)), 0)
                  mw'  (/ (- (* n+1 m') (* j mj')) u)
                  v'   (/  (+ (* n-1 v) (* Kv r r))
                           (max 1 n))
                  vj'  (if (> j 1)
                           (let [j21 (/ (- j 2.0) 
                                        (- j 1.0))]
                               (+ (* j21 vj) 
                                  (/ (* rj rj) j)))
                           0)
                  vw'  (let [t1 (- (* n v')
                                   (* (- n w) vj'))
                             t2 (- (* n+1 m' m')
                                   (* j mj' mj'))
                             t3 (- (* u mw' mw'))]
                           (/  (+ t1 t2 t3)
                               (max 1 (- u 1))))
                  ]
                (swap! running-stats conj
                       [:n    n+1 ]
                       [:m    m'  ]
                       [:v    v'  ]
                       [:mj   mj' ]
                       [:vj   vj' ]
                       [:mw   mw' ]
                       [:vw   vw' ]
                       [:win  win']))
            @running-stats)))

(clojure.pprint/print-table 
    [:n :mw :vw]
    (sync-scan z3s (make-sliding-stats-mapper 3)))


|   :n |                  :mw |                  :vw |
|------+----------------------+----------------------|
|  1.0 |             0.857454 |                  0.0 |
|  2.0 |             0.584954 |  0.14851250000000005 |
|  3.0 |   0.6250776666666666 |  0.07908597588033339 |
|  4.0 |   0.6190473333333332 |  0.07499115039433346 |
|  5.0 |   1.0608326666666668 |   0.2541686787463333 |
|  6.0 |              1.05881 |  0.25633817280899995 |
|  7.0 |   0.6656836666666668 |   0.9787942981023336 |
|  8.0 |  0.04854833333333334 |   0.3215618307563336 |
|  9.0 | -0.19849096666666663 | 0.022395237438003604 |
| 10.0 | -0.06691730000000007 |  0.01846722403596973 |


nil

This passes the unit test.

# KALMAN FILTER

## BASIC LINEAR ALGEBRA

Go for high performance with CUDA or Intel KML later.

Following the [getting-started guide here](https://github.com/mikera/core.matrix/wiki/Getting-Started-Guide), add the following lines to `project.clj` of `clojupyter`:

Recompile `clojupyter` (`make` and `make install` in its directory) and restart the kernel in this notebook (`Kernel` menu above).

Smoke test:

In [40]:
(require '[clojure.core.matrix :as ccm])
(ccm/set-current-implementation :vectorz)

:vectorz

In [41]:
(ccm/shape
    (ccm/array [[1 2 3]
                [1 3 8]
                [2 7 4]]))

[3 3]

Bits and pieces we will need:

In [42]:
(ccm/transpose
    (ccm/array [[1 2 3]
                [1 3 8]
                [2 7 4]]))

#vectorz/matrix [[1.0,1.0,2.0],
[2.0,3.0,7.0],
[3.0,8.0,4.0]]

`mmul` is multiadic (takes more than two arguments). This is possible because matrix multiplication is associative.

In [43]:
(let [A (ccm/array [[1 2 3]
                    [1 3 8]
                    [2 7 4]])]
    (ccm/mmul (ccm/transpose A) A (ccm/inverse A)))

#vectorz/matrix [[1.000000000000003,1.0,2.0000000000000004],
[2.0000000000000093,3.000000000000001,6.999999999999998],
[3.000000000000006,8.0,3.999999999999999]]

### LINSPACE

In [44]:
(defn linspace
  "A sequence of $n$ equally spaced points in the doubly closed
 interval $[a,b]$, that is, inclusive of both ends."
  [a b n]
  (let [d (/ (- b a) (dec n))]
    (map (fn [x] (+ a (* x d))) (range n))))

#'user/linspace

In [45]:
(clojure.pprint/pprint (linspace 2 3. 3))

(2.0 2.5 3.0)


nil

### SYMMETRIC AND ANTISYMMETRIC PARTS

Symmetric part:

In [46]:
(defn symmetric-part [M]
    (ccm/div (ccm/add M (ccm/transpose M)) 2.0))
(symmetric-part [[1 2 3]
                 [1 3 8]
                 [2 7 4]])

[[1.0 1.5 2.5] [1.5 3.0 7.5] [2.5 7.5 4.0]]

Anti-symmetric part:

In [47]:
(defn anti-symmetric-part [M]
    (ccm/div (ccm/sub M (ccm/transpose M)) 2.0))
(anti-symmetric-part [[1 2 3]
                      [1 3 8]
                      [2 7 4]])

[[0.0 0.5 0.5] [-0.5 0.0 0.5] [-0.5 -0.5 0.0]]

In [48]:
(let [M [[1 2 3]
         [1 3 8]
         [2 7 4]]]
    (ccm/sub (ccm/add (symmetric-part M)
                (anti-symmetric-part M))
             M))

[[0.0 0.0 0.0] [0.0 0.0 0.0] [0.0 0.0 0.0]]

### NEAR-EQUALITY FOR MATRICES

In [49]:
(require '[clojure.algo.generic.math-functions :as gmf])

nil

This isn't the best solution: neither relative nor absolute differences are robust. Units in last place is a better criterion, however, this will unblock us for now.

In [51]:
(defn matrix-almost= 
    ([m1 m2 eps]
     "Checks for near equality against a given absolute difference."
    (mapv (fn [row1 row2]
              (mapv (fn [e1 e2] (gmf/approx= e1 e2 eps))
                    row1 row2))
          m1 m2))
    ([m1 m2] 
     "Checks for near equality against a default absolute difference of 1.0e-9"
     (matrix-almost= m1 m2 1.0e-9)))

(let [M [[1 2 3]
         [1 3 8]
         [2 7 4]]]
    (matrix-almost= (ccm/add (symmetric-part M)
                             (anti-symmetric-part M))
                    M))

[[true true true] [true true true] [true true true]]

### SIMILARITY TRANSFORM

In [52]:
(defn similarity-transform [A M]
    (ccm/mmul A M (ccm/transpose A)))

#'user/similarity-transform

### VECTORS, ROW VECTORS, COLUMN VECTORS

The library (like many others) is loose about matrices times vectors.

In [53]:
(ccm/mmul
    (ccm/matrix [[1 2 3]
                 [1 3 8]
                 [2 7 4]])
    (ccm/array [22 23 42]))

#vectorz/vector [194.0,427.0,373.0]

Pedantically, a matrix should only be allowed to left-multiply a column vector, i.e., a $1\times{3}$ matrix. The Clojure library handles this case.

In [54]:
(ccm/mmul
    (ccm/matrix [[1 2 3]
                 [1 3 8]
                 [2 7 4]])
    (ccm/array [[22] [23] [42]]))

#vectorz/matrix [[194.0],
[427.0],
[373.0]]

Non-pedantic multiplication of a vector on the right by a matrix:

In [55]:
(ccm/mmul
    (ccm/array [22 23 42])
    (ccm/matrix [[1 2 3]
                 [1 3 8]
                 [2 7 4]]))

#vectorz/vector [129.0,407.0,418.0]

Pedantic multiplication of a row vector on the right by a matrix:

In [56]:
(ccm/mmul
    (ccm/array [[22 23 42]])
    (ccm/matrix [[1 2 3]
                 [1 3 8]
                 [2 7 4]]))

#vectorz/matrix [[129.0,407.0,418.0]]

### SOLVING INSTEAD OF INVERTING

Textbooks will tell you that, if you have $\boldsymbol{A}\boldsymbol{x}=\boldsymbol{b}$ and you want $\boldsymbol{x}$, you should compute $\boldsymbol{A}^{-1}\boldsymbol{b}$. Don't do this; the inverse is numerically risky and almost never needed:

In [57]:
(ccm/mmul
    (ccm/inverse
        (ccm/array [[1 2 3]
                    [1 3 8]
                    [2 7 4]]))
    (ccm/array [22 23 42]))

#vectorz/vector [22.05882352941177,-0.4705882352941142,0.2941176470588234]

Instead, use a linear solver. Almost everywhere that you see $\boldsymbol{A}^{-1}\boldsymbol{b}$, visualize $\text{solve}(\boldsymbol{A},\boldsymbol{b})$. You will get a more stable answer. Notice the difference in the low-significance digits below. The following is a more reliable answer:

In [58]:
(require '[clojure.core.matrix.linear :as ccml])

nil

In [59]:
(ccml/solve 
    (ccm/array [[1 2 3]
                [1 3 8]
                [2 7 4]])
    (ccm/array [22 23 42]))

#vectorz/vector [22.058823529411764,-0.4705882352941176,0.2941176470588236]

In [60]:
(ccml/solve 
    (ccm/matrix [[1 2 3]
                [1 3 8]
                [2 7 4]])
    (ccm/matrix [22 23 42]))

#vectorz/vector [22.058823529411764,-0.4705882352941176,0.2941176470588236]

In [61]:
(ccm/shape (ccm/matrix [[22] [23] [42]]))

[3 1]

We need `solve` to work on matrices:

In [62]:
(defn solve-matrix
  "The 'solve' routine in clojure.core.matrix only works on Matrix times Vector.
  We need it to work on Matrix times Matrix. The equation to solve is

  Ann * Xnm = Bnm

  Think of the right-hand side matrix Bnm as a sequence of columns. Iterate over
  its transpose, treating each column as a row, then converting that row to a
  vector, to get the transpose of the solution X."
  [Ann Bnm]
  (ccm/transpose (mapv (partial ccml/solve Ann) (ccm/transpose Bnm))))

#'user/solve-matrix

In [63]:
(solve-matrix 
    (ccm/matrix [[1 2 3]
                [1 3 8]
                [2 7 4]])
    (ccm/matrix [[22] [23] [42]]
               ))

[[22.058823529411764] [-0.4705882352941176] [0.2941176470588236]]

In [64]:
(solve-matrix 
    (ccm/matrix [[1 2 3]
                [1 3 8]
                [2 7 4]])
    (ccm/matrix [[22 44] 
                 [23 46] 
                 [42 84]]))

[[22.058823529411764 44.11764705882353] [-0.4705882352941176 -0.9411764705882352] [0.2941176470588236 0.5882352941176472]]

## GENERAL KALMAN FILTER

Use Clojure's destructuring to write the Kalman filter as a binary function. See http://vixra.org/abs/1606.0348

`xn1` denotes a vector $\boldsymbol{x}$ with dimension $n\times{1}$, that is, a column vector of height $n$. `Pnn` denotes a covariance matrix of dimension $n\times{n}$, and So on.

The math is as follows (notice step 6 has the same form as all our prior statistics calculations):

Letting inputs:

* $\boldsymbol{x}_{n,1}$ be the current, best estimate of the $n$-dimensional state of a system
* $\boldsymbol{P}_{n,n}$ be the current, best estimate of the $n\times{n}$ covariance of state $\boldsymbol{x}_{n,1}$
* $\boldsymbol{z}_{m,1}$ be the current, $m$-dimensional observation
* $\boldsymbol{H}_{m,n}$ be linearized observation model to be inverted:  $\boldsymbol{z}_{m,1}=\boldsymbol{H}_{m,n}\cdot\boldsymbol{x}_{n,1}$
* $\boldsymbol{A}_{n,n}$ be linearized dynamics
* $\boldsymbol{Q}_{n,n}$ be process noise (covariance) accounting for uncertainty in $\boldsymbol{A}_{n,n}$
* $\boldsymbol{R}_{m,m}$ be observation noise (covariance) accounting for uncertainty in $\boldsymbol{z}_{m,1}$

and intermediates and outputs: 

* $\boldsymbol{x}'_{n,1}$ (intermediate; _update_) be the estimate of the state after enduring one time step of linearized dynamics
* $\boldsymbol{x}''_{n,1}$ (output; _prediction_) be the estimate of the state after dynamics and after information from the observation $\boldsymbol{z}_{m,1}$ 
* $\boldsymbol{P}'_{n,n}$ (intermediate; _update_) be the current, best estimate of the $n\times{n}$ covariance of state $\boldsymbol{x}_{n,1}$ after dynamics
* $\boldsymbol{P}''_{n,n}$ (output; _prediction_) be the current, best estimate of the $n\times{n}$ covariance of state $\boldsymbol{x}_{n,1}$ after dynamics and oservation $\boldsymbol{z}_{m,1}$
 
The steps are:

1. _Update state estimate_: $\boldsymbol{x}'_{n,1} = \boldsymbol{A}_{n,n}\;\boldsymbol{x}_{n,1}$
2. _Update state covariance_: $\boldsymbol{P}'_{n,n} = \boldsymbol{Q}_{n,n} + \left(\boldsymbol{A}_{n,n}\;\boldsymbol{P}_{n,n}\;\boldsymbol{A}_{n,n}^\intercal\right)$
3. _Covariance-update scaling matrix_: $\boldsymbol{D}_{m,m} = \boldsymbol{R}_{m,m} + \left(\boldsymbol{H}_{m,n}\;\boldsymbol{P}'_{n,n}\;\boldsymbol{H}_{m,n}^\intercal\right)$
4. _Kalman gain_: $\boldsymbol{K}_{n,m}=\boldsymbol{P}_{n,n}\;\boldsymbol{H}_{m,n}^\intercal\;\boldsymbol{D}_{m,m}^{-1}$
    1. written as $\boldsymbol{K}_{n,m}^\intercal=\text{solve}\left(\boldsymbol{D}_{m,m}^{\intercal},\boldsymbol{H}_{m,n}\;\boldsymbol{P}_{n,n}^\intercal\right)$
5. _Innovation: predicted observation residual_: $\boldsymbol{r}_{m,1}=\boldsymbol{z}_{m,1} - \boldsymbol{H}_{m,n}\;\boldsymbol{x}'_{n,1}$
6. _State prediction_: $\boldsymbol{x}''_{n,1} = \boldsymbol{x}'_{n,1} + \boldsymbol{K}_{n,m}\;\boldsymbol{r}_{m,1}$
7. _Covariance reduction matrix_: $\boldsymbol{L}_{n,n}=\boldsymbol{I}_{n,n} - \boldsymbol{K}_{n,m}\;\boldsymbol{H}_{m,n}$
8. _Covariance prediction_: $\boldsymbol{P}''_{n,n}=\boldsymbol{L}_{n,n}\;\boldsymbol{P}'_{n,n}$

In [78]:
(defn kalman-update [{:keys [xn1 Pnn]} {:keys [zm1 Hmn Ann Qnn Rmm]}]
  (let [x'n1   (ccm/mmul Ann xn1)                    ; Predict state
        P'nn   (ccm/add
                Qnn (similarity-transform Ann Pnn))  ; Predict covariance
        Dmm    (ccm/add
                Rmm (similarity-transform Hmn P'nn)) ; Gain precursor
        DTmm   (ccm/transpose Dmm)                   ; Support for "solve"
        HP'Tmn (ccm/mmul Hmn (ccm/transpose P'nn))   ; Support for "solve"
        KTmn   (solve-matrix DTmm HP'Tmn)            ; Eqn 3 of http://vixra.org/abs/1606.0328
        Knm    (ccm/transpose KTmn)                  ; Kalman gain
        rm1    (ccm/sub zm1  (ccm/mmul Hmn x'n1))    ; innovation = predicted obn residual
        x''n1  (ccm/add x'n1 (ccm/mmul Knm rm1))     ; final corrected estimate
        n      (ccm/dimension-count xn1 0)
        Lnn    (ccm/sub (ccm/identity-matrix n)      ; Support for new covariance ...
                        (ccm/mmul Knm Hmn))          ; ...  ? catastrophic cancellation ?
        P''nn  (ccm/mmul Lnn P'nn)]                  ; New covariance

      {:xn1 x''n1, :Pnn P''nn}))

#'user/kalman-update

### UNIT TEST

Let the measurement model be a cubic:

In [66]:
(defn Hmn-t [t]
  (ccm/matrix [[(* t t t) (* t t) t 1]]))

#'user/Hmn-t

Ground truth state, constant with time in this unit test:

In [67]:
(def true-x
    (ccm/array [-5 -4 9 -3]))

#'user/true-x

In [68]:
(require '[clojure.core.matrix.random :as ccmr])

nil

In [69]:
(defn fake [n]
  (let [times   (range -2.0 2.0 (/ 2.0 n))
        Hmns    (mapv Hmn-t times)
        true-zs (mapv #(ccm/mmul % true-x) Hmns)
        zm1s    (mapv #(ccm/add
                        % (ccm/array
                           [[(ccmr/rand-gaussian)]]))
                      true-zs)]
    {:times times, :Hmns Hmns, :true-zs true-zs, :zm1s zm1s}))

#'user/fake

In [70]:
(def test-data (fake 7))

#'user/test-data

A state cluster is a vector of $\boldsymbol{x}$ and $\boldsymbol{P}$:

In [71]:
(def state-cluster-prior
  {:xn1 (ccm/array [[0.0] [0.0] [0.0] [0.0]])
   :Pnn (ccm/mul 1000.0 (ccm/identity-matrix 4))})

#'user/state-cluster-prior

An obn-cluster is a vector of $\boldsymbol{z}$, $\boldsymbol{H}$, $\boldsymbol{A}$, $\boldsymbol{Q}$, and $\boldsymbol{R}$. _Obn_ is short for _observation_.

In [72]:
(def obn-clusters
  (let [c (count (:times test-data))]
    (mapv (fn [zm1 Hmn Ann Qnn Rmm]
            {:zm1 zm1, :Hmn Hmn, :Ann Ann, :Qnn Qnn, :Rmm Rmm})
          (:zm1s test-data)
          (:Hmns test-data)
          (repeat c (ccm/identity-matrix 4))
          (repeat c (ccm/zero-matrix 4 4))
          (repeat c (ccm/identity-matrix 1))
          )))

#'user/obn-clusters

In [80]:
(clojure.pprint/pprint (reduce kalman-update state-cluster-prior obn-clusters))

{:xn1 #vectorz/matrix [[-5.0620526513628885],
[-3.9911699170558035],
[9.218349402929167],
[-2.719776178504107]],
 :Pnn
 #vectorz/matrix [[0.03208215055213958,-5.478256737134757E-15,-0.0874691388122202,-8.770761894538737E-15],
[-2.3568386825489895E-15,0.03637145347999561,-5.2632377622874316E-14,-0.05541947257604415],
[-0.08746913881223455,-2.570860191397628E-14,0.2822249372573019,-1.1334683192032458E-14],
[4.6455894686658894E-15,-0.05541947257607027,-6.734196533741965E-15,0.15110531309503664]]}


nil

Notice how close it is to the ground truth, $[-5, -4, 9, -3]$ for $\boldsymbol{x}$. A chi-squared test would be appropriate to complete the verification (TODO).

### MAKE-KALMAN-MAPPER

Just as we did before, we can convert a _foldable_ into a _mappable_ transducer and bang on an asynchronous stream of data. This only needs error handling to be deployable at scale. Not to minimize error handling: it's a big but separable engineering task.

In [86]:
(defn make-kalman-mapper [{:keys [xn1 Pnn]}]
    ;; let-over-lambda (LOL); here are the Bayesian priors
    (let [estimate-and-covariance (atom {:xn1 xn1, ;; prior-estimate
                                         :Pnn Pnn, ;; prior-covariance
                                         })]
        ;; here is the mapper (mappable)
        (fn [{:keys [zm1 Hmn Ann Qnn Rmm]}]
            (let [{xn1 :xn1, Pnn :Pnn} @estimate-and-covariance]
                (let [ ;; out-dented so we don't go crazy reading it
    x'n1   (ccm/mmul Ann xn1)                    ; Predict state
    P'nn   (ccm/add
            Qnn (similarity-transform Ann Pnn))  ; Predict covariance
    Dmm    (ccm/add
            Rmm (similarity-transform Hmn P'nn)) ; Gain precursor
    DTmm   (ccm/transpose Dmm)                   ; Support for "solve"
    HP'Tmn (ccm/mmul Hmn (ccm/transpose P'nn))   ; Support for "solve"
    KTmn   (solve-matrix DTmm HP'Tmn)            ; Eqn 3 of http://vixra.org/abs/1606.0328
    Knm    (ccm/transpose KTmn)                  ; Kalman gain
    rm1    (ccm/sub zm1  (ccm/mmul Hmn x'n1))    ; innovation = predicted obn residual
    x''n1  (ccm/add x'n1 (ccm/mmul Knm rm1))     ; final corrected estimate
    n      (ccm/dimension-count xn1 0)
    Lnn    (ccm/sub (ccm/identity-matrix n)      ; Support for new covariance ...
                    (ccm/mmul Knm Hmn))          ; ...  ? catastrophic cancellation ?
    P''nn  (ccm/mmul Lnn P'nn)]
                    (swap! estimate-and-covariance conj
                           [:xn1 x''n1]
                           [:Pnn P''nn])   )   )
            @estimate-and-covariance)   ))

#_(clojure.pprint/pprint (last 
                           (map (make-kalman-mapper state-cluster-prior) 
                           obn-clusters)))

(async-randomized-scan obn-clusters
                       (make-kalman-mapper state-cluster-prior)
                       clojure.pprint/pprint)

{:xn1 #vectorz/matrix [[2.3125808246442032],
[-2.6980109620849024],
[3.147679455765718],
[-3.6722926983933357]],
 :Pnn
 #vectorz/matrix [[851.5854216510306,173.15034140713095,-202.00873164165267,235.676853581928],
[173.15034140713095,797.9912683583474,235.676853581928,-274.9563291789159],
[-202.0087316416527,235.67685358192801,725.0436708210841,320.7823840420684],
[235.67685358192801,-274.9563291789159,320.7823840420684,625.7538852842537]]}
{:xn1 #vectorz/matrix [[-0.29733492697949826],
[-1.6174055436869548],
[3.605732389652434],
[-5.710602136359952]],
 :Pnn
 #vectorz/matrix [[302.5743366767256,400.4620286961495,-105.6546106121292,-193.09349096088863],
[400.46202869614933,703.8754943925106,195.78254465463857,-97.42892919557505],
[-105.65461061212935,195.7825446546386,708.1330529966615,396.0336691302056],
[-193.09349096088835,-97.42892919557511,396.03366913020557,290.8899574129797]]}
{:xn1 #vectorz/matrix [[-7.258239261571873],
[-9.24487380036868],
[8.893682213600231],
[-0.0250986607205

nil

### PYTHON BASELINE

The above is a transcription of the following Python code, developed in another notebook and for cross-validation of the Clojure code above.

## INS / BARO FUSION EXAMPLE

### STATE $\boldsymbol{x}$

Altitude $h$, vertical velocity $\dot{h}$, and barometer bias:

$$\bar{\boldsymbol{x}}=[h,\, \dot{h},\, \mathrm{bias_{baro}}]^\intercal\tag{8.2.1}$$

### PROCESS DYNAMICS $\boldsymbol{A}$, $\bar{\boldsymbol{w}}$

$$\bar{\boldsymbol{x}}_{k+1} = \boldsymbol{A}\, \bar{\boldsymbol{x}}_k + \bar{\boldsymbol{w}}\tag{8.2.2}$$

where
$\boldsymbol{A}
=\begin{bmatrix}
    1 & dt & 0 \\
    0 & 1 & 0  \\
    0 & 0 & 1  \\
\end{bmatrix}$, $\boldsymbol{\bar{w}}$ is drawn from a joint normal distribution of mean 0, covariance $\boldsymbol{Q}$, or, in notation, $\boldsymbol{\bar{w}}\sim\mathcal{N}(\boldsymbol{0},\boldsymbol{Q})$

In [None]:
(defn A [dt] (ccm/array [[1 dt 0]
                         [0  1 0]
                         [0  0 1]]))

In [None]:
(ccmr/sample-normal 3)

### MEASUREMENTS $\bar{\boldsymbol{z}}$

a.k.a. OBSERVATIONS, OBNS

1. $h_{\mathrm{imu}}$: Altitude from the fused IMU/GNSS
2. $\dot{h}_{\mathrm{imu}}$: vertical velocity from the IMU/GNSS
3. $h_{\mathrm{baro}}$: altitude from the barometer
4. a dummy observation, the average of the two height measurements, to artificially make the dimensions of $\boldsymbol{H}$, namely $3\times{4}$ different from the dimensions of $\boldsymbol{A}$, namely $3\times{3}$, for the purpose of validating computational linear algebra code.

$$\bar{\boldsymbol{z}}=[h_{\mathrm{imu}},\,\dot{h}_{\mathrm{imu}},\, h_{\mathrm{baro}},\, \frac{h_{\mathrm{imu}}+h_{\mathrm{baro}}}{2}]^\intercal\tag{8.2.3}$$

### MEASUREMENT PARTIALS $\boldsymbol{H}$: $\partial\kern{.125em}{\boldsymbol{z}}\kern{.125em}/\kern{.125em}\partial\kern{.125em}{\boldsymbol{x}}$

Measurements $\bar{\boldsymbol{z}}$ are related to the state vector $\bar{\boldsymbol{x}}$ by the following linear model: 
1. The measured imu height $h_{\mathrm{imu}}$ is equal to the height $h$ from the state.
2. The measured vertical speed from the imu $\dot{h}_{\mathrm{imu}}$ is equal to $\dot{h}$ from the state. 
3. The measured baro height $h_{\mathrm{baro}}$ is equal to the height $h$ from the state plus the baro bias from the state.
4. The measured dummy value is equal to the height from the state plus half the baro bias from the state.

$\boldsymbol{\bar{\nu}}\sim\mathcal{N}(\boldsymbol{0},\boldsymbol{R})$ noise is added to the measurements.

$$\bar{\boldsymbol{z}}_k = \boldsymbol{H}\, \bar{\boldsymbol{x}}_k+\bar{\boldsymbol{\nu}}\tag{8.2.4}$$

where $\boldsymbol{H}=
\begin{bmatrix}
     1  &  0  &  0 \\
     0  &  1  &  0 \\
     1  &  0  &  1 \\
     1  &  0  & 1/2
\end{bmatrix}$

In [None]:
(def H (ccm/array [[1 0 0]
                   [0 1 0]
                   [1 0 1]
                   [1 0 0.5]]))
H

This sets up the whole problem. We modify $\boldsymbol{Q}$ and $\boldsymbol{R}$ to "tune" the Kalman filter. 

### GROUND TRUTH

In [None]:
(defmacro hashup [& vars]
  (list 'zipmap
    (mapv keyword vars)
    (vec vars)))

In [None]:
(def ground-truth
    (let [n              201
          t1             2.0
          times          (vec (linspace 0.0, t1, n))
          wavelength     1
          wavespeed      1
          f              (* (/ wavespeed wavelength) 2 Math/PI)
          dt             (- (get times 1) (get times 0))
          amplitude      5.0
          ground-level   10.0
          h-true         (mapv #(+ ground-level 
                                   (* amplitude (Math/sin (* f %))))
                               times)
          h-dot-true     (mapv #(* amplitude f (Math/cos (* f %)))
                               times)
          baro-bias-true -3.777]
        (hashup n times f dt h-true h-dot-true baro-bias-true)))

Here is the old Python code:

Add `[quil "2.6.0"]` to `clojupyter` and borrow plotting code from [VDQuil](https://github.com/daveliepmann/vdquil).

### SIMULATION

Significant process noise must be added to the velocity measure. The filter cannot track velocity without it because there are no direct measurements of velocity: it's inferred from the height measurements. Try setting the second element of $\boldsymbol{Q}$ to $0$ or $1$ and watch the bad results. 

#### TODO

This effect needs a better explanation.

## VALIDATE C++ CODE

The files that we read in and plot here were created from C++ code.

### DIFFERENCES BETWEEN PYTHON AND C++: TODO

I see differences in the fifth decimal place between Python and C++. This should not be. Must investigate.

There is no difference between Python and C++ in the true baro bias because they're both constants, namely $3.777\,\mathrm{m}$.

### NON-CLOSURE-STYLE KALMAN

### RESIDUALS

The following groups of plots show identical results from four styles of Kalman filter. There is **no significant speed difference** amongst the four styles, each taking about $39\pm{3}\,\mathrm{ms}$ to process 200 samples.

1. **non-closure style**: All matrices are passed in the argument list of the Kalman filter. This is the most general form, but is potentially not as memory-efficient as "closing" over matrices that are usually constant. This is the simplest form (has the fewest moving parts) and is the easiest to understand, implement, modify, and deploy.
2. **functional closure style**: The Kalman filter is written as a `std::function` of a lambda expression, in which the constant matrices are not function parameters, but "free variables," that is, non-parameters referenced from an "environment" of variable names created by the constructor of the `std::function`. This form has advantages of clarity, but **may entail dynamic heap memory** because we do not have visibility or control over the memory management used by `std::function` (TODO: investigate).
    1. **index-loop test**: reliably produces verifiable and correct results. Suffers from the small additional complications of the index variable, its bound, and its `for` loop. This complication is mitigated by the fact that the index-loop idiom is standard and familiar to all C++ programmers.
    2. **std::accumulate test**: This alternative is the most modern in style, bypassing the index variable, its bound, and its loop. Also reliably produces verifiable and correct results. Requires understanding C++ lambda expressions, especially the difference between the `[=]` and `[&]` options for capture of free variables. It turns out that we want `[=]`, that is, copy semantics, for the Kalman `std::function` itself, and `[&]`, that is, reference semantics, for the accumulator lambda that saves the entire list of estimates. It is a good argument that this subtle difference complicates the code; in fact, it tripped us up in TAG_009. However, there is a mitigating simplicity to removing the index variable, its bound, and its loop. If we can prove that the Kalman filter does not use dynamic memory, this will be the **preferred method** when constant matrices must not be passed in to the Kalman filter every iteration.
3. **object-style closure**: offers higher assurance of avoiding dynamic memory, using a stack-allocated instance of a class struct to "close" over the constant matrices. The free variables in the body of the Kalman filter refer to instance variables that hold the constant matrices. The Kalman filter itself is an ordinary member function or "method." This is an  **acceptable** form when constant matrices must not be passed in to the Kalman filter every iteration. It is easily understood by C++ programmers who don't understand lambda expressions.

#### NON-CLOSURE KALMAN FILTER

#### FUNCTIONAL CLOSURE, INDEX LOOP

#### FUNCTIONAL CLOSURE, `std::accumulate`

#### OBJECT-STYLE CLOSURE, INDEX LOOP

# TODO

## Frinj FOR UNITS OF MEASURE

## VDQuil FOR PLOTTING

## OPEN-CL

In [None]:
(require '[uncomplicate.clojurecl.core :as cclo])
(require '[uncomplicate.clojurecl.info :as ccli])

In [None]:
(clojure.pprint/pprint (map ccli/info (cclo/platforms)))

In [None]:
(clojure.pprint/pprint 
    (map ccli/info
         (cclo/devices 
             (first (cclo/platforms)))))

## CUDA / MKL

Cuda / MKL / Neanderthal

In [None]:
(require '[uncomplicate.neanderthal.core   :as uncore])
(require '[uncomplicate.neanderthal.native :as uncnat])

In [None]:
(def xs (uncnat/dv 1 2 3))
(def ys (uncnat/dv 10 20))

# GAUSSIAN PROCESSES