In [None]:
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt

def get_diffusion_betas(spec, device):
    """Get betas from the hyperparameters."""
    
    if spec['type'] == 'linear':
        # Used by Ho et al. for DDPM, https://arxiv.org/abs/2006.11239.
        # To be used with Gaussian diffusion models in continuous and discrete
        # state spaces.
        # To be used with transition_mat_type = 'gaussian'
        return torch.linspace(spec['start'], spec['stop'], spec['num_timesteps']).to(device)
    
    elif spec['type'] == 'cosine':
        # Schedule proposed by Hoogeboom et al. https://arxiv.org/abs/2102.05379
        # To be used with transition_mat_type = 'uniform'.
        def cosine_beta_schedule(timesteps, s=0.008):
            """
            Cosine schedule as described in https://arxiv.org/abs/2102.09672.

            Parameters:
            - timesteps: int, the number of timesteps for the schedule.
            - s: float, small constant to prevent numerical issues.

            Returns:
            - betas: torch.Tensor, beta values for each timestep.
            - alphas: torch.Tensor, alpha values for each timestep.
            - alpha_bars: torch.Tensor, cumulative product of alphas for each timestep.
            """
            steps = timesteps + 1
            x = torch.linspace(0, timesteps, steps)
            alphas_cumprod = torch.cos((x / timesteps + s) / (1 + s) * torch.pi * 0.5) ** 2
            alphas_cumprod = alphas_cumprod / alphas_cumprod[0]

            betas = 1 - alphas_cumprod[1:] / alphas_cumprod[:-1]
            alphas = 1 - betas
            alpha_bars = torch.cumprod(alphas, dim=0)

            return betas, alphas, alpha_bars
        betas, alphas, alpha_bars = cosine_beta_schedule(spec['num_timesteps'])
        return betas.to(device)
    
    elif spec['type'] == 'jsd':  # 1/T, 1/(T-1), 1/(T-2), ..., 1
        # Proposed by Sohl-Dickstein et al., https://arxiv.org/abs/1503.03585
        # To be used with absorbing state models.
        # ensures that the probability of decaying to the absorbing state
        # increases linearly over time, and is 1 for t = T-1 (the final time).
        # To be used with transition_mat_type = 'absorbing'
        return 1. / torch.linspace(spec['num_timesteps'], 1, spec['num_timesteps']).to(device)
    else:
        raise NotImplementedError(spec['type'])
    
def _get_custom_transition_mat(t, betas):
    """
    Generates a 2x2 transition matrix for a discrete diffusion process at timestep t.

    Parameters:
    - t: int, current timestep.
    - device: torch.device, the device to place the tensor on.

    Returns:
    - transition_matrix: torch.Tensor, a 2x2 transition matrix.
    """
    alphas = 1 - betas
    alpha_bars = torch.cumprod(alphas, dim=0)
    alpha_bar_t = alpha_bars[t]
    
    # Compute the transition probabilities
    transition_matrix = torch.tensor([[0.5*alphas[t] + 0.5, 0.5 - 0.5*alphas[t]],
                                    [0.5 - 0.5*alphas[t], 0.5*alphas[t] + 0.5]])
    
    return transition_matrix

betas = get_diffusion_betas({'type': 'cosine', 'num_timesteps': 100}, 'cpu')
#print(betas)
alphas = 1 - betas
#print(alphas)
q_onestep_mats = [_get_custom_transition_mat(t, betas)
                               for t in range(0, 100)]
q_mat_t = q_onestep_mats[0]
q_mats = [q_mat_t]
for t in range(1, 100):
    # Q_{1...t} = Q_{1 ... t-1} Q_t = Q_1 Q_2 ... Q_t
    q_mat_t = torch.tensordot(q_mat_t, q_onestep_mats[t],
                            dims=[[1], [0]])
    q_mats.append(q_mat_t)
q_mats = torch.stack(q_mats, axis=0)
print(q_mats[0])
print(q_mats[50])
print(q_mats[99])


In [None]:
print(q_mats[0])
print(q_mats[50])
print(q_mats[99])

In [None]:
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
T=50
def get_diffusion_betas(spec, device):
    """Get betas from the hyperparameters."""
    
    if spec['type'] == 'linear':
        # Used by Ho et al. for DDPM, https://arxiv.org/abs/1006.11239.
        # To be used with Gaussian diffusion models in continuous and discrete
        # state spaces.
        # To be used with transition_mat_type = 'gaussian'
        return torch.linspace(spec['start'], spec['stop'], spec['num_timesteps']).to(device)
    elif spec['type'] == 'cosine':
        # Schedule proposed by Hoogeboom et al. https://arxiv.org/abs/2102.05379
        # To be used with transition_mat_type = 'uniform'.
        steps = torch.linspace(0, 1, spec['num_timesteps'] + 1, dtype=torch.float64)
        alpha_bar = torch.square(torch.cos(((steps + 0.008) / 1.008) * torch.pi / 2))
        betas = torch.minimum(1 - alpha_bar[1:] / alpha_bar[:-1], torch.tensor(0.999))
        return betas.to(device)
    elif spec['type'] == 'jsd':  # 1/T, 1/(T-1), 1/(T-2), ..., 1
        # Proposed by Sohl-Dickstein et al., https://arxiv.org/abs/1503.03585
        # To be used with absorbing state models.
        # ensures that the probability of decaying to the absorbing state
        # increases linearly over time, and is 1 for t = T-1 (the final time).
        # To be used with transition_mat_type = 'absorbing'
        return 1. / torch.linspace(spec['num_timesteps'], 1, spec['num_timesteps']).to(device)
    else:
        raise NotImplementedError(spec['type'])
    
    
