<a href="https://colab.research.google.com/github/raytheman/Bayesian/blob/main/%5BClean%5D_The_Evolutionary_Origin_of_Bayesian_like_Behavior_and_Finite_Memory.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Imports

In [None]:
import numpy as np
import pandas as pd
import math
from scipy.stats import binom, pearsonr
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import multiprocessing as mp
from google.colab import files, drive
from statsmodels.tsa.arima_process import arma_generate_sample
import warnings
warnings.filterwarnings("ignore")

In [None]:
EPSILON = 1E-8

#Sampling with Limited Information

In [None]:
# Probability of State Given Sample

def binon_right_prob(
    k, p
):
  '''
  k: number of sample
  p: probability that each sample is correct
  '''
  if not isinstance(k, int):
    print('input k must be integer')
    assert(False)
  prob = 1 - binom.cdf(k/2, k, p)
  if k % 2 == 0:
    prob += binom.pmf(k/2, k, p) / 2
  return prob

In [None]:
fig, ax = plt.subplots(figsize=(5,4))
k = np.arange(1,10)
for q in [0.9, 0.8, 0.7, 0.6]:
  plt.plot(
      k, [binon_right_prob(int(x), q) for x in k],
      'o-',
      label='q=' + str(q)
  )
plt.xlabel('Number of samples')
plt.ylabel('Probability to choose the right state')
plt.ylim(0.53, 1.02)
plt.legend(loc='lower right')
plt.show()
fig.savefig('binomial_cdf.png')

In [None]:
#Optimal Growth Rate

def growth_rate_with_opt_behavior(
    mode,
    fitness=3,
    weather_prob=0.8,
    regime_prob=0.5,
    num_sample=None,
    sample_correct_prob=None,
    cost_factor=None,
):
  '''
  mode: version of the model: {no sample, infinite sample, limited sample}
  fitness: number of offspring when env is in favor
  weather_prob: probability of sunny weather in summer, and rainy weather in winter
  regime_prob: probability of summer
  num_sample: number of samples (required when mode=='limited sample'),
  sample_correct_prob: probability that each sample is correct (required when mode=='limited sample'),
  cost_factor: cost factor for each sample (required when mode=='limited sample')
  '''
  p = weather_prob
  if mode == 'no sample':
    growth = regime_prob * math.log(regime_prob * fitness) + (1-regime_prob) * math.log((1-regime_prob) * fitness)
  elif mode == 'infinite sample':
    growth = p * math.log(p * fitness) + (1 - p) * math.log((1 - p) * fitness)
  elif mode == 'limited sample':
    if num_sample is None or sample_correct_prob is None or cost_factor is None:
      print('some input is None when mode is limited sample')
      assert False
    q = binon_right_prob(num_sample, sample_correct_prob)
    f = (1 - p) * (1 - q) + p * q
    growth = (
        p * (q * math.log(f * fitness) + (1 - q) * math.log((1 - f) * fitness))
        + (1 - p) * (q * math.log((1 - f) * fitness)
            + (1 - q) * math.log(f * fitness))
        - math.log(1 + num_sample * cost_factor))

  else:
    print('mode not right')
    assert False
  
  return growth

In [None]:
# Optimal number of samples

def optimal_samples(
    sample_correct_prob,
    cost_factor,
    fitness=3,
    weather_prob=0.8,
    regime_prob=0.5,
    max_samples=1000,
    verbose=False,
):
  '''
  sample_correct_prob: probability that each sample is correct (required when mode=='limited sample'),
  cost_factor: cost factor for each sample (required when mode=='limited sample')
  fitness: number of offspring when env is in favor
  weather_prob: probability of sunny weather in summer, and rainy weather in winter
  regime_prob: probability of summer
  '''
  results = {}
  optimal_sample = -1
  max_alpha = -9999
  for num_sample in np.arange(0, max_samples):
    alpha = growth_rate_with_opt_behavior(
        mode='limited sample',
        num_sample=int(num_sample),
        sample_correct_prob=sample_correct_prob,
        cost_factor=cost_factor,
    )
    results[num_sample] = alpha
    if alpha > max_alpha:
      max_alpha = alpha
      optimal_sample = num_sample
    if verbose and num_sample <= 10:
      print('k=' + str(num_sample) + ' alpha=' + str(alpha))
  return optimal_sample, results


In [None]:
opt_sample, results = optimal_samples(sample_correct_prob=0.8, cost_factor=0.02, verbose=True)
print(opt_sample)

