# Mutual information estimates

In [1]:
# Custom code imports
from generate_time_series import (load_two_body_problem_time_series,
                                  load_belousov_zhabotinsky_time_series,
                                  load_lorenz_attractor_time_series)

from datasets import (chop_time_series_into_chunks,
                      split_chunks_into_windows_and_targets)

In [2]:
# Standard code imports
import numpy as np
import functools 

from typing import Tuple
import numpy.typing
NDArray = numpy.typing.NDArray[np.floating]

In [3]:
# Helpers
def time_series_2_windows_and_targets(time_series: NDArray,
                                      window_len: int = 10,
                                      target_len: int = 1,
                                      reverse: bool = False) -> Tuple[NDArray, NDArray]:
    chunks = chop_time_series_into_chunks(time_series,
                                          chunk_len=window_len+target_len,
                                          reverse=reverse,
                                          take_each_nth_chunk=3)
    windows, targets = split_chunks_into_windows_and_targets(chunks, target_len=target_len)
    return windows, targets


def flatten_last_dim(array: NDArray) -> NDArray:
    assert array.ndim >= 2
    return array.reshape((*array.shape[:-2], -1))

## `sklearn.feature_selection.mutual_info_regression`

In [4]:
# # This function only works with discrete inputs (handy for categorization/clusterization).
# # It is unusable for the float (continuous) vectors we are dealing with here.
# from sklearn.metrics import mutual_info_score

In [5]:
# See https://www.blog.trainindata.com/mutual-information-with-python/
from sklearn.feature_selection import mutual_info_regression

In [6]:
def calculate_mutual_info_for_dataset(ts: NDArray, dim: int = 0) -> Tuple[NDArray, NDArray]:
    assert 0 <= dim < ts.shape[1]

    forward_windows, forward_targets = time_series_2_windows_and_targets(ts)
    backward_windows, backward_targets = time_series_2_windows_and_targets(ts, reverse=True)

    # Each window or target is two-dimensional. I extract just one dimension
    backward_windows = backward_windows[:, :, dim]
    forward_windows = forward_windows[:, :, dim]
    # ... and assume target_len=1, so take the 0-th point in target.
    forward_targets = forward_targets[:, 0, dim]
    backward_targets = backward_targets[:, 0, dim]
    # Note: `mutual_info_regression` only accepts 1-dimensional y's.
    # So I'm forced to pick only one dimension from targets, although I could
    # flatten the windows instead of extracting one dimension from it.

    return (mutual_info_regression(forward_windows, forward_targets),
            mutual_info_regression(backward_windows, backward_targets))

In [7]:
def print_mutual_info(ts: NDArray, comment: str) -> None:
    forward, backward = calculate_mutual_info_for_dataset(ts)
    print(comment, "forward", forward)
    print(comment, "backward", backward)

In [8]:
print_mutual_info(load_two_body_problem_time_series(), "kepler")
print()
print_mutual_info(load_belousov_zhabotinsky_time_series(), "belousov_zhabotinsky")
print()
print_mutual_info(load_lorenz_attractor_time_series(), "lorenz")

kepler forward [4.1544507  4.1679777  4.19014217 4.23306892 4.40380346 4.52204556
 4.6513439  4.83707335 5.07009635 5.38225062]
kepler backward [4.1544507  4.16829816 4.19438907 4.23299843 4.40411548 4.52216755
 4.65179145 4.83874348 5.06217402 5.38334711]

belousov_zhabotinsky forward [5.10297078 5.11591333 5.12894595 5.15271997 5.25938243 5.40564618
 5.44039595 5.48630473 5.54904934 5.63826422]
belousov_zhabotinsky backward [5.10354369 5.11592085 5.12937203 5.1472664  5.22970626 5.39968754
 5.42965822 5.47718146 5.53905855 5.6281999 ]

lorenz forward [1.10491599 1.18068127 1.28136143 1.39081549 1.52221169 1.68715116
 1.89150051 2.17183271 2.57736067 3.26623382]
lorenz backward [1.10105144 1.18418257 1.28136143 1.38803675 1.51959223 1.68800725
 1.90041139 2.17751784 2.57858673 3.25964205]


The numbers are about the same, within reasonable accuracy, for `forward` and `backward`.
This is not what we expect.