def _get_gaussian_transition_mat(t):
    r"""Computes transition matrix for q(x_t|x_{t-1}).

    This method constructs a transition matrix Q with
    decaying entries as a function of how far off diagonal the entry is.
    Normalization option 1:
    Q_{ij} =  ~ softmax(-val^2/beta_t)   if |i-j| <= transition_bands
                1 - \sum_{l \neq i} Q_{il}  if i==j.
                0                          else.

    Normalization option 2:
    tilde{Q}_{ij} =  softmax(-val^2/beta_t)   if |i-j| <= transition_bands
                        0                        else.

    Q_{ij} =  tilde{Q}_{ij} / sum_l{tilde{Q}_{lj}}

    Args:
        t: timestep. integer scalar (or numpy array?)

    Returns:
        Q_t: transition matrix. shape = (num_classes, num_classes).
    """
    num_classes = 2
    device = 'cpu'
    transition_bands = 1
    spec = {'type': 'linear', 'start': 0.4, 'stop': 1.0, 'num_timesteps': T}
    betas = get_diffusion_betas(spec, device=device)
    transition_bands = transition_bands if transition_bands else num_classes - 1

    beta_t = betas[t]

    mat = torch.zeros((num_classes, num_classes),
                    dtype=torch.float64).to(device, non_blocking=True)

    # Make the values correspond to a similar type of gaussian as in the
    # gaussian diffusion case for continuous state spaces.
    values = torch.linspace(torch.tensor(0.), torch.tensor(num_classes-1), num_classes, dtype=torch.float64).to(device, non_blocking=True)
    #print(values)   # [0, 1]
    values = values * 2./ (num_classes - 1.)
    #print(values)   # [0, 2]
    values = values[:transition_bands+1]
    #print(values)   # [0, 2]
    values = -values * values / beta_t
    #print(values)   # [0, -6300]
    
    # To reverse the tensor 'values' starting from the second element
    reversed_values = values[1:].flip(dims=[0])
    #print(reversed_values)  # [-6300]
    # Concatenating the reversed values with the original values
    values = torch.cat([reversed_values, values], dim=0)
    #print(values)   # [-6300, 0, -6300]
    values = F.softmax(values, dim=0)
    #print(values)   # [0, 1, 0]
    values = values[transition_bands:]
    #print(values)   # [1, 0]
    
    for k in range(1, transition_bands + 1):
        off_diag = torch.full((num_classes - k,), values[k], dtype=torch.float64).to(device, non_blocking=True)

        mat += torch.diag(off_diag, k)
        mat += torch.diag(off_diag, -k)

    # Add diagonal values such that rows and columns sum to one.
    # Technically only the ROWS need to sum to one
    # NOTE: this normalization leads to a doubly stochastic matrix,
    # which is necessary if we want to have a uniform stationary distribution.
    diag = 1. - mat.sum(dim=1)
    mat += torch.diag_embed(diag)

    return mat.to(device, non_blocking=True)

def _get_prior_distribution_transition_mat(t):
        """Computes transition matrix for q(x_t|x_{t-1}).
        Use cosine schedule for these transition matrices.

        Args:
        t: timestep. integer scalar.

        Returns:
        Q_t: transition matrix. shape = (num_classes, num_classes).
        """
        device = 'cpu'
        spec = {'type': 'cosine', 'start': 0.4, 'stop': 0.8, 'num_timesteps': T}
        betas = get_diffusion_betas(spec, device=device)
        beta_t = betas[t]
        num_classes = 2
        class_probs = torch.tensor([1 - 5/11000, 5/11000], device=device)
        mat = torch.zeros((num_classes, num_classes), dtype=torch.float64).to(device, non_blocking=True)

        '''for i in range(num_classes):
            for j in range(num_classes):
                if i != j:
                    mat[i, j] = beta_t * class_probs[j]
                else:
                    mat[i, j] = 1 - beta_t + beta_t * class_probs[j]'''
        mat = beta_t * class_probs + (1 - beta_t) * torch.eye(2, device=device)
        
        return mat


q_one_step_mats = [_get_gaussian_transition_mat(t)
                            for t in range(0, T)]
q_onestep_mats = torch.stack(q_one_step_mats, axis=0).to('cpu', non_blocking=True)
assert q_onestep_mats.shape == (T,
                                    2,
                                    2)

# Construct transition matrices for q(x_t|x_start)
q_mat_t = q_onestep_mats[0]
q_mats = [q_mat_t]
for t in range(1, T):
    # Q_{1...t} = Q_{1 ... t-1} Q_t = Q_1 Q_2 ... Q_t
    q_mat_t = torch.tensordot(q_mat_t, q_onestep_mats[t],
                            dims=[[1], [0]])
    q_mats.append(q_mat_t)
q_mats = torch.stack(q_mats, axis=0)

for t in range(0, T, 9):
    print(q_mats[t])
    # print(_get_gaussian_transition_mat(t))
    # print(_get_prior_distribution_transition_mat(t))

'''import matplotlib.pyplot as plt

spec = {'type': 'linear', 'start': 1.0, 'stop': 100.0, 'num_timesteps': 100}
betas = get_diffusion_betas(spec, device='cpu')
plt.plot(betas)
plt.show()'''

In [None]:
e_marginals = torch.tensor([0.9, 0.1], dtype=torch.float64)
print(e_marginals.unsqueeze(0).expand(2, -1).unsqueeze(0))

