In [2]:
import sys
import os
import time

# Add the project root directory to sys.path (go up 1 level from notebooks/)
# Use os.getcwd() and navigate up since __file__ is not available in notebooks
current_dir = os.getcwd()
if 'notebooks' in current_dir:
    # Navigate up to discrete-gflownet project root (just 1 level up)
    project_root = os.path.abspath(os.path.join(current_dir, '..'))
else:
    # Already in project root or somewhere else
    project_root = current_dir

if project_root not in sys.path:
    sys.path.append(project_root) 
print(f"Project root: {project_root}")

from reward_func.evo_devo import coord_reward_func, oscillator_reward_func

test_state = (50, -53, -57, 8, 9, -6, -117, 81, 8)
start_time = time.perf_counter_ns()
test_reward = coord_reward_func(test_state)
end_time = time.perf_counter_ns()
print(f"Time taken to run coord_reward_func: {(end_time - start_time)/1e9:.9f} seconds")
print(f"Test reward for state {test_state}: {test_reward}")
    

Project root: /home/pfrancois/dannyhuang/gfn_test/discrete-gflownet
Time taken to run coord_reward_func: 0.000029811 seconds
Test reward for state (50, -53, -57, 8, 9, -6, -117, 81, 8): 4


In [3]:
# Import plotting function from graph module
from graph.graph import draw_network_motif
import matplotlib.pyplot as plt
from ipywidgets import interact, FloatSlider, IntSlider
import numpy as np
import time
from reward_func.evo_devo import somitogenesis_reward_func, somitogenesis_sol_func, weights_to_matrix, somitogenesis_sparsity_reward_func

In [None]:
test_state=[0, 0, 0, 0, 100, 0] # g(i,t)
# test_state= [0, 0, -100, 0, 0, 0, -100, -80, 0, 100, 100, 100] # 3n Repressilator 
# test_state=[70, 50, 10, -30,30, 20] # 2n somite-half
# test_state= [85, 50, 10, -80, 40, 20] # 2n somite-s
test_state= [100, 0, 40, -100, 30, 20] # 2n somite-s
# test_state= [0, 90, 0, 50, 30, 20]  # 2n somite-1
# test_state=[100, 100, -60, 50, -75, 30] # 2n chaos

# test_state=[37, -89, 88, 89, 76, 51, -56, 43, -57, 35, 1, -16, 36, 7, -53, 6, 0, 0, 31, 0, 32, 0, -51, 0, 31, 6, 56, 1, -50, -2, 1, -32, 1, -30, -5, 0, 80, -100, 58, 75, -50, -50, 0, -30, -75, 76, 100, -26, -5, -39, -18, -36, -25, 51, -61, 1]
# test_state=[37, -89, 88, 89, 76, 51, -56, 43, -57, 35, 1, -16, 36, 7, -53, 6, -36, 26, 31, 56, 32, -55, -51, 10, 31, 6, 56, 1, -50, -2, 1, -32, 1, -30, -5, 5, 80, -100, 58, 75, -50, -50, 0, -30, -75, 76, 100, -26, -5, -39, -18, -36, -25, 51, -61, 1]

test_state=[160, -50, -110, 105, 5, 0, 50, 50, -5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -105, 150, 0, 0, 0, 0, 0]
test_state=[165, -120, -75, 175, 155, -185, 200, -165, 120, -110, 20, -105, -15, -55, 200, 160, 5, -15, -10, 160, 105, 55, 100, -150, 155, -150, -155, 55, 55, 5, -5, 10, -100, 0, 10, 50, -50, 50, 5, -5, -5, 50, 10, 50, 50, 0, 0, -50, 5, -200, 175, 125, -130, -50, 50, -5]




# Calculate number of nodes from the length of state vector
n_nodes = int((-1 + (1 + 4*len(test_state))**0.5) / 2)  # solve quadratic: n^2 + n - len(state) = 0
n_weights = n_nodes * n_nodes

W = weights_to_matrix(test_state[:n_weights])
print(W)

