<a href="https://colab.research.google.com/github/vladipako/Transfer_Entropy_Project/blob/main/TE_Package.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
from collections import Counter
from typing import Union
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
from functools import partial
import matplotlib.pyplot as plt

In [None]:
# %%time
# analysis = TransferEntropy('cumulative', x_series, y_series, x_lag = 1)
# result = analysis.te_calculator.processor()
# print(result)

In [None]:
class Utilities:
  def __init__(self, entropy, mode, x_series, y_series, lag, n_shuffles):
    self.entropy = entropy
    self.x_series = x_series
    self.y_series = y_series
    self.mode = mode
    self.n_shuffles = n_shuffles
    self.lag = lag
    self.entropy = entropy

  def shuffle_series(self, series):
    return np.random.permutation(series)

  def perform_shuffle_test(self):
    shuffle_tes = []
    for _ in tqdm(range(self.n_shuffles), desc='Running through shuffle iterations: '):
        y_shuffled = self.shuffle_series(self.y_series)
        analysis = TransferEntropy(self.mode, self.x_series, y_shuffled, self.lag)
        shuffle_te = analysis.te_calculator.processor()
        shuffle_tes.append(shuffle_te)

    p_value = sum(shuffle_te >= self.entropy for shuffle_te in shuffle_tes) / self.n_shuffles
    return shuffle_tes, p_value

  def shuffle_and_calculate(self, y_shuffled):
    analysis = TransferEntropy(self.mode, self.x_series, y_shuffled, self.lag)
    return analysis.te_calculator.processor()

  def perform_shuffle_test_parallel(self):
    shuffle_func = partial(self.shuffle_and_calculate)
    with Pool(processes=cpu_count()) as pool:

        shuffle_tes = list(tqdm(
            pool.imap(shuffle_func, (self.shuffle_series(self.y_series) for _ in range(self.n_shuffles))),
            total=self.n_shuffles,
            desc='Running through shuffle iterations'
        ))

    p_value = sum(shuffle_te >= self.entropy for shuffle_te in shuffle_tes) / self.n_shuffles
    return shuffle_tes, p_value

In [None]:
class TestDataGenerator:

  def __init__(self,
               n_samples = 10_000,
               x_scale = 1,
               y_scale = 1,
               ):

    self.n_samples = n_samples
    self.x_scale = x_scale
    self.y_scale = y_scale

  def randomData(self):
    x = np.random.normal(loc = 0, scale = self.x_scale, size = self.n_samples)
    y = np.random.normal(loc = 0, scale = self.y_scale, size = self.n_samples)
    return pd.Series(x), pd.Series(y)

  def simpleData(self):
    x = np.random.normal(loc = 0, scale = self.x_scale, size = self.n_samples)
    x_scaled = x * np.pi
    y = np.sin(np.roll(x_scaled, 1)) + np.random.normal(loc = 0, scale = self.y_scale, size = self.n_samples)
    y[0] = np.sin(x_scaled[-1]) + np.random.randn() * self.y_scale
    return pd.Series(x), pd.Series(y)

  def complexData(self):
    x = np.random.normal(loc = 0, scale = self.x_scale, size = self.n_samples)
    y = np.zeros(self.n_samples)

    # Initialize the first two y values (as they depend on non-existent past x values)
    y[0] = np.random.randn() * self.y_scale
    y[1] = np.random.randn() * self.y_scale

    for t in range(2, self.n_samples):
      if x[t] > 0:
          y[t] = np.sin(x[t-1]) + np.random.randn() * self.y_scale
      else:
          y[t] = np.cos(x[t-2])**2 + np.random.randn() * self.y_scale

    return pd.Series(x), pd.Series(y)