In [None]:
import numpy as onp
def get_diffusion_betas(spec):
  """Get betas from the hyperparameters."""
  if spec['type'] == 'linear':
    # Used by Ho et al. for DDPM, https://arxiv.org/abs/2006.11239.
    # To be used with Gaussian diffusion models in continuous and discrete
    # state spaces.
    # To be used with transition_mat_type = 'gaussian'
    return onp.linspace(spec['start'], spec['stop'], spec['num_timesteps'])
  elif spec['type'] == 'cosine':
    # Schedule proposed by Hoogeboom et al. https://arxiv.org/abs/2102.05379
    # To be used with transition_mat_type = 'uniform'.
    steps = (
        onp.arange(spec['num_timesteps'] + 1, dtype=onp.float64) /
        spec['num_timesteps'])
    alpha_bar = onp.cos((steps + 0.008) / 1.008 * onp.pi / 2)
    betas = onp.minimum(1 - alpha_bar[1:] / alpha_bar[:-1], 0.999)
    return betas
  elif spec['type'] == 'jsd':  # 1/T, 1/(T-1), 1/(T-2), ..., 1
    # Proposed by Sohl-Dickstein et al., https://arxiv.org/abs/1503.03585
    # To be used with absorbing state models.
    # ensures that the probability of decaying to the absorbing state
    # increases linearly over time, and is 1 for t = T-1 (the final time).
    # To be used with transition_mat_type = 'absorbing'
    return 1. / onp.linspace(spec['num_timesteps'], 1., spec['num_timesteps'])
  else:
    raise NotImplementedError(spec['type'])
  
spec = {'type': 'linear', 'start': 0.4, 'stop': 0.8, 'num_timesteps': 100}
betas = get_diffusion_betas(spec)
alphas = 1 - betas
alpha_bar = onp.cumprod(alphas)
plt.plot(betas)
plt.plot(alpha_bar)
plt.legend(['betas', 'alpha_bar'])
plt.show()

import scipy
def _get_gaussian_transition_mat(t):
    r"""Computes transition matrix for q(x_t|x_{t-1}).

    This method constructs a transition matrix Q with
    decaying entries as a function of how far off diagonal the entry is.
    Normalization option 1:
    Q_{ij} =  ~ softmax(-val^2/beta_t)   if |i-j| <= transition_bands
             1 - \sum_{l \neq i} Q_{il}  if i==j.
             0                          else.

    Normalization option 2:
    tilde{Q}_{ij} =  softmax(-val^2/beta_t)   if |i-j| <= transition_bands
                     0                        else.

    Q_{ij} =  tilde{Q}_{ij} / sum_l{tilde{Q}_{lj}}

    Args:
      t: timestep. integer scalar (or numpy array?)

    Returns:
      Q_t: transition matrix. shape = (num_pixel_vals, num_pixel_vals).
    """
    transition_bands = 2 - 1
    betas = get_diffusion_betas(spec)
    beta_t = betas[t]

    mat = onp.zeros((2, 2),
                    dtype=onp.float64)

    # Make the values correspond to a similar type of gaussian as in the
    # gaussian diffusion case for continuous state spaces.
    values = onp.linspace(start=0., stop=1., num=2,
                          endpoint=True, dtype=onp.float64)
    print(values)
    values = values * 2./ (2 - 1.)
    print(values)
    values = values[:transition_bands+1]
    print(values)
    values = -values * values / beta_t
    print(values)

    values = onp.concatenate([values[:0:-1], values], axis=0)
    print(values)
    values = scipy.special.softmax(values, axis=0)
    print(values)
    values = values[transition_bands:]
    print(values)
    for k in range(1, transition_bands + 1):
      off_diag = onp.full(shape=(2 - k,),
                          fill_value=values[k],
                          dtype=onp.float64)

      mat += onp.diag(off_diag, k=k)
      mat += onp.diag(off_diag, k=-k)

    # Add diagonal values such that rows and columns sum to one.
    # Technically only the ROWS need to sum to one
    # NOTE: this normalization leads to a doubly stochastic matrix,
    # which is necessary if we want to have a uniform stationary distribution.
    diag = 1. - mat.sum(1)
    mat += onp.diag(diag, k=0)

    return mat
  
_get_gaussian_transition_mat(99)

In [None]:
import numpy as np

def cosine_beta_schedule(T, s=0.008):
    """
    Generate a cosine schedule for diffusion betas.
    
    Args:
    - T (int): The total number of diffusion timesteps.
    - s (float): A small constant to ensure the betas do not reach 0, which could cause numerical stability issues.
    
    Returns:
    - np.array: An array of length T with the beta values for each timestep.
    """
    steps = np.arange(T, dtype=np.float64)
    x = s + (1 - s) * np.cos(0.5 * np.pi * steps / T)
    betas = 1 - x / x.max()
    return betas

# Example usage
T = 1000  # Total number of diffusion steps
betas = cosine_beta_schedule(T)
alphas = 1 - betas
alpha_bar = np.cumprod(alphas)
plt.plot(betas)
#plt.plot(alphas)
plt.plot(alpha_bar)
plt.show()

In [None]:
import torch

def cosine_beta_schedule(timesteps, s=0.008):
    """
    Cosine schedule as described in https://arxiv.org/abs/2102.09672.

    Parameters:
    - timesteps: int, the number of timesteps for the schedule.
    - s: float, small constant to prevent numerical issues.

    Returns:
    - betas: torch.Tensor, beta values for each timestep.
    - alphas: torch.Tensor, alpha values for each timestep.
    - alpha_bars: torch.Tensor, cumulative product of alphas for each timestep.
    """
    steps = timesteps + 1
    x = torch.linspace(0, timesteps, steps)
    alphas_cumprod = torch.cos((x / timesteps + s) / (1 + s) * torch.pi * 0.5) ** 2
    alphas_cumprod = alphas_cumprod / alphas_cumprod[0]

    betas = 1 - alphas_cumprod[1:] / alphas_cumprod[:-1]
    alphas = 1 - betas
    alpha_bars = torch.cumprod(alphas, dim=0)

    return betas, alphas, alpha_bars

