In [None]:
# Install extra packages: uncomment if in google collab

#holoviews
#hvplot
#toolz
#

### Recitation 2: Logic gates and information theory. 

> The goal of this tutorial is to get you geared up for solving the homework in an efficient manner using python. We'll learn how to compute mutual information between two discrete random variables using numpy. 

#### Introduction

Information theory is the mathematical theory that aims to describes how to communicate efficiently (with low error and at a fast rate) through an imperfect channel. As we've seen in class, this is a major tool for analyzing biological systems, since it provides a way to quantify the non-linear coupling between random variables. During this tutorial, we will cover some of the concepts central to information theory and how to compute them using python. 

There is a very interesting history about how information theory came about, with big contributions from the ["Young Turks"](https://en.wikipedia.org/wiki/Young_Turks_(Bell_Labs)), a group of scientists and mathematicians working at Bell Labs. Of utmost importance, Claude Shannon, developed the foundations in a landmark paper titled [A Mathematical Theory of Communication](http://people.math.harvard.edu/~ctm/home/text/others/shannon/entropy/entropy.pdf). I highly recommend for you to read it, as it is a beautiful display of probabilistic and mathematical thinking. 

In this paper, Shannon defined the surprise (or information gain) of learning that an event $x$ with probability $p(x)$ will occur as:

$$
\text{surprise / self-information (x)} = \mathrm{log}_2 \left( \frac{1}{p(x)} \right)
$$

This quantity is measured in a unit called bits. Low-probability events will have high surprise, while an event that has probability 1 of occurring has zero surprise. The $\mathrm{log}$ is there so that if we observe multiple events, the total surprise is additive. The $\mathrm{log}$ is abse 2 so that if we learn that an event with probability $\frac{1}{2}$ happened, the surprise is 1 (we'll calculate this later on), we gain 1 bit of information. 



#### Entropy is the average surprise of learning a random variable

As we saw in class, for a discrete r.v. $X$ that takes values $x_1, x_2, ...,x_n$ with probabilities $p_1, p_2, ..., p_n$ with $\sum_{i} P(X = x_i) = \sum_i^n p_i = 1$, the entropy of X is defined as the expected or average surprise of learning the value of X: 

$$
H(X) = \mathbb{E}_X \left[ \mathrm{log_2} \, \left( \frac{1}{P(X)} \right) \right] \\[1.4em]
= \sum_{i = 1}^n p_i \, \mathrm{log_2}\, \left( \frac{1}{p_i} \right)\\[1.2em]
= - \sum_{i=1}^n  p_i \, \mathrm{log_2} \, \left( p_i \right)
$$

Where the second equality arises from evaluating the formula of the generalized expectation or [law of unconscious statistician (LOTUS)](https://en.wikipedia.org/wiki/Law_of_the_unconscious_statistician). **Note that the entropy doesn't depend on the values the random variable takes, but only on the probability values !**

Let's visualize the entropy for a binary random variable. 

In [None]:
import numpy as np
import pandas as pd 
import itertools as it
import matplotlib.pyplot as plt
import holoviews as hv
import string
import seaborn as sns 

sns.set_style('dark')
hv.extension('bokeh')

#%load_ext blackcellmagic
%config InlineBackend.figure_format = 'retina'

Exercise: write down a function that returns the entropy of a Bernoulli random variable. 

In [None]:
def binary_entropy(p):
    "Returns the entropy of a Bernoulli r.v. with probability p"
    
    #Write your code here
    entropy = 
    
    return entropy

In [None]:
# Test the function ! 
binary_entropy(0.1)

We can see that our function works. Now let's see if it actually returns the right values by calculating the entropy by hand. 

The entropy of a Bernoulli random variable with probability of success $p = 0.5$ is  : 

$$
H(X) = \mathbb{E}_X (- \mathrm{log} \, P(X)) \\[1.4em]
= \sum_{x \in \{0,1 \}} P(X=x) \,  \mathrm{log}_2 \, \left(\frac{1}{P(X = x)} \right)\\[1em]
= P(X = 0) \times \mathrm{log_2} \frac{1}{P(X = 0)} + P(X = 1) \times \mathrm{log_2} \frac{1}{P(X = 1)}\\[1em]
= \frac{1}{2} \times \mathrm{log_2} (2) + \frac{1}{2} \times \mathrm{log_2} (2)\\[1em]
\mathrm{log_2}(2) = 1
$$

Let's see if we do get the answer. 

In [None]:
# Should return 1
binary_entropy(0.5)

Let's define a set of values for different probabilities $p_i$, and plot the resulting curve.

In [None]:
# Define an array of probability of success of 
# Bernoulli r.v.s
ps = np.linspace(0, 1, 20)

In [None]:
# Initalize list 
entropies = []

for p in ps: 
#

Whoa, we got an error! Let's inspect the binary entropy arrays. 

In [None]:
# Take a look 
binary_entropy

We can see that we got a Not-a-Number (NaN) value when we evaluate `binary_entropy()` with `p` = 0 and `p` = 1.

In [None]:
#We need to avoid this! 
np.log2(0)

Exercise: Write a function that returns the entropy for a random variable 

In [None]:
ps = np.array([0.5, 0.5, 0])

In [None]:
ps[np.nonzero(ps)]

In [None]:
def entropy(ps): 
    
    # Get nonzero indices
    nz = 

    # Compute entropy for nonzero indices
    entropy = 
    
    return entropy

In [None]:
entropy(ps)

Let's make an array of probabilities. 

In [9]:
# Intialize empty array
p_ = #

q_ = #

ps = np.hstack([p_.reshape(-1,1), q_.reshape(-1,1)])

Let's re-evaluate the entropies

In [None]:
entropies = []

for i in range(len(ps)): 



We can see that we got no error back. 

In [None]:
entropies

Let's plot the entropies ! 

In [None]:
entropy_plot = hv.Curve((p_success, entropies)).opts(
    title="Bern(p) entropy", xlabel="probability of success", ylabel="entropy", color = 'salmon', 
    padding = 0.1
)

entropy_plot

Does this plot ring a bell ? In lecture we saw that the variance of a Bernoulli random variable is $p(1-p)$. Let's quickly make a function of this formula and plot the variances for the $p_i$.

In [None]:
def variance_bernoulli(p): 
    return p*(1-p)

In [None]:
variances = [  ]

In [None]:
variance_plot = hv.Curve((p_success, variances)).opts(
    title="Bern(p) variance", xlabel="probability of success", ylabel="entropy", color = 'dodgerblue', 
    padding = 0.1
)

In [None]:
variance_plot

In [None]:
variance_plot + entropy_plot

### Quantifying information from coupled random variables 


Remember that the goal of this tutorial is to get you ready for computing the mutual information of noisy logic gates. In order to do so we have define a couple more concepts in order to make this tutorial self-contained. 

#### The Kullback - Leibler divergence quantifies the *distance* between distributions.

Building up towards the mutual information, an important metric to fully grasp the concepts is the Kullback-Leibler (KL) divergence. It is a metric that quantifies the distance between two probability distributions. Let $\mathbf{q} = (q_1, q_2, ..., q_n)$ and $\mathbf{p} = (p_1, p_2, .., p_n)$ be two discrete PMFs. The KL divergence between $\mathbf{q}$ and $\mathbf{p}$ is defined as: 



$$
\text{D}( \mathbf{q} || \mathbf{p} ) = \sum_j q_j \mathrm{log} \left( \frac{1}{p_j} \right)- \sum_j q_j \mathrm{log} \left(\frac{1}{q_j} \right)
$$


This metric thus represents the difference between the average surprise we will experience when the actual probabilities are $\mathbf{q}$ but we are instead using $\mathbf{p}$ as our current guess for $\mathbf{q}$  and the average surprise of  $\mathbf{q}$. 

The intuition is that because, we're using $\mathbf{p}$ as a proxy for $\mathbf{q}$, the term on the left $\mathbb{E}_q [ \mathrm{log_2} (1 / p)]$ will always be higher than the entropy of the true variable $\mathbb{E}_q [ \mathrm{log_2} (1 / q)]$. In other words, we will always be at least as surprised or more when working with an estimate of the true distribution that with the true distribution itself. There is a neat way to prove that the KL divergence is nonnegative using one of the most important inequalities in stats: the Jensen Inequality.

#### Brief detour: Jensen's inequality 

Let $X$ be a random variable. If $g$ is a convex function (in the sense that $g''(x) > 0$), then 

$$
\mathbb{E}(g(X)) > g ( \mathbb{E} (X))
$$

if $g$ is a concave function, then the equality flips. Here is are some intuitive examples of Jensen's inequality. 

$$
\mathrm{Var} (x) = \mathbb{E} (x^2) - \mathbb{E} (x)^2. \\[.8em]
\text{because } \mathrm{Var}(x) \ge 0 \\[.8em]
\mathbb{E}(x^2) \ge (\mathbb{E}(x))^2 \\[.8em]
\text{reciprocally because log is a concave function}  \\[.7em]
\mathbb{E}(\mathrm {log}(x)) \le  \mathrm \log \left(\mathbb{E} (X) \right)
$$

#### Conditional entropy

Like conditional probabilities, we can have conditional entropies. The conditional entropy $H(X|Y)$ is the information you gain from observing a random variable $X$ conditioning on knowing the values of another random variable $Y$. Another way to think of the conditional entropy is as the average surprise of the conditional distribution $P(X | Y)$ after observing the distribution $P(Y)$.

The conditional entropy is defined as : 

$$
H(X | Y) = \mathbb{E}_{P(y) } \left[ - \mathrm{log_2} P(X | Y) \right] = - \sum_{y \in Y} P(Y = y) \, \mathrm{log}_2 \, P(X | Y = y)  
$$

Similarly, the conditional entropy $H(Y|X)$ can be written as: 

$$
H(Y|X)  = \mathbb{E}_{P(X)} \left[ - \mathrm{log_2} \, P(Y|X) \right]
$$

Using both of this notions, we can finally now define the mutual information. 

#### Mutual information 

The mutual information of two discrete random variables $X$, $Y$ ($\text{I(X,Y)}$) is the reduction in uncertainty of a random variable $X$ when get to know see the conditional distribution with another r.v. $(X|Y)$. Congruent with this notion, the mutual information is defined as:

$$
\text{I(X,Y)} = \text{H} (X) - \text{H}(X|Y) = \text{H} (Y) - \text{H} (Y | X)
$$

Where $\text{H}(X)$ is the entropy of $X$. A common way to visualize the concepts of entropy, conditional entropy and mutual information is the following: 

In [None]:
from IPython.display import Image

In [None]:
Image(
    url = 'https://upload.wikimedia.org/wikipedia/commons/2/2e/Conditional_entropy.png',
    format = 'png',
    width = 500,
    height = 500
)

#### The mutual information is the KL divergence between the joint and the marginal distributions. 

We know that by definition, the mutual information between two discrete random variables is the KL divergence between the joint and the marginal distributions, i.e.: 
$$
\text{MI(X,Y)} = \sum_{y \in Y} \sum_{x \in X} P_{X,Y}(x, y) \,  \mathrm{log_2} \left( \frac{P_{X,Y}(x,y)}{P_{X}(x)P_Y(y))} \right)
$$

This is a nice conceptual version, because we can see that in the limit where the joint and the marginal distributions are equal, i.e., the r.v.s X and Y are independent, the mutual information is close to zero.



### Applying mutual information on the english language. 

In [14]:
import toolz as tz
from toolz import curried as c
from collections import Counter
import itertools as it

In [None]:
#path = '../../bi195_tutorials/data/california_love.txt'
#! wget 

In [None]:
! head -10 {path}

We can see that there are empty lines

In [None]:
import unicodedata

In [None]:
import re

In [None]:
re.sub(pattern = '[^a-zA-Z]+', repl = '', string = 'Hello :')

 This uses on-memory filtering, so this function alone makes three passes to the data. Caution is neeeded 

In [None]:
def read_file(path_to_file): 
    """
    Return a list of strings from a text file.
    """
    with open(path_to_file, 'r') as f: 
        corpus = f.readlines()

    # Remove empty lines 
    corpus = list(filter(not_empty_line, corpus))
    
    # Substitute all symbols not in the alphabet 
    # and lowercase all words 
    corpus = [re.sub('[^A-Za-z]', ' ', line.strip().lower()) for line in corpus]
    
    return corpus

In [None]:
lines = read_file(path)

In [None]:
all_lines = ' '.join(lines)

In [None]:
all_lines[:100]

In [None]:
len(all_lines)

In [None]:
alphabet= set(all_lines)

In [None]:
alphabet_letters = sorted(list(alphabet))[1:]

In [None]:
alphabet_letters[:10]

In [None]:
len(alphabet_letters)

In [None]:
def process_text(text): 
    "Substitutes symbols in a string with a space."
    text = re.sub('[^A-Za-z]', ' ', text.strip().lower())
    return text

In [None]:
def stream_file(fname): 
    with open(fname) as f:
        for line in f:
            yield line

In [None]:
next(stream_file(path))

In [None]:
g = stream_file(path)

In [None]:
next(g)

I've created a function that allows us to stream large files and get kmer counts already, so we can just go ahead and apply it to large datasets (see exercise at the final of the notebook).

In [None]:
def get_counts_kmer_streaming(path_to_file, k=1): 
    """
    Returns a dictionary of kmer counts using streaming.
    This function is designed to handle large files on-disk.
    
    Params 
    ------
    path_to_file(str):
        Path to the file to process. 
    
    k(int, default = 1):
        Size of the k-mer. 
        
    Returns 
    -------
    dict:
        A dictionary whose keys are unique k-mers found in the file 
        and values are the frequencies of appearance. 
    """
    
    return tz.pipe(
        path_to_file,
        stream_file, # Initializes line generator 
        c.filter(not_empty_line), # Gets lines with strings (not symbols or whitespaces)
        c.map(process_text), # Eliminate symbols 
        c.map(str.strip), # Eliminate white space at the end of line
        c.map(str.split), # Split into words
        tz.concat, # Concatenate word iterator
        c.map(c.sliding_window(k)), # Make k-mer tuples
        tz.concat, # Concatenate tuple iterator
        c.map(''.join), # Unpack tuples 
        tz.frequencies # Get kmer frequencies
    )

In [None]:
counts = get_counts_kmer_streaming(path, k = 2)

In [None]:
alphabet_letters = sorted(list(set(''.join(list(counts.keys())))))

In [None]:
n_letters = len(alphabet_letters)

In [None]:
# Get total counts
total_counts = sum(counts.values())

In [None]:
total_counts

In [None]:
# Sort freqs by alphabetical order 
freqs = sorted(counts.items(), key = lambda x: x[0])

In [None]:
#counts

In [None]:
freqs[:10]

In [None]:
alphabet_dict = dict(zip(alphabet_letters, np.arange(n_letters)))

alphabet_dict

In [None]:
bigram_pairs = list(it.product(alphabet_letters, alphabet_letters))

In [None]:
bigram_pairs[0]

In [None]:
position_dict = {
    (str(i + j)): (alphabet_dict[i], alphabet_dict[j]) for (i,j) in bigram_pairs
}

In [None]:
position_dict[('aa')]

In [None]:
position_dict[('ie')]

In [None]:
freqs[:10]

In [None]:
# Initialize joint distribution matrix 
joint_distro = np.zeros((n_letters,n_letters))

for bigram, counts in freqs: 
    
    # Get indices for given bigram
    index = position_dict[bigram]
    
    # Set counts in array 
    joint_distro[index] = counts

In [None]:
joint_distro = joint_distro / total_counts

In [None]:
# Double check normalization condition 
joint_distro.sum()

In [None]:
len(alphabet_letters)

In [None]:
df_joint= pd.DataFrame(
    joint_distro, 
    index = pd.Index(list(alphabet_letters), name = 'first letter'),
    columns = pd.Index(list(alphabet_letters), name = 'second letter')
)

In [None]:
# Take a look 
df_joint

Let's drop columns and rows with nonzero values. 

In [None]:
(df_joint != 0 ).any(axis  = 0)

In [None]:
(df_joint != 0 ).any(axis  = 1)

In [None]:
col_selector = (df_joint != 0).any(axis  = 0)
row_selector = (df_joint != 0).any(axis  = 1)

In [None]:
row_selector

In [None]:
df_sel = df_joint.loc[row_selector, col_selector]

In [None]:
df_melt = df_sel.unstack().reset_index().rename(columns = {0 : 'P(x,y)'})

In [None]:
import hvplot.pandas

In [None]:
df_melt.head()

In [None]:
df_melt.hvplot.heatmap(
    x = 'first letter',
    y = 'second letter', 
    C = 'P(x,y)',
    height = 500,
    width = 600, 
    cmap = 'viridis', 
    title = 'Joint distribution P(x,y)'
)

With the joint distribution in place, we can calculate the entropy of the letters in english.

In [None]:
#Now let's compute the mutual information between the first and second letters.

p_x = df_sel.values.sum(axis = 1)
p_y = df_sel.values.sum(axis = 0)

In [None]:
entropy(p_x)

This is indeed what's reported ([see F1 in this post](https://cs.stanford.edu/people/eroberts/courses/soco/projects/1999-00/information-theory/entropy_of_english_9.html) and [Shannon's paper on English language redundancy](https://www.princeton.edu/~wbialek/rome/refs/shannon_51.pdf)).

This is super interesting, but **how do we know if this entropy is a lot or a little? In other words, what would be a null model for the entropy of a distribution?**.


###### Proposition : the uniform distribution is the  distribution with maximum entropy

Intuititively, we can assert that the above proposition is true because, if a uniform distribution every value is equally likely, and thus whichever value we see sampled, will be "surprising". On the other hand, if we think of a very sharp peaked distribution, with the probability mass concentrated about the mean, we would expect that if we sample from the distribution, we would get values close to the mean, and thus, these samples wouldn't be surprising. 

*Proof:* Let $X$~DUnif(0,n) so that every value $P(x = i) = \frac{1}{n}$ for $0 \le i \le n$ : 

$$
H(X) = \mathbb{E} \left[\mathrm{log_2} \left( \frac{1}{p(x)} \right) \right] = \sum_i^n \frac{1}{n} \mathrm{log_2} (n) = \mathrm{log_2 } (n)
$$

Now, let $Y$ be an r.v. that takes on values $ \frac{1}{p_1}, ..., \frac{1}{p_n} $ with probabilities $p_1, .., p_n$. Then $H(Y) = \sum_i^n p_i \mathrm {log} (1 / p_i) = \mathbb{E}[ \mathrm{log} Y]$ by evaluating $Y$ and using LOTUS. By Jensen's : 

$$
H(Y) = \mathbb{E}[ \mathrm{log} Y]  \le \mathrm{log} (\mathbb{E}(Y)) = \mathrm{log} (n) = H(X)
$$

Because the entropy of a random variable doesn't depend on its support (i.e. the values the r.v. takes), then $Y$ is unchanges if we change the support from $p_1, .., p_n$ to $a_1, ..., a_n$ . Therefore $X$ which is uniform has entropy at least as large as that of any other r.v. with support on $a_1, a_2, ..., a_n$. 

#### Redundancy is the ratio that one can compress a random variable

With the notion that the uniform distribution is the one distribution with maximum entropy, the information redundancy of a random variable is defined as: 

$$
\text{redundancy}= 1 - \frac{H(X) }{H_{ \text{unif}}} = 1 - \frac{H(X)}{\mathrm{log_2} n}
$$

where $n$ is taken as as the size of the alphabet. 

We can now compute the information redundancy of english letters !

In [None]:
entropy(p_x)

In [None]:
entropy(p_x) / np.log2(26)

In [None]:
np.log2(26)

In [None]:
redundancy = 1 - (entropy(p_x)/ np.log2(26))
print(f'The redundancy of the english letters is { round(100*redundancy, 2) } %' )

Shannon, indeed a different and very ingenious way of calculating the entropy of the letters in english, by estimating the entropy over words. Let's continue with our calculations. 

Conditional distributions have to follow all of the properties of distributions, that is, have nonnegative probabilities and sum to one. We can thus then get the conditional distributions by normalizing the joint distribution over rows and columns. 

In [None]:
df_sel.head()

In [None]:
joint_distro_filt = df_sel.values

In [None]:
# Normalize over rows
p_ygx = (joint_distro_filt / p_x.reshape(-1, 1))

# Normalize over columns
p_xgy = joint_distro_filt / p_y

In [None]:
df_xgy = df_sel.div(df_sel.sum(axis=0), axis = 1)

df_ygx = df_sel.div(df_sel.sum(axis=1), axis = 0)

In [None]:
# Check that both methods match
np.all(df_xgy.values ==p_xgy)

In [None]:
# Check that conditional distributions sum to 1
p_xgy.sum(axis = 0) # over columns
p_ygx.sum(axis = 1) # over rows

Now we need a clever way to compute the conditional entropy from our array directly. 

In [None]:
def element_wise_information(px):
    """
    Returns a numpy array with element wise information calculated as -log_2(p_i).
    This quantity is also know as information or self-information:
    https://en.wikipedia.org/wiki/Information_content
    
    Params
    ------
    px (np.array)
        Array of individual probabilities, i.e. a probability vector or distribution.
    
    Returns
    -------
    information (np.array)
        Array of element-wise information content.
    """
    if isinstance(px, list):
        px = np.array(px)
        
    # Make a copy of input array
    information_content = px.copy()
    
    # Get indices of nonzero probability values
    nz = np.nonzero(information_content)
    
    # Compute -log_2(p_i) element-wise
    information_content[nz] *= - np.log2(information_content[nz])
    
    return information_content

In [None]:
h_xgy = np.sum(p_y * element_wise_information(p_xgy))

In [None]:
# Mutual information, finally!!
mi = h_x - h_xgy

In [None]:
mi

In [None]:
h_y = entropy(p_y)

In [None]:
# Joint entropy 
h_xy = h_x  + h_y - mi 

In [None]:
h_xy

In [None]:
redundancy_bigram = 1 - h_xy / np.log2(27**2)

print(f'The information redundancy of bigrams is {round(redundancy_bigram*100, 2) } %')

### Brief on logic gates 

Let's start by making a binary sequence. 

In [None]:
x = (0,1)

We can get all combinations of 0 and 1 by taking the cartesian product. 

In [None]:
xy = np.array(list(it.product(x,x)))

In [None]:
xy

Now let's define a new vector storing all of the values evaluating the function OR(X,Y).

In [None]:
# Initialize array 
or_expression = np.zeros(xy.shape[0])

# Evaluate OR gate
for ix in range(xy.shape[0]):
    # Unpack each row 
    x, y = xy[ix]
    
    # Evaluate x OR y
    or_expression[ix] = x | y

The expressions for AND and XOR are `&` and `^` respectively. You can read about how to implement more logic functions [here](https://www.journaldev.com/42242/logic-gates-in-python).

In [None]:
# Check it out 
or_expression

In [None]:
or_expression.reshape(-1,1).shape

In [None]:
or_truth_table = np.hstack([xy, or_expression.reshape(-1, 1)]).astype(int)

In [None]:
or_truth_table

In [None]:
or_df = pd.DataFrame(
    or_truth_table, 
    columns = ['x', 'y', 'z (OR)']
)

In [None]:
or_df

Let's start by getting the joint distribution $P(x,z)$.

In [None]:
contingency_table = np.zeros((2,2))

In [None]:
x = or_df['x'].values
z = or_df['z (OR)'].values

In [None]:
(x == 0) & (z == 0)

In [None]:
np.sum((x == 1) & (z == 0))

In [None]:
for i in [0,1]:
    for j in [0,1]: 
        contingency_table[i,j] = np.sum((x == i) & (z == j))

In [None]:
contingency_table

In [None]:
contingency_table = np.zeros((2,2))

In [None]:
for i,j in zip(x,z):
    contingency_table[i,j] +=1

In [None]:
p_xz = contingency_table/ contingency_table.sum()

In [None]:
df_joint = pd.DataFrame(
    p_xz, 
    index = pd.Index(['x = 0', 'x = 1']),
    columns = pd.Index(['z = 0', 'z = 1'])
)

In [None]:
df_joint

#### Extra exercise

I've uploaded some sequences from PFAM. With the functions above, calculate the redundancy of the aminoacid code for bigrams. 

*Food for thought*: How would you go about getting the redundancy for trigrams ? What data structure would you use if you wanted to use the same approach we did for bigrams ? What other clever tricks you can think of. 