In [None]:
class TransferEntropy:

  def __init__(self,
               mode: str,
               x_series: Union[np.ndarray, pd.Series],
               y_series: Union[np.ndarray, pd.Series],
               x_lag: int,
               discretization_method: str = 'quantile',
               n_bins: int = 5,
               base: float = 2,
               ):
    """
    Initialize the TransferEntropyAnalyzer.

    Parameters:
    mode (str): 'cumulative' or 'individual'
    x_series (array-like): Input series X
    y_series (array-like): Input series Y
    x_lag (int): number of lags (cumulative case) or specific lag (individual case)
    n_bins (int): Number of bins for discretization
    discretization_method (str): 'quantile' or 'uniform'
    base (float): Base for logarithm in TE calculation
    """

    self.mode = mode.lower()
    self.x_lag = x_lag
    self.n_bins = n_bins
    self.discretization_method = discretization_method
    self.base = base
    self.x_series = self.discretize_returns(x_series)
    self.y_series = self.discretize_returns(y_series)

    if self.mode == 'cumulative':
        self.te_calculator = CumulativeTransferEntropy(
            self.x_series, self.y_series, self.x_lag, self.n_bins, self.base
            )

    elif self.mode == 'individual':
        self.te_calculator = IndividualTransferEntropy(
            self.x_series, self.y_series, self.x_lag, self.n_bins, self.base
        )

    else:
        raise ValueError("Mode must be either 'cumulative' or 'individual'")

    self.results = None

  def __repr__(self):
    return f'TransferEntropyAnalyzer(mode={self.mode}'

  def shuffleTest(self, size):
    entropy = self.te_calculator.processor()
    utils = Utilities(entropy, self.mode, self.x_series, self.y_series, self.x_lag, size)
    # return utils.perform_shuffle_test()
    return utils.perform_shuffle_test_parallel()

  def calculate(self) -> float:
    return self.te_calculator.processor()

  def discretize_returns(self, returns: Union[np.ndarray, pd.Series]) -> pd.Series:

    if self.discretization_method == 'uniform':
        bins = np.linspace(np.min(returns), np.max(returns), self.n_bins + 1)
        discretized = pd.cut(returns, bins=self.bins, labels=False, include_lowest=True)
    elif self.discretization_method == 'quantile':
        discretized = pd.qcut(returns, q=self.n_bins, labels=False, duplicates='drop')
    else:
        raise ValueError("Discretization method must be either 'uniform' or 'quantile'")

    return pd.Series(discretized, index=getattr(returns, 'index', None))


In [None]:
class IndividualTransferEntropy:


  def __init__(self,
               x_series: Union[np.ndarray, pd.Series],
               y_series: Union[np.ndarray, pd.Series],
               lag: int,
               n_bins: int = 5,
               base: int = 2
               ):
    """
    Initialize the TransferEntropyCalculator.

    Parameters:
    n_bins (int): Number of bins to use for discretization. Default is 5.
    discretization_method (str): Method to use for discretization.
                                  Options are 'quantile' or 'uniform'. Default is 'quantile'.
    base (float): The logarithm base to use for entropy calculations. Default is 2 (bits).

    Attributes:
    n_bins (int): Number of bins for discretization.
    discretization_method (str): Method used for discretization.
    base (float): Logarithm base for entropy calculations.
    """
    self.n_bins = n_bins
    self.base = base

    self.lag = lag

    self.x_discrete = x_series
    self.y_discrete = y_series

  def processor(self):
    joint_probs = self.calculate_joint_probability(self.y_discrete, self.x_discrete, self.lag)
    cond_probs_x = self.calculate_conditional_probability_x(self.y_discrete, self.lag)
    cond_probs_xy = self.calculate_conditional_probability_xy(self.y_discrete, self.x_discrete, self.lag)
    self.te_result = self.calculate_transfer_entropy(joint_probs, cond_probs_x, cond_probs_xy)
    return self.te_result

  def calculate_joint_probability(self, x, y, lag):
      n = len(x)
      joint_states = []
      for t in range(lag, n - 1):
          x_future = x[t + 1]
          x_past = x[t - lag + 1]  # Only consider the specific lag
          y_past = y[t - lag + 1]  # Only consider the specific lag
          joint_state = (x_future, x_past, y_past)
          joint_states.append(joint_state)
      state_counts = Counter(joint_states)
      total_counts = sum(state_counts.values())
      return {state: count / total_counts for state, count in state_counts.items()}

  def calculate_conditional_probability_x(self, x, lag):
      n = len(x)
      state_counts = Counter()
      next_state_counts = Counter()
      for t in range(lag, n - 1):
          current_state = x[t - lag + 1]  # Only consider the specific lag
          next_state = x[t + 1]
          state_counts[current_state] += 1
          next_state_counts[(next_state, current_state)] += 1

      conditional_probs = {}
      for (next_state, current_state), count in next_state_counts.items():
          if current_state in state_counts:
              conditional_probs[(next_state, current_state)] = count / state_counts[current_state]

      return conditional_probs

  def calculate_conditional_probability_xy(self, x, y, lag):
      n = len(x)
      state_counts = Counter()
      next_state_counts = Counter()
      for t in range(lag, n - 1):
          current_state_x = x[t - lag + 1]  # Only consider the specific lag
          current_state_y = y[t - lag + 1]  # Only consider the specific lag
          current_state = (current_state_x, current_state_y)
          next_state = x[t + 1]
          state_counts[current_state] += 1
          next_state_counts[(next_state,) + current_state] += 1

      conditional_probs = {}
      for (next_state, *current_state), count in next_state_counts.items():
          current_state = tuple(current_state)
          if current_state in state_counts:
              conditional_probs[(next_state, current_state)] = count / state_counts[current_state]

      return conditional_probs

  def calculate_transfer_entropy(self, joint_probs, cond_probs_x, cond_probs_xy):
      te = 0
      for (x_next, x_past, y_past), joint_prob in joint_probs.items():
          if (x_next, x_past) in cond_probs_x and (x_next, (x_past, y_past)) in cond_probs_xy:
              p_x = cond_probs_x[(x_next, x_past)]
              p_xy = cond_probs_xy[(x_next, (x_past, y_past))]
              te += joint_prob * np.log2(p_xy / p_x)
      return te