# Example usage
timesteps = 100  # Number of timesteps
device = 'cuda' if torch.cuda.is_available() else 'cpu'

betas, alphas, alpha_bars = cosine_beta_schedule(timesteps)
betas = betas.to(device)
alphas = alphas.to(device)
alpha_bars = alpha_bars.to(device)

import matplotlib.pyplot as plt
plt.plot(betas)
plt.plot(alpha_bars)
plt.legend(['betas', 'alpha_bar'])
plt.show()


In [None]:
import torch
import torch.nn.functional as F
T = 100

def get_diffusion_betas(spec, device):
    """Get betas from the hyperparameters."""
    
    if spec['type'] == 'linear':
        # Used by Ho et al. for DDPM, https://arxiv.org/abs/1006.11239.
        # To be used with Gaussian diffusion models in continuous and discrete
        # state spaces.
        # To be used with transition_mat_type = 'gaussian'
        return torch.linspace(spec['start'], spec['stop'], spec['num_timesteps']).to(device)
    elif spec['type'] == 'cosine':
        # Schedule proposed by Hoogeboom et al. https://arxiv.org/abs/2102.05379
        # To be used with transition_mat_type = 'uniform'.
        steps = torch.linspace(0, 1, spec['num_timesteps'] + 1, dtype=torch.float64)
        betas, alphas, alpha_bars = cosine_beta_schedule(spec['num_timesteps'])
        return betas.to(device)
    elif spec['type'] == 'jsd':  # 1/T, 1/(T-1), 1/(T-2), ..., 1
        # Proposed by Sohl-Dickstein et al., https://arxiv.org/abs/1503.03585
        # To be used with absorbing state models.
        # ensures that the probability of decaying to the absorbing state
        # increases linearly over time, and is 1 for t = T-1 (the final time).
        # To be used with transition_mat_type = 'absorbing'
        return 1. / torch.linspace(spec['num_timesteps'], 1, spec['num_timesteps']).to(device)
    else:
        raise NotImplementedError(spec['type'])
    
    
def _get_gaussian_transition_mat(t):
    r"""Computes transition matrix for q(x_t|x_{t-1}).

    This method constructs a transition matrix Q with
    decaying entries as a function of how far off diagonal the entry is.
    Normalization option 1:
    Q_{ij} =  ~ softmax(-val^2/beta_t)   if |i-j| <= transition_bands
                1 - \sum_{l \neq i} Q_{il}  if i==j.
                0                          else.

    Normalization option 2:
    tilde{Q}_{ij} =  softmax(-val^2/beta_t)   if |i-j| <= transition_bands
                        0                        else.

    Q_{ij} =  tilde{Q}_{ij} / sum_l{tilde{Q}_{lj}}

    Args:
        t: timestep. integer scalar (or numpy array?)

    Returns:
        Q_t: transition matrix. shape = (num_classes, num_classes).
    """
    num_classes = 2
    device = 'cpu'
    transition_bands = 1
    spec = {'type': 'cosine', 'start': 0.4, 'stop': 1.0, 'num_timesteps': T}
    betas = get_diffusion_betas(spec, device=device)
    # transition_bands = transition_bands if transition_bands else num_classes - 1

    beta_t = betas[t]

    mat = torch.zeros((num_classes, num_classes),
                    dtype=torch.float64).to(device, non_blocking=True)

    # Make the values correspond to a similar type of gaussian as in the
    # gaussian diffusion case for continuous state spaces.
    values = torch.linspace(torch.tensor(0.), torch.tensor(num_classes-1), num_classes, dtype=torch.float64).to(device, non_blocking=True)
    #print(values)   # [0, 1]
    values = values * 2./ (num_classes - 1.)
    #print(values)   # [0, 2]
    values = values[:transition_bands+1]
    #print(values)   # [0, 2]
    values = -values * values / beta_t
    #print(values)   # [0, -6300]
    
    # To reverse the tensor 'values' starting from the second element
    reversed_values = values[1:].flip(dims=[0])
    #print(reversed_values)  # [-6300]
    # Concatenating the reversed values with the original values
    values = torch.cat([reversed_values, values], dim=0)
    #print(values)   # [-6300, 0, -6300]
    values = F.softmax(values, dim=0)
    #print(values)   # [0, 1, 0]
    values = values[transition_bands:]
    #print(values)   # [1, 0]
    
    for k in range(1, transition_bands + 1):
        off_diag = torch.full((num_classes - k,), values[k], dtype=torch.float64).to(device, non_blocking=True)

        mat += torch.diag(off_diag, k)
        mat += torch.diag(off_diag, -k)

    # Add diagonal values such that rows and columns sum to one.
    # Technically only the ROWS need to sum to one
    # NOTE: this normalization leads to a doubly stochastic matrix,
    # which is necessary if we want to have a uniform stationary distribution.
    diag = 1. - mat.sum(dim=1)
    mat += torch.diag_embed(diag)

    return mat.to(device, non_blocking=True)