def update_plot(cell_pos, **kwargs):
    params = list(kwargs.values())
    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(7, 18)) 
    
    # Plot somite pattern and get reward
    start_time = time.perf_counter_ns()
    # reward = somitogenesis_reward_func(params, plot=True, ax=ax1, SPARSITY_WEIGHT=0.0)
    reward = somitogenesis_sparsity_reward_func(params, plot=True, ax=ax1)
    end_time = time.perf_counter_ns()
    print(f"Reward for somitogenesis: {reward}")
    print(f"Time taken to run somitogenesis_reward_func: {(end_time - start_time)/1e9:.9f} seconds")
    
    # Plot oscillation diagram for selected cell_pos
    t_sim, cell_trajectory, _ = somitogenesis_sol_func(params, cell_position=cell_pos)
    for i in range(n_nodes):
        ax2.plot(t_sim, cell_trajectory[:, i], label=f'Gene {i+1}', linewidth=2)
    ax2.set_xlabel('Time')
    ax2.set_ylabel('Gene Concentration')
    ax2.set_title(f'Gene Expression Dynamics - Cell {cell_pos}')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    # Draw network motif in last subplot
    draw_network_motif(params, ax=ax3)
    ax3.set_title(f"{n_nodes}-Node Network Motif")
    
    plt.tight_layout()

# Create sliders for all parameters
sliders = {
    'cell_pos': IntSlider(min=0, max=99, step=1, value=1, description='Cell Position')
}
# Weight sliders
for i in range(n_weights):
    default_value = test_state[i] if i < len(test_state) else 0
    sliders[f'w{i+1}'] = FloatSlider(min=-10000, max=10000, step=1, value=default_value)
# D value sliders    
for i in range(n_nodes):
    default_value = test_state[n_weights+i] if n_weights+i < len(test_state) else 0
    sliders[f'd{i+1}'] = FloatSlider(min=-10000, max=10000, step=1, value=default_value)

interact(update_plot, **sliders)


[[ 165  175  200 -105  -10   55    5]
 [ -75 -120  120  -55  105    5   -5]
 [-185 -165  155  160  100   10   10]
 [  20  -15  200 -110  155    0   50]
 [ -15  160   55 -150    5   50    0]
 [-155   55   -5 -100   10 -150    5]
 [  50   -5   50   50    0  -50  -50]]