x = [0] + list(np.arange(1,12,2))
y = [results[xval] for xval in x]
fig, ax = plt.subplots(figsize=(5,4))
plt.plot(
    x, y,
    'ko-',
    label='Limited Sample w/ Cost'
)
plt.plot(
    [0, 12],
    [alpha_infinite_sample, alpha_infinite_sample],
    'k--',
    label='Infinite Sample w/o Cost'
)
plt.plot(
    [0, 12],
    [alpha_no_sample, alpha_no_sample],
    'k-.',
    label='No Sample'
)
plt.scatter(
    [opt_sample],
    [results[opt_sample]],
    s=200,
    color='k',
)
plt.xlabel('Number of samples')
plt.ylabel('Growth rate')
plt.ylim(0.37, 0.62)
plt.legend(loc='center right')
plt.show()
fig.savefig('optimal_sample.png')

In [None]:
#Optimal number of samples as cost change

cost_vec = [0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.5, 1]
opt_sample_vec = []
for cost in cost_vec:
  opt_sample, results = optimal_samples(sample_correct_prob=0.8, cost_factor=cost)
  opt_sample_vec.append(opt_sample)
  print('cost=' + str(cost) + ' opt_sample=' + str(opt_sample))
  
  
fig, ax = plt.subplots(figsize=(5,4))
plt.plot(
    np.log10(np.array(cost_vec)), opt_sample_vec,
    'ko-',
)
plt.xlabel('Log10(cost magnitude)')
plt.ylabel('Optimal samples')
# plt.ylim(-1, 16)
fig.gca().yaxis.set_major_locator(MaxNLocator(integer=True))
plt.show()
fig.savefig('optimal_sample_with_cost.png')

In [None]:
# Optimal number of samples as data information increases

probs = np.arange(0.5, 1.01, 0.025)
opt_sample_vec = []
for sample_correct_prob in probs:
  opt_sample, results = optimal_samples(sample_correct_prob=round(sample_correct_prob, 5), cost_factor=0.005)
  opt_sample_vec.append(opt_sample)
  print('sample_correct_prob=' + str(round(sample_correct_prob, 3)) + ' opt_sample=' + str(opt_sample))
  

fig, ax = plt.subplots(figsize=(5,4))
plt.plot(
    probs, opt_sample_vec,
    'ko-',
)
plt.xlabel('Probability that each sample is correct (q)')
plt.ylabel('Optimal samples')
# plt.ylim(-1, 16)
fig.gca().yaxis.set_major_locator(MaxNLocator(integer=True))
plt.show()
fig.savefig('optimal_sample_with_information.png')

In [None]:
%%time
log_cost_vec = np.arange(-2.5, -0.49, 0.1)
probs = np.arange(0.5, 1.01, 0.025)

optimal_results_df = pd.DataFrame()

for cost in log_cost_vec:
  print('log cost=' + str(cost))
  for sample_correct_prob in probs:
    opt_sample, _ = optimal_samples(
        sample_correct_prob=round(sample_correct_prob, 5),
        cost_factor=math.pow(10, cost),
        max_samples=100)
    optimal_values = {'optimal_sample': opt_sample}
    optimal_values['sample_correct_prob'] = sample_correct_prob
    optimal_values['log_cost'] = cost
    optimal_results_df = optimal_results_df.append(optimal_values, ignore_index=True)

eta_dim = len(optimal_results_df['log_cost'].unique())
q_dim = len(optimal_results_df['sample_correct_prob'].unique())

df_plot = np.array(optimal_results_df['optimal_sample']).reshape(eta_dim, q_dim)

fig, ax = plt.subplots()
plt.imshow(df_plot, cmap='jet',
           interpolation='bicubic',
           )
ax.set_xlabel('Probability that each sample is correct ($q$)')
ax.set_ylabel('Log of cost magnitude ($\log(\eta)$)')
ax.set_yticks(np.arange(eta_dim)[::4])
ax.set_yticklabels(optimal_results_df['log_cost'].unique()[::4].round(2))
ax.set_xticks(np.arange(q_dim)[::4])
ax.set_xticklabels(optimal_results_df['sample_correct_prob'].unique()[::4].round(2))
cbar = plt.colorbar()
cbar.set_label('Optimal Number of Samples (m*)')
cbar.set_ticks(np.arange(df_plot.min(), df_plot.max(), 4))
plt.tight_layout()
plt.show()
plt.close()


# Finite Memory in Nonstationary Env