def _get_prior_distribution_transition_mat(t):
        """Computes transition matrix for q(x_t|x_{t-1}).
        Use cosine schedule for these transition matrices.

        Args:
        t: timestep. integer scalar.

        Returns:
        Q_t: transition matrix. shape = (num_classes, num_classes).
        """
        device = 'cpu'
        spec = {'type': 'cosine', 'start': 0.4, 'stop': 0.8, 'num_timesteps': T}
        betas = get_diffusion_betas(spec, device=device)
        beta_t = betas[t]
        num_classes = 2
        class_probs = torch.tensor([1 - 5/11000, 5/11000], device=device)
        mat = torch.zeros((num_classes, num_classes), dtype=torch.float64).to(device, non_blocking=True)

        '''for i in range(num_classes):
            for j in range(num_classes):
                if i != j:
                    mat[i, j] = beta_t * class_probs[j]
                else:
                    mat[i, j] = 1 - beta_t + beta_t * class_probs[j]'''
        mat = beta_t * class_probs + (1 - beta_t) * torch.eye(2, device=device)
        
        return mat


q_one_step_mats = [_get_gaussian_transition_mat(t)
                            for t in range(0, T)]
q_onestep_mats = torch.stack(q_one_step_mats, axis=0).to('cpu', non_blocking=True)
assert q_onestep_mats.shape == (T,
                                    2,
                                    2)

# Construct transition matrices for q(x_t|x_start)
q_mat_t = q_onestep_mats[0]
q_mats = [q_mat_t]
for t in range(1, T):
    # Q_{1...t} = Q_{1 ... t-1} Q_t = Q_1 Q_2 ... Q_t
    q_mat_t = torch.tensordot(q_mat_t, q_onestep_mats[t],
                            dims=[[1], [0]])
    q_mats.append(q_mat_t)
q_mats = torch.stack(q_mats, axis=0)

for t in range(0, T, 9):
    print(q_mats[t])
    # print(_get_gaussian_transition_mat(t))
    # print(_get_prior_distribution_transition_mat(t))

'''import matplotlib.pyplot as plt

spec = {'type': 'linear', 'start': 1.0, 'stop': 100.0, 'num_timesteps': 100}
betas = get_diffusion_betas(spec, device='cpu')
plt.plot(betas)
plt.show()'''

In [None]:
T = 50
def get_transition_matrix(t, device='cpu'):
    """
    Generates a 2x2 transition matrix for a discrete diffusion process at timestep t.

    Parameters:
    - t: int, current timestep.
    - device: torch.device, the device to place the tensor on.

    Returns:
    - transition_matrix: torch.Tensor, a 2x2 transition matrix.
    """
    betas, _, alpha_bars = cosine_beta_schedule(T)
    spec = {'type': 'linear', 'start': 0.00001, 'stop': 0.09, 'num_timesteps': T}
    betas = get_diffusion_betas(spec, device=device)
    alphas = 1 - betas
    alpha_bars = torch.cumprod(alphas, dim=0)
    alpha_bar_t = alpha_bars[t].to(device)
    
    # Compute the transition probabilities
    transition_matrix = torch.tensor([[alphas[t], 1 - alphas[t]],
                                      [1 - alphas[t], alphas[t]]], device=device)
    
    return transition_matrix

In [None]:
q_one_step_mats = [get_transition_matrix(t)
                            for t in range(0, T)]
q_onestep_mats = torch.stack(q_one_step_mats, axis=0).to('cpu', non_blocking=True)
assert q_onestep_mats.shape == (T,
                                    2,
                                    2)

# Construct transition matrices for q(x_t|x_start)
q_mat_t = q_onestep_mats[0]
q_mats = [q_mat_t]
for t in range(1, T):
    # Q_{1...t} = Q_{1 ... t-1} Q_t = Q_1 Q_2 ... Q_t
    q_mat_t = torch.tensordot(q_mat_t, q_onestep_mats[t],
                            dims=[[1], [0]])
    q_mats.append(q_mat_t)
q_mats = torch.stack(q_mats, axis=0)

zero_to_one = []
one_to_zero = []
for t in range(0, T, 1):
    zero_to_one.append(q_mats[t][0][1].item())
    one_to_zero.append(q_mats[t][1][0].item())
    print(q_mats[t])

In [None]:
plt.plot(zero_to_one)
plt.plot(one_to_zero, linestyle='dashed')
plt.legend(['0 -> 1', '1 -> 0'])
plt.title('Transition probabilities with linear noise schedule')
plt.show()

In [None]:
T = 50
def _get_prior_distribution_transition_mat(t):
    """Computes transition matrix for q(x_t|x_{t-1}).
    Use cosine schedule for these transition matrices.

    Args:
    t: timestep. integer scalar.

    Returns:
    Q_t: transition matrix. shape = (num_classes, num_classes).
    """
    device = 'cpu'
    spec = {'type': 'cosine', 'start': 0.01, 'stop': 0.46, 'num_timesteps': T}
    betas = get_diffusion_betas(spec, device=device)
    beta_t = betas[t]
    num_classes = 2
    class_probs = torch.tensor([1 - 5/16000, 5/16000], device=device)
    mat = torch.zeros((num_classes, num_classes), dtype=torch.float64).to(device, non_blocking=True)

    '''for i in range(num_classes):
        for j in range(num_classes):
            if i != j:
                mat[i, j] = beta_t * class_probs[j]
            else:
                mat[i, j] = 1 - beta_t + beta_t * class_probs[j]'''
    mat = beta_t * class_probs + (1 - beta_t) * torch.eye(2, device=device)
    
    return mat

In [None]:
q_one_step_mats = [_get_prior_distribution_transition_mat(t)
                            for t in range(0, T)]
q_onestep_mats = torch.stack(q_one_step_mats, axis=0).to('cpu', non_blocking=True)
assert q_onestep_mats.shape == (T,
                                    2,
                                    2)