interactive(children=(IntSlider(value=1, description='Cell Position', max=99), FloatSlider(value=165.0, descri…

<function __main__.update_plot(cell_pos, **kwargs)>

In [None]:
"""Plot network motifs and their corresponding somite patterns"""
# from graph.graph import plot_network_motifs_and_somites

# # Define test weights to visualize
# test_weights_list = [
#     # (65, -110, 52, -40, 32, -8, -65, -32, 71),
#     [-5, 200, -6, -51, -26, 5, 5, 1, -125, 25, 30, 100, 0, 60, -5, -25, -1, 5, 0, -75, 30, 0, -1, -200, -25],
#     [60, 32, -38, -85, 70, -63, 22, -27, -7],
#     [60, 32, -38, -85, 70, -63, 22, -27, -7, 0, 0, 0, 0, 0, 0, 0]
# ]

# # Plot network motifs and their corresponding somite patterns
# save_path = plot_network_motifs_and_somites(test_weights_list)
# print(f"Plot saved to: {save_path}")



In [None]:
"""Plot kymograph of g(i,t) function from somitogenesis reward function"""
# import numpy as np
# import matplotlib.pyplot as plt

# # Parameters from somitogenesis_reward_func
# N_CELLS = 100
# N_SIMTIME = 90 
# N_TIMEPOINTS = 200
# A, B = 0.1/5, 0.2/5

# # Create position and time arrays
# positions = np.arange(N_CELLS).reshape(-1, 1)
# t = np.linspace(0, N_SIMTIME, N_TIMEPOINTS)

# # Calculate g(i,t) for all positions and times
# g_values = np.zeros((N_TIMEPOINTS, N_CELLS))
# for i, time in enumerate(t):
#     g = np.minimum(np.exp(A * positions - B * time), 1)
#     g_values[i] = g.flatten()

# # Plot kymograph
# plt.figure(figsize=(10, 6))
# plt.imshow(g_values.T, aspect='auto', cmap='Blues', 
#           extent=[0, N_SIMTIME, N_CELLS, 0])
# plt.colorbar(label='g(i,t)')
# plt.xlabel('Time')
# plt.ylabel('Position')
# plt.title('Kymograph of g(i,t) = min(exp(A*i - B*t), 1)')
# plt.show()

# # Print parameter values used
# print(f"Parameters used:")
# print(f"A = {A:.6f}")
# print(f"B = {B:.6f}")
# print(f"N_CELLS = {N_CELLS}")
# print(f"N_SIMTIME = {N_SIMTIME}")
# print(f"N_TIMEPOINTS = {N_TIMEPOINTS}")


In [None]:
"""Test cases for Masking in GridEnv"""
# import sys
# import os
# import numpy as np
# from types import SimpleNamespace

# # Add the project root directory to sys.path
# current_dir = os.getcwd()
# if 'notebooks' in current_dir:
#     project_root = os.path.abspath(os.path.join(current_dir, '..', '..'))
# else:
#     project_root = current_dir

# if project_root not in sys.path:
#     sys.path.append(project_root)

# from disc_gflownet.envs.grid_env import GridEnv

# # Create a simple test environment with 3 nodes (12 dimensions: 9 weights + 3 diagonals)
# args = SimpleNamespace(
#     n_workers=1,
#     cache_max_size=1000,
#     min_reward=0.001,
#     custom_reward_fn=lambda x: 0,  # Dummy reward function
#     n_steps=20,
#     n_dims=4**2+4,
#     max_nodes=4,
#     max_edges=2,
#     actions_per_dim={'weight': [5, 25, -5, -25], 'diagonal': [5, 25, -5, -25]},
#     grid_bound={'weight': {'min': -100, 'max': 100}, 'diagonal': {'min': -100, 'max': 100}},
#     enable_time=False,
#     consistent_signs=True
# )

# env = GridEnv(args)

# # Print all actions first
# env.print_actions()


# print(f"Environment has {env.n_nodes} nodes, {env.n_dims} dimensions")
# print(f"Action space size: {env.action_dim}")

# print("\nTesting Masking in GridEnv")
# print("==========================")

# # s0
# env.reset()
# print(f"Steps Total: {env._step}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 

# # First action
# action = allowed_indices[0]
# next_state, reward, done = env.step(action)
# print("\nTaking action:", action)
# print(f"New state: {env._state}")
# print(f"Done: {done}")

# # s1
# print(f"Steps Total: {env._step}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 

# # Second action
# action = allowed_indices[3]
# next_state, reward, done = env.step(action)
# print("\nTaking action:", action)
# print(f"New state: {env._state}")
# print(f"Done: {done}")

# # s2
# print(f"Steps Total: {env._step}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 


In [None]:
"""Test cases for Progressive Masking in GridEnv2"""
# import sys
# import os
# import numpy as np
# from types import SimpleNamespace

# # Add the project root directory to sys.path
# current_dir = os.getcwd()
# if 'notebooks' in current_dir:
#     project_root = os.path.abspath(os.path.join(current_dir, '..', '..'))
# else:
#     project_root = current_dir

# if project_root not in sys.path:
#     sys.path.append(project_root)

# from disc_gflownet.envs.grid_env2 import GridEnv2

# # Create a simple test environment with 3 nodes (12 dimensions: 9 weights + 3 diagonals)
# args = SimpleNamespace(
#     n_workers=1,
#     cache_max_size=1000,
#     min_reward=0.001,
#     custom_reward_fn=lambda x: 0,  # Dummy reward function
#     actions_per_dim={'weight': [1, 5, 25, -1, -5, -25], 'diagonal': [1, 5, -1, -5]},
#     grid_bound={'weight': {'min': -200, 'max': 200}, 'diagonal': {'min': -20, 'max': 20}},
#     enable_time=False,
#     consistent_signs=True,
#     n_dims=3**2+3,  # 9 weights + 3 diagonals
#     n_steps=2+6+10,  # Total steps for all network sizes
#     steps_per_network={1:2, 2:6, 3:10}  # Steps per network size
# )

# env = GridEnv2(args)

# # Print all actions first
# env.print_actions()

# print("Testing Progressive Masking in GridEnv2")
# print(f"Environment has {env.n_nodes} nodes, {env.n_dims} dimensions")
# print(f"Action space size: {env.action_dim}")


# # s0
# env.reset()
# print(f"Current network size: {env.current_network_size}") 
# print(f"Steps Total: {env._step}")
# print(f"_step in current network: {env._step_in_current_network}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 

# # First action
# action = allowed_indices[0]
# next_state, reward, done = env.step(action)
# print("\nTaking action:", action)
# print(f"New state: {env._state}")
# print(f"Done: {done}")

# # s1
# print(f"Current network size: {env.current_network_size}") 
# print(f"Steps Total: {env._step}")
# print(f"_step in current network: {env._step_in_current_network}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 

# # Second action
# action = allowed_indices[0]
# next_state, reward, done = env.step(action)
# print("\nTaking action:", action)
# print(f"New state: {env._state}")
# print(f"Done: {done}")

# # s2
# print(f"Current network size: {env.current_network_size}") 
# print(f"Steps Total: {env._step}")
# print(f"_step in current network: {env._step_in_current_network}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 

# # Third action
# action = allowed_indices[0]
# next_state, reward, done = env.step(action)
# print("\nTaking action:", action)
# print(f"New state: {env._state}")
# print(f"Done: {done}")

# # s3
# print(f"Current network size: {env.current_network_size}") 
# print(f"Steps Total: {env._step}")
# print(f"_step in current network: {env._step_in_current_network}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 

# # Fourth action
# action = allowed_indices[0]
# next_state, reward, done = env.step(action)
# print("\nTaking action:", action)
# print(f"New state: {env._state}")
# print(f"Done: {done}")

# # s4
# print(f"Current network size: {env.current_network_size}") 
# print(f"Steps Total: {env._step}")
# print(f"_step in current network: {env._step_in_current_network}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 

# # Fifth action
# action = allowed_indices[0]
# next_state, reward, done = env.step(action)
# print("\nTaking action:", action)
# print(f"New state: {env._state}")
# print(f"Done: {done}")

# # s5
# print(f"Current network size: {env.current_network_size}") 
# print(f"Steps Total: {env._step}")
# print(f"_step in current network: {env._step_in_current_network}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 

# # Sixth action
# action = allowed_indices[0]
# next_state, reward, done = env.step(action)
# print("\nTaking action:", action)
# print(f"New state: {env._state}")
# print(f"Done: {done}")

# # s6
# print(f"Current network size: {env.current_network_size}") 
# print(f"Steps Total: {env._step}")
# print(f"_step in current network: {env._step_in_current_network}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 

# # Seventh action
# action = allowed_indices[0]
# next_state, reward, done = env.step(action)
# print("\nTaking action:", action)
# print(f"New state: {env._state}")
# print(f"Done: {done}")

# # s7
# print(f"Current network size: {env.current_network_size}") 
# print(f"Steps Total: {env._step}")
# print(f"_step in current network: {env._step_in_current_network}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 

# # Eighth action
# action = allowed_indices[0]
# next_state, reward, done = env.step(action)
# print("\nTaking action:", action)
# print(f"New state: {env._state}")
# print(f"Done: {done}")

# # s8
# print(f"Current network size: {env.current_network_size}") 
# print(f"Steps Total: {env._step}")
# print(f"_step in current network: {env._step_in_current_network}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 

# # Ninth action
# action = allowed_indices[0]
# next_state, reward, done = env.step(action)
# print("\nTaking action:", action)
# print(f"New state: {env._state}")
# print(f"Done: {done}")

# # s9
# print(f"Current network size: {env.current_network_size}") 
# print(f"Steps Total: {env._step}")
# print(f"_step in current network: {env._step_in_current_network}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 

# # Tenth action
# action = allowed_indices[0]
# next_state, reward, done = env.step(action)
# print("\nTaking action:", action)
# print(f"New state: {env._state}")
# print(f"Done: {done}")

# # s10
# print(f"Current network size: {env.current_network_size}") 
# print(f"Steps Total: {env._step}")
# print(f"_step in current network: {env._step_in_current_network}")
# mask = env.get_forward_mask(env._state)
# print(f"Number of allowed actions: {np.sum(mask)}")
# allowed_indices = np.where(mask)[0]
# print(f"Allowed action indices: {allowed_indices}")
# print(f"--------\n") 


In [None]:
"""test combined sparsity reward"""

import numpy as np
def sparsity_reward_combined(state, w1=0.0, w2=1.0):
    # Entropy-based component
    # Normalize values to probabilities
    abs_values = np.abs(state)
    if sum(abs_values) == 0:
        entropy_reward = 1.0  # maximum sparsity
    else:
        probs = abs_values / sum(abs_values)
        # Calculate entropy (lower entropy = more sparse)
        entropy = -sum(p * np.log(p) for p in probs if p > 0)
        entropy_reward = 1 / (1 + entropy)  # transform to reward
    
    # L0 component (explicitly rewards zeros)
    n_zeros = sum(1 for x in state if x == 0)
    l0_reward = n_zeros / len(state)
    
    return w1 * entropy_reward + w2 * l0_reward

# Example states to test
sparse_state1 = (10, 1, 1, 1, 10, 1, 1, 1, 10)
# sparse_state1 = (10, 0, 0, 0, 10, 0, 0, 0, 10)
sparse_state2 = (0, 0, 0, 0, 0, 0, 0, 0, 0)


SPARSITY_WEIGHT = 5.0
sparsity_factor1 = round(1.0 + (SPARSITY_WEIGHT * sparsity_reward_combined(sparse_state1)), 3)
sparsity_factor2 = round(1.0 + (SPARSITY_WEIGHT * sparsity_reward_combined(sparse_state2)), 3)


# Test combined reward function
print(f"Combined sparsity reward for sparse_state1: { sparsity_factor1 }")
print(f"Combined sparsity reward for sparse_state2: { sparsity_factor2 }")

In [None]:
def sigmoid(z):
    """Sigmoid activation function with overflow protection"""
    return 1 / (1 + np.exp( - np.clip(z, -500, 500)))

# Test the sigmoid function with a simple differential equation
# dx₁/dt = σ(d₁g(t) + w₁₁x₁ + w₁₂x₂ + w₁₃x₃) - s₁x₁

def test_equation(x1, x2, x3, d1, w11, w12, w13, s1, g_value=1.0):
    """Calculate the right side of the differential equation for x1"""
    z = d1 * g_value + w11 * x1 + w12 * x2 + w13 * x3
    return sigmoid(z) - s1 * x1

# Create interactive sliders to test the equation
def update_equation(**kwargs):
    x1 = kwargs['x1']
    x2 = kwargs['x2']
    x3 = kwargs['x3']
    d1 = kwargs['d1']
    w11 = kwargs['w11']
    w12 = kwargs['w12']
    w13 = kwargs['w13']
    s1 = kwargs['s1']
    g_value = kwargs['g']
    
    z = d1 * g_value + w11 * x1 + w12 * x2 + w13 * x3
    # z= 0
    sigmoid_z = sigmoid(z)
    dx1dt = sigmoid_z - s1 * x1
    
    print(f"z = {z:.4f}")
    print(f"σ(z) = {sigmoid_z:.4f}")
    # print(f"dx₁/dt = {dx1dt:.4f}")
    
    return sigmoid_z

# Create sliders for all parameters
equation_sliders = {
    'x1': FloatSlider(min=0, max=1, step=0.1, value=0.5, description='x₁'),
    'x2': FloatSlider(min=0, max=1, step=0.1, value=0.5, description='x₂'),
    'x3': FloatSlider(min=0, max=1, step=0.1, value=0.5, description='x₃'),
    'd1': FloatSlider(min=-10, max=10, step=0.5, value=1.0, description='d₁'),
    'w11': FloatSlider(min=-10, max=10, step=0.5, value=0.0, description='w₁₁'),
    'w12': FloatSlider(min=-10, max=10, step=0.5, value=0.0, description='w₁₂'),
    'w13': FloatSlider(min=-10, max=100, step=0.5, value=0.0, description='w₁₃'),
    's1': FloatSlider(min=0, max=2, step=0.1, value=1.0, description='s₁'),
    'g': FloatSlider(min=0, max=1, step=0.1, value=1.0, description='g(t)')
}


interact(update_equation, **equation_sliders)

In [None]:
# Simple test to understand how @ W.T works with a 2-node system
n_nodes = 2

# Create a simple 2x2 weight matrix
W = np.array([
    [1, 4],
    [3, 2]
])
print("W:\n", W)
print("W.T:\n", W.T)

# Create a simple x array with 3 cells, 2 nodes each
x = np.array([2, 3])  # Flattened array

# # Reshape to (3 cells, 2 nodes)
x_reshaped = x.reshape(-1, 2)
print("x_reshaped:\n", x_reshaped)

# # Calculate x_reshaped @ W.T
result = x_reshaped @ W.T
print("x_reshaped @ W.T:\n", result)

# Alternative way using W @ x_reshaped.T
result_alt = (W @ x_reshaped.T)
print("\nAlternative calculation:")
print("(W @ x_reshaped.T).T:\n", result_alt)

In [None]:
state = [70, 100, -80, 100, -75, 30]
int((-1 + (1 + 4*(6) )**0.5) / 2)