In [None]:
def ar_k(
    k, coefficient, noise_sigma, n_sample,
):
  '''
  p_t = c / k * (p_{t-1} + ... + p_{t-k}) + e_t
  k: order of AR
  coefficient: coefficient for lag terms
  noise_sigma: standard deviation of the white noise
  n_sample: number of samples to generate
  '''
  warmup = np.random.normal(0, noise_sigma, k)
  series = list(warmup)
  errors = np.random.normal(0, noise_sigma, size=n_sample)
  for i in range(n_sample):
    next_value = coefficient * np.mean(series[-k:]) + errors[i]
    series.append(next_value)
  return np.array(series[k:])

def logistic(x):
  return 1 / (1 + np.exp(-x))

def srw(n_sample):
  '''
  n_sample: number of samples to generate
  '''
  bernoulli = np.random.binomial(1, 0.5, n_sample)
  steps = bernoulli * 2 -1
  return steps.cumsum()

def ar_plus_srw(
    k, coefficient, noise_sigma, n_sample, weight_srw,
):
  '''
  p_t = c / k * (p_{t-1} + ... + p_{t-k}) + e_t
  k: order of AR
  coefficient: coefficient for lag terms
  noise_sigma: standard deviation of the white noise
  n_sample: number of samples to generate
  weight_srw:
  '''
  ar_series = np.array(ar_k(
    k, coefficient, noise_sigma, n_sample,))
  srw_series = np.array(srw(n_sample))
  # print(ar_series)
  # print(srw_series)
  return (1 - weight_srw) + ar_series + weight_srw * srw_series

def explore_combined(
    k, coefficient, noise_sigma, n_sample, weight_srw, autocorr_lag=100,
    show_plots=True,):
  ts_combined = ar_plus_srw(
      k=k, coefficient=coefficient, noise_sigma=noise_sigma, n_sample=n_sample, weight_srw=weight_srw,
  )
  if show_plots:
    plt.plot(np.arange(len(ts_combined)), ts_combined)
    plt.show()
    plt.plot(np.arange(len(ts_combined)), logistic(ts_combined))
    plt.show()

  if show_plots:
    auto_corr = autocorr(ts_combined, lag=autocorr_lag)
    auto_corr_pt = autocorr(logistic(ts_combined), lag=autocorr_lag)
    plt.plot(np.arange(len(auto_corr)), auto_corr,
            label='s_t', alpha=0.6)
    plt.plot(np.arange(len(auto_corr_pt)), auto_corr_pt,
            label='p_t', alpha=0.6)
    plt.legend(loc='best')
    plt.show()

  return ts_combined

In [None]:
def compare_paths(
    k, coefficient, noise_sigma, n_sample, weight_srw):
  fig, ax = plt.subplots(figsize=(4.4, 3.3))
  for i in range(len(k)):
    ts = ar_plus_srw(
        k=k[i], coefficient=coefficient[i], noise_sigma=noise_sigma[i],
        n_sample=n_sample[i], weight_srw=weight_srw[i],
    )
    plt.plot(np.arange(len(ts)), logistic(ts),
             alpha=0.7, 
             label='$\lambda$=' + str(weight_srw[i]))
  plt.xlabel('Generation (t)')
  plt.ylabel('Probability of Sunny Day ($p_t$)')
  plt.legend(loc='best')
  plt.tight_layout()
  plt.show()
  return 0

compare_paths(
    k=[10, 10],
    coefficient=[0.9, 0.9],
    noise_sigma=[1, 1],
    n_sample=[300, 300],
    weight_srw=[0.1, 0.9])


In [None]:
#Population growth

def fitness_prob_matching(f, p, fitness=3):
  '''
  f: behavior
  p: environmental probability
  '''
  return np.where(
      p > 1 - EPSILON,
      np.log(f) + np.log(fitness),
      np.where(
          p < EPSILON,
          np.log(1 - f) + np.log(fitness),
          p * np.log(f) + (1 - p) * np.log(1 - f) + np.log(fitness)
      ))

def evolution(states, fitness, max_memory):
  sim_results = pd.DataFrame({
      't': np.arange(len(states)),
      's_t': states,
      'p_t': logistic(states),
      })

  for memory in np.arange(1, max_memory+1):
    sim_results['S_t_ma_' + str(memory)] = sim_results.shift(1)['s_t'].rolling(window=memory).mean()
    sim_results['f_t_mem_' + str(memory)] = logistic(sim_results['S_t_ma_' + str(memory)])

  sim_results['fitness_t_oracle'] = fitness_prob_matching(
      f=sim_results['p_t'],
      p=sim_results['p_t'],
      fitness=fitness)

  for constant_f in np.arange(0, 1.1, 0.1):
    sim_results['fitness_t_const_' + str(round(constant_f, 1))] = fitness_prob_matching(
        f=round(constant_f, 1),
        p=sim_results['p_t'],
        fitness=fitness)

  for memory in np.arange(1, max_memory+1):
    sim_results['fitness_t_mem_' + str(memory)] = fitness_prob_matching(
        f=sim_results['f_t_mem_' + str(memory)],
        p=sim_results['p_t'],
        fitness=fitness)

  growth_rates = sim_results[[name for name in sim_results.columns if 'fitness' in name]].mean()
  return growth_rates, sim_results