# Construct transition matrices for q(x_t|x_start)
q_mat_t = q_onestep_mats[0]
q_mats = [q_mat_t]
for t in range(1, T):
    # Q_{1...t} = Q_{1 ... t-1} Q_t = Q_1 Q_2 ... Q_t
    q_mat_t = torch.tensordot(q_mat_t, q_onestep_mats[t],
                            dims=[[1], [0]])
    q_mats.append(q_mat_t)
q_mats = torch.stack(q_mats, axis=0)

zero_to_one = []
one_to_zero = []
for t in range(0, T, 1):
    zero_to_one.append(q_mats[t][0][1].item())
    one_to_zero.append(q_mats[t][1][0].item())
    print(q_mats[t])

In [None]:

# Create a figure and a set of subplots
fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(10, 5))

# Plot on the first subplot
axs[0].plot(zero_to_one, label='0 -> 1')
axs[0].legend()  # Add a legend

# Plot on the second subplot
axs[1].plot(one_to_zero, label='1 -> 0')
axs[1].legend()  # Add a legend

# Show the plot
fig.suptitle('Transition probabilities with cosine noise schedule')
plt.show()

In [None]:
print(5/16000)

In [None]:
def _get_transition_mat(t):
        r"""Computes transition matrix for q(x_t|x_{t-1}).

        This method constructs a transition
        matrix Q with
        Q_{ij} = beta_t / num_classes       if |i-j| <= transition_bands
                1 - \sum_{l \neq i} Q_{il} if i==j.
                0                          else.

        Args:
        t: timestep. integer scalar (or numpy array?)

        Returns:
        Q_t: transition matrix. shape = (num_classes, num_classes).
        """
        num_classes = 2
        device = 'cpu'
        spec = {'type': 'linear', 'start': 0.01, 'stop': 0.9, 'num_timesteps': T}
        betas = get_diffusion_betas(spec, device=device)
        beta_t = betas[t]
        
        mat = torch.zeros((num_classes, num_classes),
                        dtype=torch.float64)
        off_diag = torch.full((num_classes - 1,), fill_value=beta_t / float(num_classes), dtype=torch.float64)

        for k in range(1, 1 + 1):
            mat += torch.diag(off_diag, k)
            mat += torch.diag(off_diag, -k)
            off_diag = off_diag[:-1]

        # Add diagonal values such that rows sum to one
        diag = 1. - mat.sum(dim=1)
        mat += torch.diag(diag)
        
        return mat

In [None]:
q_one_step_mats = [_get_transition_mat(t)
                            for t in range(0, T)]
q_onestep_mats = torch.stack(q_one_step_mats, axis=0).to('cpu', non_blocking=True)
assert q_onestep_mats.shape == (T,
                                    2,
                                    2)

# Construct transition matrices for q(x_t|x_start)
q_mat_t = q_onestep_mats[0]
q_mats = [q_mat_t]
for t in range(1, T):
    # Q_{1...t} = Q_{1 ... t-1} Q_t = Q_1 Q_2 ... Q_t
    q_mat_t = torch.tensordot(q_mat_t, q_onestep_mats[t],
                            dims=[[1], [0]])
    q_mats.append(q_mat_t)
q_mats = torch.stack(q_mats, axis=0)

zero_to_one = []
one_to_zero = []
for t in range(0, T, 1):
    zero_to_one.append(q_mats[t][0][1].item())
    one_to_zero.append(q_mats[t][1][0].item())
    print(q_mats[t])

In [None]:

# Create a figure and a set of subplots
fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(10, 5))

# Plot on the first subplot
axs[0].plot(zero_to_one, label='0 -> 1')
axs[0].legend()  # Add a legend

# Plot on the second subplot
axs[1].plot(one_to_zero, label='1 -> 0')
axs[1].legend()  # Add a legend

# Show the plot
fig.suptitle('Transition probabilities with cosine noise schedule')
plt.show()

In [None]:
import torch
q_mats = torch.stack([torch.tensor([[0.4, 0.6], [0.6, 0.4]]), 
                      torch.tensor([[0.7, 0.3], [0.1, 0.8]]), 
                      torch.tensor([[0.2, 0.8],[0.8, 0.2]])], axis=0)

In [None]:
print(q_mats[1, torch.tensor([0, 0, 0, 1, 0, 1, 0, 0])])

In [None]:
import torch
from torch import tensor
def calculate_fut_ratio(sample_list, ground_truth_fut):
    """
    Calculates the ratio of samples in `sample_list` that have at least n edges in common with the ground truth future trajectory for each n up to future_len.

    Args:
        sample_list (list): A list of samples.
        ground_truth_fut (list): A list of ground truth future trajectories.

    Returns:
        dict: A dictionary where keys are the minimum number of common edges (from 1 to future_len) and values are the ratios of samples meeting that criterion.
    """
    # Initialize counts for each number of common edges from 1 up to future_len
    future_len = 2
    counts = {i: 0 for i in range(1, future_len + 1)}            

    total = 0
    for (sample_sublist, ground_truth_sublist) in zip(sample_list, ground_truth_fut):
        total += len(sample_sublist)
        for i in range(len(sample_sublist)):
            # Convert tensors to lists if they are indeed tensors
            sample = sample_sublist[i].tolist()
            ground_truth = ground_truth_sublist[i].flatten().tolist()

            edges_count = sum(1 for edge in ground_truth if edge in sample)
            for n in range(1, min(edges_count, future_len) + 1):
                counts[n] += 1
        
    if total != 0:
        ratios = {n: counts[n] / total for n in counts}
    else:
        ratios = {n: 0 for n in counts}
    return ratios