In [None]:
class CumulativeTransferEntropy:

  def __init__(self,
               x_series: Union[np.ndarray, pd.Series],
               y_series: Union[np.ndarray, pd.Series],
               lag: int,
               n_bins: int = 5,
               base: int = 2):

    """
    Initialize the TransferEntropyCalculator.

    Parameters:
    n_bins (int): Number of bins to use for discretization. Default is 5.
    discretization_method (str): Method to use for discretization.
                                  Options are 'quantile' or 'uniform'. Default is 'quantile'.
    base (float): The logarithm base to use for entropy calculations. Default is 2 (bits).

    Attributes:
    n_bins (int): Number of bins for discretization.
    discretization_method (str): Method used for discretization.
    base (float): Logarithm base for entropy calculations.
    """

    self.n_bins = n_bins
    self.base = base

    self.x_discrete = x_series
    self.y_discrete = y_series

    self.lag = lag
    self.te = None

  def __repr__(self):
    return f"TransferEntropyCalculator(n_bins={self.n_bins}, " \
            f"base={self.base})"

  def processor(self):
    joint_probs = self.calculate_joint_probability(self.y_discrete, self.x_discrete, self.lag, self.lag)
    cond_probs_x = self.calculate_conditional_probability_x(self.y_discrete, self.lag)
    cond_probs_xy = self.calculate_conditional_probability_xy(self.y_discrete, self.x_discrete, self.lag, self.lag)
    self.te_result = self.calculate_transfer_entropy(joint_probs, cond_probs_x, cond_probs_xy, self.lag)
    return self.te_result

  def calculate_joint_probability(self, x, y, k, l):
      n = len(x)
      joint_states = []
      for t in range(max(k, l), n - 1):
          x_future = x[t + 1]
          x_past = tuple(x[t - i] for i in range(k))
          y_past = tuple(y[t - i] for i in range(l))
          joint_state = (x_future,) + x_past + y_past
          joint_states.append(joint_state)
      state_counts = Counter(joint_states)
      total_counts = sum(state_counts.values())
      return {state: count / total_counts for state, count in state_counts.items()}

  def calculate_conditional_probability_x(self, x, k):
      n = len(x)
      state_counts = Counter()
      next_state_counts = Counter()
      for t in range(k, n - 1):
          current_state = tuple(x[t - i] for i in range(k))
          next_state = x[t + 1]
          state_counts[current_state] += 1
          next_state_counts[(next_state,) + current_state] += 1

      conditional_probs = {}
      for full_state, count in next_state_counts.items():
          next_state = full_state[0]
          current_state = full_state[1:]
          if current_state in state_counts:
              conditional_probs[(next_state, current_state)] = count / state_counts[current_state]

      return conditional_probs

  def calculate_conditional_probability_xy(self, x, y, k, l):
      n = len(x)
      state_counts = Counter()
      next_state_counts = Counter()
      for t in range(max(k, l), n - 1):
          current_state_x = tuple(x[t - i] for i in range(k))
          current_state_y = tuple(y[t - i] for i in range(l))
          current_state = current_state_x + current_state_y
          next_state = x[t + 1]
          state_counts[current_state] += 1
          next_state_counts[(next_state,) + current_state] += 1

      conditional_probs = {}
      for full_state, count in next_state_counts.items():
          next_state = full_state[0]
          current_state = full_state[1:]
          if current_state in state_counts:
              conditional_probs[(next_state, current_state)] = count / state_counts[current_state]

      return conditional_probs

  def calculate_transfer_entropy(self, joint_probs, cond_probs_x, cond_probs_xy, lag):
      te = 0
      for (x_next, *state), joint_prob in joint_probs.items():
          state = tuple(state)
          x_state = state[:lag]
          xy_state = state
          if (x_next, x_state) in cond_probs_x and (x_next, xy_state) in cond_probs_xy:
              p_x = cond_probs_x[(x_next, x_state)]
              p_xy = cond_probs_xy[(x_next, xy_state)]
              te += joint_prob * np.log2(p_xy / p_x)
      return te

  def shuffle_series(self, series):
      return pd.Series(np.random.permutation(series.values), index=series.index)


In [None]:
generator = TestDataGenerator(n_samples = 1_000)
x_series, y_series = generator.complexData()

analysis = TransferEntropy('individual', x_series, y_series, x_lag = 2)
result = analysis.te_calculator.processor()

te, p = analysis.shuffleTest(500)
print(f'\n TE for the given lag is {result}, with the p-value of {p}')
# print(f'TE for the given lag is {result}')

Running through shuffle iterations: 100%|██████████| 500/500 [00:15<00:00, 32.35it/s]



 TE for the given lag is 0.07802499562194608, with the p-value of 0.044
