# [dev] PyMC4 Design overview. Generators, Coroutines and all the things

For a brief introduction of Coroutines you can first read [PEP0342](https://www.python.org/dev/peps/pep-0342/), but we will cover most of the basics here with a practical example. Here we will make a draft of a PPL on top of `tfp`. The challenge is to use dynamic graph building and designing a flexible framework.

## A simple case

In [1]:
import tensorflow_probability as tfp
import tensorflow as tf
from tensorflow_probability import distributions as tfd

Let's first look at "HOW-TO PPL". Essentially, we want to implement a probabilisitc program (think of a Bayesian model that you usually represent in a directed acyclic graph) that does 2 things: 1. forward sampling to get prior (predictive) samples; 2. reverse evaluation on some inputs to compute the log-probability (if we conditioned on the observed, we are evaluating the unnormalized posterior distribution of the unknown random variables). Specifically for computing the log_prob, we need to keep track of the dependency. For example, if we have something simple like `x ~ Normal(0, 1), y ~ Normal(x, 1)`, in the reverse mode (goal 2 from above) we need to swap `x` with the input `x` (either from user or programmatically) to essentially do `log_prob = Normal(0, 1).log_prob(x_input) + Normal(x_input, 1).log_prob(y_input)`.

There are a few approaches to this problem. For example, in PyMC3 with a static graph backend (theano), we are able to write things in a declarative way and use the static computational graph to keep track of the dependences. This means we are able to do `log_prob = x.log_prob(x) + y.log_prob(y)`, as the value representation of the random variables `x` and `y` are "swap-in" with some input at runtime. In dynamic graph we have some problems, as we write model in a forward mode, we essentially loss track of the dependence in the reverse calling mode. We need to either keep track of the graph representation ourself, or writing a function that could be reevaluate to make sure we have the right dependency. 

This ideally should look like this

In [2]:
def model(x):
    scale = tfd.HalfCauchy(0, 1)
    coefs = tfd.Normal(tf.zeros(x.shape[1]), 1, )
    predictions = tfd.Normal(tf.linalg.matvec(x, coefs), scale)
    return predictions

But this function will not work (you can try it yourself), because there is no random variable concept in `tfp`, meaning that you cannot do `RV_a.log_prob(RV_a)` (yes, just think of Random Variable in a PPL as a tensor/array-like object that you can do computation and a log_prob method that we can evaluate it on itself.

In [None]:
model(tf.random.normal((100, 10)))

Generating a log_prob from this model also wont work, because the computation is different than the function we have written down above.

What we want here is to track function evaluation at runtime, depending on the context (goal 1 or goal 2 from above).

The very first way to cope with is was writing a wrapper over a distribution object. This wrapper was intended to catch a call to the distribution and use context to figure out what to do: for goal 1, we draw a sample from a random variable and plug the concrete value into the downstream dependencies; for goal 2 we got the concrete value and evaluate it with the respective random variable, and also plug the concrete value into the downstream dependences. Here we use Coroutines from Python to have dynamic control flow that could archived this kind of deferred execution.

In [2]:
def model(x):
    scale = yield tfd.HalfCauchy(0, 1)
    coefs = yield tfd.Normal(tf.zeros(x.shape[1]), 1, )
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale)
    return predictions

Now, we evaluate the model as expected but `yield` allows us to defer the program execution. But before evaluating this function, let's figure out what does yield do.

In [14]:
def generator(message):
    print("I am a generator and I yield", message)
    responce = yield message
    print("I am a generator and I got", responce)
    return "goodbye"

In [15]:
g = generator("(generators are cool)")

In [16]:
mes = g.send(None)

I am a generator and I yield (generators are cool)


In [17]:
print(mes)

(generators are cool)


In [18]:
g.send("(Indeed, generators are cool)")

I am a generator and I got (Indeed, generators are cool)


StopIteration: goodbye

What has happened here:

* we had a simple generator and were able to communicate with it via `send`
* after `send` is called (first time requires it to have `None` argument) generator goes to the next `yield` expression and yields what it it asked to yield.
* as a return value from `send` we have this exact message from `yield message`
* we set the lhs of `responce = yield message` with next `send` and no earlier
* after generator has no `yield` statements left and finally reaches `return`, it raises `StopIteration` with return value as a first argument

Now we are ready to evaluate our model by hand

In [19]:
state = dict(dists=dict(), samples=dict())

In [20]:
state["input"] = tf.random.normal((3, 10))
m = model(state["input"])

In [21]:
scale_dist = next(m)

In [22]:
print(scale_dist)

tfp.distributions.HalfCauchy("HalfCauchy", batch_shape=[], event_shape=[], dtype=float32)


which means, we are here

```python
def model(x):
    scale = yield tfd.HalfCauchy(0, 1) # <--- HERE
    coefs = yield tfd.Normal(tf.zeros(x.shape[1]), 1, )
    ...
```

What can we do with this distribution? We can choose forward sampling (in this case we sample from the state-less distribution `HalfCauchy(0, 1)`). But we need it to be used by user seamlessly later on regardless context (goal 1 or 2 above). On the model side, we need to store intermediate values and its associated distributions (hey! that's a random variable!).

In [23]:
assert scale_dist.name not in state["dists"]
state["samples"][scale_dist.name] = scale_dist.sample()
state["dists"][scale_dist.name] = scale_dist

In [24]:
coefs_dist = m.send(state["samples"][scale_dist.name])

In [25]:
print(coefs_dist)

tfp.distributions.Normal("Normal", batch_shape=[10], event_shape=[], dtype=float32)


```python
def model(x):
    scale = yield tfd.HalfCauchy(0, 1)
    coefs = yield tfd.Normal(tf.zeros(x.shape[1]), 1, ) # <--- WE ARE NOW HERE
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale)
    return predictions
```

We do the same thing

In [26]:
assert coefs_dist.name not in state["dists"]
state["samples"][coefs_dist.name] = coefs_dist.sample()
state["dists"][coefs_dist.name] = coefs_dist

In [27]:
preds_dist = m.send(state["samples"][coefs_dist.name])

In [28]:
print(preds_dist)

tfp.distributions.Normal("Normal", batch_shape=[3], event_shape=[], dtype=float32)


```python
def model(x):
    scale = yield tfd.HalfCauchy(0, 1)
    coefs = yield tfd.Normal(tf.zeros(x.shape[1]), 1, )
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale) # <--- NOW HERE
    return predictions
```

We are now facing predictive distribution. Here we have several options:
* sample from it: we get prior predictive
* set a custom values instead of sample: similar to Pearl's `do`-operator, but at the leaf nodes it is equivelent to conditioning. We might be interested in this to compute unnormalized posterior
* replace it with anothe distribution, arbitrary magic

In [29]:
assert preds_dist.name not in state["dists"]
state["samples"][preds_dist.name] = tf.zeros(preds_dist.batch_shape)
state["dists"][preds_dist.name] = preds_dist

AssertionError: 

Gotcha, we found duplicated names in our toy graphical model. We can easily tell our user to rewrite the model to get rid of duplicate names

In [30]:
m.throw(RuntimeError(
    "We found duplicate names in your cool model: {}, "
    "so far we have other variables in the model, {}".format(
        preds_dist.name, set(state["dists"].keys()), 
    )
))

RuntimeError: We found duplicate names in your cool model: Normal, so far we have other variables in the model, {'Normal', 'HalfCauchy'}

The good thing is that we *communicate* with user, and can give meaningful exceptions with few pain.

The correct model should look like this:

```python
def model(x):
    scale = yield tfd.HalfCauchy(0, 1)
    coefs = yield tfd.Normal(tf.zeros(x.shape[1]), 1, )
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale, name="Normal_1") # <--- HERE we asked out user to change the name
    return predictions
```


Let's set all the names according to the new model and interact with user again using the same model

In [31]:
m.gi_running

False

Our generator is now at the end of its execution - we can't interact with it any more. Let's create a new one and revaluate with same sampled values (A hint how to get the desired `logp` functino)

In [32]:
def model(x):
    scale = yield tfd.HalfCauchy(0, 1)
    coefs = yield tfd.Normal(tf.zeros(x.shape[1]), 1, )
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale, name="Normal_1") # <--- HERE we asked out user to change the name
    return predictions

In [34]:
m = model(state["input"])
print(m.send(None))
print(m.send(state["samples"]["HalfCauchy"]))
print(m.send(state["samples"]["Normal"]))
try:
    m.send(tf.zeros(state["input"].shape[0]))
except StopIteration as e:
    stop_iteration = e
else:
    raise RuntimeError("No exception met")

tfp.distributions.HalfCauchy("HalfCauchy", batch_shape=[], event_shape=[], dtype=float32)
tfp.distributions.Normal("Normal", batch_shape=[10], event_shape=[], dtype=float32)
tfp.distributions.Normal("Normal_1", batch_shape=[3], event_shape=[], dtype=float32)


In [35]:
print(stop_iteration)

tf.Tensor([0. 0. 0.], shape=(3,), dtype=float32)


Instead of returning some value in the last `send`, generator raises `StopIteration` because it is exhausted and reached the `return` statement (no more `yield` met). As explained (and checked here) in [PEP0342](https://www.python.org/dev/peps/pep-0342/), we have a return value inside

## Automate process above

We all are lazy humans and cant stand doing repetitive things. In our model evaluation we followed pretty simple rules:
* asserting name is not used
* checking if we should sample or place a specific value instead
* recording distributions and samples

Nest step is to make a function that does all this instead of us. In this tutorial let's keep it simple:

In [36]:
def interact(gen, state):
    control_flow = gen()
    return_value = None
    while True:
        try:
            dist = control_flow.send(return_value)
            if dist.name in state["dists"]:
                control_flow.throw(RuntimeError(
                    "We found duplicate names in your cool model: {}, "
                    "so far we have other variables in the model, {}".format(
                        preds_dist.name, set(state["dists"].keys()), 
                    )
                ))
            if dist.name in state["samples"]:
                return_value = state["samples"][dist.name]
            else:
                return_value = dist.sample()
                state["samples"][dist.name] = return_value
            state["dists"][dist.name] = dist
        except StopIteration as e:
            if e.args:
                return_value = e.args[0]
            else:
                return_value = None
            break
    return return_value, state

This implementation assumes no arg generator, we make things just simple

In [37]:
preds, state = interact(lambda: model(tf.random.normal((3, 10))), state=dict(dists=dict(), samples=dict()))

In [38]:
state

{'dists': {'HalfCauchy': <tfp.distributions.HalfCauchy 'HalfCauchy' batch_shape=[] event_shape=[] dtype=float32>,
  'Normal': <tfp.distributions.Normal 'Normal' batch_shape=[10] event_shape=[] dtype=float32>,
  'Normal_1': <tfp.distributions.Normal 'Normal_1' batch_shape=[3] event_shape=[] dtype=float32>},
 'samples': {'HalfCauchy': <tf.Tensor: shape=(), dtype=float32, numpy=35.769157>,
  'Normal': <tf.Tensor: shape=(10,), dtype=float32, numpy=
  array([-0.41907588,  0.8893132 , -0.00785866,  2.1496706 , -0.21485494,
         -0.79587907, -0.9849972 ,  1.261421  ,  0.40662226, -0.38024214],
        dtype=float32)>,
  'Normal_1': <tf.Tensor: shape=(3,), dtype=float32, numpy=array([  0.3047135, -33.16318  ,  11.798283 ], dtype=float32)>}}

In [39]:
preds

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([  0.3047135, -33.16318  ,  11.798283 ], dtype=float32)>

We get all the things as expected. To calculate `logp` you just iterate over distributions and match them with the correspondig values. But let's dive deeper

## One level deeper

Recall the motivating example from [PR#125](https://github.com/pymc-devs/pymc4/pull/125)

In [40]:
def Horseshoe(mu=0, tau=1., s=1., name=None):
    with tf.name_scope(name):
        scale = yield tfd.HalfCauchy(0, s, name="scale")
        noise = yield tfd.Normal(0, tau, name="noise")
        return scale * noise + mu


def linreg(x):
    scale = yield tfd.HalfCauchy(0, 1, name="scale")
    coefs = yield Horseshoe(tf.zeros(x.shape[1]), name="coefs")
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale, name="predictions")
    return predictions

In [41]:
preds, state = interact(lambda: linreg(tf.random.normal((3, 10))), state=dict(dists=dict(), samples=dict()))

AttributeError: 'generator' object has no attribute 'name'

Oooups, we have a type error. What we want is a nested model, but nesting models is something different from a plain generator. As we have out model being a generator itself, the return value of `Horseshoe(tf.zeros(x.shape[1]), name="coefs")` is a generator. Of course this generator has no name attribute. Okay, we can ask user to use `yield from` construction to generate from the generator

In [42]:
def linreg_ugly(x):
    scale = yield tfd.HalfCauchy(0, 1, name="scale")
    coefs = yield from Horseshoe(tf.zeros(x.shape[1]), name="coefs")
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale, name="predictions")
    return predictions

In [43]:
preds, state = interact(lambda: linreg_ugly(tf.random.normal((3, 10))), state=dict(dists=dict(), samples=dict()))

Okay, we passed this thing

In [44]:
state["dists"]

{'scale': <tfp.distributions.HalfCauchy 'scale' batch_shape=[] event_shape=[] dtype=float32>,
 'coefs_scale': <tfp.distributions.HalfCauchy 'coefs_scale' batch_shape=[] event_shape=[] dtype=float32>,
 'coefs_noise': <tfp.distributions.Normal 'coefs_noise' batch_shape=[] event_shape=[] dtype=float32>,
 'predictions': <tfp.distributions.Normal 'predictions' batch_shape=[3] event_shape=[] dtype=float32>}

We got nesting models working, but it requires `yield from`. This is UGLY and potentially confusing for user. Fortunately, we can rewrite out `interact` function to accept nested models in a few lines, and let the Python do the task for us.

In [45]:
import types
def interact_nested(gen, state):
    # for now we should check input type
    if not isinstance(gen, types.GeneratorType):
        control_flow = gen()
    else:
        control_flow = gen

    return_value = None
    while True:
        try:
            dist = control_flow.send(return_value)
            # this makes nested models possible
            if isinstance(dist, types.GeneratorType):
                return_value, state = interact_nested(dist, state)
                # ^ in a few lines of code, go recursive
            else:
                if dist.name in state["dists"]:
                    control_flow.throw(RuntimeError(
                        "We found duplicate names in your cool model: {}, "
                        "so far we have other variables in the model, {}".format(
                            preds_dist.name, set(state["dists"].keys()), 
                        )
                    ))
                if dist.name in state["samples"]:
                    return_value = state["samples"][dist.name]
                else:
                    return_value = dist.sample()
                    state["samples"][dist.name] = return_value
                state["dists"][dist.name] = dist
        except StopIteration as e:
            if e.args:
                return_value = e.args[0]
            else:
                return_value = None
            break
    return return_value, state

remember we had problems here:
```python
preds, state = interact(lambda: linreg(tf.random.normal((3, 10))), state=dict(dists=dict(), samples=dict()))
```

Additionally we can specify the observed variable

In [46]:
preds, state = interact_nested(lambda: linreg(tf.random.normal((3, 10))), state=dict(dists=dict(), samples={"predictions/":tf.zeros(3)}))

In [47]:
state["dists"]

{'scale': <tfp.distributions.HalfCauchy 'scale' batch_shape=[] event_shape=[] dtype=float32>,
 'coefs_scale': <tfp.distributions.HalfCauchy 'coefs_scale' batch_shape=[] event_shape=[] dtype=float32>,
 'coefs_noise': <tfp.distributions.Normal 'coefs_noise' batch_shape=[] event_shape=[] dtype=float32>,
 'predictions': <tfp.distributions.Normal 'predictions' batch_shape=[3] event_shape=[] dtype=float32>}

In [48]:
state["samples"]

{'predictions/': <tf.Tensor: shape=(3,), dtype=float32, numpy=array([0., 0., 0.], dtype=float32)>,
 'scale': <tf.Tensor: shape=(), dtype=float32, numpy=0.9951895>,
 'coefs_scale': <tf.Tensor: shape=(), dtype=float32, numpy=0.15956731>,
 'coefs_noise': <tf.Tensor: shape=(), dtype=float32, numpy=0.5349592>,
 'predictions': <tf.Tensor: shape=(3,), dtype=float32, numpy=array([-0.2006491 ,  0.08688551, -0.83539486], dtype=float32)>}

Cool, we've finished the central idea behind PyMC4 core engine. There is some extra stuff to do to make `evaluate_nested` really powerful

* resolve transforms
* resolve reparametrizations
* variational inference
* better error messages
* lazy returns in posterior predictive mode

Some of this functionality may be found in the corresponding [PR#125](https://github.com/pymc-devs/pymc4/pull/125)