In [None]:
sample_list = [[tensor([1, 2]), tensor([3, 4])], [tensor([0, 2]), tensor([4, 6])]]
ground_truth_fut = [[tensor([[2, 1]]), tensor([[1, 4]])], [tensor([[4, 2]]), tensor([[4, 6]])]]
print(calculate_fut_ratio(sample_list, ground_truth_fut))

In [None]:
import torch
def get_diffusion_betas(spec, device):
    """Get betas from the hyperparameters."""
    
    if spec['type'] == 'linear':
        # Used by Ho et al. for DDPM, https://arxiv.org/abs/2006.11239.
        # To be used with Gaussian diffusion models in continuous and discrete
        # state spaces.
        # To be used with transition_mat_type = 'gaussian'
        return torch.linspace(spec['start'], spec['stop'], spec['num_timesteps']).to(device)
    
    elif spec['type'] == 'cosine':
        # Schedule proposed by Hoogeboom et al. https://arxiv.org/abs/2102.05379
        # To be used with transition_mat_type = 'uniform'.
        def cosine_beta_schedule(timesteps, s=0.008):
            """
            Cosine schedule as described in https://arxiv.org/abs/2102.09672.

            Parameters:
            - timesteps: int, the number of timesteps for the schedule.
            - s: float, small constant to prevent numerical issues.

            Returns:
            - betas: torch.Tensor, beta values for each timestep.
            - alphas: torch.Tensor, alpha values for each timestep.
            - alpha_bars: torch.Tensor, cumulative product of alphas for each timestep.
            """
            steps = timesteps + 1
            x = torch.linspace(0, timesteps, steps)
            alphas_cumprod = torch.cos((x / timesteps + s) / (1 + s) * torch.pi * 0.5) ** 2
            alphas_cumprod = alphas_cumprod / alphas_cumprod[0]

            betas = 1 - alphas_cumprod[1:] / alphas_cumprod[:-1]
            alphas = 1 - betas
            alpha_bars = torch.cumprod(alphas, dim=0)

            return betas, alphas, alpha_bars
        betas, alphas, alpha_bars = cosine_beta_schedule(spec['num_timesteps'])
        return betas.to(device)
    
    elif spec['type'] == 'jsd':  # 1/T, 1/(T-1), 1/(T-2), ..., 1
        # Proposed by Sohl-Dickstein et al., https://arxiv.org/abs/1503.03585
        # To be used with absorbing state models.
        # ensures that the probability of decaying to the absorbing state
        # increases linearly over time, and is 1 for t = T-1 (the final time).
        # To be used with transition_mat_type = 'absorbing'
        return 1. / torch.linspace(spec['num_timesteps'], 1, spec['num_timesteps']).to(device)
    else:
        raise NotImplementedError(spec['type'])

In [None]:
def _get_prior_distribution_transition_mat(spec, t):
    betas = get_diffusion_betas(spec, 'cpu')
    device = 'cpu'
    beta_t = betas[t]
    num_classes = 2
    class_probs = torch.tensor([1 - 5/11000, 5/11000])
    mat = torch.zeros((num_classes, num_classes), dtype=torch.float64).to(device, non_blocking=True)

    for i in range(num_classes):
        for j in range(num_classes):
            if i != j:
                mat[i, j] = beta_t * class_probs[j]
            else:
                mat[i, j] = 1 - beta_t + beta_t * class_probs[j]
    
    return mat

def _get_custom_transition_mat(spec, t):
    device = 'cpu'
    betas = get_diffusion_betas(spec, 'cpu')
    alphas = 1 - betas
    
    # Compute the transition probabilities
    transition_matrix = torch.tensor([[alphas[t] + 0.5*betas[t], (1 - alphas[t])/2],
                                    [(1 - alphas[t])/2, alphas[t] + 0.5*betas[t]]], device=device)
    
    return transition_matrix

In [None]:
spec = {'type': 'cosine', 'start': 1e-6, 'stop': 0.04, 'num_timesteps': 100}

In [None]:
q_one_step_mats = [_get_custom_transition_mat(spec, t)
                            for t in range(0, spec['num_timesteps'])]
q_onestep_mats = torch.stack(q_one_step_mats, axis=0).to('cpu', non_blocking=True)
assert q_onestep_mats.shape == (spec['num_timesteps'],
                                    2,
                                    2)

# Construct transition matrices for q(x_t|x_start)
q_mat_t = q_onestep_mats[0]
q_mats = [q_mat_t]
for t in range(1, spec['num_timesteps']):
    # Q_{1...t} = Q_{1 ... t-1} Q_t = Q_1 Q_2 ... Q_t
    q_mat_t = torch.tensordot(q_mat_t, q_onestep_mats[t],
                            dims=[[1], [0]])
    q_mats.append(q_mat_t)
q_mats = torch.stack(q_mats, axis=0)

zero_to_one = []
one_to_zero = []
for t in range(0, spec['num_timesteps'], 1):
    zero_to_one.append(q_mats[t][0][1].item())
    one_to_zero.append(q_mats[t][1][0].item())
    print(q_mats[t])

In [None]:
import matplotlib.pyplot as plt
# Create a figure and a set of subplots
fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(10, 5))

# Plot on the first subplot
axs[0].plot(zero_to_one, label='0 -> 1')
axs[0].legend()  # Add a legend

# Plot on the second subplot
axs[1].plot(one_to_zero, label='1 -> 0')
axs[1].legend()  # Add a legend