## gregversteeg/NPEET

In [9]:
# Install the module from GitHub
!git clone https://github.com/gregversteeg/NPEET.git

fatal: destination path 'NPEET' already exists and is not an empty directory.


In [10]:
# The module's suggested installation method doesn't work,
# so we just find the right source file in the directory tree.
from NPEET.npeet import entropy_estimators

In [11]:
def calculate_mutual_info_for_dataset(ts: NDArray, window_len: int, target_len: int) -> Tuple[float, float]:
    forward_windows, forward_targets = time_series_2_windows_and_targets(ts, window_len=window_len,
                                                                         target_len=target_len)
    backward_windows, backward_targets = time_series_2_windows_and_targets(ts, window_len=window_len,
                                                                           target_len=target_len, reverse=True)

    # Each window or target is two-dimensional, so I flatten them.
    backward_windows = flatten_last_dim(backward_windows)
    forward_windows = flatten_last_dim(forward_windows)
    backward_targets = flatten_last_dim(backward_targets)
    forward_targets = flatten_last_dim(forward_targets)

    return (entropy_estimators.mi(forward_windows, forward_targets),
            entropy_estimators.mi(backward_windows, backward_targets))

def apply_mi_over_len_grid(ts: NDArray) -> None:
    print("window+target: (backward - forward) / forward")
    for window_len in [3, 5, 10, 15]:
        for target_len in [1, 3, 5, 10]:
            forward, backward = calculate_mutual_info_for_dataset(ts=ts,
                                                                  window_len=window_len,
                                                                  target_len=target_len)
            # Print relative difference between backward and forward.
            print(f"{window_len}+{target_len}:\t{(backward - forward) / forward:.2e}")

In [12]:
apply_mi_over_len_grid(load_two_body_problem_time_series())

window+target: (backward - forward) / forward
3+1:	-5.47e-05
3+3:	2.16e-16
3+5:	-5.48e-05
3+10:	3.23e-03
5+1:	0.00e+00
5+3:	5.48e-05
5+5:	5.49e-05
5+10:	3.24e-03
10+1:	-3.26e-03
10+3:	-3.28e-03
10+5:	-3.23e-03
10+10:	4.31e-05
15+1:	-3.11e-03
15+3:	-3.02e-03
15+5:	-3.06e-03
15+10:	-1.44e-05


In [13]:
apply_mi_over_len_grid(load_lorenz_attractor_time_series())

window+target: (backward - forward) / forward
3+1:	-2.34e-04
3+3:	-4.78e-04
3+5:	1.75e-04
3+10:	1.97e-03
5+1:	-6.92e-04
5+3:	-9.73e-04
5+5:	0.00e+00
5+10:	1.48e-03
10+1:	-1.17e-03
10+3:	-1.95e-03
10+5:	-1.43e-03
10+10:	-2.63e-04
15+1:	-4.15e-03
15+3:	-4.11e-03
15+5:	-3.57e-03
15+10:	-2.08e-03


In [14]:
apply_mi_over_len_grid(load_belousov_zhabotinsky_time_series())

window+target: (backward - forward) / forward
3+1:	7.50e-04
3+3:	1.98e-04
3+5:	6.46e-05
3+10:	-1.58e-02
5+1:	7.33e-04
5+3:	7.39e-04
5+5:	-1.51e-04
5+10:	-1.55e-02
10+1:	1.71e-02
10+3:	1.69e-02
10+5:	1.57e-02
10+10:	2.61e-04
15+1:	1.69e-02
15+3:	1.73e-02
15+5:	1.75e-02
15+10:	6.23e-05


In [15]:
# Let's try to apply it to random noise
apply_mi_over_len_grid(np.random.normal(loc=1, scale=1, size=(4000, 3)))

window+target: (backward - forward) / forward
3+1:	5.97e-01
3+3:	3.51e+00
3+5:	-6.17e-01
3+10:	-3.66e-01
5+1:	-1.57e+00
5+3:	-1.70e+00
5+5:	1.78e-02
5+10:	1.22e+00
10+1:	-9.51e-01
10+3:	5.81e-01
10+5:	7.04e-01
10+10:	1.90e-01
15+1:	6.04e-01
15+3:	-3.08e+00
15+5:	1.13e+00
15+10:	-1.24e+00