In [None]:
#Optimal behavior and memory

def optimal_behavior(growth_rates):
  mu_oracle = growth_rates['fitness_t_oracle']
  f_const = growth_rates[[name for name in growth_rates.index if 'const' in name]].idxmax()
  mu_const = growth_rates[[name for name in growth_rates.index if 'const' in name]].max()
  f_mem = growth_rates[[name for name in growth_rates.index if 'mem' in name]].idxmax()
  mu_mem = growth_rates[[name for name in growth_rates.index if 'mem' in name]].max()
  return {
      'mu_oracle': mu_oracle,
      'mu_const': mu_const,
      'mu_mem': mu_mem,
      'f_star_const': float(f_const[len('fitness_t_const_'):]),
      'mem_star': int(f_mem[len('fitness_t_mem_'):]),
  }

In [None]:
##Dependence on Markov order and non-stationarity

%%time

def OneSim(lag_vec, weight_srw_vec, n_gen, sim_count, seed):
  print('-----Running Sim #' + str(sim_count) + ' with seed=' + str(seed))
  np.random.seed(seed)
  optimal_results_df = pd.DataFrame()
  for lag in lag_vec:
    for weight_srw in weight_srw_vec:
      states = explore_combined(
          k=lag,
          coefficient=0.9,
          noise_sigma=1,
          n_sample=n_gen,
          weight_srw=weight_srw,
          show_plots=False)
      growth_rates, _ = evolution(states, fitness=3, max_memory=30)
      optimal_values = optimal_behavior(growth_rates)
      optimal_values['sim'] = sim_count
      optimal_values['AR_lag'] = lag
      optimal_values['weight_srw'] = weight_srw
      optimal_results_df = optimal_results_df.append(optimal_values, ignore_index=True)
  return optimal_results_df

def SimForOptMemory(lag_vec, weight_srw_vec, n_gen, n_sim = 30, n_workers = 4, max_seed=99999999):
  pool = mp.Pool(processes = n_workers)
  seeds = np.random.randint(max_seed, size=n_sim, dtype=np.int32)
  results = [pool.apply(OneSim, args=(lag_vec, weight_srw_vec, n_gen, k, seeds[k])) for k in range(n_sim)]
  # results is a list with length=n_sim, each element is the returned dataframe from OneSim
  return pd.concat(results)


lag_vec = np.arange(1, 31, 1)
weight_srw_vec = np.arange(0.025, 1.0, 0.025)
n_gen = 30000
n_sim = 50

sim_results = SimForOptMemory(lag_vec, weight_srw_vec, n_gen, n_sim = n_sim, n_workers = 2, max_seed=99999999)


In [None]:
optimal_values_median = sim_results.groupby(['AR_lag', 'weight_srw'])[['mem_star']].median()

ar_dim = len(optimal_values_median.reset_index()['AR_lag'].unique())
weight_dim = len(optimal_values_median.reset_index()['weight_srw'].unique())

df_plot = np.array(optimal_values_median['mem_star']).reshape(ar_dim, weight_dim)

from mpl_toolkits.axes_grid1 import make_axes_locatable
fig, ax = plt.subplots()
im = plt.imshow(df_plot, cmap='jet',
           interpolation='bicubic',
           aspect=1.2,
           )
ax.set_xlabel('SRW weight ($\lambda$)')
ax.set_ylabel('AR lag ($k$)')

ax.set_xticks(np.arange(weight_dim)[3::4])
ax.set_xticklabels(optimal_values_median.reset_index()['weight_srw'].unique().round(2)[3::4])
ax.set_xlim(0, weight_dim-1)

ax.set_yticks(np.arange(ar_dim)[::4])
ax.set_yticklabels(optimal_values_median.reset_index()['AR_lag'].unique().astype(int)[::4])
ax.set_ylim(ar_dim-1, 0)

divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="3.5%", pad=-0.15)
cbar = plt.colorbar(im, cax=cax)
cbar.set_label('Optimal Memory (m*)')
cbar.set_ticks(np.arange(df_plot.min(), df_plot.max(), 4))
plt.tight_layout()
plt.show()