# Show the plot
fig.suptitle('Cumulative transition probabilities with cosine noise schedule')
plt.show()

In [None]:
q_one_step_mats = [_get_custom_transition_mat(spec, t)
                            for t in range(0, spec['num_timesteps'])]
q_onestep_mats = torch.stack(q_one_step_mats, axis=0).to('cpu', non_blocking=True)
assert q_onestep_mats.shape == (spec['num_timesteps'],
                                    2,
                                    2)

# Construct transition matrices for q(x_t|x_start)
q_mat_t = q_onestep_mats[0]
q_mats = [q_mat_t]
for t in range(1, spec['num_timesteps']):
    # Q_{1...t} = Q_{1 ... t-1} Q_t = Q_1 Q_2 ... Q_t
    q_mat_t = torch.tensordot(q_mat_t, q_onestep_mats[t],
                            dims=[[1], [0]])
    q_mats.append(q_mat_t)
q_mats = torch.stack(q_mats, axis=0)

zero_to_one = []
one_to_zero = []
for t in range(0, spec['num_timesteps'], 1):
    zero_to_one.append(q_mats[t][0][1].item())
    one_to_zero.append(q_mats[t][1][0].item())
    print(q_mats[t])

In [None]:
import matplotlib.pyplot as plt
# Create a figure and a set of subplots
fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(10, 5))

# Plot on the first subplot
axs[0].plot(zero_to_one, label='0 -> 1')
axs[0].legend()  # Add a legend

# Plot on the second subplot
axs[1].plot(one_to_zero, label='1 -> 0')
axs[1].legend()  # Add a legend

# Show the plot
fig.suptitle('Cumulative transition probabilities with cosine noise schedule')
plt.show()

In [None]:
import torch
def get_diffusion_betas(spec, device):
    """Get betas from the hyperparameters."""
    
    if spec['type'] == 'linear':
        # Used by Ho et al. for DDPM, https://arxiv.org/abs/2006.11239.
        # To be used with Gaussian diffusion models in continuous and discrete
        # state spaces.
        # To be used with transition_mat_type = 'gaussian'
        return torch.linspace(spec['start'], spec['stop'], spec['num_timesteps']).to(device)
    
    elif spec['type'] == 'cosine':
        # Schedule proposed by Hoogeboom et al. https://arxiv.org/abs/2102.05379
        # To be used with transition_mat_type = 'uniform'.
        def cosine_beta_schedule(timesteps, s=0.008):
            """
            Cosine schedule as described in https://arxiv.org/abs/2102.09672.

            Parameters:
            - timesteps: int, the number of timesteps for the schedule.
            - s: float, small constant to prevent numerical issues.

            Returns:
            - betas: torch.Tensor, beta values for each timestep.
            - alphas: torch.Tensor, alpha values for each timestep.
            - alpha_bars: torch.Tensor, cumulative product of alphas for each timestep.
            """
            steps = timesteps + 1
            x = torch.linspace(0, timesteps, steps)
            alphas_cumprod = torch.cos((x / timesteps + s) / (1 + s) * torch.pi * 0.5) ** 2
            alphas_cumprod = alphas_cumprod / alphas_cumprod[0]

            betas = 1 - alphas_cumprod[1:] / alphas_cumprod[:-1]
            alphas = 1 - betas
            alpha_bars = torch.cumprod(alphas, dim=0)

            return betas, alphas, alpha_bars
        betas, alphas, alpha_bars = cosine_beta_schedule(spec['num_timesteps'])
        return betas.to(device)
    
    elif spec['type'] == 'jsd':  # 1/T, 1/(T-1), 1/(T-2), ..., 1
        # Proposed by Sohl-Dickstein et al., https://arxiv.org/abs/1503.03585
        # To be used with absorbing state models.
        # ensures that the probability of decaying to the absorbing state
        # increases linearly over time, and is 1 for t = T-1 (the final time).
        # To be used with transition_mat_type = 'absorbing'
        return 1. / torch.linspace(spec['num_timesteps'], 1, spec['num_timesteps']).to(device)
    else:
        raise NotImplementedError(spec['type'])

In [None]:
import matplotlib.pyplot as plt
print(plt.style.available)
plt.style.use('seaborn-v0_8-paper')  # or 'ggplot', 'fivethirtyeight', etc.
plt.rcParams.update({
    'axes.labelsize': 12,
    'xtick.labelsize': 10,
    'ytick.labelsize': 10,
    'legend.fontsize': 10,
    'figure.figsize': (8, 6),
    'axes.grid': True,
    'grid.alpha': 0.2
})

In [None]:
spec = {'type': 'cosine', 'start': 0.01, 'stop': 0.99, 'num_timesteps': 1000}
betas = get_diffusion_betas(spec, 'cpu')
alphas = 1 - betas
alpha_bars = torch.cumprod(alphas, dim=0)
t_over_T = torch.linspace(0, 1, spec['num_timesteps'])
spec_linear = {'type': 'linear', 'start': 0.01, 'stop': 0.01, 'num_timesteps': 1000}
betas_linear = get_diffusion_betas(spec_linear, 'cpu')
alphas_linear = 1 - betas_linear
alpha_bars_linear = torch.cumprod(alphas_linear, dim=0)

In [None]:
print(betas[0], betas[-1])
print(betas_linear[0], betas_linear[-1])

In [None]:
plt.plot(t_over_T, alpha_bars_linear, label='Linear')
plt.plot(t_over_T, alpha_bars, label='Cosine')
plt.xlabel(r'Diffusion Step ($t/T$)')
plt.ylabel(r'$\bar{\alpha}_t$')
plt.legend()
plt.show()