In [13]:
import thermotools.wham as wham
import thermotools.tram as tram
import thermotools.tram_direct as tram_direct
import thermotools.mbar as mbar
import thermotools.mbar_direct as mbar_direct
import thermotools.dtram as dtram
import thermotools.tram as tram
import numpy as np
from numpy.testing import assert_allclose

#   ************************************************************************************************
#   data generation functions
#   ************************************************************************************************

def tower_sample(distribution):
    cdf = np.cumsum(distribution)
    rnd = np.random.rand() * cdf[-1]
    return np.searchsorted(cdf, rnd)

def draw_independent_samples(biased_stationary_distribution, n_samples):
    state_counts = np.zeros(shape=biased_stationary_distribution.shape, dtype=np.intc)
    conf_state_sequence = np.zeros(
        biased_stationary_distribution.shape[0]*n_samples, dtype=np.intc)
    for K in range(biased_stationary_distribution.shape[0]):
        for s in range(n_samples):
            i = tower_sample(biased_stationary_distribution[K, :])
            state_counts[K, i] += 1
            conf_state_sequence[K * n_samples + s] = i
    return conf_state_sequence, state_counts

def draw_transition_counts(transition_matrices, n_samples, x0):
    """generates a discrete Markov chain"""
    count_matrices = np.zeros(shape=transition_matrices.shape, dtype=np.intc)
    conf_state_sequence = np.zeros(
        shape=(transition_matrices.shape[0]*(n_samples+1),), dtype=np.intc)
    state_counts = np.zeros(shape=transition_matrices.shape[0:2], dtype=np.intc)
    h = 0
    for K in range(transition_matrices.shape[0]):
        x = x0
        state_counts[K, x] += 1
        conf_state_sequence[h] = x
        h += 1
        for s in range(n_samples):
            x_new = tower_sample(transition_matrices[K, x, :])
            count_matrices[K, x, x_new] += 1
            x = x_new
            state_counts[K, x] += 1
            conf_state_sequence[h] = x
            h += 1
    return count_matrices, conf_state_sequence, state_counts

#   ************************************************************************************************
#   test class
#   ************************************************************************************************

class TestThreeTwoModel(object):
    @classmethod
    def setup_class(cls):
        cls.energy = np.array([1.0, 2.0, 0.0], dtype=np.float64)
        cls.bias_energies = np.array([[0.0, 0.0, 0.0], 2.0 - cls.energy], dtype=np.float64)
        cls.stationary_distribution = np.exp(-cls.energy) / np.exp(-cls.energy).sum()
        cls.conf_energies = -np.log(cls.stationary_distribution)
        cls.biased_conf_energies = cls.conf_energies[np.newaxis, :] + cls.bias_energies
        cls.conf_partition_function = np.exp(-cls.biased_conf_energies)
        cls.partition_function = cls.conf_partition_function.sum(axis=1)
        cls.therm_energies = -np.log(cls.partition_function)
        cls.biased_stationary_distribution = np.exp(-cls.bias_energies) * \
            cls.stationary_distribution[np.newaxis, :] / cls.partition_function[:, np.newaxis]
        metropolis = cls.energy[np.newaxis, :] - cls.energy[:, np.newaxis]
        metropolis[(metropolis < 0.0)] = 0.0
        selection = np.array([[0.5, 0.5, 0.0], [0.5, 0.0, 0.5], [0.0, 0.5, 0.5]], dtype=np.float64)
        metr_hast = selection * np.exp(-metropolis)
        for i in range(metr_hast.shape[0]):
            metr_hast[i, i] = 0.0
            metr_hast[i, i] = 1.0 - metr_hast[i, :].sum()
        cls.transition_matrices = np.array([metr_hast, selection])
        cls.n_samples = 10000
        cls.conf_state_sequence_ind, cls.state_counts_ind = draw_independent_samples(
            cls.biased_stationary_distribution, cls.n_samples)
        cls.count_matrices, cls.conf_state_sequence, cls.state_counts = draw_transition_counts(
            cls.transition_matrices, cls.n_samples, 0)
    @classmethod

    def test_tram_direct(self):
        bias_energies = np.ascontiguousarray(self.bias_energies[:,self.conf_state_sequence].T)
        # this serves as template for a test system for DHAMed implementation
        biased_conf_energies, conf_energies, therm_energies, log_lagrangian_mult, error_history, logL_history = tram_direct.estimate(
            self.count_matrices, self.state_counts, [bias_energies], [self.conf_state_sequence],
            maxiter=10000, maxerr=1.0E-12, save_convergence_info=10)
        return biased_conf_energies, conf_energies, therm_energies
        
    def test_dhamed_direct(self):
        bias_energies = np.ascontiguousarray(self.bias_energies[:,self.conf_state_sequence].T)
        o = tram_direct.estimate(
            self.count_matrices, self.state_counts, [bias_energies], [self.conf_state_sequence],
            maxiter=400000, maxerr=1.0E-12, save_convergence_info=0, dhamed_true=True)
        return o

In [14]:
t = TestThreeTwoModel()
t.setup_class()

In [15]:
biased_conf_energies, conf_energies, therm_energies = t.test_tram_direct()

In [16]:
biased_conf_energies # first ensemble has bias zero

array([[ 1.40648408,  2.41734668,  0.40670746],
       [ 2.40648408,  2.41734668,  2.40670746]])

In [17]:
conf_energies # 

array([ 1.40648408,  2.41734668,  0.40670746])

In [18]:
therm_energies # free energies of the different ensembles kT=1

array([ -2.23072116e-11,   1.31155428e+00])

In [19]:
o = t.test_dhamed_direct()



In [20]:
o[0]

array([[ 0.58108626,  0.58108626,  0.58108626],
       [ 0.58108626,  0.58108626,  0.58108626]])

In [21]:
biased_conf_energies

array([[ 1.40648408,  2.41734668,  0.40670746],
       [ 2.40648408,  2.41734668,  2.40670746]])

In [22]:
o[1]

array([ 2.03958727,  1.86175697,  0.33614674])

In [23]:
conf_energies

array([ 1.40648408,  2.41734668,  0.40670746])

In [24]:
o[2]

array([-0.51752603, -0.51752603])

In [25]:
therm_energies

array([ -2.23072116e-11,   1.31155428e+00])